Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions metamorphic/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
OpDBFlush
OpDBRatchetFormatMajorVersion
OpDBRestart
OpDBCrashDuringOpen
OpDBEstimateDiskUsage
OpIterClose
OpIterFirst
Expand Down Expand Up @@ -159,6 +160,7 @@ func DefaultOpConfig() OpConfig {
OpDBFlush: 2,
OpDBRatchetFormatMajorVersion: 1,
OpDBRestart: 2,
OpDBCrashDuringOpen: 1,
OpDBEstimateDiskUsage: 1,
OpIterClose: 5,
OpIterFirst: 100,
Expand Down Expand Up @@ -221,6 +223,7 @@ func ReadOpConfig() OpConfig {
OpDBFlush: 0,
OpDBRatchetFormatMajorVersion: 0,
OpDBRestart: 0,
OpDBCrashDuringOpen: 0,
OpDBEstimateDiskUsage: 0,
OpIterClose: 5,
OpIterFirst: 100,
Expand Down Expand Up @@ -280,6 +283,7 @@ func WriteOpConfig() OpConfig {
OpDBFlush: 2,
OpDBRatchetFormatMajorVersion: 1,
OpDBRestart: 2,
OpDBCrashDuringOpen: 1,
OpDBEstimateDiskUsage: 1,
OpIterClose: 0,
OpIterFirst: 0,
Expand Down
49 changes: 28 additions & 21 deletions metamorphic/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func (g *generator) generate(count uint64) []op {
OpDBDownload: g.dbDownload,
OpDBFlush: g.dbFlush,
OpDBRatchetFormatMajorVersion: g.dbRatchetFormatMajorVersion,
OpDBRestart: g.dbRestart,
OpDBRestart: g.dbRestart(false /* shouldCrashDuringOpen */),
OpDBCrashDuringOpen: g.dbRestart(true /* shouldCrashDuringOpen */),
OpDBEstimateDiskUsage: g.dbEstimateDiskUsage,
OpIterClose: g.randIter(g.iterClose),
OpIterFirst: g.randIter(g.iterFirst),
Expand Down Expand Up @@ -465,27 +466,33 @@ func (g *generator) dbRatchetFormatMajorVersion() {
g.add(&dbRatchetFormatMajorVersionOp{dbID: dbID, vers: vers})
}

func (g *generator) dbRestart() {
// Close any live iterators and snapshots, so that we can close the DB
// cleanly.
dbID := g.dbs.rand(g.rng)
for len(g.liveIters) > 0 {
g.randIter(g.iterClose)()
}
for len(g.liveSnapshots) > 0 {
g.snapshotClose()
}
// Close the batches.
for len(g.liveBatches) > 0 {
batchID := g.liveBatches[0]
g.removeBatchFromGenerator(batchID)
g.add(&closeOp{objID: batchID})
}
if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) {
panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d",
len(g.liveReaders), len(g.liveWriters)))
func (g *generator) dbRestart(shouldCrashDuringOpen bool) func() {
return func() {
// Close any live iterators and snapshots, so that we can close the DB
// cleanly.
dbID := g.dbs.rand(g.rng)
for len(g.liveIters) > 0 {
g.randIter(g.iterClose)()
}
for len(g.liveSnapshots) > 0 {
g.snapshotClose()
}
// Close the batches.
for len(g.liveBatches) > 0 {
batchID := g.liveBatches[0]
g.removeBatchFromGenerator(batchID)
g.add(&closeOp{objID: batchID})
}
if len(g.liveReaders) != len(g.dbs) || len(g.liveWriters) != len(g.dbs) {
panic(fmt.Sprintf("unexpected counts: liveReaders %d, liveWriters: %d",
len(g.liveReaders), len(g.liveWriters)))
}
if shouldCrashDuringOpen {
g.add(&dbUncleanRestartOp{dbID: dbID})
} else {
g.add(&dbRestartOp{dbID: dbID})
}
}
g.add(&dbRestartOp{dbID: dbID})
}

// maybeSetSnapshotIterBounds must be called whenever creating a new iterator or
Expand Down
1 change: 1 addition & 0 deletions metamorphic/key_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,6 +883,7 @@ func opWrittenKeys(untypedOp op) [][]byte {
case *closeOp:
case *compactOp:
case *dbRestartOp:
case *dbUncleanRestartOp:
case *deleteOp:
return [][]byte{t.key}
case *deleteRangeOp:
Expand Down
2 changes: 1 addition & 1 deletion metamorphic/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -560,7 +560,7 @@ func RunOnce(t TestingT, runDir string, seed uint64, historyPath string, rOpts .
// multi-instance mode.
testOpts.Opts.WALFailover = nil
} else {
testOpts.Opts.WALFailover.Secondary.FS = opts.FS
testOpts.Opts.WALFailover.Secondary.FS = vfs.NewCrashableMem()
}
}

Expand Down
33 changes: 32 additions & 1 deletion metamorphic/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -1965,7 +1965,7 @@ type dbRestartOp struct {
}

func (o *dbRestartOp) run(t *Test, h historyRecorder) {
if err := t.restartDB(o.dbID); err != nil {
if err := t.restartDB(o.dbID, false /* shouldCrashDuringOpen */); err != nil {
h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
h.history.err.Store(errors.Wrap(err, "dbRestartOp"))
} else {
Expand All @@ -1980,6 +1980,37 @@ func (o *dbRestartOp) syncObjs() objIDSlice { return o.affectedObjec
func (o *dbRestartOp) rewriteKeys(func(UserKey) UserKey) {}
func (o *dbRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }

// dbUncleanRestartOp performs an unclean restart like dbRestartOp, but also
// starts a concurrent goroutine that calls CrashClone during the Open and uses
// that clone to do a second Open. This tests crashing during Open with
// concurrent operations.
type dbUncleanRestartOp struct {
dbID objID

// affectedObjects is the list of additional objects that are affected by this
// operation, and which syncObjs() must return so that we don't perform the
// restart in parallel with other operations to affected objects.
affectedObjects []objID
}

func (o *dbUncleanRestartOp) run(t *Test, h historyRecorder) {
if err := t.restartDB(o.dbID, true /* shouldCrashDuringOpen */); err != nil {
h.Recordf("%s // %v", o.formattedString(t.testOpts.KeyFormat), err)
h.history.err.Store(errors.Wrap(err, "dbCrashDuringOpenOp"))
} else {
h.Recordf("%s", o.formattedString(t.testOpts.KeyFormat))
}
}

func (o *dbUncleanRestartOp) formattedString(KeyFormat) string {
return fmt.Sprintf("%s.RestartWithCrashClone()", o.dbID)
}
func (o *dbUncleanRestartOp) receiver() objID { return o.dbID }
func (o *dbUncleanRestartOp) syncObjs() objIDSlice { return o.affectedObjects }

func (o *dbUncleanRestartOp) rewriteKeys(func(UserKey) UserKey) {}
func (o *dbUncleanRestartOp) diagramKeyRanges() []pebble.KeyRange { return nil }

func formatOps(kf KeyFormat, ops []op) string {
var buf strings.Builder
for _, op := range ops {
Expand Down
3 changes: 2 additions & 1 deletion metamorphic/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,9 @@ func RandomOptions(rng *rand.Rand, kf KeyFormat, cfg RandomOptionsCfg) *TestOpti
// maintains a maximum history 120 entries, so the healthy interval
// must not exceed 119x the probe interval.
healthyInterval := scaleDuration(probeInterval, 1.0, 119.0)
newMem := vfs.NewCrashableMem()
opts.WALFailover = &pebble.WALFailoverOptions{
Secondary: wal.Dir{FS: vfs.Default, Dirname: pebble.MakeStoreRelativePath(vfs.Default, "wal_secondary")},
Secondary: wal.Dir{FS: newMem, Dirname: pebble.MakeStoreRelativePath(newMem, "wal_secondary")},
FailoverOptions: wal.FailoverOptions{
PrimaryDirProbeInterval: probeInterval,
HealthyProbeLatencyThreshold: healthyThreshold,
Expand Down
13 changes: 13 additions & 0 deletions metamorphic/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ func opArgs(op op) (receiverID *objID, targetID *objID, args []interface{}) {
return &t.dbID, nil, []interface{}{&t.vers}
case *dbRestartOp:
return &t.dbID, nil, nil
case *dbUncleanRestartOp:
return &t.dbID, nil, nil
case *deleteOp:
return &t.writerID, nil, []interface{}{&t.key}
case *deleteRangeOp:
Expand Down Expand Up @@ -177,6 +179,7 @@ var methods = map[string]*methodInfo{
"RatchetFormatMajorVersion": makeMethod(dbRatchetFormatMajorVersionOp{}, dbTag),
"Replicate": makeMethod(replicateOp{}, dbTag),
"Restart": makeMethod(dbRestartOp{}, dbTag),
"RestartWithCrashClone": makeMethod(dbUncleanRestartOp{}, dbTag),
"SeekGE": makeMethod(iterSeekGEOp{}, iterTag),
"SeekLT": makeMethod(iterSeekLTOp{}, iterTag),
"SeekPrefixGE": makeMethod(iterSeekPrefixGEOp{}, iterTag),
Expand Down Expand Up @@ -749,6 +752,16 @@ func computeDerivedFields(ops []op) {
}
// Sort so the output is deterministic.
slices.Sort(v.affectedObjects)
case *dbUncleanRestartOp:
// Find all objects that use this db.
v.affectedObjects = nil
for obj, db := range objToDB {
if db == v.dbID {
v.affectedObjects = append(v.affectedObjects, obj)
}
}
// Sort so the output is deterministic.
slices.Sort(v.affectedObjects)
case *ingestOp:
v.derivedDBIDs = make([]objID, len(v.batchIDs))
for i := range v.batchIDs {
Expand Down
106 changes: 97 additions & 9 deletions metamorphic/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package metamorphic
import (
"fmt"
"io"
"math/rand/v2"
"os"
"path"
"runtime/debug"
Expand Down Expand Up @@ -321,10 +322,11 @@ func (t *Test) minFMV() pebble.FormatMajorVersion {
return minVersion
}

func (t *Test) restartDB(dbID objID) error {
func (t *Test) restartDB(dbID objID, shouldCrashDuringOpen bool) error {
db := t.getDB(dbID)
// If strictFS is not used, we use pebble.NoSync for writeOpts, so we can't
// restart the database (even if we don't revert to synced data).
// If strictFS is not used, no-op since we end up using pebble.NoSync for
// writeOpts. In the case of pebble.NoSync, we can't restart the database
// even if we don't revert to synced data.
if !t.testOpts.strictFS {
return nil
}
Expand All @@ -348,15 +350,26 @@ func (t *Test) restartDB(dbID objID) error {
}
}
t.opts.FS = crashFS
var slowFS *errorfs.FS
// If we should crash during Open, inject some latency into the filesystem
// so that the first Open is slow enough for us to capture some arbitrary
// intermediate state.
if shouldCrashDuringOpen {
seed := time.Now().UnixNano()
t.opts.Logger.Infof("seed %d", seed)
mean := time.Duration(rand.IntN(20) + 10*int(time.Millisecond))
t.opts.Logger.Infof("Injecting mean %s of latency with p=%.3f", mean, 1.0)
slowFS = errorfs.Wrap(crashFS,
errorfs.RandomLatency(errorfs.Randomly(1.0, seed), mean, seed, time.Second))
t.opts.FS = slowFS
}
t.opts.WithFSDefaults()
// We want to set the new FS in testOpts too, so they are propagated to the
// TestOptions that were used with metamorphic.New().
t.testOpts.Opts.FS = t.opts.FS
if t.opts.WALFailover != nil {
t.opts.WALFailover.Secondary.FS = t.opts.FS
t.testOpts.Opts.WALFailover.Secondary.FS = t.opts.FS
}

firstOpenDone := make(chan struct{})
secondOpenDone := make(chan struct{})
// TODO(jackson): Audit errorRate and ensure custom options' hooks semantics
// are well defined within the context of retries.
err := t.withRetries(func() (err error) {
Expand All @@ -373,15 +386,90 @@ func (t *Test) restartDB(dbID objID) error {
dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
}
o := t.finalizeOptions()
if shouldCrashDuringOpen {
go func() {
err = t.simulateCrashDuringOpen(dbID, slowFS, secondOpenDone, firstOpenDone)
}()
if err != nil {
return err
}
}
t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o)
if err != nil {
return err
if shouldCrashDuringOpen {
firstOpenDone <- struct{}{}
}
return err
})
if shouldCrashDuringOpen {
<-secondOpenDone
}
return err
}

func (t *Test) simulateCrashDuringOpen(
dbID objID, slowFS *errorfs.FS, secondOpenDone, firstOpenDone chan struct{},
) error {
defer func() { secondOpenDone <- struct{}{} }()

// Wait a bit for the first Open to make some progress.
time.Sleep(30 * time.Millisecond)

// Create a crash clone of the current filesystem state.
rng := rand.New(rand.NewPCG(0, uint64(time.Now().UnixNano())))
crashCloneFS, err := slowFS.CrashClone(vfs.CrashCloneCfg{
UnsyncedDataPercent: rng.IntN(101),
RNG: rng,
})
if err != nil {
return err
}

// After the first Open has completed, close the resulting DB and open the
// second DB.
<-firstOpenDone
err = t.dbs[dbID.slot()-1].Close()
if err != nil {
return err
}
// Release any resources held by custom options. This may be used, for
// example, by the encryption-at-rest custom option (within the Cockroach
// repository) to close the file registry.
for i := range t.testOpts.CustomOpts {
if err := t.testOpts.CustomOpts[i].Close(t.opts); err != nil {
return err
}
}
t.opts.FS = crashCloneFS
if t.opts.WALFailover != nil {
ccsmemFS := t.opts.WALFailover.Secondary.FS.(*vfs.MemFS)
crashCloneSecondaryFS := ccsmemFS.CrashClone(vfs.CrashCloneCfg{
UnsyncedDataPercent: rng.IntN(101),
RNG: rng,
})
t.testOpts.Opts.WALFailover.Secondary.FS = crashCloneSecondaryFS
t.opts.WALFailover.Secondary.FS = crashCloneSecondaryFS
}
// Reacquire any resources required by custom options. This may be used, for
// example, by the encryption-at-rest custom option (within the Cockroach
// repository) to reopen the file registry.
for i := range t.testOpts.CustomOpts {
if err := t.testOpts.CustomOpts[i].Open(t.opts); err != nil {
return err
}
}
// Create a copy of options for the second DB.
dir := t.dir
if len(t.dbs) > 1 {
dir = path.Join(dir, fmt.Sprintf("db%d", dbID.slot()))
}
o := t.finalizeOptions()
t.dbs[dbID.slot()-1], err = pebble.Open(dir, &o)
if err != nil {
return err
}
return nil
}

func (t *Test) saveInMemoryDataInternal() error {
if rootFS := vfs.Root(t.opts.FS); rootFS != vfs.Default {
// t.opts.FS is an in-memory system; copy it to disk.
Expand Down
9 changes: 9 additions & 0 deletions vfs/errorfs/errorfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,15 @@ func (fs *FS) Stat(name string) (vfs.FileInfo, error) {
return fs.fs.Stat(name)
}

// CrashClone implements MemFS.CrashClone.
func (fs *FS) CrashClone(cfg vfs.CrashCloneCfg) (*vfs.MemFS, error) {
memFs, ok := fs.fs.(*vfs.MemFS)
if !ok {
return nil, errors.New("not a MemFS")
}
return memFs.CrashClone(cfg), nil
}

// errorFile implements vfs.File. The interface is implemented on the pointer
// type to allow pointer equality comparisons.
type errorFile struct {
Expand Down
Loading