Skip to content

Commit c164298

Browse files
authored
globalsort: use partitioned prefix to store intermediate files (#62905) (#62963)
close #62904
1 parent 81afa5c commit c164298

File tree

17 files changed

+363
-194
lines changed

17 files changed

+363
-194
lines changed

br/pkg/storage/memstore.go

Lines changed: 0 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,6 @@ func (s *MemStorage) DeleteFile(ctx context.Context, name string) error {
6969
default:
7070
// continue on
7171
}
72-
if !path.IsAbs(name) {
73-
return errors.Errorf("file name is not an absolute path: %s", name)
74-
}
7572
s.rwm.Lock()
7673
defer s.rwm.Unlock()
7774
if _, ok := s.dataStore[name]; !ok {
@@ -102,9 +99,6 @@ func (s *MemStorage) WriteFile(ctx context.Context, name string, data []byte) er
10299
default:
103100
// continue on
104101
}
105-
if !path.IsAbs(name) {
106-
return errors.Errorf("file name is not an absolute path: %s", name)
107-
}
108102
fileData := slices.Clone(data)
109103
s.rwm.Lock()
110104
defer s.rwm.Unlock()
@@ -128,9 +122,6 @@ func (s *MemStorage) ReadFile(ctx context.Context, name string) ([]byte, error)
128122
default:
129123
// continue on
130124
}
131-
if !path.IsAbs(name) {
132-
return nil, errors.Errorf("file name is not an absolute path: %s", name)
133-
}
134125
theFile, ok := s.loadMap(name)
135126
if !ok {
136127
return nil, errors.Errorf("cannot find the file: %s", name)
@@ -148,9 +139,6 @@ func (s *MemStorage) FileExists(ctx context.Context, name string) (bool, error)
148139
default:
149140
// continue on
150141
}
151-
if !path.IsAbs(name) {
152-
return false, errors.Errorf("file name is not an absolute path: %s", name)
153-
}
154142
_, ok := s.loadMap(name)
155143
return ok, nil
156144
}
@@ -161,9 +149,6 @@ func (s *MemStorage) Open(ctx context.Context, filePath string, o *ReaderOption)
161149
if err := ctx.Err(); err != nil {
162150
return nil, errors.Trace(err)
163151
}
164-
if !path.IsAbs(filePath) {
165-
return nil, errors.Errorf("file name is not an absolute path: %s", filePath)
166-
}
167152
theFile, ok := s.loadMap(filePath)
168153
if !ok {
169154
return nil, errors.Errorf("cannot find the file: %s", filePath)
@@ -248,9 +233,6 @@ func (s *MemStorage) Create(ctx context.Context, name string, _ *WriterOption) (
248233
default:
249234
// continue on
250235
}
251-
if !path.IsAbs(name) {
252-
return nil, errors.Errorf("file name is not an absolute path: %s", name)
253-
}
254236
s.rwm.Lock()
255237
defer s.rwm.Unlock()
256238
theFile := new(memFile)
@@ -269,9 +251,6 @@ func (s *MemStorage) Rename(ctx context.Context, oldFileName, newFileName string
269251
default:
270252
// continue on
271253
}
272-
if !path.IsAbs(newFileName) {
273-
return errors.Errorf("new file name is not an absolute path: %s", newFileName)
274-
}
275254
s.rwm.Lock()
276255
defer s.rwm.Unlock()
277256
theFile, ok := s.dataStore[oldFileName]

pkg/disttask/importinto/planner_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,11 @@ func TestSplitForOneSubtask(t *testing.T) {
344344
multiFileStat = s.MultipleFilesStats
345345
}).
346346
Build(store, "/mock-test", "0")
347-
_, _, err = external.MockExternalEngineWithWriter(
348-
store, writer, "/mock-test", keys, values,
349-
)
347+
for i := range keys {
348+
err := writer.WriteRow(ctx, keys[i], values[i], nil)
349+
require.NoError(t, err)
350+
}
351+
require.NoError(t, writer.Close(ctx))
350352
require.NoError(t, err)
351353
kvMeta := &external.SortedKVMeta{
352354
StartKey: keys[0],

pkg/executor/test/indexmergereadtest/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test")
22

33
go_test(
44
name = "indexmergereadtest_test",
5-
timeout = "short",
5+
timeout = "moderate",
66
srcs = [
77
"index_merge_reader_test.go",
88
"main_test.go",

pkg/lightning/backend/external/bench_test.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,14 @@
1515
package external
1616

1717
import (
18+
"bytes"
1819
"context"
1920
"encoding/hex"
2021
goerrors "errors"
2122
"flag"
2223
"fmt"
2324
"io"
25+
"strings"
2426
"testing"
2527
"time"
2628

@@ -88,11 +90,9 @@ func writePlainFile(s *writeTestSuite) {
8890
}
8991

9092
func cleanOldFiles(ctx context.Context, store storage.ExternalStorage, subDir string) {
91-
dataFiles, statFiles, err := GetAllFileNames(ctx, store, subDir)
93+
filenames, err := GetAllFileNames(ctx, store, subDir)
9294
intest.AssertNoError(err)
93-
err = store.DeleteFiles(ctx, dataFiles)
94-
intest.AssertNoError(err)
95-
err = store.DeleteFiles(ctx, statFiles)
95+
err = store.DeleteFiles(ctx, filenames)
9696
intest.AssertNoError(err)
9797
}
9898

@@ -249,7 +249,7 @@ type readTestSuite struct {
249249

250250
func readFileSequential(t *testing.T, s *readTestSuite) {
251251
ctx := context.Background()
252-
files, _, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
252+
files, _, err := getKVAndStatFilesByScan(ctx, s.store, "/"+s.subDir)
253253
intest.AssertNoError(err)
254254

255255
buf := make([]byte, s.memoryLimit)
@@ -285,9 +285,32 @@ func readFileSequential(t *testing.T, s *readTestSuite) {
285285
)
286286
}
287287

288+
func getKVAndStatFilesByScan(ctx context.Context,
289+
store storage.ExternalStorage,
290+
nonPartitionedDir string,
291+
) ([]string, []string, error) {
292+
names, err := GetAllFileNames(ctx, store, nonPartitionedDir)
293+
if err != nil {
294+
return nil, nil, err
295+
}
296+
var data, stats []string
297+
for _, path := range names {
298+
bs := []byte(path)
299+
lastIdx := bytes.LastIndexByte(bs, '/')
300+
secondLastIdx := bytes.LastIndexByte(bs[:lastIdx], '/')
301+
parentDir := path[secondLastIdx+1 : lastIdx]
302+
if strings.HasSuffix(parentDir, statSuffix) {
303+
stats = append(stats, path)
304+
} else {
305+
data = append(data, path)
306+
}
307+
}
308+
return data, stats, nil
309+
}
310+
288311
func readFileConcurrently(t *testing.T, s *readTestSuite) {
289312
ctx := context.Background()
290-
files, _, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
313+
files, _, err := getKVAndStatFilesByScan(ctx, s.store, "/"+s.subDir)
291314
intest.AssertNoError(err)
292315

293316
conc := min(s.concurrency, len(files))
@@ -336,7 +359,7 @@ func readFileConcurrently(t *testing.T, s *readTestSuite) {
336359

337360
func readMergeIter(t *testing.T, s *readTestSuite) {
338361
ctx := context.Background()
339-
files, _, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
362+
files, _, err := getKVAndStatFilesByScan(ctx, s.store, "/"+s.subDir)
340363
intest.AssertNoError(err)
341364

342365
if s.beforeCreateReader != nil {
@@ -484,7 +507,7 @@ type mergeTestSuite struct {
484507

485508
func mergeStep(t *testing.T, s *mergeTestSuite) {
486509
ctx := context.Background()
487-
datas, _, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
510+
datas, _, err := getKVAndStatFilesByScan(ctx, s.store, "/"+s.subDir)
488511
intest.AssertNoError(err)
489512

490513
mergeOutput := "merge_output"
@@ -527,7 +550,7 @@ func mergeStep(t *testing.T, s *mergeTestSuite) {
527550

528551
func newMergeStep(t *testing.T, s *mergeTestSuite) {
529552
ctx := context.Background()
530-
datas, stats, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
553+
datas, stats, err := getKVAndStatFilesByScan(ctx, s.store, "/"+s.subDir)
531554
intest.AssertNoError(err)
532555

533556
mergeOutput := "merge_output"
@@ -647,7 +670,7 @@ func TestReadAllDataLargeFiles(t *testing.T) {
647670
writeExternalOneFile(suite2)
648671
t.Logf("minKey: %s, maxKey: %s", minKey, maxKey)
649672

650-
dataFiles, statFiles, err := GetAllFileNames(ctx, store, "")
673+
dataFiles, statFiles, err := getKVAndStatFilesByScan(ctx, store, "")
651674
intest.AssertNoError(err)
652675
intest.Assert(len(dataFiles) == 2)
653676

@@ -800,7 +823,7 @@ func TestReadAllData(t *testing.T) {
800823

801824
finishCreateFiles:
802825

803-
dataFiles, statFiles, err := GetAllFileNames(ctx, store, "/")
826+
dataFiles, statFiles, err := getKVAndStatFilesByScan(ctx, store, "/")
804827
require.NoError(t, err)
805828
require.Equal(t, 2091, len(dataFiles))
806829

pkg/lightning/backend/external/byte_reader_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,12 @@ func TestSwitchMode(t *testing.T) {
214214
t.Logf("seed: %d", seed)
215215
st := storage.NewMemStorage()
216216
// Prepare
217+
var kvAndStat [2]string
217218
ctx := context.Background()
218219
writer := NewWriterBuilder().
219220
SetPropSizeDistance(100).
220221
SetPropKeysDistance(2).
222+
SetOnCloseFunc(func(summary *WriterSummary) { kvAndStat = summary.MultipleFilesStats[0].Filenames[0] }).
221223
BuildOneFile(st, "/test", "0")
222224

223225
writer.InitPartSizeAndLogger(ctx, 5*1024*1024)
@@ -244,9 +246,9 @@ func TestSwitchMode(t *testing.T) {
244246
require.NoError(t, err)
245247
pool := membuf.NewPool()
246248
ConcurrentReaderBufferSizePerConc = rand.Intn(100) + 1
247-
kvReader, err := NewKVReader(context.Background(), "/test/0/one-file", st, 0, 64*1024)
249+
kvReader, err := NewKVReader(context.Background(), kvAndStat[0], st, 0, 64*1024)
248250
require.NoError(t, err)
249-
kvReader.byteReader.enableConcurrentRead(st, "/test/0/one-file", 100, ConcurrentReaderBufferSizePerConc, pool.NewBuffer())
251+
kvReader.byteReader.enableConcurrentRead(st, kvAndStat[0], 100, ConcurrentReaderBufferSizePerConc, pool.NewBuffer())
250252
modeUseCon := false
251253
i := 0
252254
for {

pkg/lightning/backend/external/iter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -305,7 +305,7 @@ func testMergeIterSwitchMode(t *testing.T, f func([]byte, int) []byte) {
305305
err := writer.Close(context.Background())
306306
require.NoError(t, err)
307307

308-
dataNames, _, err := GetAllFileNames(context.Background(), st, "")
308+
dataNames, _, err := getKVAndStatFilesByScan(context.Background(), st, "testprefix")
309309
require.NoError(t, err)
310310

311311
offsets := make([]uint64, len(dataNames))

pkg/lightning/backend/external/onefile_writer.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ package external
1717
import (
1818
"context"
1919
"encoding/binary"
20+
"math/rand"
2021
"path/filepath"
2122
"slices"
2223
"time"
@@ -62,6 +63,7 @@ type OneFileWriter struct {
6263
// file information.
6364
writerID string
6465
filenamePrefix string
66+
rnd *rand.Rand
6567
dataFile string
6668
statFile string
6769
dataWriter storage.ExternalFileWriter
@@ -97,14 +99,14 @@ func (w *OneFileWriter) lazyInitWriter(ctx context.Context) (err error) {
9799
return nil
98100
}
99101

100-
dataFile := filepath.Join(w.filenamePrefix, "one-file")
102+
dataFile := filepath.Join(w.getPartitionedPrefix(), "one-file")
101103
dataWriter, err := w.store.Create(ctx, dataFile, &storage.WriterOption{
102104
Concurrency: maxUploadWorkersPerThread,
103105
PartSize: w.partSize})
104106
if err != nil {
105107
return err
106108
}
107-
statFile := filepath.Join(w.filenamePrefix+statSuffix, "one-file")
109+
statFile := filepath.Join(w.getPartitionedPrefix()+statSuffix, "one-file")
108110
statWriter, err := w.store.Create(ctx, statFile, &storage.WriterOption{
109111
Concurrency: maxUploadWorkersPerThread,
110112
PartSize: MinUploadPartSize})
@@ -127,7 +129,7 @@ func (w *OneFileWriter) lazyInitDupFile(ctx context.Context) error {
127129
return nil
128130
}
129131

130-
dupFile := filepath.Join(w.filenamePrefix+dupSuffix, "one-file")
132+
dupFile := filepath.Join(w.getPartitionedPrefix()+dupSuffix, "one-file")
131133
dupWriter, err := w.store.Create(ctx, dupFile, &storage.WriterOption{
132134
// too many duplicates will cause duplicate resolution part very slow,
133135
// we temporarily use 1 as we don't expect too many duplicates, if there
@@ -347,6 +349,10 @@ func (w *OneFileWriter) closeImpl(ctx context.Context) (err error) {
347349
return nil
348350
}
349351

352+
func (w *OneFileWriter) getPartitionedPrefix() string {
353+
return randPartitionedPrefix(w.filenamePrefix, w.rnd)
354+
}
355+
350356
// caller should make sure the buf is large enough to hold the encoded data.
351357
func encodeToBuf(buf, key, value []byte) {
352358
intest.Assert(len(buf) == lengthBytes*2+len(key)+len(value))

0 commit comments

Comments
 (0)