Skip to content

Commit 8aed757

Browse files
committed
fix 'import cycle' for package tidb/ddl.
create sysvar TiDBFastDDL (#3) * Change TiDBFastDDL to Global scope. * TiDBFastDDL Variable usage 1, At beginning of doReorgWorkForCreateIndex, use global variable TiDBFastDDL to detemine whether enable lightning backfill process. After set up lightning, then set reorgInfo.IsLightningEnabled to true to show the lightning add index is uded for this DDL job. 2, Later, check reorgInfo.IsLightningEnabled to control the DDL job level lightning's process. Complete lightning env initlize procdeure (#4) 1. Get max open file limition for lightning, default 1024 2. Set light sorted path: 2.1 If sysVar DataDir is not a start with / data dir path, then set path to /tmp/lightning 2.2 otherwise set path to DataDir + "lightning" Check whether the lightning sorted path is exist and a dir, if yes then keep use it. otherwise create new one. 3, Set the lightning dir quota, default 10G Add one unit test uniform lightning related errmessage text to lightning_error file. (#5) complete memory track module work. refacter memmory manager to resource manager for expand to control concurrent base available cpu core number. Finsih integrate with lightning concurrency process logic. Add log for lightning processing. refactor some code` complete memory track module work. (#7) complete user cancel and exception part logic implement * complete memory track module work. * refacter memmory manager to resource manager for expand to control concurrent base available cpu core number. Finsih integrate with lightning concurrency process logic. * Add log for lightning processing. refactor some code` restore logic first stage adjust metric of ddl process progress value for lightning solution refine import cycle set up disk quota refine code and add some ut and ft. Add config paramemter TiDBlightningSortPath for setting sort parth for add index. fix mem reclaim problems Add log infromation combine the optimizes: 1, prundecode 2, json expression Add some log information
1 parent 2ba2a9e commit 8aed757

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+3411
-327
lines changed

br/pkg/conn/util/util.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package util
2+
3+
import (
4+
"context"
5+
6+
"github.com/pingcap/errors"
7+
"github.com/pingcap/kvproto/pkg/metapb"
8+
pd "github.com/tikv/pd/client"
9+
10+
errors2 "github.com/pingcap/tidb/br/pkg/errors"
11+
"github.com/pingcap/tidb/br/pkg/version"
12+
)
13+
14+
// GetAllTiKVStores returns all TiKV stores registered to the PD client. The
15+
// stores must not be a tombstone and must never contain a label `engine=tiflash`.
16+
func GetAllTiKVStores(
17+
ctx context.Context,
18+
pdClient pd.Client,
19+
storeBehavior StoreBehavior,
20+
) ([]*metapb.Store, error) {
21+
// get all live stores.
22+
stores, err := pdClient.GetAllStores(ctx, pd.WithExcludeTombstone())
23+
if err != nil {
24+
return nil, errors.Trace(err)
25+
}
26+
27+
// filter out all stores which are TiFlash.
28+
j := 0
29+
for _, store := range stores {
30+
isTiFlash := false
31+
if version.IsTiFlash(store) {
32+
if storeBehavior == SkipTiFlash {
33+
continue
34+
} else if storeBehavior == ErrorOnTiFlash {
35+
return nil, errors.Annotatef(errors2.ErrPDInvalidResponse,
36+
"cannot restore to a cluster with active TiFlash stores (store %d at %s)", store.Id, store.Address)
37+
}
38+
isTiFlash = true
39+
}
40+
if !isTiFlash && storeBehavior == TiFlashOnly {
41+
continue
42+
}
43+
stores[j] = store
44+
j++
45+
}
46+
return stores[:j], nil
47+
}
48+
49+
// StoreBehavior is the action to do in GetAllTiKVStores when a non-TiKV
50+
// store (e.g. TiFlash store) is found.
51+
type StoreBehavior uint8
52+
53+
const (
54+
// ErrorOnTiFlash causes GetAllTiKVStores to return error when the store is
55+
// found to be a TiFlash node.
56+
ErrorOnTiFlash StoreBehavior = 0
57+
// SkipTiFlash causes GetAllTiKVStores to skip the store when it is found to
58+
// be a TiFlash node.
59+
SkipTiFlash StoreBehavior = 1
60+
// TiFlashOnly caused GetAllTiKVStores to skip the store which is not a
61+
// TiFlash node.
62+
TiFlashOnly StoreBehavior = 2
63+
)

br/pkg/lightning/backend/backend.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,9 @@ type AbstractBackend interface {
209209
// ResolveDuplicateRows resolves duplicated rows by deleting/inserting data
210210
// according to the required algorithm.
211211
ResolveDuplicateRows(ctx context.Context, tbl table.Table, tableName string, algorithm config.DuplicateResolutionAlgorithm) error
212+
213+
// Total Memory usage. This is only used for local backend
214+
TotalMemoryConsume() int64
212215
}
213216

214217
// Backend is the delivery target for Lightning
@@ -280,6 +283,10 @@ func (be Backend) FlushAll(ctx context.Context) error {
280283
return be.abstract.FlushAllEngines(ctx)
281284
}
282285

286+
func (be Backend) TotalMemoryConsume() int64 {
287+
return be.abstract.TotalMemoryConsume()
288+
}
289+
283290
// CheckDiskQuota verifies if the total engine file size is below the given
284291
// quota. If the quota is exceeded, this method returns an array of engines,
285292
// which after importing can decrease the total size below quota.
@@ -406,11 +413,20 @@ func (engine *OpenedEngine) LocalWriter(ctx context.Context, cfg *LocalWriterCon
406413
return &LocalEngineWriter{writer: w, tableName: engine.tableName}, nil
407414
}
408415

416+
func (engine *OpenedEngine) TotalMemoryConsume() int64 {
417+
return engine.engine.backend.TotalMemoryConsume()
418+
}
419+
409420
// WriteRows writes a collection of encoded rows into the engine.
410421
func (w *LocalEngineWriter) WriteRows(ctx context.Context, columnNames []string, rows kv.Rows) error {
411422
return w.writer.AppendRows(ctx, w.tableName, columnNames, rows)
412423
}
413424

425+
// WriteRows writes a collection of encoded rows into the engine.
426+
func (w *LocalEngineWriter) WriteRow(ctx context.Context, columnNames []string, kvs []common.KvPair) error {
427+
return w.writer.AppendRow(ctx, w.tableName, columnNames, kvs)
428+
}
429+
414430
func (w *LocalEngineWriter) Close(ctx context.Context) (ChunkFlushStatus, error) {
415431
return w.writer.Close(ctx)
416432
}
@@ -493,6 +509,16 @@ type EngineWriter interface {
493509
columnNames []string,
494510
rows kv.Rows,
495511
) error
512+
AppendRow(
513+
ctx context.Context,
514+
tableName string,
515+
columnNames []string,
516+
kvs []common.KvPair,
517+
) error
496518
IsSynced() bool
497519
Close(ctx context.Context) (ChunkFlushStatus, error)
498520
}
521+
522+
func (oe *OpenedEngine) GetEngineUuid() uuid.UUID {
523+
return oe.uuid
524+
}

br/pkg/lightning/backend/kv/session_test.go renamed to br/pkg/lightning/backend/kv/kvtest/session_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,18 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
package kv
15+
package kvtest
1616

1717
import (
1818
"testing"
1919

20+
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
2021
"github.com/pingcap/tidb/parser/mysql"
2122
"github.com/stretchr/testify/require"
2223
)
2324

2425
func TestSession(t *testing.T) {
25-
session := newSession(&SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890})
26+
session := kv.NewSession(&kv.SessionOptions{SQLMode: mysql.ModeNone, Timestamp: 1234567890})
2627
_, err := session.Txn(true)
2728
require.NoError(t, err)
2829
}

0 commit comments

Comments
 (0)