Skip to content

Commit c9966c7

Browse files
asddongmenti-chi-bot
authored andcommitted
This is an automated cherry-pick of pingcap#12287
Signed-off-by: ti-chi-bot <[email protected]>
1 parent 70d4f37 commit c9966c7

File tree

5 files changed

+530
-24
lines changed

5 files changed

+530
-24
lines changed

cdc/puller/ddl_puller.go

Lines changed: 321 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,7 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
373373
zap.String("schema", job.SchemaName),
374374
zap.String("table", job.TableName),
375375
zap.String("query", job.Query),
376+
<<<<<<< HEAD
376377
zap.String("job", job.String()))
377378
return true, nil
378379
}
@@ -381,6 +382,12 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
381382
if err := snap.FillSchemaName(job); err != nil {
382383
log.Info("failed to fill schema name for ddl job", zap.Error(err))
383384
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName) {
385+
=======
386+
zap.Uint64("startTs", job.StartTS),
387+
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
388+
zap.Error(err))
389+
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName, job.StartTS) {
390+
>>>>>>> db43be26bf (puller: Support discarding unsupported DDL by setting `ignore-txn-start-ts` in filter. (#12287))
384391
return true, nil
385392
}
386393
return true, cerror.WrapError(cerror.ErrHandleDDLFailed,
@@ -393,6 +400,45 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
393400
if err != nil {
394401
return true, errors.Trace(err)
395402
}
403+
<<<<<<< HEAD
404+
=======
405+
case timodel.ActionCreateTables:
406+
querys, err := ddl.SplitQueries(job.Query)
407+
if err != nil {
408+
return false, errors.Trace(err)
409+
}
410+
// we only use multiTableInfos and Querys when we generate job event
411+
// So if some table should be discard, we just need to delete the info from multiTableInfos and Querys
412+
if len(querys) != len(job.BinlogInfo.MultipleTableInfos) {
413+
log.Error("the number of queries in `Job.Query` is not equal to "+
414+
"the number of `TableInfo` in `Job.BinlogInfo.MultipleTableInfos`",
415+
zap.Int("numQueries", len(querys)),
416+
zap.Int("numTableInfos", len(job.BinlogInfo.MultipleTableInfos)),
417+
zap.String("Job.Query", job.Query),
418+
zap.Any("Job.BinlogInfo.MultipleTableInfos", job.BinlogInfo.MultipleTableInfos),
419+
zap.Error(cerror.ErrTiDBUnexpectedJobMeta.GenWithStackByArgs()))
420+
return false, cerror.ErrTiDBUnexpectedJobMeta.GenWithStackByArgs()
421+
}
422+
423+
var newMultiTableInfos []*timodel.TableInfo
424+
var newQuerys []string
425+
426+
multiTableInfos := job.BinlogInfo.MultipleTableInfos
427+
428+
for index, tableInfo := range multiTableInfos {
429+
// judge each table whether need to be skip
430+
if p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, tableInfo.Name.O, job.StartTS) {
431+
continue
432+
}
433+
newMultiTableInfos = append(newMultiTableInfos, multiTableInfos[index])
434+
newQuerys = append(newQuerys, querys[index])
435+
}
436+
437+
skip = len(newMultiTableInfos) == 0
438+
439+
job.BinlogInfo.MultipleTableInfos = newMultiTableInfos
440+
job.Query = strings.Join(newQuerys, "")
441+
>>>>>>> db43be26bf (puller: Support discarding unsupported DDL by setting `ignore-txn-start-ts` in filter. (#12287))
396442
case timodel.ActionRenameTable:
397443
log.Info("rename table ddl job",
398444
zap.Int64("newSchemaID", job.SchemaID),
@@ -404,10 +450,11 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
404450
oldTable, ok := snap.PhysicalTableByID(job.TableID)
405451
if !ok {
406452
// 1. If we can not find the old table, and the new table name is in filter rule, return error.
407-
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O)
453+
discard := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.StartTS)
408454
if !discard {
409455
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
410456
}
457+
<<<<<<< HEAD
411458
skip = true
412459
} else {
413460
log.Info("rename table ddl job",
@@ -424,14 +471,38 @@ func (p *ddlJobPullerImpl) handleJob(job *timodel.Job) (skip bool, err error) {
424471
if skipByOldTableName && skipByNewTableName {
425472
skip = true
426473
return true, nil
474+
=======
475+
log.Warn("skip rename table ddl since cannot found the old table info",
476+
zap.String("namespace", p.changefeedID.Namespace),
477+
zap.String("changefeed", p.changefeedID.ID),
478+
zap.Int64("tableID", job.TableID),
479+
zap.Int64("newSchemaID", job.SchemaID),
480+
zap.String("newSchemaName", job.SchemaName),
481+
zap.String("oldTableName", job.BinlogInfo.TableInfo.Name.O),
482+
zap.String("newTableName", job.TableName))
483+
return true, nil
484+
}
485+
// since we can find the old table, it must be able to find the old schema.
486+
// 2. If we can find the preTableInfo, we filter it by the old table name.
487+
skipByOldTableName := p.filter.ShouldDiscardDDL(job.Type, oldTable.TableName.Schema, oldTable.TableName.Table, job.StartTS)
488+
skipByNewTableName := p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.BinlogInfo.TableInfo.Name.O, job.StartTS)
489+
if err != nil {
490+
return false, cerror.WrapError(cerror.ErrHandleDDLFailed,
491+
errors.Trace(err), job.Query, job.StartTS, job.StartTS)
492+
}
493+
// 3. If its old table name is not in filter rule, and its new table name in filter rule, return error.
494+
if skipByOldTableName {
495+
if !skipByNewTableName {
496+
return false, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(job.TableID, job.Query)
497+
>>>>>>> db43be26bf (puller: Support discarding unsupported DDL by setting `ignore-txn-start-ts` in filter. (#12287))
427498
}
428499
}
429500
default:
430501
// nil means it is a schema ddl job, it's no need to fill the table name.
431502
if job.BinlogInfo.TableInfo != nil {
432503
job.TableName = job.BinlogInfo.TableInfo.Name.O
433504
}
434-
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName)
505+
skip = p.filter.ShouldDiscardDDL(job.Type, job.SchemaName, job.TableName, job.StartTS)
435506
}
436507

437508
if skip {
@@ -519,6 +590,254 @@ func (p *ddlJobPullerImpl) checkIneligibleTableDDL(snapBefore *schema.Snapshot,
519590
"then resume the changefeed.", job.Query))
520591
}
521592

593+
<<<<<<< HEAD
594+
=======
595+
// handleRenameTables gets all the tables that are renamed
596+
// in the DDL job out and filter them one by one,
597+
// if all the tables are filtered, skip it.
598+
func (p *ddlJobPullerImpl) handleRenameTables(job *timodel.Job) (skip bool, err error) {
599+
var args *timodel.RenameTablesArgs
600+
args, err = timodel.GetRenameTablesArgs(job)
601+
if err != nil {
602+
return true, errors.Trace(err)
603+
}
604+
605+
multiTableInfos := job.BinlogInfo.MultipleTableInfos
606+
if len(multiTableInfos) != len(args.RenameTableInfos) {
607+
return true, cerror.ErrInvalidDDLJob.GenWithStackByArgs(job.ID)
608+
}
609+
610+
// we filter subordinate rename table ddl by these principles:
611+
// 1. old table name matches the filter rule, remain it.
612+
// 2. old table name does not match and new table name matches the filter rule, return error.
613+
// 3. old table name and new table name do not match the filter rule, skip it.
614+
remainTables := make([]*timodel.TableInfo, 0, len(multiTableInfos))
615+
snap := p.schemaStorage.GetLastSnapshot()
616+
617+
argsForRemaining := &timodel.RenameTablesArgs{}
618+
for i, tableInfo := range multiTableInfos {
619+
info := args.RenameTableInfos[i]
620+
var shouldDiscardOldTable, shouldDiscardNewTable bool
621+
oldTable, ok := snap.PhysicalTableByID(tableInfo.ID)
622+
if !ok {
623+
shouldDiscardOldTable = true
624+
} else {
625+
shouldDiscardOldTable = p.filter.ShouldDiscardDDL(job.Type, info.OldSchemaName.O, oldTable.Name.O, job.StartTS)
626+
}
627+
628+
newSchemaName, ok := snap.SchemaByID(info.NewSchemaID)
629+
if !ok {
630+
// the new table name does not hit the filter rule, so we should discard the table.
631+
shouldDiscardNewTable = true
632+
} else {
633+
shouldDiscardNewTable = p.filter.ShouldDiscardDDL(job.Type, newSchemaName.Name.O, info.NewTableName.O, job.StartTS)
634+
}
635+
636+
if shouldDiscardOldTable && shouldDiscardNewTable {
637+
// skip a rename table ddl only when its old table name and new table name are both filtered.
638+
log.Info("RenameTables is filtered",
639+
zap.Int64("tableID", tableInfo.ID),
640+
zap.String("schema", info.OldSchemaName.O),
641+
zap.String("query", job.Query))
642+
continue
643+
}
644+
if shouldDiscardOldTable && !shouldDiscardNewTable {
645+
// if old table is not in filter rule and its new name is in filter rule, return error.
646+
return true, cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(tableInfo.ID, job.Query)
647+
}
648+
// old table name matches the filter rule, remain it.
649+
argsForRemaining.RenameTableInfos = append(argsForRemaining.RenameTableInfos, &timodel.RenameTableArgs{
650+
OldSchemaID: info.OldSchemaID,
651+
NewSchemaID: info.NewSchemaID,
652+
TableID: info.TableID,
653+
NewTableName: info.NewTableName,
654+
OldSchemaName: info.OldSchemaName,
655+
OldTableName: info.OldTableName,
656+
})
657+
remainTables = append(remainTables, tableInfo)
658+
}
659+
660+
if len(remainTables) == 0 {
661+
return true, nil
662+
}
663+
664+
bakJob, err := entry.GetNewJobWithArgs(job, argsForRemaining)
665+
if err != nil {
666+
return true, errors.Trace(err)
667+
}
668+
job.RawArgs = bakJob.RawArgs
669+
job.BinlogInfo.MultipleTableInfos = remainTables
670+
return false, nil
671+
}
672+
673+
// DDLPuller is the interface for DDL Puller, used by owner only.
674+
type DDLPuller interface {
675+
// Run runs the DDLPuller
676+
Run(ctx context.Context) error
677+
// PopFrontDDL returns and pops the first DDL job in the internal queue
678+
PopFrontDDL() (uint64, *timodel.Job)
679+
// ResolvedTs returns the resolved ts of the DDLPuller
680+
ResolvedTs() uint64
681+
// Close closes the DDLPuller
682+
Close()
683+
}
684+
685+
type ddlPullerImpl struct {
686+
ddlJobPuller DDLJobPuller
687+
688+
mu sync.Mutex
689+
resolvedTS uint64
690+
pendingDDLJobs []*timodel.Job
691+
lastDDLJobID int64
692+
cancel context.CancelFunc
693+
694+
changefeedID model.ChangeFeedID
695+
}
696+
697+
// NewDDLPuller return a puller for DDL Event
698+
func NewDDLPuller(
699+
up *upstream.Upstream,
700+
startTs uint64,
701+
changefeed model.ChangeFeedID,
702+
schemaStorage entry.SchemaStorage,
703+
filter filter.Filter,
704+
) DDLPuller {
705+
var puller DDLJobPuller
706+
// storage can be nil only in the test
707+
if up.KVStorage != nil {
708+
changefeed.ID += "_owner_ddl_puller"
709+
puller = NewDDLJobPuller(up, startTs, config.GetGlobalServerConfig(),
710+
changefeed, schemaStorage, filter)
711+
}
712+
713+
return &ddlPullerImpl{
714+
ddlJobPuller: puller,
715+
resolvedTS: startTs,
716+
cancel: func() {},
717+
changefeedID: changefeed,
718+
}
719+
}
720+
721+
func (h *ddlPullerImpl) addToPending(job *timodel.Job) {
722+
if job == nil {
723+
return
724+
}
725+
if job.ID == h.lastDDLJobID {
726+
log.Warn("ignore duplicated DDL job",
727+
zap.String("namespace", h.changefeedID.Namespace),
728+
zap.String("changefeed", h.changefeedID.ID),
729+
zap.String("schema", job.SchemaName),
730+
zap.String("table", job.TableName),
731+
732+
zap.String("query", job.Query),
733+
zap.Uint64("startTs", job.StartTS),
734+
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
735+
zap.Int64("jobID", job.ID))
736+
return
737+
}
738+
h.mu.Lock()
739+
defer h.mu.Unlock()
740+
h.pendingDDLJobs = append(h.pendingDDLJobs, job)
741+
h.lastDDLJobID = job.ID
742+
log.Info("ddl puller receives new pending job",
743+
zap.String("namespace", h.changefeedID.Namespace),
744+
zap.String("changefeed", h.changefeedID.ID),
745+
zap.String("schema", job.SchemaName),
746+
zap.String("table", job.TableName),
747+
zap.String("query", job.Query),
748+
zap.Uint64("startTs", job.StartTS),
749+
zap.Uint64("finishTs", job.BinlogInfo.FinishedTS),
750+
zap.Int64("jobID", job.ID))
751+
}
752+
753+
// Run the ddl puller to receive DDL events
754+
func (h *ddlPullerImpl) Run(ctx context.Context) error {
755+
g, ctx := errgroup.WithContext(ctx)
756+
ctx, cancel := context.WithCancel(ctx)
757+
h.cancel = cancel
758+
759+
g.Go(func() error { return h.ddlJobPuller.Run(ctx) })
760+
761+
g.Go(func() error {
762+
cc := clock.New()
763+
ticker := cc.Ticker(ddlPullerStuckWarnDuration)
764+
defer ticker.Stop()
765+
lastResolvedTsAdvancedTime := cc.Now()
766+
for {
767+
select {
768+
case <-ctx.Done():
769+
return ctx.Err()
770+
case <-ticker.C:
771+
duration := cc.Since(lastResolvedTsAdvancedTime)
772+
if duration > ddlPullerStuckWarnDuration {
773+
log.Warn("ddl puller resolved ts has not advanced",
774+
zap.String("namespace", h.changefeedID.Namespace),
775+
zap.String("changefeed", h.changefeedID.ID),
776+
zap.Duration("duration", duration),
777+
zap.Uint64("resolvedTs", atomic.LoadUint64(&h.resolvedTS)))
778+
}
779+
case e := <-h.ddlJobPuller.Output():
780+
if e.OpType == model.OpTypeResolved {
781+
if e.CRTs > atomic.LoadUint64(&h.resolvedTS) {
782+
atomic.StoreUint64(&h.resolvedTS, e.CRTs)
783+
lastResolvedTsAdvancedTime = cc.Now()
784+
continue
785+
}
786+
}
787+
h.addToPending(e.Job)
788+
}
789+
}
790+
})
791+
792+
log.Info("DDL puller started",
793+
zap.String("namespace", h.changefeedID.Namespace),
794+
zap.String("changefeed", h.changefeedID.ID),
795+
zap.Uint64("resolvedTS", atomic.LoadUint64(&h.resolvedTS)))
796+
797+
defer func() {
798+
log.Info("DDL puller stopped",
799+
zap.String("namespace", h.changefeedID.Namespace),
800+
zap.String("changefeed", h.changefeedID.ID))
801+
}()
802+
803+
return g.Wait()
804+
}
805+
806+
// PopFrontDDL return the first pending DDL job and remove it from the pending list
807+
func (h *ddlPullerImpl) PopFrontDDL() (uint64, *timodel.Job) {
808+
h.mu.Lock()
809+
defer h.mu.Unlock()
810+
if len(h.pendingDDLJobs) == 0 {
811+
return atomic.LoadUint64(&h.resolvedTS), nil
812+
}
813+
job := h.pendingDDLJobs[0]
814+
h.pendingDDLJobs = h.pendingDDLJobs[1:]
815+
return job.BinlogInfo.FinishedTS, job
816+
}
817+
818+
// Close the ddl puller, release all resources.
819+
func (h *ddlPullerImpl) Close() {
820+
h.cancel()
821+
if h.ddlJobPuller != nil {
822+
h.ddlJobPuller.Close()
823+
}
824+
log.Info("DDL puller closed",
825+
zap.String("namespace", h.changefeedID.Namespace),
826+
zap.String("changefeed", h.changefeedID.ID))
827+
}
828+
829+
func (h *ddlPullerImpl) ResolvedTs() uint64 {
830+
h.mu.Lock()
831+
defer h.mu.Unlock()
832+
if len(h.pendingDDLJobs) == 0 {
833+
return atomic.LoadUint64(&h.resolvedTS)
834+
}
835+
job := h.pendingDDLJobs[0]
836+
return job.BinlogInfo.FinishedTS
837+
}
838+
839+
// Below are some helper functions for ddl puller.
840+
>>>>>>> db43be26bf (puller: Support discarding unsupported DDL by setting `ignore-txn-start-ts` in filter. (#12287))
522841
func findDBByName(dbs []*timodel.DBInfo, name string) (*timodel.DBInfo, error) {
523842
for _, db := range dbs {
524843
if db.Name.L == name {

0 commit comments

Comments
 (0)