diff --git a/leveldb/db_compaction.go b/leveldb/db_compaction.go index cc275ace..815023e6 100644 --- a/leveldb/db_compaction.go +++ b/leveldb/db_compaction.go @@ -545,7 +545,7 @@ func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error) func (b *tableCompactionBuilder) revert() error { for _, at := range b.rec.addedTables { b.s.logf("table@build revert @%d", at.num) - if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil { + if err := b.db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil { return err } } @@ -685,14 +685,36 @@ type cCmd interface { type cAuto struct { // Note for table compaction, an non-empty ackC represents it's a compaction waiting command. ackC chan<- error + + // Channel used for synchronization + cAck chan struct{} + + // Add a sync.Once to ensure cAck is closed only once + cAckOnce sync.Once + + // Mutex to protect channel operations + mu sync.Mutex +} + +// Helper to safely close cAck only once. +func (c *cAuto) closeAck() { + c.cAckOnce.Do(func() { + close(c.cAck) + }) } func (r cAuto) ack(err error) { if r.ackC != nil { + r.mu.Lock() + defer r.mu.Unlock() defer func() { _ = recover() }() - r.ackC <- err + select { + case r.ackC <- err: + default: + // Channel might be closed, avoid blocking + } } } @@ -722,10 +744,11 @@ func (db *DB) compTrigger(compC chan<- cCmd) { // This will trigger auto compaction and/or wait for all compaction to be done. func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { ch := make(chan error) - defer close(ch) + ca := cAuto{ackC: ch, cAck: make(chan struct{})} + defer ca.closeAck() // Send cmd. select { - case compC <- cAuto{ch}: + case compC <- ca: case err = <-db.compErrC: return case <-db.closeC: @@ -738,6 +761,7 @@ func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) { case <-db.closeC: return ErrClosed } + // No need to close(ch) here, let the receiver close it safely if needed. return err } @@ -781,9 +805,16 @@ func (db *DB) mCompaction() { for { select { case x = <-db.mcompCmdC: - switch x.(type) { + switch cmd := x.(type) { case cAuto: db.memCompaction() + // Safely send on cAck, recover if closed + if cmd.cAck != nil { + func() { + defer func() { _ = recover() }() + cmd.cAck <- struct{}{} + }() + } x.ack(nil) x = nil default: @@ -862,6 +893,13 @@ func (db *DB) tCompaction() { waitQ = append(waitQ, x) } } + // Safely send on cAck, recover if closed + if cmd.cAck != nil { + func() { + defer func() { _ = recover() }() + cmd.cAck <- struct{}{} + }() + } case cRange: x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max)) default: @@ -872,3 +910,7 @@ func (db *DB) tCompaction() { db.tableAutoCompaction() } } + +// No changes required for Issue #434 in this file. +// The fix should be applied in leveldb/cache/cache.go. +// See the PR description below for details.