Skip to content

Commit 9ff50fe

Browse files
committed
async_fallback
1 parent 88a12aa commit 9ff50fe

File tree

7 files changed

+199
-54
lines changed

7 files changed

+199
-54
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.pingcap.tikv.exception;
2+
3+
public class NonAsyncCommitLockException extends RuntimeException {
4+
5+
public NonAsyncCommitLockException(String msg) {
6+
super(msg);
7+
}
8+
}

tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ public void prewrite(
421421
long startTs,
422422
long lockTTL)
423423
throws TiClientInternalException, KeyException, RegionException {
424-
this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false, false, null);
424+
this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false, false, null, false);
425425
}
426426

427427
/**
@@ -437,7 +437,8 @@ public void prewrite(
437437
long ttl,
438438
boolean skipConstraintCheck,
439439
boolean useAsyncCommit,
440-
Iterable<ByteString> secondaries)
440+
Iterable<ByteString> secondaries,
441+
boolean fallbackTest)
441442
throws TiClientInternalException, KeyException, RegionException {
442443
boolean forWrite = true;
443444
while (true) {
@@ -463,6 +464,10 @@ public void prewrite(
463464
if (secondaries != null) {
464465
builder.addAllSecondaries(secondaries);
465466
}
467+
// just for test
468+
if (fallbackTest) {
469+
builder.setMaxCommitTs(1);
470+
}
466471
}
467472
return builder.build();
468473
};

tikv-client/src/main/java/com/pingcap/tikv/txn/AsyncResolveData.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package com.pingcap.tikv.txn;
2020

2121
import com.google.protobuf.ByteString;
22+
import com.pingcap.tikv.exception.NonAsyncCommitLockException;
2223
import com.pingcap.tikv.exception.ResolveLockException;
2324
import java.util.ArrayList;
2425
import java.util.List;
@@ -114,6 +115,10 @@ public synchronized void addKeys(
114115
"unexpected timestamp, expected: %d, found: %d",
115116
startTs, lockInfo.getLockVersion()));
116117
}
118+
if (!lockInfo.getUseAsyncCommit()) {
119+
LOG.info("non-async commit lock found in async commit recovery");
120+
throw new NonAsyncCommitLockException("non-async commit lock found");
121+
}
117122
if (!this.missingLock && lockInfo.getMinCommitTs() > this.commitTs) {
118123
this.commitTs = lockInfo.getMinCommitTs();
119124
}

tikv-client/src/main/java/com/pingcap/tikv/txn/LockResolverClientV4.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.pingcap.tikv.PDClient;
2828
import com.pingcap.tikv.TiConfiguration;
2929
import com.pingcap.tikv.exception.KeyException;
30+
import com.pingcap.tikv.exception.NonAsyncCommitLockException;
3031
import com.pingcap.tikv.exception.RegionException;
3132
import com.pingcap.tikv.exception.TiClientInternalException;
3233
import com.pingcap.tikv.exception.TiKVException;
@@ -131,7 +132,16 @@ public ResolveLockResult resolveLocks(
131132
cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());
132133

133134
if (status.getPrimaryLock() != null && status.getPrimaryLock().getUseAsyncCommit()) {
134-
resolveLockAsync(bo, l, status);
135+
try {
136+
resolveLockAsync(bo, l, status);
137+
} catch (NonAsyncCommitLockException e) {
138+
logger.info("fallback because of the non async commit lock");
139+
if (l.getLockType() == org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock) {
140+
resolvePessimisticLock(bo, l, cleanRegion);
141+
} else {
142+
resolveLock(bo, l, status, cleanRegion);
143+
}
144+
}
135145
} else if (l.getLockType() == org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock) {
136146
resolvePessimisticLock(bo, l, cleanRegion);
137147
} else {
@@ -481,6 +491,9 @@ private AsyncResolveData checkAllSecondaries(BackOffer bo, Lock lock, TxnStatus
481491
Thread.currentThread().interrupt();
482492
throw new TiKVException("Current thread interrupted.", e);
483493
} catch (ExecutionException e) {
494+
if (e.getCause() != null && e.getCause() instanceof NonAsyncCommitLockException) {
495+
throw (NonAsyncCommitLockException) e.getCause();
496+
}
484497
logger.info("async commit recovery (sending CheckSecondaryLocks) finished with errors", e);
485498
throw new TiKVException("Execution exception met.", e);
486499
} catch (Throwable e) {

tikv-client/src/test/java/com/pingcap/tikv/KVMockServer.java

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -123,28 +123,28 @@ public void rawGet(
123123
}
124124
}
125125

126-
/** */
127-
public void rawPut(
128-
org.tikv.kvproto.Kvrpcpb.RawPutRequest request,
129-
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.RawPutResponse> responseObserver) {
130-
try {
131-
verifyContext(request.getContext());
132-
ByteString key = request.getKey();
133-
134-
Kvrpcpb.RawPutResponse.Builder builder = Kvrpcpb.RawPutResponse.newBuilder();
135-
Integer errorCode = errorMap.remove(key);
136-
Errorpb.Error.Builder errBuilder = Errorpb.Error.newBuilder();
137-
if (errorCode != null) {
138-
setErrorInfo(errorCode, errBuilder);
139-
builder.setRegionError(errBuilder.build());
140-
// builder.setError("");
126+
/** */
127+
public void rawPut(
128+
org.tikv.kvproto.Kvrpcpb.RawPutRequest request,
129+
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.RawPutResponse> responseObserver) {
130+
try {
131+
verifyContext(request.getContext());
132+
ByteString key = request.getKey();
133+
134+
Kvrpcpb.RawPutResponse.Builder builder = Kvrpcpb.RawPutResponse.newBuilder();
135+
Integer errorCode = errorMap.remove(key);
136+
Errorpb.Error.Builder errBuilder = Errorpb.Error.newBuilder();
137+
if (errorCode != null) {
138+
setErrorInfo(errorCode, errBuilder);
139+
builder.setRegionError(errBuilder.build());
140+
// builder.setError("");
141+
}
142+
responseObserver.onNext(builder.build());
143+
responseObserver.onCompleted();
144+
} catch (Exception e) {
145+
responseObserver.onError(Status.INTERNAL.asRuntimeException());
141146
}
142-
responseObserver.onNext(builder.build());
143-
responseObserver.onCompleted();
144-
} catch (Exception e) {
145-
responseObserver.onError(Status.INTERNAL.asRuntimeException());
146147
}
147-
}
148148

149149
private void setErrorInfo(int errorCode, Errorpb.Error.Builder errBuilder) {
150150
if (errorCode == NOT_LEADER) {
@@ -344,29 +344,29 @@ public void coprocessor(
344344
}
345345
}
346346

347-
@Override
348-
public void kvPrewrite(
349-
org.tikv.kvproto.Kvrpcpb.PrewriteRequest request,
350-
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.PrewriteResponse> responseObserver) {
351-
Kvrpcpb.PrewriteResponse.Builder builder = Kvrpcpb.PrewriteResponse.newBuilder();
352-
ByteString key = request.getPrimaryLock();
353-
Integer errorCode = errorMap.remove(key);
354-
Kvrpcpb.KeyError.Builder errBuilder = Kvrpcpb.KeyError.newBuilder();
355-
356-
if (errorCode != null) {
357-
if (errorCode == ABORT) {
358-
errBuilder.setAbort("ABORT");
359-
} else if (errorCode == RETRY) {
360-
errBuilder.setRetryable("Retry");
361-
} else if (errorCode == WRITE_CONFLICT) {
362-
Kvrpcpb.WriteConflict.Builder writeConflict = Kvrpcpb.WriteConflict.newBuilder();
363-
errBuilder.setConflict(writeConflict);
347+
@Override
348+
public void kvPrewrite(
349+
org.tikv.kvproto.Kvrpcpb.PrewriteRequest request,
350+
io.grpc.stub.StreamObserver<org.tikv.kvproto.Kvrpcpb.PrewriteResponse> responseObserver) {
351+
Kvrpcpb.PrewriteResponse.Builder builder = Kvrpcpb.PrewriteResponse.newBuilder();
352+
ByteString key = request.getPrimaryLock();
353+
Integer errorCode = errorMap.remove(key);
354+
Kvrpcpb.KeyError.Builder errBuilder = Kvrpcpb.KeyError.newBuilder();
355+
356+
if (errorCode != null) {
357+
if (errorCode == ABORT) {
358+
errBuilder.setAbort("ABORT");
359+
} else if (errorCode == RETRY) {
360+
errBuilder.setRetryable("Retry");
361+
} else if (errorCode == WRITE_CONFLICT) {
362+
Kvrpcpb.WriteConflict.Builder writeConflict = Kvrpcpb.WriteConflict.newBuilder();
363+
errBuilder.setConflict(writeConflict);
364+
}
365+
builder.addErrors(errBuilder);
364366
}
365-
builder.addErrors(errBuilder);
367+
responseObserver.onNext(builder.build());
368+
responseObserver.onCompleted();
366369
}
367-
responseObserver.onNext(builder.build());
368-
responseObserver.onCompleted();
369-
}
370370

371371
public int start(TiRegion region) throws IOException {
372372
try (ServerSocket s = new ServerSocket(0)) {

tikv-client/src/test/java/com/pingcap/tikv/txn/LockResolverAsyncCommitTest.java

Lines changed: 115 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,13 @@ public void prewriteWtihoutSecondaryKeyTest() {
226226
long startTs = session.getTimestamp().getVersion();
227227
Assert.assertTrue(
228228
prewriteStringUsingAsyncCommit(
229-
primaryKey, primaryKeyValue, startTs, primaryKey, ASYNC_COMMIT_TTL, secondaries));
229+
primaryKey,
230+
primaryKeyValue,
231+
startTs,
232+
primaryKey,
233+
ASYNC_COMMIT_TTL,
234+
secondaries,
235+
false));
230236

231237
for (int i = 0; i < secondaryKeyList.size(); i++) {
232238
if (i == secondarySize - 1) {
@@ -240,7 +246,8 @@ public void prewriteWtihoutSecondaryKeyTest() {
240246
startTs,
241247
primaryKey,
242248
ASYNC_COMMIT_TTL,
243-
null));
249+
null,
250+
false));
244251
}
245252
}
246253

@@ -279,7 +286,8 @@ public void prewriteWtihoutPrimaryKeyTest() {
279286
startTs,
280287
primaryKey,
281288
ASYNC_COMMIT_TTL,
282-
null));
289+
null,
290+
false));
283291
}
284292

285293
// skip commitString primary key
@@ -323,6 +331,67 @@ public void ttlExpiredTest() throws InterruptedException {
323331
}
324332
}
325333

334+
@Test
335+
public void fallBackTest() {
336+
if (!check()) {
337+
return;
338+
}
339+
340+
// Case 1: Fallback primary, read primary
341+
String primaryKey = genRandomKey(64);
342+
List<String> secondaryKeyList = randomSecondaryKeyList();
343+
fallback(primaryKey, secondaryKeyList, true, false);
344+
assertEquals(pointGet(primaryKey), oldValue);
345+
for (int i = 0; i < secondarySize; i++) {
346+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
347+
}
348+
349+
// Case 2: Fallback primary, read secondary
350+
primaryKey = genRandomKey(64);
351+
secondaryKeyList = randomSecondaryKeyList();
352+
fallback(primaryKey, secondaryKeyList, true, false);
353+
for (int i = 0; i < secondarySize; i++) {
354+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
355+
}
356+
assertEquals(pointGet(primaryKey), oldValue);
357+
358+
// Case 3: Fallback secondary, read primary
359+
primaryKey = genRandomKey(64);
360+
secondaryKeyList = randomSecondaryKeyList();
361+
fallback(primaryKey, secondaryKeyList, false, true);
362+
assertEquals(pointGet(primaryKey), oldValue);
363+
for (int i = 0; i < secondarySize; i++) {
364+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
365+
}
366+
367+
// Case 4: Fallback secondary, read secondary
368+
primaryKey = genRandomKey(64);
369+
secondaryKeyList = randomSecondaryKeyList();
370+
fallback(primaryKey, secondaryKeyList, false, true);
371+
for (int i = 0; i < secondarySize; i++) {
372+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
373+
}
374+
assertEquals(pointGet(primaryKey), oldValue);
375+
376+
// Case 5: Fallback both, read primary
377+
primaryKey = genRandomKey(64);
378+
secondaryKeyList = randomSecondaryKeyList();
379+
fallback(primaryKey, secondaryKeyList, true, true);
380+
assertEquals(pointGet(primaryKey), oldValue);
381+
for (int i = 0; i < secondarySize; i++) {
382+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
383+
}
384+
385+
// Case 6: Fallback both, read secondary
386+
primaryKey = genRandomKey(64);
387+
secondaryKeyList = randomSecondaryKeyList();
388+
fallback(primaryKey, secondaryKeyList, true, true);
389+
for (int i = 0; i < secondarySize; i++) {
390+
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
391+
}
392+
assertEquals(pointGet(primaryKey), oldValue);
393+
}
394+
326395
private boolean check() {
327396
if (!init) {
328397
skipTestInit();
@@ -365,7 +434,13 @@ private void prewrite(long startTs, String primaryKey, List<String> secondaryKey
365434
// prewriteString <primary key, value1, secondaries>
366435
Assert.assertTrue(
367436
prewriteStringUsingAsyncCommit(
368-
primaryKey, primaryKeyValue, startTs, primaryKey, ASYNC_COMMIT_TTL, secondaries));
437+
primaryKey,
438+
primaryKeyValue,
439+
startTs,
440+
primaryKey,
441+
ASYNC_COMMIT_TTL,
442+
secondaries,
443+
false));
369444

370445
// prewriteString secondaryKeys
371446
for (int i = 0; i < secondaryKeyList.size(); i++) {
@@ -376,7 +451,42 @@ private void prewrite(long startTs, String primaryKey, List<String> secondaryKey
376451
startTs,
377452
primaryKey,
378453
ASYNC_COMMIT_TTL,
379-
null));
454+
null,
455+
false));
456+
}
457+
}
458+
459+
private void fallback(
460+
String primaryKey,
461+
List<String> secondaryKeyList,
462+
boolean fallbackPrimary,
463+
boolean fallbackSecondary) {
464+
// put
465+
putAll(primaryKey, secondaryKeyList);
466+
// prewrite primary key
467+
long startTs = session.getTimestamp().getVersion();
468+
// prewrite secondaryKeys
469+
List<ByteString> secondaries =
470+
secondaryKeyList.stream().map(ByteString::copyFromUtf8).collect(Collectors.toList());
471+
Assert.assertTrue(
472+
prewriteStringUsingAsyncCommit(
473+
primaryKey,
474+
primaryKeyValue,
475+
startTs,
476+
primaryKey,
477+
ASYNC_COMMIT_TTL,
478+
secondaries,
479+
fallbackPrimary));
480+
for (int i = 0; i < secondaryKeyList.size(); i++) {
481+
Assert.assertTrue(
482+
prewriteStringUsingAsyncCommit(
483+
secondaryKeyList.get(i),
484+
secondaryKeyValueList[i],
485+
startTs,
486+
primaryKey,
487+
ASYNC_COMMIT_TTL,
488+
null,
489+
fallbackSecondary));
380490
}
381491
}
382492
}

0 commit comments

Comments
 (0)