@@ -30,12 +30,14 @@ import (
30
30
31
31
"github.com/pingcap/errors"
32
32
"github.com/pingcap/failpoint"
33
+ "github.com/pingcap/tidb/pkg/config/kerneltype"
33
34
"github.com/pingcap/tidb/pkg/ddl/logutil"
34
35
ddlutil "github.com/pingcap/tidb/pkg/ddl/util"
35
36
"github.com/pingcap/tidb/pkg/domain/infosync"
36
37
"github.com/pingcap/tidb/pkg/infoschema"
37
38
infoschemacontext "github.com/pingcap/tidb/pkg/infoschema/context"
38
39
"github.com/pingcap/tidb/pkg/meta/model"
40
+ "github.com/pingcap/tidb/pkg/parser/ast"
39
41
"github.com/pingcap/tidb/pkg/sessionctx"
40
42
"github.com/pingcap/tidb/pkg/table"
41
43
"github.com/pingcap/tidb/pkg/util"
@@ -221,6 +223,8 @@ var (
221
223
PullTiFlashPdTick = atomicutil .NewUint64 (30 * 5 )
222
224
// UpdateTiFlashStoreTick indicates the number of intervals before we fully update TiFlash stores.
223
225
UpdateTiFlashStoreTick = atomicutil .NewUint64 (5 )
226
+ // RefreshRulesTick indicates the number of intervals before we refresh TiFlash rules.
227
+ RefreshRulesTick = atomicutil .NewUint64 (10 )
224
228
// PollTiFlashBackoffMaxTick is the max tick before we try to update TiFlash replica availability for one table.
225
229
PollTiFlashBackoffMaxTick TiFlashTick = 10
226
230
// PollTiFlashBackoffMinTick is the min tick before we try to update TiFlash replica availability for one table.
@@ -548,6 +552,99 @@ func (d *ddl) refreshTiFlashTicker(ctx sessionctx.Context, pollTiFlashContext *T
548
552
return nil
549
553
}
550
554
555
+ type pending struct {
556
+ ID int64
557
+ TableInfo * model.TableInfo
558
+ DBInfo * model.DBInfo
559
+ }
560
+
561
+ // refreshTiFlashPlacementRules will refresh the placement rules of TiFlash replicas if on tick.
562
+ // 1. It will scan all the meta and check if there is any TiFlash replica.
563
+ // 2. If there is, it will check if the placement rules are missing.
564
+ // 3. If the placement rules are missing, it will add by submit a ActionSetTiFlashReplica job to repair the entire table.
565
+ func (d * ddl ) refreshTiFlashPlacementRules (sctx sessionctx.Context , tick uint64 ) error {
566
+ if tick % RefreshRulesTick .Load () != 0 {
567
+ return nil
568
+ }
569
+ schema := d .infoCache .GetLatest ()
570
+ if schema == nil {
571
+ return errors .New ("schema is nil" )
572
+ }
573
+
574
+ var pendings []pending
575
+
576
+ for _ , dbResult := range schema .ListTablesWithSpecialAttribute (infoschemacontext .TiFlashAttribute ) {
577
+ db , ok := schema .SchemaByName (dbResult .DBName )
578
+ if ! ok {
579
+ return infoschema .ErrDatabaseNotExists .GenWithStackByArgs (dbResult .DBName .O )
580
+ }
581
+ for _ , tblInfo := range dbResult .TableInfos {
582
+ if tblInfo .TiFlashReplica == nil {
583
+ continue
584
+ }
585
+
586
+ if ps := tblInfo .GetPartitionInfo (); ps != nil {
587
+ collectPendings := func (ps []model.PartitionDefinition ) {
588
+ for _ , p := range ps {
589
+ pendings = append (pendings , pending {
590
+ ID : p .ID ,
591
+ TableInfo : tblInfo ,
592
+ DBInfo : db ,
593
+ })
594
+ }
595
+ }
596
+ collectPendings (ps .Definitions )
597
+ collectPendings (ps .AddingDefinitions )
598
+ } else {
599
+ pendings = append (pendings , pending {
600
+ ID : tblInfo .ID ,
601
+ TableInfo : tblInfo ,
602
+ DBInfo : db ,
603
+ })
604
+ }
605
+ }
606
+ }
607
+
608
+ fixed := make (map [int64 ]struct {})
609
+ for _ , replica := range pendings {
610
+ if _ , ok := fixed [replica .TableInfo .ID ]; ok {
611
+ continue
612
+ }
613
+ rule , err := infosync .GetPlacementRule (d .ctx , replica .ID )
614
+ if err != nil {
615
+ logutil .DDLLogger ().Warn ("get placement rule err" , zap .Error (err ))
616
+ continue
617
+ }
618
+ // pdhttp.GetPlacementRule returns the zero object instead of nil pointer when not found.
619
+ ruleIsMissing := rule == nil || len (rule .ID ) == 0
620
+ if ruleIsMissing && replica .TableInfo .TiFlashReplica .Count > 0 {
621
+ job := & model.Job {
622
+ Version : model .GetJobVerInUse (),
623
+ SchemaID : replica .DBInfo .ID ,
624
+ TableID : replica .TableInfo .ID ,
625
+ SchemaName : replica .DBInfo .Name .L ,
626
+ TableName : replica .TableInfo .Name .L ,
627
+ Type : model .ActionSetTiFlashReplica ,
628
+ BinlogInfo : & model.HistoryInfo {},
629
+ CDCWriteSource : sctx .GetSessionVars ().CDCWriteSource ,
630
+ SQLMode : sctx .GetSessionVars ().SQLMode ,
631
+ }
632
+ args := model.SetTiFlashReplicaArgs {TiflashReplica : ast.TiFlashReplicaSpec {
633
+ Count : replica .TableInfo .TiFlashReplica .Count ,
634
+ Labels : replica .TableInfo .TiFlashReplica .LocationLabels ,
635
+ }}
636
+ err = d .executor .doDDLJob2 (sctx , job , & args )
637
+ if err != nil {
638
+ logutil .DDLLogger ().Warn ("fix placement rule err" , zap .Error (err ))
639
+ } else {
640
+ logutil .DDLLogger ().Info ("fix placement rule success" , zap .Int64 ("tableID" , replica .TableInfo .ID ))
641
+ fixed [replica .TableInfo .ID ] = struct {}{}
642
+ }
643
+ }
644
+ }
645
+ return nil
646
+ }
647
+
551
648
func (d * ddl ) PollTiFlashRoutine () {
552
649
pollTiflashContext , err := NewTiFlashManagementContext ()
553
650
if err != nil {
@@ -594,6 +691,11 @@ func (d *ddl) PollTiFlashRoutine() {
594
691
logutil .DDLLogger ().Warn ("refreshTiFlashTicker returns error" , zap .Error (err ))
595
692
}
596
693
}
694
+ if kerneltype .IsNextGen () {
695
+ if err := d .refreshTiFlashPlacementRules (sctx , pollTiflashContext .PollCounter ); err != nil {
696
+ logutil .DDLLogger ().Warn ("refreshTiFlashPlacementRules returns error" , zap .Error (err ))
697
+ }
698
+ }
597
699
} else {
598
700
infosync .CleanTiFlashProgressCache ()
599
701
}
0 commit comments