Skip to content

Commit d3b6a6e

Browse files
authored
Merge pull request #11 from jameinel/txn-preload
TXN preload
2 parents 73a9463 + b5ff827 commit d3b6a6e

File tree

2 files changed

+56
-7
lines changed

2 files changed

+56
-7
lines changed

txn/flusher.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ func (f *flusher) run() (err error) {
6363

6464
f.debugf("Processing %s", f.goal)
6565
seen := make(map[bson.ObjectId]*transaction)
66-
if err := f.recurse(f.goal, seen); err != nil {
66+
preloaded := make(map[bson.ObjectId]*transaction)
67+
if err := f.recurse(f.goal, seen, preloaded); err != nil {
6768
return err
6869
}
6970
if f.goal.done() {
@@ -155,26 +156,54 @@ func (f *flusher) run() (err error) {
155156
return nil
156157
}
157158

158-
func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction) error {
159+
const preloadBatchSize = 100
160+
161+
func (f *flusher) recurse(t *transaction, seen map[bson.ObjectId]*transaction, preloaded map[bson.ObjectId]*transaction) error {
159162
seen[t.Id] = t
163+
delete(preloaded, t.Id)
160164
err := f.advance(t, nil, false)
161165
if err != errPreReqs {
162166
return err
163167
}
164168
for _, dkey := range t.docKeys() {
169+
remaining := make([]bson.ObjectId, 0, len(f.queue[dkey]))
170+
toPreload := make(map[bson.ObjectId]struct{}, len(f.queue[dkey]))
165171
for _, dtt := range f.queue[dkey] {
166172
id := dtt.id()
167-
if seen[id] != nil {
173+
if _, scheduled := toPreload[id]; seen[id] != nil || scheduled || preloaded[id] != nil {
168174
continue
169175
}
170-
qt, err := f.load(id)
171-
if err != nil {
172-
return err
176+
toPreload[id] = struct{}{}
177+
remaining = append(remaining, id)
178+
}
179+
// done with this map
180+
toPreload = nil
181+
for len(remaining) > 0 {
182+
batch := remaining
183+
if len(batch) > preloadBatchSize {
184+
batch = remaining[:preloadBatchSize]
173185
}
174-
err = f.recurse(qt, seen)
186+
remaining = remaining[len(batch):]
187+
err := f.loadMulti(batch, preloaded)
175188
if err != nil {
176189
return err
177190
}
191+
for _, id := range batch {
192+
if seen[id] != nil {
193+
continue
194+
}
195+
qt, ok := preloaded[id]
196+
if !ok {
197+
qt, err = f.load(id)
198+
if err != nil {
199+
return err
200+
}
201+
}
202+
err = f.recurse(qt, seen, preloaded)
203+
if err != nil {
204+
return err
205+
}
206+
}
178207
}
179208
}
180209
return nil

txn/txn.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,26 @@ func (r *Runner) load(id bson.ObjectId) (*transaction, error) {
492492
return &t, nil
493493
}
494494

495+
func (r *Runner) loadMulti(ids []bson.ObjectId, preloaded map[bson.ObjectId]*transaction) error {
496+
txns := make([]transaction, 0, len(ids))
497+
498+
query := r.tc.Find(bson.M{"_id": bson.M{"$in": ids}})
499+
// Not sure that this actually has much of an effect when using All()
500+
query.Batch(len(ids))
501+
err := query.All(&txns)
502+
if err == mgo.ErrNotFound {
503+
return fmt.Errorf("could not find a transaction in batch: %v", ids)
504+
} else if err != nil {
505+
return err
506+
}
507+
for i := range txns {
508+
t := &txns[i]
509+
preloaded[t.Id] = t
510+
}
511+
return nil
512+
}
513+
514+
495515
type typeNature int
496516

497517
const (

0 commit comments

Comments
 (0)