@@ -159,33 +159,41 @@ const preloadBatchSize = 100
159
159
160
160
func (f * flusher ) recurse (t * transaction , seen map [bson.ObjectId ]* transaction , preloaded map [bson.ObjectId ]* transaction ) error {
161
161
seen [t .Id ] = t
162
+ // we shouldn't need this one anymore because we are processing it now
162
163
delete (preloaded , t .Id )
163
164
err := f .advance (t , nil , false )
164
165
if err != errPreReqs {
165
166
return err
166
167
}
168
+ toPreload := make ([]bson.ObjectId , 0 )
167
169
for _ , dkey := range t .docKeys () {
168
- remaining := make ([]bson. ObjectId , 0 , len ( f .queue [dkey ]))
169
- toPreload := make (map [ bson.ObjectId ] struct {}, len (f . queue [ dkey ] ))
170
+ queue := f .queue [dkey ]
171
+ remaining := make ([] bson.ObjectId , 0 , len (queue ))
170
172
for _ , dtt := range f .queue [dkey ] {
171
173
id := dtt .id ()
172
- if _ , scheduled := toPreload [ id ]; seen [ id ] != nil || scheduled || preloaded [id ] != nil {
174
+ if seen [id ] != nil {
173
175
continue
174
176
}
175
- toPreload [id ] = struct {}{}
176
177
remaining = append (remaining , id )
177
178
}
178
- // done with this map
179
- toPreload = nil
179
+
180
180
for len (remaining ) > 0 {
181
- batch := remaining
182
- if len (batch ) > preloadBatchSize {
183
- batch = remaining [:preloadBatchSize ]
181
+ toPreload = toPreload [:0 ]
182
+ batchSize := preloadBatchSize
183
+ if batchSize > len (remaining ) {
184
+ batchSize = len (remaining )
184
185
}
185
- remaining = remaining [len (batch ):]
186
- err := f .loadMulti (batch , preloaded )
187
- if err != nil {
188
- return err
186
+ batch := remaining [:batchSize ]
187
+ remaining = remaining [batchSize :]
188
+ for _ , id := range batch {
189
+ if preloaded [id ] == nil {
190
+ toPreload = append (toPreload , id )
191
+ }
192
+ }
193
+ if len (toPreload ) > 0 {
194
+ if err := f .loadMulti (toPreload , preloaded ); err != nil {
195
+ return err
196
+ }
189
197
}
190
198
for _ , id := range batch {
191
199
if seen [id ] != nil {
@@ -302,6 +310,8 @@ NextDoc:
302
310
if info .Remove == "" {
303
311
// Fast path, unless workload is insert/remove heavy.
304
312
revno [dkey ] = info .Revno
313
+ // We updated the Q, so this should force refresh
314
+ // TODO: We could *just* add the new txn-queue entry/reuse existing tokens
305
315
f .queue [dkey ] = tokensWithIds (info .Queue )
306
316
f .debugf ("[A] Prepared document %v with revno %d and queue: %v" , dkey , info .Revno , info .Queue )
307
317
continue NextDoc
@@ -363,8 +373,14 @@ NextDoc:
363
373
} else {
364
374
f .debugf ("[B] Prepared document %v with revno %d and queue: %v" , dkey , info .Revno , info .Queue )
365
375
}
366
- revno [dkey ] = info .Revno
367
- f .queue [dkey ] = tokensWithIds (info .Queue )
376
+ existRevno , rok := revno [dkey ]
377
+ existQ , qok := f .queue [dkey ]
378
+ if rok && qok && existRevno == info .Revno && len (existQ ) == len (info .Queue ) {
379
+ // We've already loaded this doc, no need to load it again
380
+ } else {
381
+ revno [dkey ] = info .Revno
382
+ f .queue [dkey ] = tokensWithIds (info .Queue )
383
+ }
368
384
continue NextDoc
369
385
}
370
386
}
@@ -498,15 +514,25 @@ func (f *flusher) rescan(t *transaction, force bool) (revnos []int64, err error)
498
514
goto RetryDoc
499
515
}
500
516
revno [dkey ] = info .Revno
501
-
517
+ // TODO(jam): 2017-05-31: linear search for each token in info.Queue during all rescans is potentially O(N^2)
518
+ // if we first checked to see that we've already loaded this info.Queue in f.queue, we could use a different
519
+ // structure (map) to do faster lookups to see if the tokens are already present.
502
520
found := false
503
521
for _ , id := range info .Queue {
504
522
if id == tt {
505
523
found = true
506
524
break
507
525
}
508
526
}
509
- f .queue [dkey ] = tokensWithIds (info .Queue )
527
+ // f.queue[dkey] = tokensWithIds(info.Queue, &RescanUpdatedQueue)
528
+ existQ , qok := f .queue [dkey ]
529
+ if qok && len (existQ ) == len (info .Queue ) {
530
+ // we could check that info.Q matches existQ.tt
531
+ } else {
532
+ if len (existQ ) != len (info .Queue ) {
533
+ }
534
+ f .queue [dkey ] = tokensWithIds (info .Queue )
535
+ }
510
536
if ! found {
511
537
// Rescanned transaction id was not in the queue. This could mean one
512
538
// of three things:
0 commit comments