Skip to content

Commit a6e1213

Browse files
committed
Merge pull request #585 from pingcap/siddontang/update-reorg
update reorg
2 parents 301ea1c + 737215b commit a6e1213

File tree

5 files changed

+97
-5
lines changed

5 files changed

+97
-5
lines changed

ddl/column.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,10 @@ func (d *ddl) backfillColumnData(t table.Table, columnInfo *model.ColumnInfo, ha
335335
log.Info("backfill column...", handle)
336336

337337
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
338+
if err := d.isOwnerInReorg(txn); err != nil {
339+
return errors.Trace(err)
340+
}
341+
338342
// First check if row exists.
339343
exist, err := checkRowExist(txn, t, handle)
340344
if err != nil {
@@ -401,6 +405,10 @@ func (d *ddl) dropTableColumn(t table.Table, colInfo *model.ColumnInfo, reorgInf
401405
seekHandle = handles[len(handles)-1] + 1
402406

403407
err = kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
408+
if err := d.isOwnerInReorg(txn); err != nil {
409+
return errors.Trace(err)
410+
}
411+
404412
var h int64
405413
for _, h = range handles {
406414
key := t.RecordKey(h, col)

ddl/index.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -421,6 +421,10 @@ func (d *ddl) backfillTableIndex(t table.Table, indexInfo *model.IndexInfo, hand
421421
log.Debug("building index...", handle)
422422

423423
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
424+
if err := d.isOwnerInReorg(txn); err != nil {
425+
return errors.Trace(err)
426+
}
427+
424428
// first check row exists
425429
exist, err := checkRowExist(txn, t, handle)
426430
if err != nil {

ddl/reorg.go

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,28 +108,54 @@ func (d *ddl) runReorgJob(f func() error) error {
108108
}()
109109
}
110110

111-
waitTimeout := chooseLeaseTime(d.lease, waitReorgTimeout)
111+
waitTimeout := waitReorgTimeout
112+
// if d.lease is 0, we are using a local storage,
113+
// and we can wait the reorganization to be done here.
114+
// if d.lease > 0, we don't need to wait here because
115+
// we will wait 2 * lease outer and try checking again,
116+
// so we use a very little timeout here.
117+
if d.lease > 0 {
118+
waitTimeout = 1 * time.Millisecond
119+
}
112120

113121
// wait reorganization job done or timeout
114122
select {
115123
case err := <-d.reorgDoneCh:
116124
d.reorgDoneCh = nil
117125
return errors.Trace(err)
118-
case <-time.After(waitTimeout):
119-
// if timeout, we will return, check the owner and retry wait job done again.
120-
return errWaitReorgTimeout
121126
case <-d.quitCh:
122127
// we return errWaitReorgTimeout here too, so that outer loop will break.
123128
return errWaitReorgTimeout
129+
case <-time.After(waitTimeout):
130+
// if timeout, we will return, check the owner and retry to wait job done again.
131+
return errWaitReorgTimeout
124132
}
125133
}
126134

135+
func (d *ddl) isOwnerInReorg(txn kv.Transaction) error {
136+
t := meta.NewMeta(txn)
137+
owner, err := t.GetDDLOwner()
138+
if err != nil {
139+
return errors.Trace(err)
140+
} else if owner == nil || owner.OwnerID != d.uuid {
141+
// if no owner, we will try later, so here just return error.
142+
// or another server is owner, return error too.
143+
return errors.Trace(ErrNotOwner)
144+
}
145+
146+
return nil
147+
}
148+
127149
func (d *ddl) delKeysWithPrefix(prefix string) error {
128150
keys := make([]string, maxBatchSize)
129151

130152
for {
131153
keys := keys[0:0]
132154
err := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
155+
if err := d.isOwnerInReorg(txn); err != nil {
156+
return errors.Trace(err)
157+
}
158+
133159
iter, err := txn.Seek([]byte(prefix))
134160
if err != nil {
135161
return errors.Trace(err)

ddl/reorg_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,57 @@ func (s *testDDLSuite) TestReorg(c *C) {
112112
err = info.RemoveHandle()
113113
c.Assert(err, IsNil)
114114
}
115+
116+
func (s *testDDLSuite) TestReorgOwner(c *C) {
117+
store := testCreateStore(c, "test_reorg_owner")
118+
defer store.Close()
119+
120+
lease := 50 * time.Millisecond
121+
122+
d1 := newDDL(store, nil, nil, lease)
123+
defer d1.close()
124+
125+
ctx := testNewContext(c, d1)
126+
127+
testCheckOwner(c, d1, true)
128+
129+
d2 := newDDL(store, nil, nil, lease)
130+
defer d2.close()
131+
132+
dbInfo := testSchemaInfo(c, d1, "test")
133+
testCreateSchema(c, ctx, d1, dbInfo)
134+
135+
tblInfo := testTableInfo(c, d1, "t", 3)
136+
testCreateTable(c, ctx, d1, dbInfo, tblInfo)
137+
138+
t := testGetTable(c, d1, dbInfo.ID, tblInfo.ID)
139+
140+
num := 10
141+
for i := 0; i < num; i++ {
142+
_, err := t.AddRecord(ctx, []interface{}{i, i, i})
143+
c.Assert(err, IsNil)
144+
}
145+
146+
err := ctx.FinishTxn(false)
147+
c.Assert(err, IsNil)
148+
149+
tc := &testDDLCallback{}
150+
tc.onJobRunBefore = func(job *model.Job) {
151+
if job.SchemaState == model.StateDeleteReorganization {
152+
d1.close()
153+
}
154+
}
155+
156+
d1.hook = tc
157+
158+
testDropSchema(c, ctx, d1, dbInfo)
159+
160+
err = kv.RunInNewTxn(d1.store, false, func(txn kv.Transaction) error {
161+
t := meta.NewMeta(txn)
162+
db, err1 := t.GetDatabase(dbInfo.ID)
163+
c.Assert(err1, IsNil)
164+
c.Assert(db, IsNil)
165+
return nil
166+
})
167+
c.Assert(err, IsNil)
168+
}

tidb.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func (dm *domainMap) Get(store kv.Storage) (d *domain.Domain, err error) {
7474
// if not local storage, first we will use a little lease time to
7575
// bootstrap quickly, after bootstrapped, we will reset the lease time.
7676
if !localstore.IsLocalStore(store) {
77-
lease = 100 * time.Second
77+
lease = 100 * time.Millisecond
7878
}
7979
d, err = domain.NewDomain(store, lease)
8080
if err != nil {

0 commit comments

Comments
 (0)