Skip to content

Commit a4761ee

Browse files
committed
cache: readShard: use a separate mutex
We use a separate `readShard` mutex to to protect the `readMap` and each entry's ref count. This removes the possibility of a couple of cornercases, making the code much cleaner. We also now avoid write-locking the entire cache shard when we start a read. This will also potentially allow moving read buffer management into `readShard` in the future (and having separate `readShard` sets per store).
1 parent 1646635 commit a4761ee

File tree

4 files changed

+54
-103
lines changed

4 files changed

+54
-103
lines changed

internal/cache/clockpro.go

Lines changed: 2 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -147,25 +147,11 @@ func (c *shard) getWithMaybeReadEntry(k key, desireReadEntry bool) (*Value, *rea
147147
e.referenced.Store(true)
148148
}
149149
}
150-
c.mu.RUnlock()
151150
var re *readEntry
152151
if value == nil && desireReadEntry {
153-
c.mu.Lock()
154-
// After the c.mu.RUnlock(), someone could have inserted the value in the
155-
// cache. We could tolerate the race and do a file read, or do another map
156-
// lookup. We choose to do the latter, since the cost of a map lookup is
157-
// insignificant compared to the cost of reading a block from a file.
158-
if e, _ := c.blocks.Get(k); e != nil {
159-
value = e.acquireValue()
160-
if value != nil {
161-
e.referenced.Store(true)
162-
}
163-
}
164-
if value == nil {
165-
re = c.readShard.acquireReadEntryLocked(k)
166-
}
167-
c.mu.Unlock()
152+
re = c.readShard.acquireReadEntry(k)
168153
}
154+
c.mu.RUnlock()
169155
if value == nil {
170156
c.misses.Add(1)
171157
} else {

internal/cache/read_shard.go

Lines changed: 50 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"sync"
1010
"time"
1111

12+
"github.com/cockroachdb/pebble/internal/invariants"
1213
"github.com/cockroachdb/swiss"
1314
)
1415

@@ -39,16 +40,15 @@ import (
3940
//
4041
// Design choices and motivation:
4142
//
42-
// - readShard is tightly integrated with a cache shard: At its core,
43-
// readShard is a map with synchronization. For the same reason the cache is
44-
// sharded (for higher concurrency by sharding the mutex), it is beneficial
45-
// to shard synchronization on readShard. By making readShard a member of
46-
// shard, this sharding is trivially accomplished. Additionally, the code
47-
// feels cleaner when there isn't a race between a cache miss, followed by
48-
// creating a readEntry that is no longer needed because someone else has
49-
// done the read since the miss and inserted into the cache. By making the
50-
// readShard use shard.mu, such a race is avoided. A side benefit is that
51-
// the cache interaction can be hidden behind readEntry.SetReadValue. One
43+
// - At its core, readShard is a map with synchronization. For the same reason
44+
// the cache is sharded (for higher concurrency by sharding the mutex), it
45+
// is beneficial to shard synchronization on readShard. By making readShard
46+
// a member of shard, this sharding is trivially accomplished. readShard has
47+
// its own mutex (separate from shard.mu), in order to avoid write-locking
48+
// shard.mu when we start a read.
49+
//
50+
// - readShard is integrated with the corresponding cache shard; this allows
51+
// the cache interaction to be hidden behind readEntry.SetReadValue. One
5252
// disadvantage of this tightly integrated design is that it does not
5353
// encompass readers that will put the read value into a block.BufferPool --
5454
// we don't worry about those since block.BufferPool is only used for
@@ -69,10 +69,8 @@ type readShard struct {
6969
// shard is only used for locking, and calling shard.Set.
7070
shard *shard
7171
// Protected by shard.mu.
72-
//
73-
// shard.mu is never held when acquiring readEntry.mu. shard.mu is a shared
74-
// resource and must be released quickly.
75-
shardMu struct {
72+
mu struct {
73+
sync.Mutex
7674
readMap swiss.Map[key, *readEntry]
7775
}
7876
}
@@ -82,27 +80,35 @@ func (rs *readShard) Init(shard *shard) *readShard {
8280
shard: shard,
8381
}
8482
// Choice of 16 is arbitrary.
85-
rs.shardMu.readMap.Init(16)
83+
rs.mu.readMap.Init(16)
8684
return rs
8785
}
8886

89-
// acquireReadEntryLocked gets a *readEntry for (id, fileNum, offset). shard.mu is
90-
// already write locked.
91-
func (rs *readShard) acquireReadEntryLocked(k key) *readEntry {
92-
e, ok := rs.shardMu.readMap.Get(k)
93-
if !ok {
94-
e = newReadEntry(rs, k)
95-
rs.shardMu.readMap.Put(k, e)
96-
} else {
97-
e.refCount.acquireAllowZero()
87+
// acquireReadEntry acquires a *readEntry for (id, fileNum, offset), creating
88+
// one if necessary.
89+
func (rs *readShard) acquireReadEntry(k key) *readEntry {
90+
rs.mu.Lock()
91+
defer rs.mu.Unlock()
92+
93+
if e, ok := rs.mu.readMap.Get(k); ok {
94+
// An entry we found in the map while holding the mutex must have a non-zero
95+
// reference count.
96+
if e.refCount < 1 {
97+
panic("invalid reference count")
98+
}
99+
e.refCount++
100+
return e
98101
}
102+
103+
e := newReadEntry(rs, k)
104+
rs.mu.readMap.Put(k, e)
99105
return e
100106
}
101107

102108
func (rs *readShard) lenForTesting() int {
103-
rs.shard.mu.Lock()
104-
defer rs.shard.mu.Unlock()
105-
return rs.shardMu.readMap.Len()
109+
rs.mu.Lock()
110+
defer rs.mu.Unlock()
111+
return rs.mu.readMap.Len()
106112
}
107113

108114
// readEntry is used to coordinate between concurrent attempted readers of the
@@ -146,10 +152,8 @@ type readEntry struct {
146152
errorDuration time.Duration
147153
readStart time.Time
148154
}
149-
// Count of ReadHandles that refer to this readEntry. Increments always hold
150-
// shard.mu. So if this is found to be 0 while holding shard.mu, it is safe
151-
// to delete readEntry from readShard.shardMu.readMap.
152-
refCount refcnt
155+
// Count of ReadHandles that refer to this readEntry. Protected by readShard.mu.
156+
refCount int32
153157
}
154158

155159
var readEntryPool = sync.Pool{
@@ -163,8 +167,8 @@ func newReadEntry(rs *readShard, k key) *readEntry {
163167
*e = readEntry{
164168
readShard: rs,
165169
key: k,
170+
refCount: 1,
166171
}
167-
e.refCount.init(1)
168172
return e
169173
}
170174

@@ -261,40 +265,26 @@ func (e *readEntry) waitForReadPermissionOrHandle(
261265

262266
// unrefAndTryRemoveFromMap reduces the reference count of e and removes e.key
263267
// => e from the readMap if necessary.
264-
//
265-
// It is possible that after unreffing that s.e has already been removed, and
266-
// is now back in the sync.Pool, or being reused (for the same or different
267-
// key). This is because after unreffing, which caused the s.e.refCount to
268-
// become zero, but before acquiring shard.mu, it could have been incremented
269-
// and decremented concurrently, and some other goroutine could have observed
270-
// a different decrement to 0, and raced ahead and deleted s.e from the
271-
// readMap.
272268
func (e *readEntry) unrefAndTryRemoveFromMap() {
273-
// Save the fields we need from entry; once we release the last refcount, it
274-
// is possible that the entry is found and reused and then freed.
275269
rs := e.readShard
276-
k := e.key
277-
if !e.refCount.release() {
270+
rs.mu.Lock()
271+
e.refCount--
272+
if e.refCount > 0 {
273+
// Entry still in use.
274+
rs.mu.Unlock()
278275
return
279276
}
280-
// Once we release the refcount, it is possible that it the entry is reused
281-
// again and freed before we get the lock.
282-
rs.shard.mu.Lock()
283-
e2, ok := rs.shardMu.readMap.Get(k)
284-
if !ok || e2 != e {
285-
// Already removed.
286-
rs.shard.mu.Unlock()
287-
return
277+
if e.refCount < 0 {
278+
panic("invalid reference count")
288279
}
289-
if e.refCount.value() != 0 {
290-
// The entry was reused.
291-
rs.shard.mu.Unlock()
292-
return
280+
// The refcount is now 0; remove from the map.
281+
if invariants.Enabled {
282+
if e2, ok := rs.mu.readMap.Get(e.key); !ok || e2 != e {
283+
panic("entry not in readMap")
284+
}
293285
}
294-
// e.refCount == 0. And it cannot be incremented since
295-
// shard.mu.Lock() is held. So remove from map.
296-
rs.shardMu.readMap.Delete(k)
297-
rs.shard.mu.Unlock()
286+
rs.mu.readMap.Delete(e.key)
287+
rs.mu.Unlock()
298288

299289
// Free s.e.
300290
e.mu.v.Release()

internal/cache/refcnt_normal.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,8 @@ func (v *refcnt) acquire() {
3737
}
3838
}
3939

40-
// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be
41-
// called with a zero refcnt. This is useful for cases where the entry which
42-
// is being reference counted is inside a container and the container does not
43-
// hold a reference. The container uses release() returning true to attempt to
44-
// do a cleanup from the container.
45-
func (v *refcnt) acquireAllowZero() {
46-
v.val.Add(1)
47-
}
48-
40+
// release decrements the reference count and returns true when the reference
41+
// count becomes 0.
4942
func (v *refcnt) release() bool {
5043
switch v := v.val.Add(-1); {
5144
case v < 0:
@@ -57,10 +50,6 @@ func (v *refcnt) release() bool {
5750
}
5851
}
5952

60-
func (v *refcnt) value() int32 {
61-
return v.val.Load()
62-
}
63-
6453
func (v *refcnt) trace(msg string) {
6554
}
6655

internal/cache/refcnt_tracing.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -41,16 +41,6 @@ func (v *refcnt) acquire() {
4141
v.trace("acquire")
4242
}
4343

44-
// acquireAllowZero is the same as acquire, but allows acquireAllowZero to be
45-
// called with a zero refcnt. This is useful for cases where the entry which
46-
// is being reference counted is inside a container and the container does not
47-
// hold a reference. The container uses release() returning true to attempt to
48-
// do a cleanup from the container.
49-
func (v *refcnt) acquireAllowZero() {
50-
v.val.Add(1)
51-
v.trace("acquire")
52-
}
53-
5444
func (v *refcnt) release() bool {
5545
n := v.val.Add(-1)
5646
switch {
@@ -61,10 +51,6 @@ func (v *refcnt) release() bool {
6151
return n == 0
6252
}
6353

64-
func (v *refcnt) value() int32 {
65-
return v.val.Load()
66-
}
67-
6854
func (v *refcnt) trace(msg string) {
6955
s := fmt.Sprintf("%s: refs=%d\n%s", msg, v.refs(), debug.Stack())
7056
v.Lock()

0 commit comments

Comments
 (0)