Skip to content

Commit 88a12aa

Browse files
authored
Support service safe point (#2634)
1 parent bc02e50 commit 88a12aa

File tree

11 files changed

+167
-1
lines changed

11 files changed

+167
-1
lines changed

core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,4 +79,10 @@ object TiConfigConst {
7979
val HOST_MAPPING: String = "spark.tispark.host_mapping"
8080

8181
val NEW_COLLATION_ENABLE: String = "spark.tispark.new_collation_enable"
82+
83+
// GC
84+
val GC_MAX_WAIT_TIME: String = "spark.tispark.gc_max_wait_time"
85+
val DEFAULT_GC_MAX_WAIT_TIME: Long = 24 * 60 * 60
86+
val DEFAULT_GC_SAFE_POINT_TTL: Int = 5 * 60
87+
8288
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.pingcap.tispark.safepoint
2+
3+
import com.pingcap.tikv.TiSession
4+
import com.pingcap.tikv.exception.TiInternalException
5+
import com.pingcap.tikv.meta.TiTimestamp
6+
import com.pingcap.tikv.util.{BackOffer, ConcreteBackOffer}
7+
import org.slf4j.LoggerFactory
8+
9+
import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
10+
11+
case class ServiceSafePoint(
12+
serviceId: String,
13+
ttl: Long,
14+
GCMaxWaitTime: Long,
15+
tiSession: TiSession) {
16+
17+
private final val logger = LoggerFactory.getLogger(getClass.getName)
18+
private var minStartTs = Long.MaxValue
19+
val service: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
20+
service.scheduleAtFixedRate(
21+
() => {
22+
if (minStartTs != Long.MaxValue) {
23+
val safePoint = tiSession.getPDClient.updateServiceGCSafePoint(
24+
serviceId,
25+
ttl,
26+
minStartTs,
27+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_UPDATE_SAFE_POINT_BACKOFF))
28+
if (safePoint > minStartTs) {
29+
// will not happen unless someone delete the TiSpark service safe point in PD compulsively
30+
logger.error(
31+
s"Failed to register service GC safe point because the current minimum safe point $safePoint is newer than what we assume $minStartTs. Maybe you delete the TiSpark safe point in PD.")
32+
} else {
33+
logger.info(s"register service GC safe point $minStartTs success.")
34+
}
35+
}
36+
},
37+
0,
38+
1,
39+
TimeUnit.MINUTES)
40+
41+
// TiSpark can only decrease minStartTs now. Because we can not known which transaction is finished, so we can not increase minStartTs.
42+
def updateStartTs(startTimeStamp: TiTimestamp): Unit = {
43+
this.synchronized {
44+
val now = tiSession.getTimestamp
45+
if (now.getPhysical - startTimeStamp.getPhysical >= GCMaxWaitTime * 1000) {
46+
throw new TiInternalException(
47+
s"Can not pause GC more than spark.tispark.gc_max_wait_time=$GCMaxWaitTime s. start_ts: ${startTimeStamp.getVersion}, now: ${now.getVersion}. You can adjust spark.tispark.gc_max_wait_time to increase the gc max wait time.")
48+
}
49+
val startTs = startTimeStamp.getVersion
50+
if (startTs >= minStartTs) {
51+
// minStartTs >= safe point, so startTs must >= safe point. Check it in case some one delete the TiSpark service safe point in PD compulsively.
52+
checkServiceSafePoint(startTs)
53+
} else {
54+
// applyServiceSafePoint may throw exception. Consider startTs < safePoint < minStartTs.
55+
applyServiceSafePoint(startTs)
56+
// let minStartTs = startTs after applyServiceSafePoint success
57+
minStartTs = startTs
58+
}
59+
}
60+
}
61+
62+
private def checkServiceSafePoint(startTs: Long): Unit = {
63+
val safePoint = tiSession.getPDClient.updateServiceGCSafePoint(
64+
serviceId,
65+
ttl,
66+
minStartTs,
67+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_UPDATE_SAFE_POINT_BACKOFF))
68+
if (safePoint > startTs) {
69+
throw new TiInternalException(
70+
s"Failed to check service GC safe point because the current minimum safe point $safePoint is newer than start_ts $startTs.")
71+
}
72+
logger.info(s"check start_ts $startTs success.")
73+
}
74+
75+
private def applyServiceSafePoint(startTs: Long): Unit = {
76+
val safePoint = tiSession.getPDClient.updateServiceGCSafePoint(
77+
serviceId,
78+
ttl,
79+
startTs,
80+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_UPDATE_SAFE_POINT_BACKOFF))
81+
if (safePoint > startTs) {
82+
throw new TiInternalException(
83+
s"Failed to register service GC safe point because the current minimum safe point $safePoint is newer than what we assume $startTs.")
84+
}
85+
logger.info(s"register service GC safe point $startTs success.")
86+
}
87+
88+
def stopRegisterSafePoint(): Unit = {
89+
minStartTs = Long.MaxValue
90+
tiSession.getPDClient.updateServiceGCSafePoint(
91+
serviceId,
92+
ttl,
93+
Long.MaxValue,
94+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_UPDATE_SAFE_POINT_BACKOFF))
95+
service.shutdownNow()
96+
}
97+
}

core/src/main/scala/com/pingcap/tispark/write/TiBatchWrite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ class TiBatchWrite(
186186
val startTimeStamp = tiSession.getTimestamp
187187
startTs = startTimeStamp.getVersion
188188
logger.info(s"startTS: $startTs")
189+
tiContext.serviceSafePoint.updateStartTs(startTimeStamp)
189190

190191
// pre calculate
191192
val shuffledRDD: RDD[(SerializableKey, Array[Byte])] = {

core/src/main/scala/org/apache/spark/sql/TiContext.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,20 @@ import com.pingcap.tikv.{TiConfiguration, TiSession}
2121
import com.pingcap.tispark._
2222
import com.pingcap.tispark.auth.TiAuthorization
2323
import com.pingcap.tispark.listener.CacheInvalidateListener
24+
import com.pingcap.tispark.safepoint.ServiceSafePoint
2425
import com.pingcap.tispark.statistics.StatisticsManager
2526
import com.pingcap.tispark.utils.TiUtil
2627
import org.apache.spark.SparkConf
2728
import org.apache.spark.internal.Logging
2829
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
29-
import org.apache.spark.sql.catalyst.catalog._
3030
import org.json4s.DefaultFormats
3131
import org.json4s.JsonAST._
3232
import org.json4s.JsonDSL._
3333
import org.json4s.jackson.JsonMethods._
3434
import scalaj.http.Http
3535

3636
import java.lang
37+
import java.util.UUID
3738
import scala.collection.JavaConverters._
3839
import scala.collection.mutable
3940

@@ -49,11 +50,28 @@ class TiContext(val sparkSession: SparkSession) extends Serializable with Loggin
4950
} else Option.empty)
5051
final val tiSession: TiSession = TiSession.getInstance(tiConf)
5152
lazy val sqlContext: SQLContext = sparkSession.sqlContext
53+
// GC
54+
val GCMaxWaitTime: Long =
55+
try {
56+
conf
57+
.get(TiConfigConst.GC_MAX_WAIT_TIME, TiConfigConst.DEFAULT_GC_MAX_WAIT_TIME.toString)
58+
.toLong
59+
} catch {
60+
case _: Exception => TiConfigConst.DEFAULT_GC_MAX_WAIT_TIME
61+
}
62+
63+
val serviceSafePoint: ServiceSafePoint =
64+
ServiceSafePoint(
65+
"tispark_" + UUID.randomUUID,
66+
TiConfigConst.DEFAULT_GC_SAFE_POINT_TTL,
67+
GCMaxWaitTime,
68+
tiSession)
5269

5370
sparkSession.sparkContext.addSparkListener(new SparkListener() {
5471
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
5572
if (tiSession != null) {
5673
try {
74+
serviceSafePoint.stopRegisterSafePoint()
5775
tiSession.close()
5876
} catch {
5977
case e: Throwable => logWarning("fail to close TiSession!", e)

spark-wrapper/spark-3.0/src/main/scala/org/apache/spark/sql/extensions/TiStrategy.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ case class TiStrategy(getOrCreateTiContext: SparkSession => TiContext)(sparkSess
107107
} else {
108108
tiContext.tiSession.getSnapshotTimestamp
109109
}
110+
tiContext.serviceSafePoint.updateStartTs(ts)
110111

111112
if (plan.isStreaming) {
112113
// We should use a new timestamp for next batch execution.

spark-wrapper/spark-3.1/src/main/scala/org/apache/spark/sql/extensions/TiStrategy.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ case class TiStrategy(getOrCreateTiContext: SparkSession => TiContext)(sparkSess
110110
} else {
111111
tiContext.tiSession.getSnapshotTimestamp
112112
}
113+
tiContext.serviceSafePoint.updateStartTs(ts)
114+
113115
if (plan.isStreaming) {
114116
// We should use a new timestamp for next batch execution.
115117
// Otherwise Spark Structure Streaming will not see new data in TiDB.

spark-wrapper/spark-3.2/src/main/scala/org/apache/spark/sql/extensions/TiStrategy.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ case class TiStrategy(getOrCreateTiContext: SparkSession => TiContext)(sparkSess
110110
} else {
111111
tiContext.tiSession.getSnapshotTimestamp
112112
}
113+
tiContext.serviceSafePoint.updateStartTs(ts)
113114

114115
if (plan.isStreaming) {
115116
// We should use a new timestamp for next batch execution.

spark-wrapper/spark-3.3/src/main/scala/org/apache/spark/sql/extensions/TiStrategy.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ case class TiStrategy(getOrCreateTiContext: SparkSession => TiContext)(sparkSess
110110
} else {
111111
tiContext.tiSession.getSnapshotTimestamp
112112
}
113+
tiContext.serviceSafePoint.updateStartTs(ts)
113114

114115
if (plan.isStreaming) {
115116
// We should use a new timestamp for next batch execution.

tikv-client/src/main/java/com/pingcap/tikv/PDClient.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@
8787
import org.tikv.kvproto.Pdpb.Timestamp;
8888
import org.tikv.kvproto.Pdpb.TsoRequest;
8989
import org.tikv.kvproto.Pdpb.TsoResponse;
90+
import org.tikv.kvproto.Pdpb.UpdateServiceGCSafePointRequest;
9091

9192
public class PDClient extends AbstractGRPCClient<PDBlockingStub, PDStub>
9293
implements ReadOnlyPDClient {
@@ -298,6 +299,17 @@ private Supplier<GetAllStoresRequest> buildGetAllStoresReq() {
298299
return () -> GetAllStoresRequest.newBuilder().setHeader(header).build();
299300
}
300301

302+
private Supplier<UpdateServiceGCSafePointRequest> buildUpdateServiceGCSafePointRequest(
303+
ByteString serviceId, long ttl, long safePoint) {
304+
return () ->
305+
UpdateServiceGCSafePointRequest.newBuilder()
306+
.setHeader(header)
307+
.setSafePoint(safePoint)
308+
.setServiceId(serviceId)
309+
.setTTL(ttl)
310+
.build();
311+
}
312+
301313
private <T> PDErrorHandler<GetStoreResponse> buildPDErrorHandler() {
302314
return new PDErrorHandler<>(
303315
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null, this);
@@ -336,6 +348,20 @@ public List<Store> getAllStores(BackOffer backOffer) {
336348
.getStoresList();
337349
}
338350

351+
@Override
352+
public Long updateServiceGCSafePoint(
353+
String serviceId, long ttl, long safePoint, BackOffer backOffer) {
354+
return callWithRetry(
355+
backOffer,
356+
PDGrpc.getUpdateServiceGCSafePointMethod(),
357+
buildUpdateServiceGCSafePointRequest(
358+
ByteString.copyFromUtf8(serviceId), ttl, safePoint),
359+
new PDErrorHandler<>(
360+
r -> r.getHeader().hasError() ? buildFromPdpbError(r.getHeader().getError()) : null,
361+
this))
362+
.getMinSafePoint();
363+
}
364+
339365
@Override
340366
public void close() throws InterruptedException {
341367
etcdClient.close();

tikv-client/src/main/java/com/pingcap/tikv/ReadOnlyPDClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,16 @@ public interface ReadOnlyPDClient {
6767
Future<Store> getStoreAsync(BackOffer backOffer, long storeId);
6868

6969
List<Store> getAllStores(BackOffer backOffer);
70+
71+
/**
72+
* Update ServiceGCSafePoint
73+
*
74+
* @param serviceId ServiceId
75+
* @param ttl TTL in seconds
76+
* @param safePoint The TiTimestamp you want to set. Set to start_ts.getPrevious() is a good
77+
* practice
78+
* @return the MinSafePoint of all service. If this value is greater than safePoint, it means
79+
* update failed.
80+
*/
81+
Long updateServiceGCSafePoint(String serviceId, long ttl, long safePoint, BackOffer backOffer);
7082
}

0 commit comments

Comments
 (0)