Skip to content

Commit a8dbfaf

Browse files
authored
Merge pull request #4 from guycipher/iss-1-2-3
- minor race condition atomic lru adjustments.. issue #1 #2 #3
2 parents 7a642af + 1972a1c commit a8dbfaf

File tree

10 files changed

+627
-65
lines changed

10 files changed

+627
-65
lines changed

blockmanager/blockmanager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,9 @@ func (bm *BlockManager) backgroundSync() {
250250

251251
for {
252252
select {
253+
253254
case <-ticker.C:
254-
_ = syscall.Fdatasync(int(bm.fd)) // Use fdatasync for better performance
255+
_ = Fdatasync(bm.fd)
255256

256257
case <-bm.closeChan:
257258
return

blockmanager/fdatasync_windows.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
//go:build windows
2+
// +build windows
3+
4+
package blockmanager
5+
6+
var (
7+
modntdll = syscall.NewLazyDLL("ntdll.dll")
8+
procNtFlushBuffersFileEx = modntdll.NewProc("NtFlushBuffersFileEx")
9+
)
10+
11+
// Fdatasync__Win is a Windows-specific implementation of fdatasync.
12+
// https://learn.microsoft.com/en-us/windows-hardware/drivers/ddi/ntifs/nf-ntifs-ntflushbuffersfileex
13+
func Fdatasync(fd uintptr) error {
14+
15+
// Try to use NtFlushBuffersFileEx with FLUSH_FLAGS_FILE_DATA_SYNC_ONLY flag
16+
status, _, err := procNtFlushBuffersFileEx.Call(
17+
fd,
18+
FLUSH_FLAGS_FILE_DATA_SYNC_ONLY,
19+
0,
20+
0,
21+
0,
22+
)
23+
24+
// Check for error (0 means success in Windows API)
25+
if status != 0 {
26+
// Fall back to regular FlushFileBuffers if NtFlushBuffersFileEx fails or isn't available
27+
return syscall.FlushFileBuffers(syscall.Handle(fd))
28+
}
29+
30+
return nil
31+
}

blockmanager/fsyncdata.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
//go:build darwin || linux || freebsd || netbsd || openbsd
2+
3+
package blockmanager
4+
5+
import (
6+
"runtime"
7+
"syscall"
8+
)
9+
10+
func Fdatasync(fd uintptr) error {
11+
// On Darwin/macOS, Fdatasync is not available, so we fall back to Fsync..
12+
if runtime.GOOS == "darwin" {
13+
return syscall.Fsync(int(fd))
14+
}
15+
16+
err := syscall.Fdatasync(int(fd))
17+
if err != nil {
18+
return err
19+
}
20+
21+
return nil
22+
}

compactor.go

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -436,8 +436,18 @@ func (compactor *Compactor) compactSSTables(sstables []*SSTable, sourceLevel, ta
436436
}
437437

438438
// Add KLog and VLog managers to LRU cache
439-
compactor.db.lru.Put(klogPath, klogBm)
440-
compactor.db.lru.Put(vlogPath, vlogBm)
439+
compactor.db.lru.Put(klogPath, klogBm, func(key, value interface{}) {
440+
// Close the block manager when evicted from LRU
441+
if bm, ok := value.(*blockmanager.BlockManager); ok {
442+
_ = bm.Close()
443+
}
444+
})
445+
compactor.db.lru.Put(vlogPath, vlogBm, func(key, value interface{}) {
446+
// Close the block manager when evicted from LRU
447+
if bm, ok := value.(*blockmanager.BlockManager); ok {
448+
_ = bm.Close()
449+
}
450+
})
441451

442452
// Clean up the old SSTable files
443453
for _, table := range sstables {
@@ -714,7 +724,12 @@ func newSSTCompactionIterator(sst *SSTable) *sstCompactionIterator {
714724
if err != nil {
715725
return &sstCompactionIterator{eof: true}
716726
}
717-
sst.db.lru.Put(klogPath, klogBm)
727+
sst.db.lru.Put(klogPath, klogBm, func(key, value interface{}) {
728+
// Close the block manager when evicted from LRU
729+
if bm, ok := value.(*blockmanager.BlockManager); ok {
730+
_ = bm.Close()
731+
}
732+
})
718733
}
719734

720735
iter := &sstCompactionIterator{
@@ -794,7 +809,12 @@ func (iter *sstCompactionIterator) next() ([]byte, interface{}, int64, bool) {
794809
iter.eof = true
795810
return nil, nil, 0, false
796811
}
797-
iter.sstable.db.lru.Put(vlogPath, vlogBm)
812+
iter.sstable.db.lru.Put(vlogPath, vlogBm, func(key, value interface{}) {
813+
// Close the block manager when evicted from LRU
814+
if bm, ok := value.(*blockmanager.BlockManager); ok {
815+
_ = bm.Close()
816+
}
817+
})
798818
}
799819

800820
// Read the value from VLog

db.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,12 @@ func (db *DB) reinstate() error {
347347
}
348348

349349
// Add the WAL to the LRU cache
350-
db.lru.Put(newWalPath, walBm)
350+
db.lru.Put(newWalPath, walBm, func(key, value interface{}) {
351+
// Close the block manager when evicted from LRU
352+
if bm, ok := value.(*blockmanager.BlockManager); ok {
353+
_ = bm.Close()
354+
}
355+
})
351356

352357
// Initialize empty transactions slice
353358
txns := make([]*Txn, 0)
@@ -381,7 +386,12 @@ func (db *DB) reinstate() error {
381386
}
382387

383388
// Add WAL to LRU cache
384-
db.lru.Put(walPath, walBm)
389+
db.lru.Put(walPath, walBm, func(key, value interface{}) {
390+
// Close the block manager when evicted from LRU
391+
if bm, ok := value.(*blockmanager.BlockManager); ok {
392+
_ = bm.Close()
393+
}
394+
})
385395

386396
// Process all transaction entries in this WAL
387397
iter := walBm.Iterator()
@@ -542,7 +552,12 @@ func (db *DB) reinstate() error {
542552
}
543553

544554
// Update or add to LRU cache
545-
db.lru.Put(activeWALPath, activeWalBm)
555+
db.lru.Put(activeWALPath, activeWalBm, func(key, value interface{}) {
556+
// Close the block manager when evicted from LRU
557+
if bm, ok := value.(*blockmanager.BlockManager); ok {
558+
_ = bm.Close()
559+
}
560+
})
546561

547562
// Populate the active memtable with all committed transactions
548563
populateMemtableFromTxns(activeMemt, globalTxnMap, false)

flusher.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,12 @@ func (flusher *Flusher) queueMemtable() error {
6060
}
6161

6262
// Add the new WAL to the LRU cache
63-
flusher.db.lru.Put(newMemtable.wal.path, walBm)
63+
flusher.db.lru.Put(newMemtable.wal.path, walBm, func(key, value interface{}) {
64+
// Close the block manager when evicted from LRU
65+
if bm, ok := value.(*blockmanager.BlockManager); ok {
66+
_ = bm.Close()
67+
}
68+
})
6469

6570
// Push the current memtable to the immutable stack
6671
flusher.immutable.Enqueue(flusher.db.memtable.Load().(*Memtable))
@@ -232,8 +237,18 @@ func (flusher *Flusher) flushMemtable(memt *Memtable) error {
232237
flusher.db.log(fmt.Sprintf("Flushed %d entries to SSTable %d", entryCount, sstable.Id))
233238

234239
// Add both KLog and VLog to the LRU cache
235-
flusher.db.lru.Put(klogPath, klogBm)
236-
flusher.db.lru.Put(vlogPath, vlogBm)
240+
flusher.db.lru.Put(klogPath, klogBm, func(key, value interface{}) {
241+
// Close the block manager when evicted from LRU
242+
if bm, ok := value.(*blockmanager.BlockManager); ok {
243+
_ = bm.Close()
244+
}
245+
})
246+
flusher.db.lru.Put(vlogPath, vlogBm, func(key, value interface{}) {
247+
// Close the block manager when evicted from LRU
248+
if bm, ok := value.(*blockmanager.BlockManager); ok {
249+
_ = bm.Close()
250+
}
251+
})
237252

238253
// Add the SSTable to level 1
239254
levels := flusher.db.levels.Load()

0 commit comments

Comments
 (0)