Skip to content

Commit de0c5a1

Browse files
committed
db: rework SSTable corruption handling
This commit reworks SSTable corruption handling. Instead of Pebble calling `Fatalf` in some cases, and the higher layer (CRDB) calling it in other cases, we add a `DataCorruption` event. The default handler calls `Fatalf` but a custom handler can be used for different behavior. We rework the `MustExist` semantics to mark the not-exist error as a corruption error and rely on the same reporting mechanism. We implement this by adding a callback to report corruption to `block.ReadEnv`. This was necessary because some of the information we need (like the user key bounds) are only available at a higher level.
1 parent a8591e2 commit de0c5a1

File tree

20 files changed

+325
-72
lines changed

20 files changed

+325
-72
lines changed

compaction.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2562,8 +2562,10 @@ func (d *DB) runCopyCompaction(
25622562

25632563
// NB: external files are always virtual.
25642564
var wrote uint64
2565-
err = d.fileCache.withVirtualReader(inputMeta.VirtualMeta(), func(r sstable.VirtualReader) error {
2565+
err = d.fileCache.withVirtualReader(ctx, block.NoReadEnv, inputMeta.VirtualMeta(), func(r sstable.VirtualReader, _ block.ReadEnv) error {
25662566
var err error
2567+
// TODO(radu): plumb a ReadEnv to CopySpan (it could use the buffer pool
2568+
// or update category stats).
25672569
wrote, err = sstable.CopySpan(ctx,
25682570
src, r.UnsafeReader(), d.opts.MakeReaderOptions(),
25692571
w, d.opts.MakeWriterOptions(c.outputLevel.level, d.TableFormat()),

error_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"time"
1717

1818
"github.com/cockroachdb/errors"
19+
"github.com/cockroachdb/pebble/internal/base"
1920
"github.com/cockroachdb/pebble/internal/testkeys"
2021
"github.com/cockroachdb/pebble/vfs"
2122
"github.com/cockroachdb/pebble/vfs/errorfs"
@@ -271,7 +272,7 @@ func TestCorruptReadError(t *testing.T) {
271272
opts := &Options{
272273
DisableTableStats: true,
273274
FS: fs,
274-
Logger: panicLogger{},
275+
Logger: base.NoopLoggerAndTracer{},
275276
FormatMajorVersion: formatVersion,
276277
}
277278
d, err := Open("", opts)

event.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ import (
1616
"github.com/cockroachdb/pebble/internal/humanize"
1717
"github.com/cockroachdb/pebble/internal/invariants"
1818
"github.com/cockroachdb/pebble/internal/manifest"
19+
"github.com/cockroachdb/pebble/objstorage"
20+
"github.com/cockroachdb/pebble/objstorage/remote"
1921
"github.com/cockroachdb/pebble/vfs"
2022
"github.com/cockroachdb/redact"
2123
)
@@ -45,6 +47,35 @@ func formatFileNums(tables []TableInfo) string {
4547
return buf.String()
4648
}
4749

50+
// DataCorruptionInfo contains the information for a DataCorruption event.
51+
type DataCorruptionInfo struct {
52+
// Path of the file that is corrupted. For remote files the path starts with
53+
// "remote://".
54+
Path string
55+
IsRemote bool
56+
// Locator is only set when IsRemote is true (note that an empty Locator is
57+
// valid even then).
58+
Locator remote.Locator
59+
// Bounds indicates the keyspace range that is affected.
60+
Bounds base.UserKeyBounds
61+
// Details of the error. See cockroachdb/error for how to format with or
62+
// without redaction.
63+
Details error
64+
}
65+
66+
func (i DataCorruptionInfo) String() string {
67+
return redact.StringWithoutMarkers(i)
68+
}
69+
70+
// SafeFormat implements redact.SafeFormatter.
71+
func (i DataCorruptionInfo) SafeFormat(w redact.SafePrinter, _ rune) {
72+
w.Printf("on-disk corruption: %s", redact.Safe(i.Path))
73+
if i.IsRemote {
74+
w.Printf(" (remote locator %q)", redact.Safe(i.Locator))
75+
}
76+
w.Printf("; bounds: %s; details: %+v", i.Bounds.String(), i.Details)
77+
}
78+
4879
// LevelInfo contains info pertaining to a particular level.
4980
type LevelInfo struct {
5081
Level int
@@ -693,6 +724,10 @@ type EventListener struct {
693724
// operation such as flush or compaction.
694725
BackgroundError func(error)
695726

727+
// DataCorruption is invoked when an on-disk corruption is detected. It should
728+
// not block, as it is called synchronously in read paths.
729+
DataCorruption func(DataCorruptionInfo)
730+
696731
// CompactionBegin is invoked after the inputs to a compaction have been
697732
// determined, but before the compaction has produced any output.
698733
CompactionBegin func(CompactionInfo)
@@ -797,6 +832,15 @@ func (l *EventListener) EnsureDefaults(logger Logger) {
797832
l.BackgroundError = func(error) {}
798833
}
799834
}
835+
if l.DataCorruption == nil {
836+
if logger != nil {
837+
l.DataCorruption = func(info DataCorruptionInfo) {
838+
logger.Fatalf("%s", info)
839+
}
840+
} else {
841+
l.DataCorruption = func(info DataCorruptionInfo) {}
842+
}
843+
}
800844
if l.CompactionBegin == nil {
801845
l.CompactionBegin = func(info CompactionInfo) {}
802846
}
@@ -873,6 +917,9 @@ func MakeLoggingEventListener(logger Logger) EventListener {
873917
BackgroundError: func(err error) {
874918
backgroundError(logger, err)
875919
},
920+
DataCorruption: func(info DataCorruptionInfo) {
921+
logger.Errorf("%s", info)
922+
},
876923
CompactionBegin: func(info CompactionInfo) {
877924
logger.Infof("%s", info)
878925
},
@@ -948,6 +995,10 @@ func TeeEventListener(a, b EventListener) EventListener {
948995
a.BackgroundError(err)
949996
b.BackgroundError(err)
950997
},
998+
DataCorruption: func(info DataCorruptionInfo) {
999+
a.DataCorruption(info)
1000+
b.DataCorruption(info)
1001+
},
9511002
CompactionBegin: func(info CompactionInfo) {
9521003
a.CompactionBegin(info)
9531004
b.CompactionBegin(info)
@@ -1098,3 +1149,35 @@ func (r *lowDiskSpaceReporter) findThreshold(
10981149
}
10991150
return threshold, ok
11001151
}
1152+
1153+
func (d *DB) reportSSTableCorruption(meta *manifest.TableMetadata, err error) {
1154+
if invariants.Enabled && err == nil {
1155+
panic("nil error")
1156+
}
1157+
objMeta, lookupErr := d.objProvider.Lookup(base.FileTypeTable, meta.FileBacking.DiskFileNum)
1158+
if lookupErr != nil {
1159+
// If the object is not known to the provider, it must be a local object
1160+
// that was missing when we opened the store. Remote objects have their
1161+
// metadata in a catalog, so even if the backing object is deleted, the
1162+
// DiskFileNum would still be known.
1163+
objMeta = objstorage.ObjectMetadata{DiskFileNum: meta.FileBacking.DiskFileNum, FileType: base.FileTypeTable}
1164+
}
1165+
path := d.objProvider.Path(objMeta)
1166+
if objMeta.IsRemote() {
1167+
// Remote path (which include the locator and full path) might not always be
1168+
// safe.
1169+
err = errors.WithHintf(err, "path: %s", path)
1170+
} else {
1171+
// Local paths are safe: they start with the store directory and the
1172+
// filename is generated by Pebble.
1173+
err = errors.WithHintf(err, "path: %s", redact.Safe(path))
1174+
}
1175+
info := DataCorruptionInfo{
1176+
Path: path,
1177+
IsRemote: objMeta.IsRemote(),
1178+
Locator: objMeta.Remote.Locator,
1179+
Bounds: meta.UserKeyBounds(),
1180+
Details: err,
1181+
}
1182+
d.opts.EventListener.DataCorruption(info)
1183+
}

event_listener_test.go

Lines changed: 75 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"fmt"
1111
"reflect"
1212
"runtime"
13+
"slices"
1314
"strings"
1415
"sync"
1516
"sync/atomic"
@@ -20,6 +21,7 @@ import (
2021
"github.com/cockroachdb/datadriven"
2122
"github.com/cockroachdb/errors"
2223
"github.com/cockroachdb/pebble/internal/base"
24+
"github.com/cockroachdb/pebble/internal/testutils"
2325
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
2426
"github.com/cockroachdb/pebble/sstable"
2527
"github.com/cockroachdb/pebble/vfs"
@@ -350,12 +352,6 @@ func TestEventListenerRedact(t *testing.T) {
350352
require.Equal(t, "[JOB 5] WAL delete error: unredacted error: ‹×›\n", log.String())
351353
}
352354

353-
func TestEventListenerEnsureDefaultsBackgroundError(t *testing.T) {
354-
e := EventListener{}
355-
e.EnsureDefaults(nil)
356-
e.BackgroundError(errors.New("an example error"))
357-
}
358-
359355
func TestEventListenerEnsureDefaultsSetsAllCallbacks(t *testing.T) {
360356
e := EventListener{}
361357
e.EnsureDefaults(nil)
@@ -542,3 +538,76 @@ type mockDiskUsageFS struct {
542538
func (fs *mockDiskUsageFS) GetDiskUsage(path string) (vfs.DiskUsage, error) {
543539
return fs.usage.Load().(vfs.DiskUsage), nil
544540
}
541+
542+
func TestSSTCorruptionEvent(t *testing.T) {
543+
for _, test := range []string{"missing-file", "missing-before-open", "meta-block-corruption", "data-block-corruption"} {
544+
t.Run(test, func(t *testing.T) {
545+
var mu sync.Mutex
546+
var events []DataCorruptionInfo
547+
fs := vfs.NewMem()
548+
opts := &Options{
549+
FS: fs,
550+
// We use panicLogger to avoid errors being printed and make sure Fatalf
551+
// is not called.
552+
Logger: panicLogger{},
553+
EventListener: &EventListener{
554+
DataCorruption: func(info DataCorruptionInfo) {
555+
mu.Lock()
556+
defer mu.Unlock()
557+
events = append(events, info)
558+
},
559+
},
560+
DisableAutomaticCompactions: true,
561+
}
562+
d, err := Open("", opts)
563+
require.NoError(t, err)
564+
key := func(k int) []byte {
565+
return []byte(fmt.Sprintf("key-%05d", k))
566+
}
567+
568+
for i := 0; i < 100; i++ {
569+
d.Set(key(i), []byte(fmt.Sprintf("value-%05d", i)), nil)
570+
}
571+
require.NoError(t, d.Flush())
572+
require.NoError(t, d.Compact([]byte("a"), []byte("z"), false /* parallelize */))
573+
574+
// We expect a single sst file.
575+
files := testutils.CheckErr(fs.List(""))
576+
files = slices.DeleteFunc(files, func(name string) bool {
577+
return !strings.HasSuffix(name, ".sst")
578+
})
579+
require.Lenf(t, files, 1, "expected a single sst file, got %v", files)
580+
sstFileName := files[0]
581+
582+
switch test {
583+
case "missing-file":
584+
require.NoError(t, fs.Remove(sstFileName))
585+
case "missing-before-open":
586+
require.NoError(t, d.Close())
587+
require.NoError(t, fs.Remove(sstFileName))
588+
opts.DisableConsistencyCheck = true
589+
d, err = Open("", opts)
590+
require.NoError(t, err)
591+
case "meta-block-corruption":
592+
buf, err := fs.UnsafeGetFileDataBuffer(sstFileName)
593+
require.NoError(t, err)
594+
buf[len(buf)-100]++
595+
case "data-block-corruption":
596+
buf, err := fs.UnsafeGetFileDataBuffer(sstFileName)
597+
require.NoError(t, err)
598+
buf[20]++
599+
default:
600+
t.Fatalf("invalid test")
601+
}
602+
_, _, err = d.Get(key(5))
603+
require.Error(t, err)
604+
require.Greater(t, len(events), 0)
605+
info := events[0]
606+
require.Equal(t, info.Path, sstFileName)
607+
require.False(t, info.IsRemote)
608+
require.Equal(t, base.UserKeyBoundsInclusive(key(0), key(99)), info.Bounds)
609+
610+
d.Close()
611+
})
612+
}
613+
}

0 commit comments

Comments
 (0)