9
9
"iter"
10
10
"time"
11
11
12
+ "github.com/kelindar/roaring"
12
13
"github.com/kelindar/tales/internal/codec"
13
14
"github.com/kelindar/tales/internal/seq"
14
- "github.com/weaviate/sroar"
15
15
)
16
16
17
17
// queryWarm queries the in-memory buffer for entries.
@@ -60,12 +60,13 @@ func (l *Service) queryDay(ctx context.Context, actors []uint32, day time.Time,
60
60
61
61
// For each chunk, load all relevant bitmaps and compute intersection
62
62
for _ , chunk := range meta .Chunks {
63
- // Skip chunks that don't overlap with query time range
64
63
if ! chunk .Between (fromSec , toSec ) {
65
64
continue
66
65
}
66
+
67
67
chunkKey := keyOfChunk (seq .FormatDate (day ), chunk .Offset ())
68
- var index * sroar.Bitmap
68
+
69
+ var index * roaring.Bitmap
69
70
for _ , a := range actors {
70
71
idx , ok := chunk .Actors [a ]
71
72
if ! ok || uint32 (idx [0 ]) < t0 || uint32 (idx [0 ]) > t1 {
@@ -82,14 +83,14 @@ func (l *Service) queryDay(ctx context.Context, actors []uint32, day time.Time,
82
83
switch {
83
84
case index == nil :
84
85
index = bitmap
85
- case ! index .IsEmpty () :
86
+ case index .Count () > 0 :
86
87
index .And (bitmap )
87
88
}
88
89
}
89
90
90
91
// Query log section with intersection bitmap
91
- if index != nil && ! index .IsEmpty () {
92
- if ! l .queryChunk (ctx , chunkKey , chunk , * index , day , from , to , yield ) {
92
+ if index != nil && index .Count () > 0 {
93
+ if ! l .queryChunk (ctx , chunkKey , chunk , index , day , from , to , yield ) {
93
94
return false
94
95
}
95
96
}
@@ -99,7 +100,7 @@ func (l *Service) queryDay(ctx context.Context, actors []uint32, day time.Time,
99
100
}
100
101
101
102
// loadBitmap downloads and decodes a single bitmap for a given index entry.
102
- func (l * Service ) loadBitmap (ctx context.Context , key string , entry codec.IndexEntry ) (* sroar .Bitmap , error ) {
103
+ func (l * Service ) loadBitmap (ctx context.Context , key string , entry codec.IndexEntry ) (* roaring .Bitmap , error ) {
103
104
i0 := int64 (entry [1 ])
104
105
i1 := i0 + int64 (entry [2 ]) - 1
105
106
@@ -109,27 +110,26 @@ func (l *Service) loadBitmap(ctx context.Context, key string, entry codec.IndexE
109
110
}
110
111
111
112
// Bitmaps are stored uncompressed, so just deserialize
112
- bm := sroar . FromBuffer (data )
113
+ bm := roaring . FromBytes (data )
113
114
return bm , nil
114
115
}
115
116
116
117
// queryChunk queries a specific log chunk for sequence IDs using an optimized bitmap iterator.
117
118
// This function efficiently filters log entries by leveraging the sorted nature of both the log entries
118
119
// and the bitmap, avoiding unnecessary bitmap lookups for entries that don't match.
119
- func (l * Service ) queryChunk (ctx context.Context , chunkKey string , chunk codec.ChunkEntry , sids sroar .Bitmap , day , from , to time.Time , yield func (time.Time , string ) bool ) bool {
120
- if sids .IsEmpty () {
120
+ func (l * Service ) queryChunk (ctx context.Context , chunkKey string , chunk codec.ChunkEntry , sids * roaring .Bitmap , day , from , to time.Time , yield func (time.Time , string ) bool ) bool {
121
+ if sids .Count () == 0 {
121
122
return true
122
123
}
123
124
124
- entries , err := l .rangeChunks (ctx , chunkKey , chunk , & sids )
125
+ entries , err := l .rangeChunks (ctx , chunkKey , chunk , sids )
125
126
if err != nil {
126
127
return true // Skip chunks that fail to process
127
128
}
128
129
129
130
// Process only the filtered entries
130
131
for entry := range entries {
131
- id := entry .ID ()
132
- ts := seq .TimeOf (id , day )
132
+ ts := entry .Time (day )
133
133
if ! ts .Before (from ) && ! ts .After (to ) && ! yield (ts , entry .Text ()) {
134
134
return false // Stop iteration
135
135
}
@@ -140,8 +140,8 @@ func (l *Service) queryChunk(ctx context.Context, chunkKey string, chunk codec.C
140
140
141
141
// rangeChunks downloads the log section from a chunk file, decompresses it, and returns
142
142
// an iterator over log entries that are filtered using an optimized bitmap iterator merge algorithm.
143
- func (l * Service ) rangeChunks (ctx context.Context , chunkKey string , chunk codec.ChunkEntry , sids * sroar .Bitmap ) (iter.Seq [codec.LogEntry ], error ) {
144
- if chunk .DataSize () == 0 || sids .IsEmpty () {
143
+ func (l * Service ) rangeChunks (ctx context.Context , chunkKey string , chunk codec.ChunkEntry , sids * roaring .Bitmap ) (iter.Seq [codec.LogEntry ], error ) {
144
+ if chunk .DataSize () == 0 || sids .Count () == 0 {
145
145
return func (yield func (codec.LogEntry ) bool ) {}, nil // Empty iterator
146
146
}
147
147
@@ -159,36 +159,34 @@ func (l *Service) rangeChunks(ctx context.Context, chunkKey string, chunk codec.
159
159
}
160
160
161
161
return func (yield func (codec.LogEntry ) bool ) {
162
- iter := sids .NewIterator ()
163
- idx := iter .Next ()
164
- for len (buffer ) > 4 && idx != 0 {
165
- entry := codec .LogEntry (buffer )
166
- size := entry .Size ()
167
- if size == 0 || uint32 (len (buffer )) < size {
168
- return // Invalid size or not enough data, stop iteration
169
- }
170
-
171
- // Advance bitmap iterator until we find a target >= current entry ID
172
- entryID := uint64 (entry .ID ())
173
- for idx != 0 && idx < entryID {
174
- idx = iter .Next ()
175
- }
162
+ sids .Range (func (sidToFind uint32 ) bool {
163
+ for len (buffer ) > 4 {
164
+ entry := codec .LogEntry (buffer )
165
+ size := entry .Size ()
166
+ if size == 0 || uint32 (len (buffer )) < size {
167
+ return false // Invalid size or not enough data, stop iteration
168
+ }
176
169
177
- // If we've exhausted all targets, we're done
178
- if idx == 0 {
179
- return
180
- }
170
+ entryID := entry .ID ()
171
+ if entryID < sidToFind {
172
+ buffer = buffer [size :] // Advance buffer
173
+ continue // Continue scanning buffer for current sid
174
+ }
181
175
182
- // If current entry matches current target, yield it
183
- if entryID == idx {
184
- if ! yield (entry [:size ]) {
185
- return // Stop iteration if yield returns false
176
+ // If we found the entry, yield it. If we overshot, the entry is not
177
+ // in the buffer, so we just move to the next sid.
178
+ if entryID == sidToFind {
179
+ if ! yield (entry [:size ]) {
180
+ return false // Stop iteration if yield returns false
181
+ }
182
+ buffer = buffer [size :] // Advance buffer
186
183
}
187
- idx = iter .Next ()
188
- }
189
184
190
- // Always advance buffer after processing each entry
191
- buffer = buffer [size :]
192
- }
185
+ // We either found the entry and yielded it, or we've overshot.
186
+ // In both cases, we are done with this sid and should get the next one.
187
+ return true // Continue to next sid
188
+ }
189
+ return false // Buffer is exhausted, stop iteration
190
+ })
193
191
}, nil
194
192
}
0 commit comments