1
+ // Copyright 2022 PingCAP, Inc.
2
+ //
3
+ // Licensed under the Apache License, Version 2.0 (the "License");
4
+ // you may not use this file except in compliance with the License.
5
+ // You may obtain a copy of the License at
6
+ //
7
+ // http://www.apache.org/licenses/LICENSE-2.0
8
+ //
9
+ // Unless required by applicable law or agreed to in writing, software
10
+ // distributed under the License is distributed on an "AS IS" BASIS,
11
+ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
+ // See the License for the specific language governing permissions and
13
+ // limitations under the License.
1
14
package lightning
2
15
3
16
import (
@@ -10,7 +23,6 @@ import (
10
23
"github.com/pingcap/tidb/br/pkg/lightning/backend"
11
24
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
12
25
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
13
- "github.com/pingcap/tidb/br/pkg/lightning/common"
14
26
"github.com/pingcap/tidb/br/pkg/lightning/config"
15
27
"github.com/pingcap/tidb/meta"
16
28
"github.com/pingcap/tidb/parser/model"
@@ -22,31 +34,23 @@ import (
22
34
"github.com/twmb/murmur3"
23
35
"go.uber.org/zap"
24
36
)
25
-
26
37
type engineInfo struct {
27
38
Id int32
28
39
key string
29
- backend * backend. Backend
40
+ BackCtx * BackendContext
30
41
OpenedEngine * backend.OpenedEngine
31
42
writer * backend.LocalEngineWriter
32
43
cfg * backend.EngineConfig
33
44
// TODO: use channel later;
34
45
ref int32
35
- kvs []common.KvPair
36
46
tbl * model.TableInfo
37
47
isOpened bool
38
- //exec *sessionPool
39
- }
40
-
41
- func (ei * engineInfo ) ResetCache () {
42
- ei .kvs = ei .kvs [:0 ]
43
- // ei.size = 0
44
48
}
45
49
46
- func (ei * engineInfo ) Init (key string , cfg * backend.EngineConfig , be * backend. Backend , en * backend.OpenedEngine , tbl * model.TableInfo ) {
50
+ func (ei * engineInfo ) Init (key string , cfg * backend.EngineConfig , bCtx * BackendContext , en * backend.OpenedEngine , tbl * model.TableInfo ) {
47
51
ei .key = key
48
52
ei .cfg = cfg
49
- ei .backend = be
53
+ ei .BackCtx = bCtx
50
54
ei .OpenedEngine = en
51
55
ei .tbl = tbl
52
56
ei .isOpened = false
@@ -79,7 +83,7 @@ func (ei *engineInfo) unsafeImportAndReset(ctx context.Context) error {
79
83
return fmt .Errorf ("FinishIndexOp err:%w" , err )
80
84
}
81
85
82
- if err = ei .backend .FlushAll (ctx ); err != nil {
86
+ if err = ei .BackCtx . Backend .FlushAll (ctx ); err != nil {
83
87
//LogError("flush engine for disk quota failed, check again later : %v", err)
84
88
return err
85
89
}
@@ -89,7 +93,7 @@ func (ei *engineInfo) unsafeImportAndReset(ctx context.Context) error {
89
93
}
90
94
ctx = context .WithValue (ctx , RegionSizeStats , ret )
91
95
_ , uuid := backend .MakeUUID (ei .tbl .Name .String (), ei .Id )
92
- return ei .backend .UnsafeImportAndReset (ctx , uuid , int64 (config .SplitRegionSize )* int64 (config .MaxSplitRegionSizeRatio ))
96
+ return ei .BackCtx . Backend .UnsafeImportAndReset (ctx , uuid , int64 (config .SplitRegionSize )* int64 (config .MaxSplitRegionSizeRatio ))
93
97
}
94
98
95
99
func GenEngineKey (schemaId int64 , tableId int64 , indexId int64 ) string {
@@ -124,14 +128,14 @@ func CreateEngine(ctx context.Context, job *model.Job, t *meta.Meta, backendKey
124
128
h .Write (b [:])
125
129
eid := int32 (h .Sum32 ())
126
130
127
- bc := GlobalLightningEnv .BackendCache . bcCache [backendKey ]
131
+ bc := GlobalLightningEnv .BackendCache [backendKey ]
128
132
be := bc .Backend
129
133
130
134
en , err := be .OpenEngine (ctx , & cfg , job .TableName , eid )
131
135
if err != nil {
132
136
return errors .Errorf ("PrepareIndexOp.OpenEngine err:%v" , err )
133
137
}
134
- ei .Init (engineKey , & cfg , be , en , tblInfo )
138
+ ei .Init (engineKey , & cfg , bc , en , tblInfo )
135
139
GlobalLightningEnv .EngineManager .engineCache [engineKey ] = ei
136
140
bc .EngineCache [engineKey ] = ei
137
141
@@ -156,7 +160,7 @@ func FlushKeyValSync(ctx context.Context, keyEngineInfo string, cache *WorkerKVC
156
160
err = ei .unsafeImportAndReset (ctx )
157
161
if err != nil {
158
162
// LogError("unsafeImportAndReset %s cost %v", size2str(sn.szInc.encodeSize), time.Now().Sub(start))
159
- // 仅仅是导入失败,下次还是可以继续导入,不影响 .
163
+ // Only import failed, next time can import continue .
160
164
return nil
161
165
}
162
166
sn .importAndReset (start )
@@ -229,7 +233,7 @@ func fetchTableRegionSizeStats(tblId int64, exec sqlexec.RestrictedSQLExecutor)
229
233
return ret , nil
230
234
}
231
235
232
- // TODO: 如果多个 线程 同时调用该函数,是否存在这样情况? 如果存在,该怎么处理 ?
236
+ // TODO: If multi thread call this function, how to handle this logic ?
233
237
func FinishIndexOp (ctx context.Context , keyEngineInfo string , tbl table.Table , unique bool ) (err error ) {
234
238
ei , err := GlobalLightningEnv .EngineManager .GetEngineInfo (keyEngineInfo )
235
239
if err != nil {
@@ -275,7 +279,7 @@ func FinishIndexOp(ctx context.Context, keyEngineInfo string, tbl table.Table, u
275
279
return errors .New ("engine.Cleanup err" )
276
280
}
277
281
if unique {
278
- hasDupe , err := ei .backend .CollectRemoteDuplicateRows (ctx , tbl , ei .tbl .Name .O , & kv.SessionOptions {
282
+ hasDupe , err := ei .BackCtx . Backend .CollectRemoteDuplicateRows (ctx , tbl , ei .tbl .Name .O , & kv.SessionOptions {
279
283
SQLMode : mysql .ModeStrictAllTables ,
280
284
SysVars : defaultImportantVariables ,
281
285
})
@@ -291,13 +295,3 @@ func FinishIndexOp(ctx context.Context, keyEngineInfo string, tbl table.Table, u
291
295
GlobalLightningEnv .EngineManager .ReleaseEngine (keyEngineInfo )
292
296
return nil
293
297
}
294
-
295
- var defaultImportantVariables = map [string ]string {
296
- "max_allowed_packet" : "67108864" ,
297
- "div_precision_increment" : "4" ,
298
- "time_zone" : "SYSTEM" ,
299
- "lc_time_names" : "en_US" ,
300
- "default_week_format" : "0" ,
301
- "block_encryption_mode" : "aes-128-ecb" ,
302
- "group_concat_max_len" : "1024" ,
303
- }
0 commit comments