Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions downstreamadapter/sink/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"net/url"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/sink/helper"
commonType "github.com/pingcap/ticdc/pkg/common"
Expand All @@ -44,7 +45,6 @@ const (
type sink struct {
changefeedID commonType.ChangeFeedID

adminClient kafka.ClusterAdminClient
dmlProducer kafka.AsyncProducer
ddlProducer kafka.SyncProducer
metricsCollector kafka.MetricsCollector
Expand All @@ -64,6 +64,9 @@ type sink struct {
// isNormal indicate whether the sink is in the normal state.
isNormal *atomic.Bool
ctx context.Context
// failpointCh is used to inject failpoints to the run loop.
// Only used in test.
failpointCh chan error
}

func (s *sink) SinkType() commonType.SinkType {
Expand Down Expand Up @@ -115,8 +118,9 @@ func New(
eventChan: make(chan *commonEvent.DMLEvent, 32),
rowChan: make(chan *commonEvent.MQRowEvent, 32),

isNormal: atomic.NewBool(true),
ctx: ctx,
isNormal: atomic.NewBool(true),
ctx: ctx,
failpointCh: make(chan error, 1),
}, nil
}

Expand Down Expand Up @@ -403,6 +407,14 @@ func (s *sink) sendMessages(ctx context.Context) error {

log.Debug("send message to kafka", zap.String("messageKey", string(message.Key)), zap.String("messageValue", string(message.Value)))

failpoint.Inject("KafkaSinkAsyncSendError", func() {
// simulate sending message to input channel successfully but flushing
// message to Kafka meets error
log.Info("KafkaSinkAsyncSendError error injected", zap.String("namespace", s.changefeedID.Namespace()),
zap.String("changefeed", s.changefeedID.Name()))
s.failpointCh <- errors.New("kafka sink injected error")
failpoint.Return(nil)
})
if err = s.dmlProducer.AsyncSend(
ctx,
future.Key.Topic,
Expand Down
10 changes: 10 additions & 0 deletions pkg/sink/mysql/mysql_writer_ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/apperror"
"github.com/pingcap/ticdc/pkg/common"
Expand Down Expand Up @@ -61,6 +62,15 @@ func (w *Writer) execDDL(event *commonEvent.DDLEvent) error {
}
}

failpoint.Inject("MySQLSinkExecDDLDelay", func() {
select {
case <-ctx.Done():
failpoint.Return(ctx.Err())
case <-time.After(time.Hour):
}
failpoint.Return(nil)
})

tx, err := w.db.BeginTx(ctx, nil)
if err != nil {
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ function run() {
run_sql "CREATE table capture_session_done_during_task.t (id int primary key auto_increment, a int)" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
start_ts=$(run_cdc_cli_tso_query ${UP_PD_HOST_1} ${UP_PD_PORT_1})
run_sql "INSERT INTO capture_session_done_during_task.t values (),(),(),(),(),(),()" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/downstreamadapter/dispatchermanager/NewEventDispatcherManagerDelay=sleep(4000)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/dispatchermanager/NewEventDispatcherManagerDelay=sleep(4000)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
# wait task is dispatched
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/consistent_partition_table/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function run() {
cleanup_process $CDC_BINARY
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true);github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql/MySQLSinkExecDDLDelay=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true);github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkExecDDLDelay=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix partition_table.server2

run_sql_file $CUR/data/prepare.sql ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/consistent_replicate_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ function run() {
cleanup_process $CDC_BINARY
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true);github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql/MySQLSinkExecDDLDelay=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true);github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkExecDDLDelay=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix consistent_replicate_ddl.server2

# case 1:
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/consistent_replicate_gbk/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ function run() {
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_gbk.GBKTABLE2 like consistent_replicate_gbk.GBKTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "insert into consistent_replicate_gbk.GBKTABLE2 select * from consistent_replicate_gbk.GBKTABLE" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function run() {
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_storage_file.usertable2 like consistent_replicate_storage_file.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_storage_file.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_file.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ function run() {
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_storage_file_large_value.usertable2 like consistent_replicate_storage_file_large_value.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_storage_file_large_value.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_file_large_value.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ function run() {
# Inject the failpoint to prevent sink execution, but the global resolved can be moved forward.
# Then we can apply redo log to reach an eventual consistent state in downstream.
cleanup_process $CDC_BINARY
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/txn/mysql/MySQLSinkHangLongTime=return(true)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkHangLongTime=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
run_sql "create table consistent_replicate_storage_s3.usertable2 like consistent_replicate_storage_s3.usertable" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
run_sql "ALTER TABLE consistent_replicate_storage_s3.t1 EXCHANGE PARTITION p3 WITH TABLE consistent_replicate_storage_s3.t2" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
Expand Down
3 changes: 2 additions & 1 deletion tests/integration_tests/kafka_sink_error_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ function run() {

# Return an failpoint error to fail a kafka changefeed.
# Note we return one error for the failpoint, if owner retry changefeed frequently, it may break the test.
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer/KafkaSinkAsyncSendError=1*return(true)'

export GO_FAILPOINTS='github.com/pingcap/ticdc/downstreamadapter/sink/kafka/KafkaSinkAsyncSendError=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1 | tail -n2 | head -n1 | awk '{print $2}')
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?protocol=open-protocol&partition-num=4&version=${KAFKA_VERSION}&max-message-bytes=10485760"
Expand Down
2 changes: 1 addition & 1 deletion tests/integration_tests/kill_owner_with_ddl/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function run() {
run_sql "CREATE table kill_owner_with_ddl.t1 (id int primary key auto_increment, val int);" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
check_table_exists "kill_owner_with_ddl.t1" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}

export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/sink/ddlsink/mysql/MySQLSinkExecDDLDelay=return(true);github.com/pingcap/tiflow/cdc/capture/ownerFlushIntervalInject=return(10)'
export GO_FAILPOINTS='github.com/pingcap/ticdc/pkg/sink/mysql/MySQLSinkExecDDLDelay=return(true);github.com/pingcap/tiflow/cdc/capture/ownerFlushIntervalInject=return(10)'
kill_cdc_and_restart $pd_addr $WORK_DIR $CDC_BINARY

for i in $(seq 2 3); do
Expand Down