@@ -27,6 +27,7 @@ import (
27
27
"github.com/pingcap/failpoint"
28
28
"github.com/pingcap/kvproto/pkg/mpp"
29
29
"github.com/pingcap/tidb/pkg/config"
30
+ "github.com/pingcap/tidb/pkg/ddl/placement"
30
31
"github.com/pingcap/tidb/pkg/distsql"
31
32
"github.com/pingcap/tidb/pkg/executor/internal/builder"
32
33
"github.com/pingcap/tidb/pkg/executor/internal/util"
@@ -40,11 +41,13 @@ import (
40
41
"github.com/pingcap/tidb/pkg/store/copr"
41
42
"github.com/pingcap/tidb/pkg/store/driver/backoff"
42
43
derr "github.com/pingcap/tidb/pkg/store/driver/error"
44
+ "github.com/pingcap/tidb/pkg/store/helper"
43
45
util2 "github.com/pingcap/tidb/pkg/util"
44
46
"github.com/pingcap/tidb/pkg/util/execdetails"
45
47
"github.com/pingcap/tidb/pkg/util/logutil"
46
48
"github.com/pingcap/tidb/pkg/util/memory"
47
49
"github.com/pingcap/tipb/go-tipb"
50
+ "github.com/tikv/client-go/v2/tikv"
48
51
clientutil "github.com/tikv/client-go/v2/util"
49
52
"go.uber.org/zap"
50
53
)
@@ -189,7 +192,7 @@ func NewLocalMPPCoordinator(ctx context.Context, sctx sessionctx.Context, is inf
189
192
return coord
190
193
}
191
194
192
- func (c * localMppCoordinator ) appendMPPDispatchReq (pf * plannercore.Fragment ) error {
195
+ func (c * localMppCoordinator ) appendMPPDispatchReq (pf * plannercore.Fragment , allTiFlashZoneInfo map [ string ] string ) error {
193
196
dagReq , err := builder .ConstructDAGReq (c .sessionCtx , []base.PhysicalPlan {pf .ExchangeSender }, kv .TiFlash )
194
197
if err != nil {
195
198
return errors .Trace (err )
@@ -202,6 +205,8 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *plannercore.Fragment) err
202
205
} else {
203
206
dagReq .EncodeType = tipb .EncodeType_TypeChunk
204
207
}
208
+ zoneHelper := taskZoneInfoHelper {}
209
+ zoneHelper .init (allTiFlashZoneInfo )
205
210
for _ , mppTask := range pf .ExchangeSender .Tasks {
206
211
if mppTask .PartitionTableIDs != nil {
207
212
err = util .UpdateExecutorTableID (context .Background (), dagReq .RootExecutor , true , mppTask .PartitionTableIDs )
@@ -217,6 +222,9 @@ func (c *localMppCoordinator) appendMPPDispatchReq(pf *plannercore.Fragment) err
217
222
if err != nil {
218
223
return err
219
224
}
225
+ zoneHelper .isRoot = pf .IsRoot
226
+ zoneHelper .currentTaskZone = zoneHelper .allTiFlashZoneInfo [mppTask .Meta .GetAddress ()]
227
+ zoneHelper .fillSameZoneFlagForExchange (dagReq .RootExecutor )
220
228
pbData , err := dagReq .Marshal ()
221
229
if err != nil {
222
230
return errors .Trace (err )
@@ -343,6 +351,127 @@ func (c *localMppCoordinator) fixTaskForCTEStorageAndReader(exec *tipb.Executor,
343
351
return nil
344
352
}
345
353
354
+ // taskZoneInfoHelper used to help reset exchange executor's same zone flags
355
+ type taskZoneInfoHelper struct {
356
+ allTiFlashZoneInfo map [string ]string
357
+ // exchangeZoneInfo is used to cache one mpp task's zone info:
358
+ // key is executor id, value is zone info array
359
+ // for ExchangeSender, it's target tiflash nodes' zone info; for ExchangeReceiver, it's source tiflash nodes' zone info
360
+ exchangeZoneInfo map [string ][]string
361
+ tidbZone string
362
+ currentTaskZone string
363
+ isRoot bool
364
+ }
365
+
366
+ func (h * taskZoneInfoHelper ) init (allTiFlashZoneInfo map [string ]string ) {
367
+ h .tidbZone = config .GetGlobalConfig ().Labels [placement .DCLabelKey ]
368
+ h .allTiFlashZoneInfo = allTiFlashZoneInfo
369
+ // initial capacity to 2, for one exchange sender and one exchange receiver
370
+ h .exchangeZoneInfo = make (map [string ][]string , 2 )
371
+ }
372
+
373
+ func (h * taskZoneInfoHelper ) tryQuickFillWithUncertainZones (exec * tipb.Executor , slots int , sameZoneFlags []bool ) (bool , []bool ) {
374
+ if exec .ExecutorId == nil || len (h .currentTaskZone ) == 0 {
375
+ for i := 0 ; i < slots ; i ++ {
376
+ sameZoneFlags = append (sameZoneFlags , true )
377
+ }
378
+ return true , sameZoneFlags
379
+ }
380
+ if h .isRoot && exec .Tp == tipb .ExecType_TypeExchangeSender {
381
+ sameZoneFlags = append (sameZoneFlags , len (h .tidbZone ) == 0 || h .currentTaskZone == h .tidbZone )
382
+ return true , sameZoneFlags
383
+ }
384
+
385
+ // For CTE exchange nodes, data is passed locally, set all to true
386
+ if (exec .Tp == tipb .ExecType_TypeExchangeSender && len (exec .ExchangeSender .UpstreamCteTaskMeta ) != 0 ) ||
387
+ (exec .Tp == tipb .ExecType_TypeExchangeReceiver && len (exec .ExchangeReceiver .OriginalCtePrdocuerTaskMeta ) != 0 ) {
388
+ for i := 0 ; i < slots ; i ++ {
389
+ sameZoneFlags = append (sameZoneFlags , true )
390
+ }
391
+ return true , sameZoneFlags
392
+ }
393
+
394
+ return false , sameZoneFlags
395
+ }
396
+
397
+ func (h * taskZoneInfoHelper ) collectExchangeZoneInfos (encodedTaskMeta [][]byte , slots int ) []string {
398
+ zoneInfos := make ([]string , 0 , slots )
399
+ for _ , taskBytes := range encodedTaskMeta {
400
+ taskMeta := & mpp.TaskMeta {}
401
+ err := taskMeta .Unmarshal (taskBytes )
402
+ if err != nil {
403
+ zoneInfos = append (zoneInfos , "" )
404
+ continue
405
+ }
406
+ zoneInfos = append (zoneInfos , h .allTiFlashZoneInfo [taskMeta .GetAddress ()])
407
+ }
408
+ return zoneInfos
409
+ }
410
+
411
+ func (h * taskZoneInfoHelper ) inferSameZoneFlag (exec * tipb.Executor , encodedTaskMeta [][]byte ) []bool {
412
+ slots := len (encodedTaskMeta )
413
+ sameZoneFlags := make ([]bool , 0 , slots )
414
+ filled := false
415
+ if filled , sameZoneFlags = h .tryQuickFillWithUncertainZones (exec , slots , sameZoneFlags ); filled {
416
+ return sameZoneFlags
417
+ }
418
+ zoneInfos , exist := h .exchangeZoneInfo [* exec .ExecutorId ]
419
+ if ! exist {
420
+ zoneInfos = h .collectExchangeZoneInfos (encodedTaskMeta , slots )
421
+ h .exchangeZoneInfo [* exec .ExecutorId ] = zoneInfos
422
+ }
423
+
424
+ if len (zoneInfos ) != slots {
425
+ // This branch is for safety purpose, not expected
426
+ for i := 0 ; i < slots ; i ++ {
427
+ sameZoneFlags = append (sameZoneFlags , true )
428
+ }
429
+ return sameZoneFlags
430
+ }
431
+
432
+ for i := 0 ; i < slots ; i ++ {
433
+ sameZoneFlags = append (sameZoneFlags , len (zoneInfos [i ]) == 0 || h .currentTaskZone == zoneInfos [i ])
434
+ }
435
+ return sameZoneFlags
436
+ }
437
+
438
+ func (h * taskZoneInfoHelper ) fillSameZoneFlagForExchange (exec * tipb.Executor ) {
439
+ children := make ([]* tipb.Executor , 0 , 2 )
440
+ switch exec .Tp {
441
+ case tipb .ExecType_TypeTableScan , tipb .ExecType_TypePartitionTableScan , tipb .ExecType_TypeIndexScan :
442
+ case tipb .ExecType_TypeSelection :
443
+ children = append (children , exec .Selection .Child )
444
+ case tipb .ExecType_TypeAggregation , tipb .ExecType_TypeStreamAgg :
445
+ children = append (children , exec .Aggregation .Child )
446
+ case tipb .ExecType_TypeTopN :
447
+ children = append (children , exec .TopN .Child )
448
+ case tipb .ExecType_TypeLimit :
449
+ children = append (children , exec .Limit .Child )
450
+ case tipb .ExecType_TypeExchangeSender :
451
+ children = append (children , exec .ExchangeSender .Child )
452
+ exec .ExchangeSender .SameZoneFlag = h .inferSameZoneFlag (exec , exec .ExchangeSender .EncodedTaskMeta )
453
+ case tipb .ExecType_TypeExchangeReceiver :
454
+ exec .ExchangeReceiver .SameZoneFlag = h .inferSameZoneFlag (exec , exec .ExchangeReceiver .EncodedTaskMeta )
455
+ case tipb .ExecType_TypeJoin :
456
+ children = append (children , exec .Join .Children ... )
457
+ case tipb .ExecType_TypeProjection :
458
+ children = append (children , exec .Projection .Child )
459
+ case tipb .ExecType_TypeWindow :
460
+ children = append (children , exec .Window .Child )
461
+ case tipb .ExecType_TypeSort :
462
+ children = append (children , exec .Sort .Child )
463
+ case tipb .ExecType_TypeExpand :
464
+ children = append (children , exec .Expand .Child )
465
+ case tipb .ExecType_TypeExpand2 :
466
+ children = append (children , exec .Expand2 .Child )
467
+ default :
468
+ logutil .BgLogger ().Warn (fmt .Sprintf ("unknown new tipb protocol %d" , exec .Tp ))
469
+ }
470
+ for _ , child := range children {
471
+ h .fillSameZoneFlagForExchange (child )
472
+ }
473
+ }
474
+
346
475
func getActualPhysicalPlan (plan base.Plan ) base.PhysicalPlan {
347
476
if plan == nil {
348
477
return nil
@@ -788,8 +917,24 @@ func (c *localMppCoordinator) Execute(ctx context.Context) (kv.Response, []kv.Ke
788
917
}
789
918
c .nodeCnt = len (nodeInfo )
790
919
920
+ var allTiFlashZoneInfo map [string ]string
921
+ if c .sessionCtx .GetStore () == nil {
922
+ allTiFlashZoneInfo = make (map [string ]string )
923
+ } else if tikvStore , ok := c .sessionCtx .GetStore ().(helper.Storage ); ok {
924
+ cache := tikvStore .GetRegionCache ()
925
+ allTiFlashStores := cache .GetTiFlashStores (tikv .LabelFilterNoTiFlashWriteNode )
926
+ allTiFlashZoneInfo = make (map [string ]string , len (allTiFlashStores ))
927
+ for _ , tiflashStore := range allTiFlashStores {
928
+ tiflashStoreAddr := tiflashStore .GetAddr ()
929
+ if tiflashZone , isSet := tiflashStore .GetLabelValue (placement .DCLabelKey ); isSet {
930
+ allTiFlashZoneInfo [tiflashStoreAddr ] = tiflashZone
931
+ }
932
+ }
933
+ } else {
934
+ allTiFlashZoneInfo = make (map [string ]string )
935
+ }
791
936
for _ , frag := range frags {
792
- err = c .appendMPPDispatchReq (frag )
937
+ err = c .appendMPPDispatchReq (frag , allTiFlashZoneInfo )
793
938
if err != nil {
794
939
return nil , nil , errors .Trace (err )
795
940
}
0 commit comments