Skip to content

Commit c083fc5

Browse files
authored
Support probe tiflash status (#2619) (#2635)
1 parent acd67ec commit c083fc5

File tree

8 files changed

+129
-48
lines changed

8 files changed

+129
-48
lines changed

docs/userguide_3.0.md

Lines changed: 42 additions & 42 deletions
Large diffs are not rendered by default.

tikv-client/scripts/proto.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ CURRENT_DIR=`pwd`
1818
TISPARK_HOME="$(cd "`dirname "$0"`"/../..; pwd)"
1919
cd $TISPARK_HOME/tikv-client
2020

21-
kvproto_hash=6ed99a08e262d8a32d6355dcba91cf99cb92074a
21+
kvproto_hash=4d69c6f95e683dfb5859277563bf896aca06ec34
2222

2323
raft_rs_hash=b9891b673573fad77ebcf9bbe0969cf945841926
2424

tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
@Accessors(chain = true)
4040
public class TiConfiguration implements Serializable {
4141
private static final DateTimeZone DEF_TIMEZONE = Converter.getLocalTimezone();
42-
private static final int DEF_TIMEOUT = 10;
42+
private static final int DEF_TIMEOUT = 3;
4343
private static final TimeUnit DEF_TIMEOUT_UNIT = TimeUnit.MINUTES;
4444
private static final int DEF_SCAN_BATCH_SIZE = 10480;
4545
private static final boolean DEF_IGNORE_TRUNCATE = true;

tikv-client/src/main/java/com/pingcap/tikv/TiSession.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,11 @@
3838
import java.util.HashMap;
3939
import java.util.List;
4040
import java.util.Map;
41+
import java.util.concurrent.ConcurrentHashMap;
4142
import java.util.concurrent.ExecutorService;
4243
import java.util.concurrent.Executors;
44+
import java.util.concurrent.ScheduledExecutorService;
45+
import java.util.concurrent.TimeUnit;
4346
import java.util.function.Function;
4447
import java.util.stream.Collectors;
4548
import org.slf4j.Logger;
@@ -67,6 +70,9 @@ public class TiSession implements AutoCloseable {
6770
private boolean isClosed = false;
6871
private volatile TiTimestamp snapshotTimestamp;
6972
private volatile Catalog snapshotCatalog;
73+
// storeStatusCache will be init at @see DAGIterator#isMppStoreAlive
74+
private volatile Map<String, Boolean> storeStatusCache;
75+
private ScheduledExecutorService storeStatusCacheExecutor;
7076

7177
private TiSession(TiConfiguration conf) {
7278
this.conf = conf;
@@ -111,6 +117,11 @@ public static TiSession getInstance(TiConfiguration conf) {
111117
}
112118
}
113119

120+
121+
public ChannelFactory getChannelFactory() {
122+
return this.channelFactory;
123+
}
124+
114125
public TxnKVClient createTxnClient() {
115126
return new TxnKVClient(conf, this.getRegionStoreClientBuilder(), this.getPDClient());
116127
}
@@ -204,6 +215,28 @@ public synchronized RegionManager getRegionManager() {
204215
return res;
205216
}
206217

218+
public Map<String, Boolean> getStoreStatusCache() {
219+
if (storeStatusCache == null) {
220+
synchronized (this) {
221+
if (storeStatusCache == null) {
222+
storeStatusCache = new ConcurrentHashMap<>();
223+
storeStatusCacheExecutor = Executors.newScheduledThreadPool(1);
224+
storeStatusCacheExecutor.scheduleAtFixedRate(
225+
() -> {
226+
storeStatusCache.replaceAll(
227+
(k, v) ->
228+
RegionStoreClient.isMppAlive(
229+
channelFactory.getChannel(k)));
230+
},
231+
0,
232+
5,
233+
TimeUnit.SECONDS);
234+
}
235+
}
236+
}
237+
return storeStatusCache;
238+
}
239+
207240
public ExecutorService getThreadPoolForIndexScan() {
208241
ExecutorService res = indexScanThreadPool;
209242
if (res == null) {
@@ -473,6 +506,9 @@ public synchronized void close() throws Exception {
473506
if (deleteRangeThreadPool != null) {
474507
deleteRangeThreadPool.shutdownNow();
475508
}
509+
if (storeStatusCacheExecutor != null) {
510+
storeStatusCacheExecutor.shutdownNow();
511+
}
476512
if (client != null) {
477513
getPDClient().close();
478514
}

tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/DAGIterator.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.HashSet;
4040
import java.util.Iterator;
4141
import java.util.List;
42+
import java.util.Map;
4243
import java.util.Queue;
4344
import java.util.concurrent.ExecutorCompletionService;
4445
import org.slf4j.Logger;
@@ -215,9 +216,18 @@ private SelectResponse process(RangeSplitter.RegionTask regionTask) {
215216
try {
216217
RegionStoreClient client =
217218
session.getRegionStoreClientBuilder().build(region, store, storeType);
219+
// if mpp store is not alive, drop it and generate a new task.
220+
if (storeType == TiStoreType.TiFlash && !isMppStoreAlive(store.getAddress())) {
221+
logger.debug("Re-splitting region task due to TiFlash is unavailable");
222+
remainTasks.addAll(
223+
RangeSplitter.newSplitter(client.regionManager)
224+
.splitRangeByRegion(ranges, storeType));
225+
continue;
226+
}
218227
client.addResolvedLocks(startTs, resolvedLocks);
219228
Collection<RangeSplitter.RegionTask> tasks =
220229
client.coprocess(backOffer, dagRequest, region, ranges, responseQueue, startTs);
230+
221231
if (tasks != null) {
222232
remainTasks.addAll(tasks);
223233
}
@@ -269,4 +279,20 @@ private Iterator<SelectResponse> processByStreaming(RangeSplitter.RegionTask reg
269279
throw new TiClientInternalException("Error Closing Store client.", e);
270280
}
271281
}
282+
283+
// See https://github.com/pingcap/tispark/pull/2619 for more details
284+
public Boolean isMppStoreAlive(String address) {
285+
try {
286+
Map<String, Boolean> storeStatusCache = session.getStoreStatusCache();
287+
return storeStatusCache.computeIfAbsent(
288+
address,
289+
key ->
290+
RegionStoreClient.isMppAlive(
291+
session
292+
.getChannelFactory()
293+
.getChannel(address)));
294+
} catch (Exception e) {
295+
throw new TiClientInternalException("Error get MppStore Status.", e);
296+
}
297+
}
272298
}

tikv-client/src/main/java/com/pingcap/tikv/region/AbstractRegionStoreClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public abstract class AbstractRegionStoreClient
3333
extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvStub>
3434
implements RegionErrorReceiver {
3535

36-
protected final RegionManager regionManager;
36+
public final RegionManager regionManager;
3737
protected TiRegion region;
3838

3939
protected AbstractRegionStoreClient(

tikv-client/src/main/java/com/pingcap/tikv/region/RegionManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ public void invalidateRegion(TiRegion region) {
196196
}
197197

198198
public void invalidateRange(ByteString startKey, ByteString endKey) {
199-
cache.invalidateRange(startKey,endKey);
199+
cache.invalidateRange(startKey, endKey);
200200
}
201201

202202
public static class RegionCache {
@@ -264,7 +264,8 @@ private synchronized TiRegion getRegionFromCache(Key key) {
264264
private synchronized void invalidateRange(ByteString startKey, ByteString endKey) {
265265
regionCache.remove(makeRange(startKey, endKey));
266266
if (logger.isDebugEnabled()) {
267-
logger.debug(String.format("invalidateRange success, startKey[%s], endKey[%s]", startKey, endKey));
267+
logger.debug(
268+
String.format("invalidateRange success, startKey[%s], endKey[%s]", startKey, endKey));
268269
}
269270
}
270271

tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import com.pingcap.tikv.util.Pair;
5353
import com.pingcap.tikv.util.RangeSplitter;
5454
import io.grpc.ManagedChannel;
55+
import io.grpc.stub.ClientCalls;
5556
import java.util.ArrayList;
5657
import java.util.Collections;
5758
import java.util.HashMap;
@@ -62,6 +63,7 @@
6263
import java.util.Objects;
6364
import java.util.Queue;
6465
import java.util.Set;
66+
import java.util.concurrent.TimeUnit;
6567
import java.util.function.Supplier;
6668
import java.util.stream.Collectors;
6769
import org.slf4j.Logger;
@@ -106,6 +108,7 @@
106108
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest;
107109
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse;
108110
import org.tikv.kvproto.Metapb.Store;
111+
import org.tikv.kvproto.Mpp;
109112
import org.tikv.kvproto.TikvGrpc;
110113
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
111114
import org.tikv.kvproto.TikvGrpc.TikvStub;
@@ -712,7 +715,7 @@ private List<RangeSplitter.RegionTask> handleCopResponse(
712715
// we need to invalidate cache when region not find
713716
if (regionError.hasRegionNotFound()) {
714717
logger.info("invalidateRange when Re-splitting region task because of region not find.");
715-
this.regionManager.invalidateRange(region.getStartKey(),region.getEndKey());
718+
this.regionManager.invalidateRange(region.getStartKey(), region.getEndKey());
716719
}
717720
// Split ranges
718721
return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, storeType);
@@ -1201,6 +1204,21 @@ private void rawDeleteRangeHelper(RawDeleteRangeResponse resp) {
12011204
}
12021205
}
12031206

1207+
public static boolean isMppAlive(ManagedChannel channel) {
1208+
TikvGrpc.TikvBlockingStub stub =
1209+
TikvGrpc.newBlockingStub(channel).withDeadlineAfter(500, TimeUnit.MILLISECONDS);
1210+
Supplier<Mpp.IsAliveRequest> factory = () -> Mpp.IsAliveRequest.newBuilder().build();
1211+
try {
1212+
Mpp.IsAliveResponse resp =
1213+
ClientCalls.blockingUnaryCall(
1214+
stub.getChannel(), TikvGrpc.getIsAliveMethod(), stub.getCallOptions(), factory.get());
1215+
return resp != null && resp.getAvailable();
1216+
} catch (Exception e) {
1217+
logger.warn("Call mpp isAlive fail with Exception", e);
1218+
return false;
1219+
}
1220+
}
1221+
12041222
public enum RequestTypes {
12051223
REQ_TYPE_SELECT(101),
12061224
REQ_TYPE_INDEX(102),

0 commit comments

Comments
 (0)