Skip to content

Commit cb779f0

Browse files
authored
Support service safe point (#2634) (#2645)
1 parent c083fc5 commit cb779f0

File tree

11 files changed

+827
-1
lines changed

11 files changed

+827
-1
lines changed

core/src/main/scala/com/pingcap/tispark/TiConfigConst.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,12 @@ object TiConfigConst {
7575
val DEFAULT_STALE_READ: String = ""
7676

7777
val TELEMETRY_ENABEL: String = "spark.tispark.telemetry.enable"
78+
79+
val NEW_COLLATION_ENABLE: String = "spark.tispark.new_collation_enable"
80+
81+
// GC
82+
val GC_MAX_WAIT_TIME: String = "spark.tispark.gc_max_wait_time"
83+
val DEFAULT_GC_MAX_WAIT_TIME: Long = 24 * 60 * 60
84+
val DEFAULT_GC_SAFE_POINT_TTL: Int = 5 * 60
85+
7886
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
package com.pingcap.tispark.safepoint
2+
3+
import com.pingcap.tikv.TiSession
4+
import com.pingcap.tikv.exception.TiInternalException
5+
import com.pingcap.tikv.meta.TiTimestamp
6+
import com.pingcap.tikv.util.{BackOffer, ConcreteBackOffer}
7+
import org.slf4j.LoggerFactory
8+
9+
import java.util.concurrent.{Executors, ScheduledExecutorService, TimeUnit}
10+
11+
case class ServiceSafePoint(
12+
serviceId: String,
13+
ttl: Long,
14+
GCMaxWaitTime: Long,
15+
tiSession: TiSession) {
16+
17+
private final val logger = LoggerFactory.getLogger(getClass.getName)
18+
private var minStartTs = Long.MaxValue
19+
val service: ScheduledExecutorService = Executors.newSingleThreadScheduledExecutor()
20+
service.scheduleAtFixedRate(
21+
() => {
22+
if (minStartTs != Long.MaxValue) {
23+
val safePoint = tiSession.getPDClient.updateServiceGCSafePoint(
24+
serviceId,
25+
ttl,
26+
minStartTs,
27+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_UPDATE_SAFE_POINT_BACKOFF))
28+
if (safePoint > minStartTs) {
29+
// will not happen unless someone delete the TiSpark service safe point in PD compulsively
30+
logger.error(
31+
s"Failed to register service GC safe point because the current minimum safe point $safePoint is newer than what we assume $minStartTs. Maybe you delete the TiSpark safe point in PD.")
32+
} else {
33+
logger.info(s"register service GC safe point $minStartTs success.")
34+
}
35+
}
36+
},
37+
0,
38+
1,
39+
TimeUnit.MINUTES)
40+
41+
// TiSpark can only decrease minStartTs now. Because we can not known which transaction is finished, so we can not increase minStartTs.
42+
def updateStartTs(startTimeStamp: TiTimestamp): Unit = {
43+
this.synchronized {
44+
val now = tiSession.getTimestamp
45+
if (now.getPhysical - startTimeStamp.getPhysical >= GCMaxWaitTime * 1000) {
46+
throw new TiInternalException(
47+
s"Can not pause GC more than spark.tispark.gc_max_wait_time=$GCMaxWaitTime s. start_ts: ${startTimeStamp.getVersion}, now: ${now.getVersion}. You can adjust spark.tispark.gc_max_wait_time to increase the gc max wait time.")
48+
}
49+
val startTs = startTimeStamp.getVersion
50+
if (startTs >= minStartTs) {
51+
// minStartTs >= safe point, so startTs must >= safe point. Check it in case some one delete the TiSpark service safe point in PD compulsively.
52+
checkServiceSafePoint(startTs)
53+
} else {
54+
// applyServiceSafePoint may throw exception. Consider startTs < safePoint < minStartTs.
55+
applyServiceSafePoint(startTs)
56+
// let minStartTs = startTs after applyServiceSafePoint success
57+
minStartTs = startTs
58+
}
59+
}
60+
}
61+
62+
private def checkServiceSafePoint(startTs: Long): Unit = {
63+
val safePoint = tiSession.getPDClient.updateServiceGCSafePoint(
64+
serviceId,
65+
ttl,
66+
minStartTs,
67+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_UPDATE_SAFE_POINT_BACKOFF))
68+
if (safePoint > startTs) {
69+
throw new TiInternalException(
70+
s"Failed to check service GC safe point because the current minimum safe point $safePoint is newer than start_ts $startTs.")
71+
}
72+
logger.info(s"check start_ts $startTs success.")
73+
}
74+
75+
private def applyServiceSafePoint(startTs: Long): Unit = {
76+
val safePoint = tiSession.getPDClient.updateServiceGCSafePoint(
77+
serviceId,
78+
ttl,
79+
startTs,
80+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_UPDATE_SAFE_POINT_BACKOFF))
81+
if (safePoint > startTs) {
82+
throw new TiInternalException(
83+
s"Failed to register service GC safe point because the current minimum safe point $safePoint is newer than what we assume $startTs.")
84+
}
85+
logger.info(s"register service GC safe point $startTs success.")
86+
}
87+
88+
def stopRegisterSafePoint(): Unit = {
89+
minStartTs = Long.MaxValue
90+
tiSession.getPDClient.updateServiceGCSafePoint(
91+
serviceId,
92+
ttl,
93+
Long.MaxValue,
94+
ConcreteBackOffer.newCustomBackOff(BackOffer.PD_UPDATE_SAFE_POINT_BACKOFF))
95+
service.shutdownNow()
96+
}
97+
}

core/src/main/scala/com/pingcap/tispark/write/TiBatchWrite.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ class TiBatchWrite(
196196
val startTimeStamp = tiSession.getTimestamp
197197
startTs = startTimeStamp.getVersion
198198
logger.info(s"startTS: $startTs")
199+
tiContext.serviceSafePoint.updateStartTs(startTimeStamp)
199200

200201
// pre calculate
201202
val shuffledRDD: RDD[(SerializableKey, Array[Byte])] = {

core/src/main/scala/org/apache/spark/sql/TiContext.scala

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,20 @@ import com.pingcap.tikv.{TiConfiguration, TiSession}
2121
import com.pingcap.tispark._
2222
import com.pingcap.tispark.auth.TiAuthorization
2323
import com.pingcap.tispark.listener.CacheInvalidateListener
24+
import com.pingcap.tispark.safepoint.ServiceSafePoint
2425
import com.pingcap.tispark.statistics.StatisticsManager
2526
import com.pingcap.tispark.utils.TiUtil
2627
import org.apache.spark.SparkConf
2728
import org.apache.spark.internal.Logging
2829
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
29-
import org.apache.spark.sql.catalyst.catalog._
3030
import org.json4s.DefaultFormats
3131
import org.json4s.JsonAST._
3232
import org.json4s.JsonDSL._
3333
import org.json4s.jackson.JsonMethods._
3434
import scalaj.http.Http
3535

3636
import java.lang
37+
import java.util.UUID
3738
import scala.collection.JavaConverters._
3839
import scala.collection.mutable
3940

@@ -49,11 +50,28 @@ class TiContext(val sparkSession: SparkSession) extends Serializable with Loggin
4950
} else Option.empty)
5051
final val tiSession: TiSession = TiSession.getInstance(tiConf)
5152
lazy val sqlContext: SQLContext = sparkSession.sqlContext
53+
// GC
54+
val GCMaxWaitTime: Long =
55+
try {
56+
conf
57+
.get(TiConfigConst.GC_MAX_WAIT_TIME, TiConfigConst.DEFAULT_GC_MAX_WAIT_TIME.toString)
58+
.toLong
59+
} catch {
60+
case _: Exception => TiConfigConst.DEFAULT_GC_MAX_WAIT_TIME
61+
}
62+
63+
val serviceSafePoint: ServiceSafePoint =
64+
ServiceSafePoint(
65+
"tispark_" + UUID.randomUUID,
66+
TiConfigConst.DEFAULT_GC_SAFE_POINT_TTL,
67+
GCMaxWaitTime,
68+
tiSession)
5269

5370
sparkSession.sparkContext.addSparkListener(new SparkListener() {
5471
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
5572
if (tiSession != null) {
5673
try {
74+
serviceSafePoint.stopRegisterSafePoint()
5775
tiSession.close()
5876
} catch {
5977
case e: Throwable => logWarning("fail to close TiSession!", e)

spark-wrapper/spark-3.0/src/main/scala/org/apache/spark/sql/extensions/TiStrategy.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ case class TiStrategy(getOrCreateTiContext: SparkSession => TiContext)(sparkSess
107107
} else {
108108
tiContext.tiSession.getSnapshotTimestamp
109109
}
110+
tiContext.serviceSafePoint.updateStartTs(ts)
110111

111112
if (plan.isStreaming) {
112113
// We should use a new timestamp for next batch execution.

spark-wrapper/spark-3.1/src/main/scala/org/apache/spark/sql/extensions/TiStrategy.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,8 @@ case class TiStrategy(getOrCreateTiContext: SparkSession => TiContext)(sparkSess
110110
} else {
111111
tiContext.tiSession.getSnapshotTimestamp
112112
}
113+
tiContext.serviceSafePoint.updateStartTs(ts)
114+
113115
if (plan.isStreaming) {
114116
// We should use a new timestamp for next batch execution.
115117
// Otherwise Spark Structure Streaming will not see new data in TiDB.

spark-wrapper/spark-3.2/src/main/scala/org/apache/spark/sql/extensions/TiStrategy.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ case class TiStrategy(getOrCreateTiContext: SparkSession => TiContext)(sparkSess
110110
} else {
111111
tiContext.tiSession.getSnapshotTimestamp
112112
}
113+
tiContext.serviceSafePoint.updateStartTs(ts)
113114

114115
if (plan.isStreaming) {
115116
// We should use a new timestamp for next batch execution.

0 commit comments

Comments
 (0)