Skip to content

Commit 412fa13

Browse files
authored
lightning: extract region job's accessing data to an interface (#45717)
ref #45719
1 parent fc9738b commit 412fa13

File tree

6 files changed

+201
-137
lines changed

6 files changed

+201
-137
lines changed

br/pkg/lightning/backend/local/engine.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -947,9 +947,11 @@ func (e *Engine) newKVIter(ctx context.Context, opts *pebble.IterOptions) Iter {
947947
return newDupDetectIter(e.getDB(), e.keyAdapter, opts, e.duplicateDB, logger, e.dupDetectOpt)
948948
}
949949

950-
// getFirstAndLastKey reads the first and last key in range [lowerBound, upperBound)
950+
var _ ingestData = (*Engine)(nil)
951+
952+
// GetFirstAndLastKey reads the first and last key in range [lowerBound, upperBound)
951953
// in the engine. Empty upperBound means unbounded.
952-
func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) {
954+
func (e *Engine) GetFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []byte, error) {
953955
if len(upperBound) == 0 {
954956
// we use empty slice for unbounded upper bound, but it means max value in pebble
955957
// so reset to nil
@@ -980,6 +982,22 @@ func (e *Engine) getFirstAndLastKey(lowerBound, upperBound []byte) ([]byte, []by
980982
return firstKey, lastKey, nil
981983
}
982984

985+
// NewIter implements ingestData interface.
986+
func (e *Engine) NewIter(ctx context.Context, lowerBound, upperBound []byte) ForwardIter {
987+
return e.newKVIter(ctx, &pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound})
988+
}
989+
990+
// GetTS implements ingestData interface.
991+
func (e *Engine) GetTS() uint64 {
992+
return e.TS
993+
}
994+
995+
// Finish implements ingestData interface.
996+
func (e *Engine) Finish(totalBytes, totalCount int64) {
997+
e.importedKVSize.Add(totalBytes)
998+
e.importedKVCount.Add(totalCount)
999+
}
1000+
9831001
type sstMeta struct {
9841002
path string
9851003
minKey []byte

br/pkg/lightning/backend/local/engine_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,27 +142,27 @@ func TestGetFirstAndLastKey(t *testing.T) {
142142
err = db.Set([]byte("e"), []byte("e"), nil)
143143
require.NoError(t, err)
144144

145-
first, last, err := f.getFirstAndLastKey(nil, nil)
145+
first, last, err := f.GetFirstAndLastKey(nil, nil)
146146
require.NoError(t, err)
147147
require.Equal(t, []byte("a"), first)
148148
require.Equal(t, []byte("e"), last)
149149

150-
first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("d"))
150+
first, last, err = f.GetFirstAndLastKey([]byte("b"), []byte("d"))
151151
require.NoError(t, err)
152152
require.Equal(t, []byte("c"), first)
153153
require.Equal(t, []byte("c"), last)
154154

155-
first, last, err = f.getFirstAndLastKey([]byte("b"), []byte("f"))
155+
first, last, err = f.GetFirstAndLastKey([]byte("b"), []byte("f"))
156156
require.NoError(t, err)
157157
require.Equal(t, []byte("c"), first)
158158
require.Equal(t, []byte("e"), last)
159159

160-
first, last, err = f.getFirstAndLastKey([]byte("y"), []byte("z"))
160+
first, last, err = f.GetFirstAndLastKey([]byte("y"), []byte("z"))
161161
require.NoError(t, err)
162162
require.Nil(t, first)
163163
require.Nil(t, last)
164164

165-
first, last, err = f.getFirstAndLastKey([]byte("e"), []byte(""))
165+
first, last, err = f.GetFirstAndLastKey([]byte("e"), []byte(""))
166166
require.NoError(t, err)
167167
require.Equal(t, []byte("e"), first)
168168
require.Equal(t, []byte("e"), last)

br/pkg/lightning/backend/local/iterator.go

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -28,25 +28,12 @@ import (
2828

2929
// Iter abstract iterator method for Ingester.
3030
type Iter interface {
31+
ForwardIter
3132
// Seek seek to specify position.
3233
// if key not found, seeks next key position in iter.
3334
Seek(key []byte) bool
34-
// Error return current error on this iter.
35-
Error() error
36-
// First moves this iter to the first key.
37-
First() bool
3835
// Last moves this iter to the last key.
3936
Last() bool
40-
// Valid check this iter reach the end.
41-
Valid() bool
42-
// Next moves this iter forward.
43-
Next() bool
44-
// Key represents current position pair's key.
45-
Key() []byte
46-
// Value represents current position pair's Value.
47-
Value() []byte
48-
// Close close this iter.
49-
Close() error
5037
// OpType represents operations of pair. currently we have two types.
5138
// 1. Put
5239
// 2. Delete

br/pkg/lightning/backend/local/local.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1024,7 +1024,7 @@ func (local *Backend) readAndSplitIntoRange(
10241024
sizeLimit int64,
10251025
keysLimit int64,
10261026
) ([]Range, error) {
1027-
firstKey, lastKey, err := engine.getFirstAndLastKey(nil, nil)
1027+
firstKey, lastKey, err := engine.GetFirstAndLastKey(nil, nil)
10281028
if err != nil {
10291029
return nil, err
10301030
}
@@ -1191,7 +1191,7 @@ var fakeRegionJobs map[[2]string]struct {
11911191
// It will retry internally when scan region meet error.
11921192
func (local *Backend) generateJobForRange(
11931193
ctx context.Context,
1194-
engine *Engine,
1194+
engine ingestData,
11951195
keyRange Range,
11961196
regionSplitSize, regionSplitKeys int64,
11971197
) ([]*regionJob, error) {
@@ -1210,7 +1210,7 @@ func (local *Backend) generateJobForRange(
12101210
})
12111211

12121212
start, end := keyRange.start, keyRange.end
1213-
pairStart, pairEnd, err := engine.getFirstAndLastKey(start, end)
1213+
pairStart, pairEnd, err := engine.GetFirstAndLastKey(start, end)
12141214
if err != nil {
12151215
return nil, err
12161216
}
@@ -1247,7 +1247,7 @@ func (local *Backend) generateJobForRange(
12471247
keyRange: intersectRange(region.Region, Range{start: start, end: end}),
12481248
region: region,
12491249
stage: regionScanned,
1250-
engine: engine,
1250+
ingestData: engine,
12511251
regionSplitSize: regionSplitSize,
12521252
regionSplitKeys: regionSplitKeys,
12531253
metrics: local.metrics,
@@ -1283,7 +1283,7 @@ func (local *Backend) startWorker(
12831283
case needRescan:
12841284
jobs, err2 := local.generateJobForRange(
12851285
ctx,
1286-
job.engine,
1286+
job.ingestData,
12871287
job.keyRange,
12881288
job.regionSplitSize,
12891289
job.regionSplitKeys,

0 commit comments

Comments
 (0)