Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions coordinator/changefeed/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"sync"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/config"
Expand Down Expand Up @@ -261,13 +260,13 @@ func RemoveMaintainerMessage(id common.ChangeFeedID, server node.ID, casCade boo

// getSinkType returns the sink type of the url.
func getSinkType(scheme string) common.SinkType {
if helper.IsMySQLCompatibleScheme(scheme) {
if config.IsMySQLCompatibleScheme(scheme) {
return common.MysqlSinkType
}
if helper.IsMQScheme(scheme) {
if config.IsMQScheme(scheme) {
return common.KafkaSinkType
}
if helper.IsStorageScheme(scheme) {
if config.IsStorageScheme(scheme) {
return common.CloudStorageSinkType
}
return common.BlackHoleSinkType
Expand Down
77 changes: 0 additions & 77 deletions downstreamadapter/sink/helper/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,80 +113,3 @@ func GetFileExtension(protocol config.Protocol) string {
return ".unknown"
}
}

const (
// KafkaScheme indicates the scheme is kafka.
KafkaScheme = "kafka"
// KafkaSSLScheme indicates the scheme is kafka+ssl.
KafkaSSLScheme = "kafka+ssl"
// BlackHoleScheme indicates the scheme is blackhole.
BlackHoleScheme = "blackhole"
// MySQLScheme indicates the scheme is MySQL.
MySQLScheme = "mysql"
// MySQLSSLScheme indicates the scheme is MySQL+ssl.
MySQLSSLScheme = "mysql+ssl"
// TiDBScheme indicates the scheme is TiDB.
TiDBScheme = "tidb"
// TiDBSSLScheme indicates the scheme is TiDB+ssl.
TiDBSSLScheme = "tidb+ssl"
// S3Scheme indicates the scheme is s3.
S3Scheme = "s3"
// FileScheme indicates the scheme is local fs or NFS.
FileScheme = "file"
// GCSScheme indicates the scheme is gcs.
GCSScheme = "gcs"
// GSScheme is an alias for "gcs"
GSScheme = "gs"
// AzblobScheme indicates the scheme is azure blob storage.\
AzblobScheme = "azblob"
// AzureScheme is an alias for "azblob"
AzureScheme = "azure"
// CloudStorageNoopScheme indicates the scheme is noop.
CloudStorageNoopScheme = "noop"
// PulsarScheme indicates the scheme is pulsar
PulsarScheme = "pulsar"
// PulsarSSLScheme indicates the scheme is pulsar+ssl
PulsarSSLScheme = "pulsar+ssl"
// PulsarHTTPScheme indicates the schema is pulsar with http protocol
PulsarHTTPScheme = "pulsar+http"
// PulsarHTTPSScheme indicates the schema is pulsar with https protocol
PulsarHTTPSScheme = "pulsar+https"
)

// IsMQScheme returns true if the scheme belong to mq scheme.
func IsMQScheme(scheme string) bool {
return scheme == KafkaScheme || scheme == KafkaSSLScheme ||
scheme == PulsarScheme || scheme == PulsarSSLScheme || scheme == PulsarHTTPScheme || scheme == PulsarHTTPSScheme
}

// IsMySQLCompatibleScheme returns true if the scheme is compatible with MySQL.
func IsMySQLCompatibleScheme(scheme string) bool {
return scheme == MySQLScheme || scheme == MySQLSSLScheme ||
scheme == TiDBScheme || scheme == TiDBSSLScheme
}

// IsStorageScheme returns true if the scheme belong to storage scheme.
func IsStorageScheme(scheme string) bool {
return scheme == FileScheme || scheme == S3Scheme || scheme == GCSScheme ||
scheme == GSScheme || scheme == AzblobScheme || scheme == AzureScheme || scheme == CloudStorageNoopScheme
}

// IsPulsarScheme returns true if the scheme belong to pulsar scheme.
func IsPulsarScheme(scheme string) bool {
return scheme == PulsarScheme || scheme == PulsarSSLScheme || scheme == PulsarHTTPScheme || scheme == PulsarHTTPSScheme
}

// IsBlackHoleScheme returns true if the scheme belong to blackhole scheme.
func IsBlackHoleScheme(scheme string) bool {
return scheme == BlackHoleScheme
}

// GetScheme returns the scheme of the url.
func GetScheme(url *url.URL) string {
return strings.ToLower(url.Scheme)
}

// IsPulsarSupportedProtocols returns whether the protocol is supported by pulsar.
func IsPulsarSupportedProtocols(p config.Protocol) bool {
return p == config.ProtocolCanalJSON
}
26 changes: 13 additions & 13 deletions downstreamadapter/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (

"github.com/pingcap/ticdc/downstreamadapter/sink/blackhole"
"github.com/pingcap/ticdc/downstreamadapter/sink/cloudstorage"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/downstreamadapter/sink/kafka"
"github.com/pingcap/ticdc/downstreamadapter/sink/mysql"
"github.com/pingcap/ticdc/downstreamadapter/sink/pulsar"
"github.com/pingcap/ticdc/pkg/common"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
pkgConfig "github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/sink/util"
)
Expand All @@ -48,17 +48,17 @@ func New(ctx context.Context, config *config.ChangefeedConfig, changefeedID comm
if err != nil {
return nil, errors.WrapError(errors.ErrSinkURIInvalid, err)
}
scheme := helper.GetScheme(sinkURI)
scheme := pkgConfig.GetScheme(sinkURI)
switch scheme {
case helper.MySQLScheme, helper.MySQLSSLScheme, helper.TiDBScheme, helper.TiDBSSLScheme:
case pkgConfig.MySQLScheme, pkgConfig.MySQLSSLScheme, pkgConfig.TiDBScheme, pkgConfig.TiDBSSLScheme:
return mysql.New(ctx, changefeedID, config, sinkURI)
case helper.KafkaScheme, helper.KafkaSSLScheme:
case pkgConfig.KafkaScheme, pkgConfig.KafkaSSLScheme:
return kafka.New(ctx, changefeedID, sinkURI, config.SinkConfig)
case helper.PulsarScheme, helper.PulsarSSLScheme, helper.PulsarHTTPScheme, helper.PulsarHTTPSScheme:
case pkgConfig.PulsarScheme, pkgConfig.PulsarSSLScheme, pkgConfig.PulsarHTTPScheme, pkgConfig.PulsarHTTPSScheme:
return pulsar.New(ctx, changefeedID, sinkURI, config.SinkConfig)
case helper.S3Scheme, helper.FileScheme, helper.GCSScheme, helper.GSScheme, helper.AzblobScheme, helper.AzureScheme, helper.CloudStorageNoopScheme:
case pkgConfig.S3Scheme, pkgConfig.FileScheme, pkgConfig.GCSScheme, pkgConfig.GSScheme, pkgConfig.AzblobScheme, pkgConfig.AzureScheme, pkgConfig.CloudStorageNoopScheme:
return cloudstorage.New(ctx, changefeedID, sinkURI, config.SinkConfig, nil)
case helper.BlackHoleScheme:
case pkgConfig.BlackHoleScheme:
return blackhole.New()
}
return nil, errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI)
Expand All @@ -69,17 +69,17 @@ func Verify(ctx context.Context, config *config.ChangefeedConfig, changefeedID c
if err != nil {
return errors.WrapError(errors.ErrSinkURIInvalid, err)
}
scheme := helper.GetScheme(sinkURI)
scheme := pkgConfig.GetScheme(sinkURI)
switch scheme {
case helper.MySQLScheme, helper.MySQLSSLScheme, helper.TiDBScheme, helper.TiDBSSLScheme:
case pkgConfig.MySQLScheme, pkgConfig.MySQLSSLScheme, pkgConfig.TiDBScheme, pkgConfig.TiDBSSLScheme:
return mysql.Verify(ctx, sinkURI, config)
case helper.KafkaScheme, helper.KafkaSSLScheme:
case pkgConfig.KafkaScheme, pkgConfig.KafkaSSLScheme:
return kafka.Verify(ctx, changefeedID, sinkURI, config.SinkConfig)
case helper.PulsarScheme, helper.PulsarSSLScheme, helper.PulsarHTTPScheme, helper.PulsarHTTPSScheme:
case pkgConfig.PulsarScheme, pkgConfig.PulsarSSLScheme, pkgConfig.PulsarHTTPScheme, pkgConfig.PulsarHTTPSScheme:
return pulsar.Verify(ctx, changefeedID, sinkURI, config.SinkConfig)
case helper.S3Scheme, helper.FileScheme, helper.GCSScheme, helper.GSScheme, helper.AzblobScheme, helper.AzureScheme, helper.CloudStorageNoopScheme:
case pkgConfig.S3Scheme, pkgConfig.FileScheme, pkgConfig.GCSScheme, pkgConfig.GSScheme, pkgConfig.AzblobScheme, pkgConfig.AzureScheme, pkgConfig.CloudStorageNoopScheme:
return cloudstorage.Verify(ctx, changefeedID, sinkURI, config.SinkConfig)
case helper.BlackHoleScheme:
case pkgConfig.BlackHoleScheme:
return nil
}
return errors.ErrSinkURIInvalid.GenWithStackByArgs(sinkURI)
Expand Down
6 changes: 3 additions & 3 deletions maintainer/barrier_event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestScheduleEvent(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "test1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)
spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
event := NewBlockEvent(cfID, tableTriggerEventDispatcherID, spanController, operatorController, &heartbeatpb.State{
Expand Down Expand Up @@ -91,7 +91,7 @@ func TestResendAction(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)
spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1)
Expand Down Expand Up @@ -196,7 +196,7 @@ func TestUpdateSchemaID(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)
spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
require.Equal(t, 1, spanController.GetAbsentSize())
Expand Down
18 changes: 9 additions & 9 deletions maintainer/barrier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestOneBlockEvent(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)
startTs := uint64(10)
spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, startTs)
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestNormalBlock(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)
var blockedDispatcherIDS []*heartbeatpb.DispatcherID
for id := 1; id < 4; id++ {
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestNormalBlockWithTableTrigger(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)
var blockedDispatcherIDS []*heartbeatpb.DispatcherID
for id := 1; id < 3; id++ {
Expand Down Expand Up @@ -481,7 +481,7 @@ func TestSchemaBlock(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)

spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
Expand Down Expand Up @@ -658,7 +658,7 @@ func TestSyncPointBlock(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)
spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 1}, 1)
spanController.AddNewTable(commonEvent.Table{SchemaID: 1, TableID: 2}, 1)
Expand Down Expand Up @@ -819,7 +819,7 @@ func TestNonBlocked(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)
barrier := NewBarrier(spanController, operatorController, false, nil)

Expand Down Expand Up @@ -868,7 +868,7 @@ func TestUpdateCheckpointTs(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)
barrier := NewBarrier(spanController, operatorController, false, nil)
msg := barrier.HandleStatus("node1", &heartbeatpb.BlockStatusRequest{
Expand Down Expand Up @@ -923,7 +923,7 @@ func TestHandleBlockBootstrapResponse(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)

var dispatcherIDs []*heartbeatpb.DispatcherID
Expand Down Expand Up @@ -1084,7 +1084,7 @@ func TestSyncPointBlockPerf(t *testing.T) {
ComponentStatus: heartbeatpb.ComponentState_Working,
CheckpointTs: 1,
}, "node1")
spanController := span.NewController(cfID, ddlSpan, nil, false)
spanController := span.NewController(cfID, ddlSpan, nil, false, true)
operatorController := operator.NewOperatorController(cfID, spanController, 1000)
barrier := NewBarrier(spanController, operatorController, true, nil)
for id := 1; id < 1000; id++ {
Expand Down
5 changes: 2 additions & 3 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/maintainer/replica"
"github.com/pingcap/ticdc/pkg/bootstrap"
Expand Down Expand Up @@ -669,8 +668,8 @@ func isMysqlCompatible(sinkURIStr string) (bool, error) {
if err != nil {
return false, errors.WrapError(errors.ErrSinkURIInvalid, err)
}
scheme := helper.GetScheme(sinkURI)
return helper.IsMySQLCompatibleScheme(scheme), nil
scheme := config.GetScheme(sinkURI)
return config.IsMySQLCompatibleScheme(scheme), nil
}

func (m *Maintainer) onBootstrapDone(cachedResp map[node.ID]*heartbeatpb.MaintainerBootstrapResponse) {
Expand Down
4 changes: 3 additions & 1 deletion maintainer/maintainer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,18 @@ func NewController(changefeedID common.ChangeFeedID,
mc := appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter)

enableTableAcrossNodes := false
enableSplittableCheck := false
var splitter *split.Splitter
if cfConfig != nil && cfConfig.Scheduler.EnableTableAcrossNodes {
enableTableAcrossNodes = true
enableSplittableCheck = cfConfig.Scheduler.EnableSplittableCheck
splitter = split.NewSplitter(changefeedID, pdAPIClient, cfConfig.Scheduler)
}

nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)

// Create span controller
spanController := span.NewController(changefeedID, ddlSpan, splitter, enableTableAcrossNodes)
spanController := span.NewController(changefeedID, ddlSpan, splitter, enableTableAcrossNodes, enableSplittableCheck)

// Create operator controller using spanController
oc := operator.NewOperatorController(changefeedID, spanController, batchSize)
Expand Down
5 changes: 4 additions & 1 deletion maintainer/span/span_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Controller struct {
nodeManager *watcher.NodeManager
splitter *split.Splitter
enableTableAcrossNodes bool
enableSplittableCheck bool
ddlDispatcherID common.DispatcherID
}

Expand All @@ -80,6 +81,7 @@ func NewController(
ddlSpan *replica.SpanReplication,
splitter *split.Splitter,
enableTableAcrossNodes bool,
enableSplittableCheck bool,
) *Controller {
c := &Controller{
changefeedID: changefeedID,
Expand All @@ -88,6 +90,7 @@ func NewController(
nodeManager: appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName),
splitter: splitter,
enableTableAcrossNodes: enableTableAcrossNodes,
enableSplittableCheck: enableSplittableCheck,
ddlDispatcherID: ddlSpan.ID,
}

Expand Down Expand Up @@ -142,7 +145,7 @@ func (c *Controller) AddNewTable(table commonEvent.Table, startTs uint64) {
EndKey: span.EndKey,
}
tableSpans := []*heartbeatpb.TableSpan{tableSpan}
if c.enableTableAcrossNodes && table.Splitable && c.splitter != nil && c.nodeManager != nil && len(c.nodeManager.GetAliveNodes()) > 1 {
if c.enableTableAcrossNodes && c.splitter != nil && (table.Splitable || !c.enableSplittableCheck) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The condition for splitting tables has been changed to allow splitting on a single node by removing the len(c.nodeManager.GetAliveNodes()) > 1 check. While this enables a new capability, it also changes the behavior of the enableTableAcrossNodes flag, which now controls table splitting in general, not just across nodes. This could be confusing for future maintainers. Consider adding a comment to clarify that table splitting is now supported on a single node when this flag is enabled.

// split the whole table span base on region count if table region count is exceed the limit
tableSpans = c.splitter.SplitSpansByRegion(context.Background(), tableSpan)
}
Expand Down
1 change: 1 addition & 0 deletions pkg/common/event/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/parser"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/format"

// NOTE: Do not remove the `test_driver` import.
// For details, refer to: https://github.com/pingcap/parser/issues/43
_ "github.com/pingcap/tidb/pkg/parser/test_driver"
Expand Down
6 changes: 3 additions & 3 deletions pkg/config/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ func (info *ChangeFeedInfo) RmUnusedFields() {
info.rmStorageOnlyFields()
}

if !sink.IsMySQLCompatibleScheme(uri.Scheme) {
if !IsMySQLCompatibleScheme(uri.Scheme) {
info.rmDBOnlyFields()
} else {
// remove fields only being used by MQ and Storage downstream
Expand Down Expand Up @@ -579,7 +579,7 @@ func (info *ChangeFeedInfo) fixMySQLSinkProtocol() {
return
}

if sink.IsMQScheme(uri.Scheme) {
if IsMQScheme(uri.Scheme) {
return
}

Expand All @@ -604,7 +604,7 @@ func (info *ChangeFeedInfo) fixMQSinkProtocol() {
return
}

if !sink.IsMQScheme(uri.Scheme) {
if !IsMQScheme(uri.Scheme) {
return
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/config/replica_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ func (c *ReplicaConfig) ValidateAndAdjust(sinkURI *url.URL) error { // check sin
if c.Scheduler == nil {
c.FixScheduler(false)
} else {
err := c.Scheduler.Validate()
err := c.Scheduler.ValidateAndAdjust(sinkURI)
if err != nil {
return err
}
Expand Down
Loading