Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions domain/schema_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"sync"
"time"

"github.com/pingcap/tidb/store/tikv/oracle"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -107,7 +108,7 @@ func (s *schemaValidator) Update(leaseGrantTS uint64, oldVer, currVer int64, cha

// Renew the lease.
s.latestSchemaVer = currVer
leaseGrantTime := extractPhysicalTime(leaseGrantTS)
leaseGrantTime := oracle.GetTimeFromTS(leaseGrantTS)
leaseExpire := leaseGrantTime.Add(s.lease - time.Millisecond)
s.latestSchemaExpire = leaseExpire

Expand Down Expand Up @@ -187,18 +188,13 @@ func (s *schemaValidator) Check(txnTS uint64, schemaVer int64, relatedTableIDs [
}

// Schema unchanged, maybe success or the schema validator is unavailable.
t := extractPhysicalTime(txnTS)
t := oracle.GetTimeFromTS(txnTS)
if t.After(s.latestSchemaExpire) {
return ResultUnknown
}
return ResultSucc
}

func extractPhysicalTime(ts uint64) time.Time {
t := int64(ts >> 18) // 18 is for the logical time.
return time.Unix(t/1e3, (t%1e3)*1e6)
}

func (s *schemaValidator) enqueue(schemaVersion int64, relatedTableIDs []int64) {
s.deltaSchemaInfos = append(s.deltaSchemaInfos, deltaSchemaInfo{schemaVersion, relatedTableIDs})
if len(s.deltaSchemaInfos) > maxNumberOfDiffsToLoad {
Expand Down
2 changes: 1 addition & 1 deletion mysql/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ var MySQLErrName = map[uint16]string{
ErrTiKVServerBusy: "TiKV server is busy",
ErrResolveLockTimeout: "Resolve lock timeout",
ErrRegionUnavailable: "Region is unavailable",
ErrGCTooEarly: "GC life time is shorter than transaction duration",
ErrGCTooEarly: "GC life time is shorter than transaction duration, transaction starts at %v, GC safe point is %v",

ErrTxnTooLarge: "Transaction is too large",
}
4 changes: 3 additions & 1 deletion store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ func (s *tikvStore) CheckVisibility(startTime uint64) error {
}

if startTime < cachedSafePoint {
return ErrGCTooEarly
t1 := oracle.GetTimeFromTS(startTime)
t2 := oracle.GetTimeFromTS(cachedSafePoint)
return ErrGCTooEarly.GenByArgs(t1, t2)
}

return nil
Expand Down
6 changes: 6 additions & 0 deletions store/tikv/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,9 @@ func GetPhysical(t time.Time) int64 {
func EncodeTSO(ts int64) uint64 {
return uint64(ts) << physicalShiftBits
}

// GetTimeFromTS extracts time.Time from a timestamp.
func GetTimeFromTS(ts uint64) time.Time {
ms := ExtractPhysical(ts)
return time.Unix(ms/1e3, (ms%1e3)*1e6)
}