Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/license-check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:
pull_request:
branches:
- master
- release-**

jobs:
check-license:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/verify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ on:
push:
branches: [ master ]
pull_request:
branches: [ master ]
branches: [ master,release-** ]

jobs:
fmt:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
*
* Copyright 2022 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.pingcap.tispark.safepoint

import com.pingcap.tikv.TiSession
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
*
* Copyright 2022 PingCAP, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package com.pingcap.tikv.exception;

public class NonAsyncCommitLockException extends RuntimeException {

public NonAsyncCommitLockException(String msg) {
super(msg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ public void prewrite(
long startTs,
long lockTTL)
throws TiClientInternalException, KeyException, RegionException {
this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false, false, null);
this.prewrite(backOffer, primary, mutations, startTs, lockTTL, false, false, null, false);
}

/**
Expand All @@ -437,7 +437,8 @@ public void prewrite(
long ttl,
boolean skipConstraintCheck,
boolean useAsyncCommit,
Iterable<ByteString> secondaries)
Iterable<ByteString> secondaries,
boolean fallbackTest)
throws TiClientInternalException, KeyException, RegionException {
boolean forWrite = true;
while (true) {
Expand All @@ -463,6 +464,10 @@ public void prewrite(
if (secondaries != null) {
builder.addAllSecondaries(secondaries);
}
// just for test
if (fallbackTest) {
builder.setMaxCommitTs(1);
}
}
return builder.build();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.pingcap.tikv.txn;

import com.google.protobuf.ByteString;
import com.pingcap.tikv.exception.NonAsyncCommitLockException;
import com.pingcap.tikv.exception.ResolveLockException;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -114,6 +115,10 @@ public synchronized void addKeys(
"unexpected timestamp, expected: %d, found: %d",
startTs, lockInfo.getLockVersion()));
}
if (!lockInfo.getUseAsyncCommit()) {
LOG.info("non-async commit lock found in async commit recovery");
throw new NonAsyncCommitLockException("non-async commit lock found");
}
if (!this.missingLock && lockInfo.getMinCommitTs() > this.commitTs) {
this.commitTs = lockInfo.getMinCommitTs();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.pingcap.tikv.PDClient;
import com.pingcap.tikv.TiConfiguration;
import com.pingcap.tikv.exception.KeyException;
import com.pingcap.tikv.exception.NonAsyncCommitLockException;
import com.pingcap.tikv.exception.RegionException;
import com.pingcap.tikv.exception.TiClientInternalException;
import com.pingcap.tikv.exception.TiKVException;
Expand Down Expand Up @@ -124,21 +125,8 @@ public ResolveLockResult resolveLocks(
Set<Long> pushed = new HashSet<>(locks.size());

for (Lock l : locks) {
TxnStatus status = getTxnStatusFromLock(bo, l, callerStartTS);

if (status.getTtl() == 0) {
Set<RegionVerID> cleanRegion =
cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());

if (status.getPrimaryLock() != null && status.getPrimaryLock().getUseAsyncCommit()) {
resolveLockAsync(bo, l, status);
} else if (l.getLockType() == org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock) {
resolvePessimisticLock(bo, l, cleanRegion);
} else {
resolveLock(bo, l, status, cleanRegion);
}

} else {
TxnStatus status = resolve(l, bo, callerStartTS, cleanTxns, false);
if (status.getTtl() != 0) {
long msBeforeLockExpired = TsoUtils.untilExpired(l.getTxnID(), status.getTtl());
msBeforeTxnExpired.update(msBeforeLockExpired);

Expand Down Expand Up @@ -169,6 +157,36 @@ public ResolveLockResult resolveLocks(
return new ResolveLockResult(msBeforeTxnExpired.value(), pushed);
}

private TxnStatus resolve(
Lock l,
BackOffer bo,
long callerStartTS,
Map<Long, Set<RegionVerID>> cleanTxns,
boolean forceSyncCommit) {
TxnStatus status = getTxnStatusFromLock(bo, l, callerStartTS, forceSyncCommit);
if (status.getTtl() != 0) {
return status;
}
Set<RegionVerID> cleanRegion = cleanTxns.computeIfAbsent(l.getTxnID(), k -> new HashSet<>());

if (status.getPrimaryLock() != null
&& status.getPrimaryLock().getUseAsyncCommit()
&& !forceSyncCommit) {
try {
resolveLockAsync(bo, l, status);
} catch (NonAsyncCommitLockException e) {
logger.info("fallback because of the non async commit lock");
return resolve(l, bo, callerStartTS, cleanTxns, true);
}
} else if (l.getLockType() == org.tikv.kvproto.Kvrpcpb.Op.PessimisticLock) {
resolvePessimisticLock(bo, l, cleanRegion);
} else {
resolveLock(bo, l, status, cleanRegion);
}

return status;
}

private void resolvePessimisticLock(BackOffer bo, Lock lock, Set<RegionVerID> cleanRegion) {
while (true) {
region = regionManager.getRegionByKey(lock.getKey());
Expand Down Expand Up @@ -225,7 +243,8 @@ private void resolvePessimisticLock(BackOffer bo, Lock lock, Set<RegionVerID> cl
}
}

private TxnStatus getTxnStatusFromLock(BackOffer bo, Lock lock, long callerStartTS) {
private TxnStatus getTxnStatusFromLock(
BackOffer bo, Lock lock, long callerStartTS, boolean forceSyncCommit) {
long currentTS;

if (lock.getTtl() == 0) {
Expand All @@ -249,7 +268,8 @@ private TxnStatus getTxnStatusFromLock(BackOffer bo, Lock lock, long callerStart
callerStartTS,
currentTS,
rollbackIfNotExist,
lock);
lock,
forceSyncCommit);
} catch (TxnNotFoundException e) {
// If the error is something other than txnNotFoundErr, throw the error (network
// unavailable, tikv down, backoff timeout etc) to the caller.
Expand Down Expand Up @@ -293,7 +313,8 @@ private TxnStatus getTxnStatus(
Long callerStartTS,
Long currentTS,
boolean rollbackIfNotExist,
Lock lock) {
Lock lock,
boolean forceSyncCommit) {
TxnStatus status = getResolved(txnID);
if (status != null) {
return status;
Expand All @@ -317,6 +338,7 @@ private TxnStatus getTxnStatus(
.setCallerStartTs(callerStartTS)
.setCurrentTs(currentTS)
.setRollbackIfNotExist(rollbackIfNotExist)
.setForceSyncCommit(forceSyncCommit)
.build();
};

Expand Down Expand Up @@ -481,6 +503,9 @@ private AsyncResolveData checkAllSecondaries(BackOffer bo, Lock lock, TxnStatus
Thread.currentThread().interrupt();
throw new TiKVException("Current thread interrupted.", e);
} catch (ExecutionException e) {
if (e.getCause() != null && e.getCause() instanceof NonAsyncCommitLockException) {
throw (NonAsyncCommitLockException) e.getCause();
}
logger.info("async commit recovery (sending CheckSecondaryLocks) finished with errors", e);
throw new TiKVException("Execution exception met.", e);
} catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,13 @@ public void prewriteWtihoutSecondaryKeyTest() {
long startTs = session.getTimestamp().getVersion();
Assert.assertTrue(
prewriteStringUsingAsyncCommit(
primaryKey, primaryKeyValue, startTs, primaryKey, ASYNC_COMMIT_TTL, secondaries));
primaryKey,
primaryKeyValue,
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
secondaries,
false));

for (int i = 0; i < secondaryKeyList.size(); i++) {
if (i == secondarySize - 1) {
Expand All @@ -240,7 +246,8 @@ public void prewriteWtihoutSecondaryKeyTest() {
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
null));
null,
false));
}
}

Expand Down Expand Up @@ -279,7 +286,8 @@ public void prewriteWtihoutPrimaryKeyTest() {
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
null));
null,
false));
}

// skip commitString primary key
Expand Down Expand Up @@ -323,6 +331,67 @@ public void ttlExpiredTest() throws InterruptedException {
}
}

@Test
public void fallBackTest() {
if (!check()) {
return;
}

// Case 1: Fallback primary, read primary
String primaryKey = genRandomKey(64);
List<String> secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, true, false);
assertEquals(pointGet(primaryKey), oldValue);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}

// Case 2: Fallback primary, read secondary
primaryKey = genRandomKey(64);
secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, true, false);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}
assertEquals(pointGet(primaryKey), oldValue);

// Case 3: Fallback secondary, read primary
primaryKey = genRandomKey(64);
secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, false, true);
assertEquals(pointGet(primaryKey), oldValue);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}

// Case 4: Fallback secondary, read secondary
primaryKey = genRandomKey(64);
secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, false, true);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}
assertEquals(pointGet(primaryKey), oldValue);

// Case 5: Fallback both, read primary
primaryKey = genRandomKey(64);
secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, true, true);
assertEquals(pointGet(primaryKey), oldValue);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}

// Case 6: Fallback both, read secondary
primaryKey = genRandomKey(64);
secondaryKeyList = randomSecondaryKeyList();
fallback(primaryKey, secondaryKeyList, true, true);
for (int i = 0; i < secondarySize; i++) {
assertEquals(pointGet(secondaryKeyList.get(i)), oldValue);
}
assertEquals(pointGet(primaryKey), oldValue);
}

private boolean check() {
if (!init) {
skipTestInit();
Expand Down Expand Up @@ -365,7 +434,13 @@ private void prewrite(long startTs, String primaryKey, List<String> secondaryKey
// prewriteString <primary key, value1, secondaries>
Assert.assertTrue(
prewriteStringUsingAsyncCommit(
primaryKey, primaryKeyValue, startTs, primaryKey, ASYNC_COMMIT_TTL, secondaries));
primaryKey,
primaryKeyValue,
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
secondaries,
false));

// prewriteString secondaryKeys
for (int i = 0; i < secondaryKeyList.size(); i++) {
Expand All @@ -376,7 +451,42 @@ private void prewrite(long startTs, String primaryKey, List<String> secondaryKey
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
null));
null,
false));
}
}

private void fallback(
String primaryKey,
List<String> secondaryKeyList,
boolean fallbackPrimary,
boolean fallbackSecondary) {
// put
putAll(primaryKey, secondaryKeyList);
// prewrite primary key
long startTs = session.getTimestamp().getVersion();
// prewrite secondaryKeys
List<ByteString> secondaries =
secondaryKeyList.stream().map(ByteString::copyFromUtf8).collect(Collectors.toList());
Assert.assertTrue(
prewriteStringUsingAsyncCommit(
primaryKey,
primaryKeyValue,
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
secondaries,
fallbackPrimary));
for (int i = 0; i < secondaryKeyList.size(); i++) {
Assert.assertTrue(
prewriteStringUsingAsyncCommit(
secondaryKeyList.get(i),
secondaryKeyValueList[i],
startTs,
primaryKey,
ASYNC_COMMIT_TTL,
null,
fallbackSecondary));
}
}
}
Loading