Skip to content

Commit bc02e50

Browse files
authored
Support probe tiflash status (#2619)
1 parent e9fb1ba commit bc02e50

File tree

8 files changed

+128
-48
lines changed

8 files changed

+128
-48
lines changed

docs/userguide_3.0.md

Lines changed: 42 additions & 42 deletions
Large diffs are not rendered by default.

tikv-client/scripts/proto.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ CURRENT_DIR=`pwd`
1818
TISPARK_HOME="$(cd "`dirname "$0"`"/../..; pwd)"
1919
cd $TISPARK_HOME/tikv-client
2020

21-
kvproto_hash=6ed99a08e262d8a32d6355dcba91cf99cb92074a
21+
kvproto_hash=4d69c6f95e683dfb5859277563bf896aca06ec34
2222

2323
raft_rs_hash=b9891b673573fad77ebcf9bbe0969cf945841926
2424

tikv-client/src/main/java/com/pingcap/tikv/TiConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
@Accessors(chain = true)
4343
public class TiConfiguration implements Serializable {
4444
private static final DateTimeZone DEF_TIMEZONE = Converter.getLocalTimezone();
45-
private static final int DEF_TIMEOUT = 10;
45+
private static final int DEF_TIMEOUT = 3;
4646
private static final TimeUnit DEF_TIMEOUT_UNIT = TimeUnit.MINUTES;
4747
private static final int DEF_SCAN_BATCH_SIZE = 10480;
4848
private static final boolean DEF_IGNORE_TRUNCATE = true;

tikv-client/src/main/java/com/pingcap/tikv/TiSession.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,11 @@
3939
import java.util.HashMap;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.concurrent.ConcurrentHashMap;
4243
import java.util.concurrent.ExecutorService;
4344
import java.util.concurrent.Executors;
45+
import java.util.concurrent.ScheduledExecutorService;
46+
import java.util.concurrent.TimeUnit;
4447
import java.util.function.Function;
4548
import java.util.stream.Collectors;
4649
import org.slf4j.Logger;
@@ -68,6 +71,9 @@ public class TiSession implements AutoCloseable {
6871
private boolean isClosed = false;
6972
private volatile TiTimestamp snapshotTimestamp;
7073
private volatile Catalog snapshotCatalog;
74+
// storeStatusCache will be init at @see DAGIterator#isMppStoreAlive
75+
private volatile Map<String, Boolean> storeStatusCache;
76+
private ScheduledExecutorService storeStatusCacheExecutor;
7177

7278
private TiSession(TiConfiguration conf) {
7379
this.conf = conf;
@@ -113,6 +119,10 @@ public static TiSession getInstance(TiConfiguration conf) {
113119
}
114120
}
115121

122+
public ChannelFactory getChannelFactory() {
123+
return this.channelFactory;
124+
}
125+
116126
// if NewCollationEnabled is not set in configuration file,
117127
// we will set it to true when TiDB version is greater than or equal to v6.0.0.
118128
// Otherwise, we will set it to false
@@ -219,6 +229,28 @@ public synchronized RegionManager getRegionManager() {
219229
return res;
220230
}
221231

232+
public Map<String, Boolean> getStoreStatusCache() {
233+
if (storeStatusCache == null) {
234+
synchronized (this) {
235+
if (storeStatusCache == null) {
236+
storeStatusCache = new ConcurrentHashMap<>();
237+
storeStatusCacheExecutor = Executors.newScheduledThreadPool(1);
238+
storeStatusCacheExecutor.scheduleAtFixedRate(
239+
() -> {
240+
storeStatusCache.replaceAll(
241+
(k, v) ->
242+
RegionStoreClient.isMppAlive(
243+
channelFactory.getChannel(k, getPDClient().getHostMapping())));
244+
},
245+
0,
246+
5,
247+
TimeUnit.SECONDS);
248+
}
249+
}
250+
}
251+
return storeStatusCache;
252+
}
253+
222254
public ExecutorService getThreadPoolForIndexScan() {
223255
ExecutorService res = indexScanThreadPool;
224256
if (res == null) {
@@ -486,6 +518,9 @@ public synchronized void close() throws Exception {
486518
if (deleteRangeThreadPool != null) {
487519
deleteRangeThreadPool.shutdownNow();
488520
}
521+
if (storeStatusCacheExecutor != null) {
522+
storeStatusCacheExecutor.shutdownNow();
523+
}
489524
if (client != null) {
490525
getPDClient().close();
491526
}

tikv-client/src/main/java/com/pingcap/tikv/operation/iterator/DAGIterator.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.HashSet;
4040
import java.util.Iterator;
4141
import java.util.List;
42+
import java.util.Map;
4243
import java.util.Queue;
4344
import java.util.concurrent.ExecutorCompletionService;
4445
import org.slf4j.Logger;
@@ -215,9 +216,18 @@ private SelectResponse process(RangeSplitter.RegionTask regionTask) {
215216
try {
216217
RegionStoreClient client =
217218
session.getRegionStoreClientBuilder().build(region, store, storeType);
219+
// if mpp store is not alive, drop it and generate a new task.
220+
if (storeType == TiStoreType.TiFlash && !isMppStoreAlive(store.getAddress())) {
221+
logger.debug("Re-splitting region task due to TiFlash is unavailable");
222+
remainTasks.addAll(
223+
RangeSplitter.newSplitter(client.regionManager)
224+
.splitRangeByRegion(ranges, storeType));
225+
continue;
226+
}
218227
client.addResolvedLocks(startTs, resolvedLocks);
219228
Collection<RangeSplitter.RegionTask> tasks =
220229
client.coprocess(backOffer, dagRequest, region, ranges, responseQueue, startTs);
230+
221231
if (tasks != null) {
222232
remainTasks.addAll(tasks);
223233
}
@@ -269,4 +279,20 @@ private Iterator<SelectResponse> processByStreaming(RangeSplitter.RegionTask reg
269279
throw new TiClientInternalException("Error Closing Store client.", e);
270280
}
271281
}
282+
283+
// See https://github.com/pingcap/tispark/pull/2619 for more details
284+
public Boolean isMppStoreAlive(String address) {
285+
try {
286+
Map<String, Boolean> storeStatusCache = session.getStoreStatusCache();
287+
return storeStatusCache.computeIfAbsent(
288+
address,
289+
key ->
290+
RegionStoreClient.isMppAlive(
291+
session
292+
.getChannelFactory()
293+
.getChannel(address, session.getPDClient().getHostMapping())));
294+
} catch (Exception e) {
295+
throw new TiClientInternalException("Error get MppStore Status.", e);
296+
}
297+
}
272298
}

tikv-client/src/main/java/com/pingcap/tikv/region/AbstractRegionStoreClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public abstract class AbstractRegionStoreClient
3333
extends AbstractGRPCClient<TikvGrpc.TikvBlockingStub, TikvGrpc.TikvStub>
3434
implements RegionErrorReceiver {
3535

36-
protected final RegionManager regionManager;
36+
public final RegionManager regionManager;
3737
protected TiRegion region;
3838

3939
protected AbstractRegionStoreClient(

tikv-client/src/main/java/com/pingcap/tikv/region/RegionManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ public void invalidateRegion(TiRegion region) {
191191
}
192192

193193
public void invalidateRange(ByteString startKey, ByteString endKey) {
194-
cache.invalidateRange(startKey,endKey);
194+
cache.invalidateRange(startKey, endKey);
195195
}
196196

197197
public static class RegionCache {
@@ -259,7 +259,8 @@ private synchronized TiRegion getRegionFromCache(Key key) {
259259
private synchronized void invalidateRange(ByteString startKey, ByteString endKey) {
260260
regionCache.remove(makeRange(startKey, endKey));
261261
if (logger.isDebugEnabled()) {
262-
logger.debug(String.format("invalidateRange success, startKey[%s], endKey[%s]", startKey, endKey));
262+
logger.debug(
263+
String.format("invalidateRange success, startKey[%s], endKey[%s]", startKey, endKey));
263264
}
264265
}
265266

tikv-client/src/main/java/com/pingcap/tikv/region/RegionStoreClient.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import com.pingcap.tikv.util.Pair;
5353
import com.pingcap.tikv.util.RangeSplitter;
5454
import io.grpc.ManagedChannel;
55+
import io.grpc.stub.ClientCalls;
5556
import java.util.ArrayList;
5657
import java.util.Collections;
5758
import java.util.HashMap;
@@ -62,6 +63,7 @@
6263
import java.util.Objects;
6364
import java.util.Queue;
6465
import java.util.Set;
66+
import java.util.concurrent.TimeUnit;
6567
import java.util.function.Supplier;
6668
import java.util.stream.Collectors;
6769
import org.slf4j.Logger;
@@ -106,6 +108,7 @@
106108
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatRequest;
107109
import org.tikv.kvproto.Kvrpcpb.TxnHeartBeatResponse;
108110
import org.tikv.kvproto.Metapb.Store;
111+
import org.tikv.kvproto.Mpp;
109112
import org.tikv.kvproto.TikvGrpc;
110113
import org.tikv.kvproto.TikvGrpc.TikvBlockingStub;
111114
import org.tikv.kvproto.TikvGrpc.TikvStub;
@@ -713,7 +716,7 @@ private List<RangeSplitter.RegionTask> handleCopResponse(
713716
// we need to invalidate cache when region not find
714717
if (regionError.hasRegionNotFound()) {
715718
logger.info("invalidateRange when Re-splitting region task because of region not find.");
716-
this.regionManager.invalidateRange(region.getStartKey(),region.getEndKey());
719+
this.regionManager.invalidateRange(region.getStartKey(), region.getEndKey());
717720
}
718721
// Split ranges
719722
return RangeSplitter.newSplitter(this.regionManager).splitRangeByRegion(ranges, storeType);
@@ -1202,6 +1205,21 @@ private void rawDeleteRangeHelper(RawDeleteRangeResponse resp) {
12021205
}
12031206
}
12041207

1208+
public static boolean isMppAlive(ManagedChannel channel) {
1209+
TikvGrpc.TikvBlockingStub stub =
1210+
TikvGrpc.newBlockingStub(channel).withDeadlineAfter(500, TimeUnit.MILLISECONDS);
1211+
Supplier<Mpp.IsAliveRequest> factory = () -> Mpp.IsAliveRequest.newBuilder().build();
1212+
try {
1213+
Mpp.IsAliveResponse resp =
1214+
ClientCalls.blockingUnaryCall(
1215+
stub.getChannel(), TikvGrpc.getIsAliveMethod(), stub.getCallOptions(), factory.get());
1216+
return resp != null && resp.getAvailable();
1217+
} catch (Exception e) {
1218+
logger.warn("Call mpp isAlive fail with Exception", e);
1219+
return false;
1220+
}
1221+
}
1222+
12051223
public enum RequestTypes {
12061224
REQ_TYPE_SELECT(101),
12071225
REQ_TYPE_INDEX(102),

0 commit comments

Comments
 (0)