Skip to content

Commit facab51

Browse files
authored
Support fallback when resolveLockAsync (#2651)
1 parent cec7ab8 commit facab51

File tree

9 files changed

+225
-31
lines changed

9 files changed

+225
-31
lines changed

.github/workflows/license-check.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ on:
77
pull_request:
88
branches:
99
- master
10+
- release-**
1011

1112
jobs:
1213
check-license:

.github/workflows/verify.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ on:
55
push:
66
branches: [ master ]
77
pull_request:
8-
branches: [ master ]
8+
branches: [ master,release-** ]
99

1010
jobs:
1111
fmt:

core/src/main/scala/com/pingcap/tispark/safepoint/ServiceSafePoint.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
/*
2+
*
3+
* Copyright 2022 PingCAP, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
119
package com.pingcap.tispark.safepoint
220

321
import com.pingcap.tikv.TiSession
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
*
3+
* Copyright 2022 PingCAP, Inc.
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package com.pingcap.tikv.exception;
20+
21+
public class NonAsyncCommitLockException extends RuntimeException {
22+
23+
public NonAsyncCommitLockException(String msg) {
24+
super(msg);
25+
}
26+
}

tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ public void prewrite(
421421
long startTs,
422422
long lockTTL)
423423
throws TiClientInternalException, KeyException, RegionException {
424-
this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false, false, null);
424+
this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false, false, null, false);
425425
}
426426

427427
/**
@@ -437,7 +437,8 @@ public void prewrite(
437437
long ttl,
438438
boolean skipConstraintCheck,
439439
boolean useAsyncCommit,
440-
Iterable<ByteString> secondaries)
440+
Iterable<ByteString> secondaries,
441+
boolean fallbackTest)
441442
throws TiClientInternalException, KeyException, RegionException {
442443
boolean forWrite = true;
443444
while (true) {
@@ -463,6 +464,10 @@ public void prewrite(
463464
if (secondaries != null) {
464465
builder.addAllSecondaries(secondaries);
465466
}
467+
// just for test
468+
if (fallbackTest) {
469+
builder.setMaxCommitTs(1);
470+
}
466471
}
467472
return builder.build();
468473
};

tikv-client/src/main/java/com/pingcap/tikv/txn/AsyncResolveData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.pingcap.tikv.txn;
2020

2121
import com.google.protobuf.ByteString;
22+
import com.pingcap.tikv.exception.NonAsyncCommitLockException;
2223
import com.pingcap.tikv.exception.ResolveLockException;
2324
import java.util.ArrayList;
2425
import java.util.List;
@@ -114,6 +115,10 @@ public synchronized void addKeys(
114115
"unexpected timestamp, expected: %d, found: %d",
115116
startTs, lockInfo.getLockVersion()));
116117
}
118+
if (!lockInfo.getUseAsyncCommit()) {
119+
LOG.info("non-async commit lock found in async commit recovery");
120+
throw new NonAsyncCommitLockException("non-async commit lock found");
121+
}
117122
if (!this.missingLock && lockInfo.getMinCommitTs() > this.commitTs) {
118123
this.commitTs = lockInfo.getMinCommitTs();
119124
}

tikv-client/src/main/java/com/pingcap/tikv/txn/LockResolverClientV4.java

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.pingcap.tikv.PDClient;
2828
import com.pingcap.tikv.TiConfiguration;
2929
import com.pingcap.tikv.exception.KeyException;
30+
import com.pingcap.tikv.exception.NonAsyncCommitLockException;
3031
import com.pingcap.tikv.exception.RegionException;
3132
import com.pingcap.tikv.exception.TiClientInternalException;
3233
import com.pingcap.tikv.exception.TiKVException;
@@ -124,21 +125,8 @@ public ResolveLockResult resolveLocks(
124125
Set<Long> pushed = new HashSet<>(locks.size());
125126

126127
for (Lock l : locks) {
127-
TxnStatus status = getTxnStatusFromLock(bo, l, callerStartTS);
128-
129-
if (status.getTtl() == 0) {
130-
Set<RegionVerID> cleanRegion =
131-
cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());
132-
133-
if (status.getPrimaryLock() != null && status.getPrimaryLock().getUseAsyncCommit()) {
134-
resolveLockAsync(bo, l, status);
135-
} else if (l.getLockType() == org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock) {
136-
resolvePessimisticLock(bo, l, cleanRegion);
137-
} else {
138-
resolveLock(bo, l, status, cleanRegion);
139-
}
140-
141-
} else {
128+
TxnStatus status = resolve(l, bo, callerStartTS, cleanTxns, false);
129+
if (status.getTtl() != 0) {
142130
long msBeforeLockExpired = TsoUtils.untilExpired(l.getTxnID(), status.getTtl());
143131
msBeforeTxnExpired.update(msBeforeLockExpired);
144132

@@ -169,6 +157,36 @@ public ResolveLockResult resolveLocks(
169157
return new ResolveLockResult(msBeforeTxnExpired.value(), pushed);
170158
}
171159

160+
private TxnStatus resolve(
161+
Lock l,
162+
BackOffer bo,
163+
long callerStartTS,
164+
Map<Long, Set<RegionVerID>> cleanTxns,
165+
boolean forceSyncCommit) {
166+
TxnStatus status = getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit);
167+
if (status.getTtl() != 0) {
168+
return status;
169+
}
170+
Set<RegionVerID> cleanRegion = cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());
171+
172+
if (status.getPrimaryLock() != null
173+
&& status.getPrimaryLock().getUseAsyncCommit()
174+
&& !forceSyncCommit) {
175+
try {
176+
resolveLockAsync(bo, l, status);
177+
} catch (NonAsyncCommitLockException e) {
178+
logger.info("fallback because of the non async commit lock");
179+
return resolve(l, bo, callerStartTS, cleanTxns, true);
180+
}
181+
} else if (l.getLockType() == org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock) {
182+
resolvePessimisticLock(bo, l, cleanRegion);
183+
} else {
184+
resolveLock(bo, l, status, cleanRegion);
185+
}
186+
187+
return status;
188+
}
189+
172190
private void resolvePessimisticLock(BackOffer bo, Lock lock, Set<RegionVerID> cleanRegion) {
173191
while (true) {
174192
region = regionManager.getRegionByKey(lock.getKey());
@@ -225,7 +243,8 @@ private void resolvePessimisticLock(BackOffer bo, Lock lock, Set<RegionVerID> cl
225243
}
226244
}
227245

228-
private TxnStatus getTxnStatusFromLock(BackOffer bo, Lock lock, long callerStartTS) {
246+
private TxnStatus getTxnStatusFromLock(
247+
BackOffer bo, Lock lock, long callerStartTS, boolean forceSyncCommit) {
229248
long currentTS;
230249

231250
if (lock.getTtl() == 0) {
@@ -249,7 +268,8 @@ private TxnStatus getTxnStatusFromLock(BackOffer bo, Lock lock, long callerStart
249268
callerStartTS,
250269
currentTS,
251270
rollbackIfNotExist,
252-
lock);
271+
lock,
272+
forceSyncCommit);
253273
} catch (TxnNotFoundException e) {
254274
// If the error is something other than txnNotFoundErr, throw the error (network
255275
// unavailable, tikv down, backoff timeout etc) to the caller.
@@ -293,7 +313,8 @@ private TxnStatus getTxnStatus(
293313
Long callerStartTS,
294314
Long currentTS,
295315
boolean rollbackIfNotExist,
296-
Lock lock) {
316+
Lock lock,
317+
boolean forceSyncCommit) {
297318
TxnStatus status = getResolved(txnID);
298319
if (status != null) {
299320
return status;
@@ -317,6 +338,7 @@ private TxnStatus getTxnStatus(
317338
.setCallerStartTs(callerStartTS)
318339
.setCurrentTs(currentTS)
319340
.setRollbackIfNotExist(rollbackIfNotExist)
341+
.setForceSyncCommit(forceSyncCommit)
320342
.build();
321343
};
322344

@@ -481,6 +503,9 @@ private AsyncResolveData checkAllSecondaries(BackOffer bo, Lock lock, TxnStatus
481503
Thread.currentThread().interrupt();
482504
throw new TiKVException("Current thread interrupted.", e);
483505
} catch (ExecutionException e) {
506+
if (e.getCause() != null && e.getCause() instanceof NonAsyncCommitLockException) {
507+
throw (NonAsyncCommitLockException) e.getCause();
508+
}
484509
logger.info("async commit recovery (sending CheckSecondaryLocks) finished with errors", e);
485510
throw new TiKVException("Execution exception met.", e);
486511
} catch (Throwable e) {

tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverAsyncCommitTest.java

Lines changed: 115 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,13 @@ public void prewriteWtihoutSecondaryKeyTest() {
226226
long startTs = session.getTimestamp().getVersion();
227227
Assert.assertTrue(
228228
prewriteStringUsingAsyncCommit(
229-
primaryKey, primaryKeyValue, startTs, primaryKey, ASYNC_COMMIT_TTL, secondaries));
229+
primaryKey,
230+
primaryKeyValue,
231+
startTs,
232+
primaryKey,
233+
ASYNC_COMMIT_TTL,
234+
secondaries,
235+
false));
230236

231237
for (int i = 0; i < secondaryKeyList.size(); i++) {
232238
if (i == secondarySize - 1) {
@@ -240,7 +246,8 @@ public void prewriteWtihoutSecondaryKeyTest() {
240246
startTs,
241247
primaryKey,
242248
ASYNC_COMMIT_TTL,
243-
null));
249+
null,
250+
false));
244251
}
245252
}
246253

@@ -279,7 +286,8 @@ public void prewriteWtihoutPrimaryKeyTest() {
279286
startTs,
280287
primaryKey,
281288
ASYNC_COMMIT_TTL,
282-
null));
289+
null,
290+
false));
283291
}
284292

285293
// skip commitString primary key
@@ -323,6 +331,67 @@ public void ttlExpiredTest() throws InterruptedException {
323331
}
324332
}
325333

334+
@Test
335+
public void fallBackTest() {
336+
if (!check()) {
337+
return;
338+
}
339+
340+
// Case 1: Fallback primary, read primary
341+
String primaryKey = genRandomKey(64);
342+
List<String> secondaryKeyList = randomSecondaryKeyList();
343+
fallback(primaryKey, secondaryKeyList, true, false);
344+
assertEquals(pointGet(primaryKey), oldValue);
345+
for (int i = 0; i < secondarySize; i++) {
346+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
347+
}
348+
349+
// Case 2: Fallback primary, read secondary
350+
primaryKey = genRandomKey(64);
351+
secondaryKeyList = randomSecondaryKeyList();
352+
fallback(primaryKey, secondaryKeyList, true, false);
353+
for (int i = 0; i < secondarySize; i++) {
354+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
355+
}
356+
assertEquals(pointGet(primaryKey), oldValue);
357+
358+
// Case 3: Fallback secondary, read primary
359+
primaryKey = genRandomKey(64);
360+
secondaryKeyList = randomSecondaryKeyList();
361+
fallback(primaryKey, secondaryKeyList, false, true);
362+
assertEquals(pointGet(primaryKey), oldValue);
363+
for (int i = 0; i < secondarySize; i++) {
364+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
365+
}
366+
367+
// Case 4: Fallback secondary, read secondary
368+
primaryKey = genRandomKey(64);
369+
secondaryKeyList = randomSecondaryKeyList();
370+
fallback(primaryKey, secondaryKeyList, false, true);
371+
for (int i = 0; i < secondarySize; i++) {
372+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
373+
}
374+
assertEquals(pointGet(primaryKey), oldValue);
375+
376+
// Case 5: Fallback both, read primary
377+
primaryKey = genRandomKey(64);
378+
secondaryKeyList = randomSecondaryKeyList();
379+
fallback(primaryKey, secondaryKeyList, true, true);
380+
assertEquals(pointGet(primaryKey), oldValue);
381+
for (int i = 0; i < secondarySize; i++) {
382+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
383+
}
384+
385+
// Case 6: Fallback both, read secondary
386+
primaryKey = genRandomKey(64);
387+
secondaryKeyList = randomSecondaryKeyList();
388+
fallback(primaryKey, secondaryKeyList, true, true);
389+
for (int i = 0; i < secondarySize; i++) {
390+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
391+
}
392+
assertEquals(pointGet(primaryKey), oldValue);
393+
}
394+
326395
private boolean check() {
327396
if (!init) {
328397
skipTestInit();
@@ -365,7 +434,13 @@ private void prewrite(long startTs, String primaryKey, List<String> secondaryKey
365434
// prewriteString <primary key, value1, secondaries>
366435
Assert.assertTrue(
367436
prewriteStringUsingAsyncCommit(
368-
primaryKey, primaryKeyValue, startTs, primaryKey, ASYNC_COMMIT_TTL, secondaries));
437+
primaryKey,
438+
primaryKeyValue,
439+
startTs,
440+
primaryKey,
441+
ASYNC_COMMIT_TTL,
442+
secondaries,
443+
false));
369444

370445
// prewriteString secondaryKeys
371446
for (int i = 0; i < secondaryKeyList.size(); i++) {
@@ -376,7 +451,42 @@ private void prewrite(long startTs, String primaryKey, List<String> secondaryKey
376451
startTs,
377452
primaryKey,
378453
ASYNC_COMMIT_TTL,
379-
null));
454+
null,
455+
false));
456+
}
457+
}
458+
459+
private void fallback(
460+
String primaryKey,
461+
List<String> secondaryKeyList,
462+
boolean fallbackPrimary,
463+
boolean fallbackSecondary) {
464+
// put
465+
putAll(primaryKey, secondaryKeyList);
466+
// prewrite primary key
467+
long startTs = session.getTimestamp().getVersion();
468+
// prewrite secondaryKeys
469+
List<ByteString> secondaries =
470+
secondaryKeyList.stream().map(ByteString::copyFromUtf8).collect(Collectors.toList());
471+
Assert.assertTrue(
472+
prewriteStringUsingAsyncCommit(
473+
primaryKey,
474+
primaryKeyValue,
475+
startTs,
476+
primaryKey,
477+
ASYNC_COMMIT_TTL,
478+
secondaries,
479+
fallbackPrimary));
480+
for (int i = 0; i < secondaryKeyList.size(); i++) {
481+
Assert.assertTrue(
482+
prewriteStringUsingAsyncCommit(
483+
secondaryKeyList.get(i),
484+
secondaryKeyValueList[i],
485+
startTs,
486+
primaryKey,
487+
ASYNC_COMMIT_TTL,
488+
null,
489+
fallbackSecondary));
380490
}
381491
}
382492
}

0 commit comments

Comments
 (0)