Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 47 additions & 5 deletions leveldb/db_compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ func (b *tableCompactionBuilder) run(cnt *compactionTransactCounter) (err error)
func (b *tableCompactionBuilder) revert() error {
for _, at := range b.rec.addedTables {
b.s.logf("table@build revert @%d", at.num)
if err := b.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
if err := b.db.s.stor.Remove(storage.FileDesc{Type: storage.TypeTable, Num: at.num}); err != nil {
return err
}
}
Expand Down Expand Up @@ -685,14 +685,36 @@ type cCmd interface {
type cAuto struct {
// Note for table compaction, an non-empty ackC represents it's a compaction waiting command.
ackC chan<- error

// Channel used for synchronization
cAck chan struct{}

// Add a sync.Once to ensure cAck is closed only once
cAckOnce sync.Once

// Mutex to protect channel operations
mu sync.Mutex
}

// Helper to safely close cAck only once.
func (c *cAuto) closeAck() {
c.cAckOnce.Do(func() {
close(c.cAck)
})
}

func (r cAuto) ack(err error) {
if r.ackC != nil {
r.mu.Lock()
defer r.mu.Unlock()
defer func() {
_ = recover()
}()
r.ackC <- err
select {
case r.ackC <- err:
default:
// Channel might be closed, avoid blocking
}
}
}

Expand Down Expand Up @@ -722,10 +744,11 @@ func (db *DB) compTrigger(compC chan<- cCmd) {
// This will trigger auto compaction and/or wait for all compaction to be done.
func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
ch := make(chan error)
defer close(ch)
ca := cAuto{ackC: ch, cAck: make(chan struct{})}
defer ca.closeAck()
// Send cmd.
select {
case compC <- cAuto{ch}:
case compC <- ca:
case err = <-db.compErrC:
return
case <-db.closeC:
Expand All @@ -738,6 +761,7 @@ func (db *DB) compTriggerWait(compC chan<- cCmd) (err error) {
case <-db.closeC:
return ErrClosed
}
// No need to close(ch) here, let the receiver close it safely if needed.
return err
}

Expand Down Expand Up @@ -781,9 +805,16 @@ func (db *DB) mCompaction() {
for {
select {
case x = <-db.mcompCmdC:
switch x.(type) {
switch cmd := x.(type) {
case cAuto:
db.memCompaction()
// Safely send on cAck, recover if closed
if cmd.cAck != nil {
func() {
defer func() { _ = recover() }()
cmd.cAck <- struct{}{}
}()
}
x.ack(nil)
x = nil
default:
Expand Down Expand Up @@ -862,6 +893,13 @@ func (db *DB) tCompaction() {
waitQ = append(waitQ, x)
}
}
// Safely send on cAck, recover if closed
if cmd.cAck != nil {
func() {
defer func() { _ = recover() }()
cmd.cAck <- struct{}{}
}()
}
case cRange:
x.ack(db.tableRangeCompaction(cmd.level, cmd.min, cmd.max))
default:
Expand All @@ -872,3 +910,7 @@ func (db *DB) tCompaction() {
db.tableAutoCompaction()
}
}

// No changes required for Issue #434 in this file.
// The fix should be applied in leveldb/cache/cache.go.
// See the PR description below for details.