Skip to content

Commit 091eeab

Browse files
committed
cloud: create and use an objstorage.Writable writer wrapper
Previously, CRDB code interacting with the pebble SST writer used a wrapper that no-op'd Finish/Abort calls on the cloud object. This caused us to write invalid SSTs to cloud storage when we would have otherwise aborted the write. Implementing Finish/Abort also lets us clean up some of the lifetime handling. Creating an SST writer normally takes ownership of the object. But since we no-op'd the cleanup methods, we had to hold onto the object and manually Close it to trigger the flush. Issue: #153055 Epic: CRDB-53946 Release note: none
1 parent 50fb4c1 commit 091eeab

31 files changed

+217
-170
lines changed

pkg/backup/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,7 @@ go_test(
343343
"@com_github_cockroachdb_datadriven//:datadriven",
344344
"@com_github_cockroachdb_errors//:errors",
345345
"@com_github_cockroachdb_errors//oserror",
346+
"@com_github_cockroachdb_pebble//objstorage",
346347
"@com_github_cockroachdb_pebble//sstable",
347348
"@com_github_cockroachdb_pebble//vfs",
348349
"@com_github_cockroachdb_redact//:redact",

pkg/backup/backupinfo/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ go_library(
5151
"//pkg/util/timeutil",
5252
"//pkg/util/tracing",
5353
"@com_github_cockroachdb_errors//:errors",
54+
"@com_github_cockroachdb_pebble//objstorage",
5455
"@com_github_klauspost_compress//gzip",
5556
],
5657
)

pkg/backup/backupinfo/desc_sst.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -49,19 +49,15 @@ func WriteDescsSST(
4949
if err != nil {
5050
return err
5151
}
52-
defer w.Close()
5352
descSST := storage.MakeTransportSSTWriter(ctx, dest.Settings(), w)
5453
defer descSST.Close()
5554

56-
if err := writeDescsToMetadata(ctx, descSST, m); err != nil {
57-
return err
58-
}
59-
60-
if err := descSST.Finish(); err != nil {
55+
err = writeDescsToMetadata(ctx, descSST, m)
56+
if err != nil {
6157
return err
6258
}
6359

64-
return w.Close()
60+
return descSST.Finish()
6561
}
6662

6763
func DescChangesLess(

pkg/backup/backupinfo/external_sst_util.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package backupinfo
88
import (
99
"bytes"
1010
"context"
11-
"io"
1211

1312
"github.com/cockroachdb/cockroach/pkg/backup/backupencryption"
1413
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
@@ -17,6 +16,7 @@ import (
1716
"github.com/cockroachdb/cockroach/pkg/keys"
1817
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
1918
"github.com/cockroachdb/cockroach/pkg/storage"
19+
"github.com/cockroachdb/pebble/objstorage"
2020
)
2121

2222
func makeWriter(
@@ -25,8 +25,8 @@ func makeWriter(
2525
filename string,
2626
enc *jobspb.BackupEncryptionOptions,
2727
kmsEnv cloud.KMSEnv,
28-
) (io.WriteCloser, error) {
29-
w, err := dest.Writer(ctx, filename)
28+
) (objstorage.Writable, error) {
29+
w, err := cloud.OpenPebbleWriter(ctx, dest, filename)
3030
if err != nil {
3131
return nil, err
3232
}

pkg/backup/backupinfo/file_sst.go

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ func writeFilesSST(
5555
if err != nil {
5656
return err
5757
}
58-
defer w.Close()
5958
fileSST := storage.MakeTransportSSTWriter(ctx, dest.Settings(), w)
6059
defer fileSST.Close()
6160

@@ -75,11 +74,7 @@ func writeFilesSST(
7574
}
7675
}
7776

78-
err = fileSST.Finish()
79-
if err != nil {
80-
return err
81-
}
82-
return w.Close()
77+
return fileSST.Finish()
8378
}
8479

8580
func encodeFileSSTKey(spanStart roachpb.Key, filename string) roachpb.Key {

pkg/backup/backupsink/BUILD.bazel

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ go_library(
2626
"//pkg/util/unique",
2727
"@com_github_cockroachdb_errors//:errors",
2828
"@com_github_gogo_protobuf//types",
29-
"@com_github_kr_pretty//:pretty",
3029
],
3130
)
3231

@@ -55,6 +54,7 @@ go_test(
5554
"//pkg/util/leaktest",
5655
"//pkg/util/log",
5756
"@com_github_cockroachdb_errors//:errors",
57+
"@com_github_cockroachdb_pebble//objstorage",
5858
"@com_github_gogo_protobuf//types",
5959
"@com_github_stretchr_testify//require",
6060
],

pkg/backup/backupsink/file_sst_sink.go

Lines changed: 15 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package backupsink
88
import (
99
"bytes"
1010
"context"
11-
io "io"
1211

1312
"github.com/cockroachdb/cockroach/pkg/backup/backuppb"
1413
"github.com/cockroachdb/cockroach/pkg/base"
@@ -25,7 +24,6 @@ import (
2524
"github.com/cockroachdb/cockroach/pkg/util/log"
2625
"github.com/cockroachdb/errors"
2726
gogotypes "github.com/gogo/protobuf/types"
28-
"github.com/kr/pretty"
2927
)
3028

3129
var (
@@ -58,10 +56,11 @@ type FileSSTSink struct {
5856
conf SSTSinkConf
5957
pacer *admission.Pacer
6058

61-
sst storage.SSTWriter
59+
isOpen bool
60+
sst storage.SSTWriter
61+
6262
ctx context.Context
6363
cancel func()
64-
out io.WriteCloser
6564
outName string
6665

6766
flushedFiles []backuppb.BackupManifest_File
@@ -133,7 +132,7 @@ func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key
133132
}
134133

135134
// Initialize the writer if needed.
136-
if s.out == nil {
135+
if !s.isOpen {
137136
if err := s.open(ctx); err != nil {
138137
return nil, err
139138
}
@@ -232,16 +231,14 @@ func (s *FileSSTSink) Close() error {
232231
s.cancel()
233232
}
234233

235-
var err error
236-
if s.out != nil {
237-
err = s.out.Close()
238-
}
239234
s.sst.Close()
240-
return err
235+
s.isOpen = false
236+
237+
return nil
241238
}
242239

243240
func (s *FileSSTSink) Flush(ctx context.Context) error {
244-
if s.out == nil {
241+
if !s.isOpen {
245242
// If the writer was not initialized but the sink has reported completed
246243
// spans then there were empty ExportRequests that were processed by the
247244
// owner of this sink. These still need to reported to the coordinator as
@@ -279,13 +276,9 @@ func (s *FileSSTSink) Flush(ctx context.Context) error {
279276
if err := s.sst.Finish(); err != nil {
280277
return err
281278
}
282-
if err := s.out.Close(); err != nil {
283-
log.Warningf(ctx, "failed to close write in FileSSTSink: % #v", pretty.Formatter(err))
284-
return errors.Wrap(err, "writing SST")
285-
}
286279
wroteSize := s.sst.Meta.Size
287280
s.outName = ""
288-
s.out = nil
281+
s.isOpen = false
289282

290283
for i := range s.flushedFiles {
291284
s.flushedFiles[i].BackingFileSize = wroteSize
@@ -322,30 +315,29 @@ func (s *FileSSTSink) open(ctx context.Context) error {
322315
if s.ctx == nil {
323316
s.ctx, s.cancel = context.WithCancel(ctx)
324317
}
325-
w, err := s.dest.Writer(s.ctx, s.outName)
318+
w, err := cloud.OpenPebbleWriter(s.ctx, s.dest, s.outName)
326319
if err != nil {
327320
return err
328321
}
329-
s.out = w
330322
if s.conf.Enc != nil {
331-
e, err := storageccl.EncryptingWriter(w, s.conf.Enc.Key)
323+
w, err = storageccl.EncryptingWriter(w, s.conf.Enc.Key)
332324
if err != nil {
333325
return err
334326
}
335-
s.out = e
336327
}
337-
// TODO(dt): make ExternalStorage.Writer return objstorage.Writable.
338-
//
328+
// NOTE: the sst writer takes ownership of the object writer. So we don't
329+
// hold on to it to close it.
339330
// Value blocks are disabled since such SSTs can be huge (e.g. 750MB in the
340331
// mixed_version_backup.go roachtest), which can cause OOMs due to value
341332
// block buffering.
342333
s.sst = storage.MakeIngestionSSTWriterWithOverrides(
343-
ctx, s.dest.Settings(), storage.NoopFinishAbortWritable(s.out),
334+
ctx, s.dest.Settings(), w,
344335
storage.WithValueBlocksDisabled,
345336
storage.WithCompressionFromClusterSetting(
346337
ctx, s.dest.Settings(), storage.CompressionAlgorithmBackupStorage,
347338
),
348339
)
340+
s.isOpen = true
349341

350342
return nil
351343
}

pkg/backup/backupsink/file_sst_sink_test.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
package backupsink
77

88
import (
9-
"bytes"
109
"context"
1110
"fmt"
1211
"reflect"
@@ -29,6 +28,7 @@ import (
2928
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3029
"github.com/cockroachdb/cockroach/pkg/util/log"
3130
"github.com/cockroachdb/errors"
31+
"github.com/cockroachdb/pebble/objstorage"
3232
"github.com/gogo/protobuf/types"
3333
"github.com/stretchr/testify/require"
3434
)
@@ -43,13 +43,13 @@ func TestFileSSTSinkExtendOneFile(t *testing.T) {
4343
ctx := context.Background()
4444

4545
getKeys := func(prefix string, n int) []byte {
46-
var b bytes.Buffer
46+
var b objstorage.MemObj
4747
sst := storage.MakeTransportSSTWriter(ctx, cluster.MakeTestingClusterSettings(), &b)
4848
for i := 0; i < n; i++ {
4949
require.NoError(t, sst.PutUnversioned([]byte(fmt.Sprintf("%s%08d", prefix, i)), nil))
5050
}
5151
sst.Close()
52-
return b.Bytes()
52+
return b.Data()
5353
}
5454

5555
exportResponse1 := ExportedSpan{
@@ -661,8 +661,8 @@ func TestFileSSTSinkCopyPointKeys(t *testing.T) {
661661
},
662662
} {
663663
t.Run(tt.name, func(t *testing.T) {
664-
buf := &bytes.Buffer{}
665-
sst := storage.MakeTransportSSTWriter(ctx, settings, buf)
664+
buf := objstorage.MemObj{}
665+
sst := storage.MakeTransportSSTWriter(ctx, settings, &buf)
666666
sink := FileSSTSink{sst: sst}
667667
compareSST := true
668668

@@ -713,7 +713,7 @@ func TestFileSSTSinkCopyPointKeys(t *testing.T) {
713713
UpperBound: keys.MaxKey,
714714
}
715715

716-
iter, err := storage.NewMemSSTIterator(buf.Bytes(), false, iterOpts)
716+
iter, err := storage.NewMemSSTIterator(buf.Data(), false, iterOpts)
717717
if err != nil {
718718
t.Fatal(err)
719719
}
@@ -840,8 +840,8 @@ func TestFileSSTSinkCopyRangeKeys(t *testing.T) {
840840
},
841841
} {
842842
t.Run(tt.name, func(t *testing.T) {
843-
buf := &bytes.Buffer{}
844-
sst := storage.MakeTransportSSTWriter(ctx, settings, buf)
843+
buf := objstorage.MemObj{}
844+
sst := storage.MakeTransportSSTWriter(ctx, settings, &buf)
845845
sink := FileSSTSink{sst: sst}
846846
compareSST := true
847847

@@ -887,7 +887,7 @@ func TestFileSSTSinkCopyRangeKeys(t *testing.T) {
887887
UpperBound: keys.MaxKey,
888888
}
889889

890-
iter, err := storage.NewMemSSTIterator(buf.Bytes(), false, iterOpts)
890+
iter, err := storage.NewMemSSTIterator(buf.Data(), false, iterOpts)
891891
if err != nil {
892892
t.Fatal(err)
893893
}
@@ -1029,8 +1029,8 @@ func (b *exportedSpanBuilder) build() ExportedSpan {
10291029
func (b *exportedSpanBuilder) buildWithEncoding(stringToKey func(string) roachpb.Key) ExportedSpan {
10301030
ctx := context.Background()
10311031
settings := cluster.MakeTestingClusterSettings()
1032-
buf := &bytes.Buffer{}
1033-
sst := storage.MakeTransportSSTWriter(ctx, settings, buf)
1032+
buf := objstorage.MemObj{}
1033+
sst := storage.MakeTransportSSTWriter(ctx, settings, &buf)
10341034
for _, d := range b.keyValues {
10351035
v := roachpb.Value{}
10361036
v.SetBytes(d.value)
@@ -1058,7 +1058,7 @@ func (b *exportedSpanBuilder) buildWithEncoding(stringToKey func(string) roachpb
10581058

10591059
sst.Close()
10601060

1061-
b.es.DataSST = buf.Bytes()
1061+
b.es.DataSST = buf.Data()
10621062

10631063
return *b.es
10641064
}

pkg/backup/backupsink/sst_sink_key_writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func (s *SSTSinkKeyWriter) Reset(ctx context.Context, newSpan roachpb.Span) erro
135135
return err
136136
}
137137
}
138-
if s.out == nil {
138+
if !s.isOpen {
139139
if err := s.open(ctx); err != nil {
140140
return err
141141
}

pkg/backup/bench_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ package backup
88
import (
99
"context"
1010
"fmt"
11-
"io"
1211
"math/rand"
1312
"testing"
1413

@@ -29,6 +28,7 @@ import (
2928
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
3029
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3130
"github.com/cockroachdb/cockroach/pkg/util/log"
31+
"github.com/cockroachdb/pebble/objstorage"
3232
"github.com/stretchr/testify/require"
3333
)
3434

@@ -59,8 +59,8 @@ func BenchmarkIteratorMemory(b *testing.B) {
5959
makeWriter := func(
6060
store cloud.ExternalStorage,
6161
filename string,
62-
enc *jobspb.BackupEncryptionOptions) (io.WriteCloser, error) {
63-
w, err := store.Writer(ctx, filename)
62+
enc *jobspb.BackupEncryptionOptions) (objstorage.Writable, error) {
63+
w, err := cloud.OpenPebbleWriter(ctx, store, filename)
6464
if err != nil {
6565
return nil, err
6666
}
@@ -87,7 +87,7 @@ func BenchmarkIteratorMemory(b *testing.B) {
8787
}
8888
}
8989

90-
writeSST := func(w io.WriteCloser, store cloud.ExternalStorage, payloadSize int, numKeys int) {
90+
writeSST := func(w objstorage.Writable, store cloud.ExternalStorage, payloadSize int, numKeys int) {
9191
sst := storage.MakeTransportSSTWriter(ctx, store.Settings(), w)
9292

9393
buf := make([]byte, payloadSize)
@@ -156,7 +156,7 @@ func BenchmarkIteratorMemory(b *testing.B) {
156156
require.NoError(b, err)
157157

158158
writeSST(w, store, 100, rows)
159-
require.NoError(b, w.Close())
159+
require.NoError(b, w.Finish())
160160

161161
sz, err := store.Size(ctx, filename)
162162
require.NoError(b, err)

0 commit comments

Comments
 (0)