Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions api/v2/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func RegisterOpenAPIV2Routes(router *gin.Engine, api OpenAPIV2) {
changefeedGroup.POST("/:changefeed_id/resume", coordinatorMiddleware, authenticateMiddleware, api.ResumeChangefeed)
changefeedGroup.POST("/:changefeed_id/pause", coordinatorMiddleware, authenticateMiddleware, api.PauseChangefeed)
changefeedGroup.DELETE("/:changefeed_id", coordinatorMiddleware, authenticateMiddleware, api.DeleteChangefeed)
changefeedGroup.GET("/:changefeed_id/status", coordinatorMiddleware, authenticateMiddleware, api.status)
changefeedGroup.GET("/:changefeed_id/synced", coordinatorMiddleware, authenticateMiddleware, api.syncState)

// internal APIs
Expand Down
60 changes: 57 additions & 3 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,26 @@ func (h *OpenAPIV2) GetChangeFeed(c *gin.Context) {
c.JSON(http.StatusOK, detail)
}

func shouldShowRunningError(state config.FeedState) bool {
switch state {
case config.StateNormal, config.StateStopped, config.StateFinished, config.StateRemoved:
return false
default:
return true
}
}

func CfInfoToAPIModel(
info *config.ChangeFeedInfo,
status *config.ChangeFeedStatus,
taskStatus []config.CaptureTaskStatus,
) *ChangeFeedInfo {
var runningError *RunningError
var runningError *config.RunningError

// if the state is normal, we shall not return the error info
// because changefeed will is retrying. errors will confuse the users
if info.State != config.StateNormal && info.Error != nil {
runningError = &RunningError{
if info.Error != nil && shouldShowRunningError(info.State) {
runningError = &config.RunningError{
Addr: info.Error.Addr,
Code: info.Error.Code,
Message: info.Error.Message,
Expand Down Expand Up @@ -1090,6 +1099,51 @@ func (h *OpenAPIV2) getDispatcherCount(c *gin.Context) {
c.JSON(http.StatusOK, &DispatcherCount{Count: number})
}

// status returns the status of a changefeed.
// Usage:
// curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/changefeed-test1/status
func (h *OpenAPIV2) status(c *gin.Context) {
changefeedDisplayName := common.NewChangeFeedDisplayName(c.Param(api.APIOpVarChangefeedID), GetNamespaceValueWithDefault(c))
co, err := h.server.GetCoordinator()
if err != nil {
_ = c.Error(err)
return
}
info, status, err := co.GetChangefeed(c, changefeedDisplayName)
if err != nil {
_ = c.Error(err)
return
}
var (
lastError *config.RunningError
lastWarning *config.RunningError
)
if info.Error != nil &&
oracle.GetTimeFromTS(status.CheckpointTs).Before(info.Error.Time) {
err := &config.RunningError{
Time: info.Error.Time,
Addr: info.Error.Addr,
Code: info.Error.Code,
Message: info.Error.Message,
}
switch info.State {
case config.StateFailed:
lastError = err
case config.StateWarning:
lastWarning = err
}
}

c.JSON(http.StatusOK, &ChangefeedStatus{
State: string(info.State),
CheckpointTs: status.CheckpointTs,
// FIXME: add correct resolvedTs
ResolvedTs: status.CheckpointTs,
LastError: lastError,
LastWarning: lastWarning,
})
}

// syncState returns the sync state of a changefeed.
// Usage:
// curl -X GET http://127.0.0.1:8300/api/v2/changefeeds/changefeed-test1/synced
Expand Down
29 changes: 10 additions & 19 deletions api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -1091,11 +1091,11 @@ type ChangeFeedInfo struct {
// The ChangeFeed will exits until sync to timestamp TargetTs
TargetTs uint64 `json:"target_ts,omitempty"`
// used for admin job notification, trigger watch event in capture
AdminJobType config.AdminJobType `json:"admin_job_type,omitempty"`
Config *ReplicaConfig `json:"config,omitempty"`
State config.FeedState `json:"state,omitempty"`
Error *RunningError `json:"error,omitempty"`
CreatorVersion string `json:"creator_version,omitempty"`
AdminJobType config.AdminJobType `json:"admin_job_type,omitempty"`
Config *ReplicaConfig `json:"config,omitempty"`
State config.FeedState `json:"state,omitempty"`
Error *config.RunningError `json:"error,omitempty"`
CreatorVersion string `json:"creator_version,omitempty"`

ResolvedTs uint64 `json:"resolved_ts"`
CheckpointTs uint64 `json:"checkpoint_ts"`
Expand All @@ -1116,15 +1116,6 @@ type SyncedStatus struct {
Info string `json:"info"`
}

// RunningError represents some running error from cdc components,
// such as processor.
type RunningError struct {
Time *time.Time `json:"time,omitempty"`
Addr string `json:"addr"`
Code string `json:"code"`
Message string `json:"message"`
}

// toCredential generates a security.Credential from a PDConfig
func (cfg *PDConfig) toCredential() *security.Credential {
credential := &security.Credential{
Expand Down Expand Up @@ -1313,11 +1304,11 @@ type CloudStorageConfig struct {

// ChangefeedStatus holds common information of a changefeed in cdc
type ChangefeedStatus struct {
State string `json:"state,omitempty"`
ResolvedTs uint64 `json:"resolved_ts"`
CheckpointTs uint64 `json:"checkpoint_ts"`
LastError *RunningError `json:"last_error,omitempty"`
LastWarning *RunningError `json:"last_warning,omitempty"`
State string `json:"state,omitempty"`
ResolvedTs uint64 `json:"resolved_ts"`
CheckpointTs uint64 `json:"checkpoint_ts"`
LastError *config.RunningError `json:"last_error,omitempty"`
LastWarning *config.RunningError `json:"last_warning,omitempty"`
}

// GlueSchemaRegistryConfig represents a glue schema registry configuration
Expand Down
2 changes: 1 addition & 1 deletion cmd/cdc/cli/cli_changefeed_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type cfMeta struct {
CheckpointTime api.JSONTime `json:"checkpoint_time"`
Engine config.SortEngine `json:"sort_engine,omitempty"`
FeedState config.FeedState `json:"state"`
RunningError *v2.RunningError `json:"error,omitempty"`
RunningError *config.RunningError `json:"error,omitempty"`
ErrorHis []int64 `json:"error_history,omitempty"`
CreatorVersion string `json:"creator_version"`
TaskStatus []config.CaptureTaskStatus `json:"task_status,omitempty"`
Expand Down
11 changes: 6 additions & 5 deletions downstreamadapter/sink/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ const (
type sink struct {
changefeedID commonType.ChangeFeedID

adminClient kafka.ClusterAdminClient
dmlProducer kafka.AsyncProducer
ddlProducer kafka.SyncProducer
metricsCollector kafka.MetricsCollector
Expand All @@ -64,6 +63,9 @@ type sink struct {
// isNormal indicate whether the sink is in the normal state.
isNormal *atomic.Bool
ctx context.Context
// failpointCh is used to inject failpoints to the run loop.
// Only used in test.
failpointCh chan error
}

func (s *sink) SinkType() commonType.SinkType {
Expand Down Expand Up @@ -115,8 +117,9 @@ func New(
eventChan: make(chan *commonEvent.DMLEvent, 32),
rowChan: make(chan *commonEvent.MQRowEvent, 32),

isNormal: atomic.NewBool(true),
ctx: ctx,
isNormal: atomic.NewBool(true),
ctx: ctx,
failpointCh: make(chan error, 1),
}, nil
}

Expand Down Expand Up @@ -400,9 +403,7 @@ func (s *sink) sendMessages(ctx context.Context) error {
start := time.Now()
if err = s.statistics.RecordBatchExecution(func() (int, int64, error) {
message.SetPartitionKey(future.Key.PartitionKey)

log.Debug("send message to kafka", zap.String("messageKey", string(message.Key)), zap.String("messageValue", string(message.Value)))

if err = s.dmlProducer.AsyncSend(
ctx,
future.Key.Topic,
Expand Down
13 changes: 13 additions & 0 deletions downstreamadapter/sink/pulsar/dml_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ type dmlProducer interface {
ctx context.Context, topic string, message *common.Message,
) error

run(ctx context.Context) error

close()
}

Expand Down Expand Up @@ -108,6 +110,17 @@ func newDMLProducers(
return p, nil
}

func (p *dmlProducers) run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-p.failpointCh:
return errors.Trace(err)
}
}
}

// asyncSendMessage Async send one message
func (p *dmlProducers) asyncSendMessage(
ctx context.Context, topic string, message *common.Message,
Expand Down
3 changes: 3 additions & 0 deletions downstreamadapter/sink/pulsar/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,9 @@ func (s *sink) sendDMLEvent(ctx context.Context) error {
g.Go(func() error {
return s.sendMessages(ctx)
})
g.Go(func() error {
return s.dmlProducer.run(ctx)
})
return g.Wait()
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/diff/spliter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestSplitRangeByRandom(t *testing.T) {
},
}

for i, testCase := range testCases {
for _, testCase := range testCases {
tableInfo, err := dbutiltest.GetTableInfoBySQL(testCase.createTableSQL, parser.New())
require.NoError(t, err)

Expand Down Expand Up @@ -196,7 +196,7 @@ func TestRandomSplitter(t *testing.T) {
},
}

for i, testCase := range testCases {
for _, testCase := range testCases {
tableInfo, err := dbutiltest.GetTableInfoBySQL(testCase.createTableSQL, parser.New())
require.NoError(t, err)

Expand Down
9 changes: 9 additions & 0 deletions pkg/sink/kafka/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/IBM/sarama"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
commonType "github.com/pingcap/ticdc/pkg/common"
cerror "github.com/pingcap/ticdc/pkg/errors"
Expand Down Expand Up @@ -254,6 +255,14 @@ func (p *saramaAsyncProducer) AsyncSend(
if p.closed.Load() {
return cerror.ErrKafkaProducerClosed.GenWithStackByArgs()
}
failpoint.Inject("KafkaSinkAsyncSendError", func() {
// simulate sending message to input channel successfully but flushing
// message to Kafka meets error
log.Info("KafkaSinkAsyncSendError error injected", zap.String("namespace", p.changefeedID.Namespace()),
zap.String("changefeed", p.changefeedID.Name()))
p.failpointCh <- errors.New("kafka sink injected error")
failpoint.Return(nil)
})
msg := &sarama.ProducerMessage{
Topic: topic,
Partition: partition,
Expand Down
10 changes: 10 additions & 0 deletions pkg/sink/mysql/mysql_writer_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/apperror"
"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -61,6 +62,15 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error {
}
}

failpoint.Inject("MySQLSinkExecDDLDelay", func() {
select {
case <-ctx.Done():
failpoint.Return(ctx.Err())
case <-time.After(time.Hour):
}
failpoint.Return(nil)
})

tx, err := w.db.BeginTx(ctx, nil)
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions tests/integration_tests/_utils/ensure
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@
# $2: command to run

total=$1
shift
command=$2
shift 2
for ((i = 1; i <= $total; i++)); do
# run the command
echo "command: $*"
bash -c "$*"
echo "command: $command $@"
bash -c "${command} \"\$@\"" _ "$@"
if [ $? == 0 ]; then
echo "run task successfully"
exit 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function run() {
run_sql "CREATE table capture_session_done_during_task.t (id int primary key auto_increment, a int)" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_sql "INSERT INTO capture_session_done_during_task.t values (),(),(),(),(),(),()" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/downstreamadapter/dispatchermanager/NewEventDispatcherManagerDelay=sleep(4000)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/dispatchermanager/NewEventDispatcherManagerDelay=sleep(4000)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
# wait task is dispatched
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/consistent_partition_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function run() {
cleanup_process $CDC_BINARY
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true);github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql/MySQLSinkExecDDLDelay=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true);github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkExecDDLDelay=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix partition_table.server2

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/consistent_replicate_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function run() {
cleanup_process $CDC_BINARY
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true);github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql/MySQLSinkExecDDLDelay=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true);github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkExecDDLDelay=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix consistent_replicate_ddl.server2

# case 1:
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/consistent_replicate_gbk/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ function run() {
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_gbk.GBKTABLE2 like consistent_replicate_gbk.GBKTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_gbk.GBKTABLE2 select * from consistent_replicate_gbk.GBKTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function run() {
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_storage_file.usertable2 like consistent_replicate_storage_file.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_storage_file.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_file.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function run() {
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_storage_file_large_value.usertable2 like consistent_replicate_storage_file_large_value.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_storage_file_large_value.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_file_large_value.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ function run() {
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_storage_s3.usertable2 like consistent_replicate_storage_s3.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_storage_s3.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_s3.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Loading