Skip to content

Commit 58e9bb4

Browse files
authored
Make the txn safe point polling support falling back to old version of PD (#1727)
closes #1728 Signed-off-by: MyonKeminta <[email protected]>
1 parent 24b88f0 commit 58e9bb4

File tree

9 files changed

+589
-41
lines changed

9 files changed

+589
-41
lines changed

integration_tests/gc_test.go

Lines changed: 333 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,333 @@
1+
package tikv_test
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"strings"
11+
"testing"
12+
"time"
13+
14+
"github.com/google/uuid"
15+
"github.com/pingcap/failpoint"
16+
"github.com/pingcap/kvproto/pkg/keyspacepb"
17+
"github.com/pingcap/tidb/pkg/keyspace"
18+
"github.com/stretchr/testify/suite"
19+
"github.com/tikv/client-go/v2/config"
20+
"github.com/tikv/client-go/v2/tikv"
21+
pd "github.com/tikv/pd/client"
22+
pdgc "github.com/tikv/pd/client/clients/gc"
23+
"github.com/tikv/pd/client/constants"
24+
"github.com/tikv/pd/client/pkg/caller"
25+
"google.golang.org/grpc/codes"
26+
"google.golang.org/grpc/status"
27+
)
28+
29+
type hookedGCStatesClient struct {
30+
inner pdgc.GCStatesClient
31+
getGCStatesHook func(inner pdgc.GCStatesClient, ctx context.Context) (pdgc.GCState, error)
32+
}
33+
34+
func (c *hookedGCStatesClient) SetGCBarrier(ctx context.Context, barrierID string, barrierTS uint64, ttl time.Duration) (*pdgc.GCBarrierInfo, error) {
35+
return c.inner.SetGCBarrier(ctx, barrierID, barrierTS, ttl)
36+
}
37+
38+
func (c *hookedGCStatesClient) DeleteGCBarrier(ctx context.Context, barrierID string) (*pdgc.GCBarrierInfo, error) {
39+
return c.inner.DeleteGCBarrier(ctx, barrierID)
40+
}
41+
42+
func (c *hookedGCStatesClient) GetGCState(ctx context.Context) (pdgc.GCState, error) {
43+
if c.getGCStatesHook != nil {
44+
return c.getGCStatesHook(c.inner, ctx)
45+
}
46+
return c.inner.GetGCState(ctx)
47+
}
48+
49+
func hookGCStatesClientForStore(store *tikv.StoreProbe, getGCStatesHook func(inner pdgc.GCStatesClient, ctx context.Context) (pdgc.GCState, error)) {
50+
store.ReplaceGCStatesClient(&hookedGCStatesClient{
51+
inner: store.GetGCStatesClient(),
52+
getGCStatesHook: getGCStatesHook,
53+
})
54+
}
55+
56+
var _ = pdgc.GCStatesClient(&hookedGCStatesClient{})
57+
58+
func TestGCWithTiKVSuite(t *testing.T) {
59+
suite.Run(t, new(testGCWithTiKVSuite))
60+
}
61+
62+
type testGCWithTiKVSuite struct {
63+
suite.Suite
64+
globalPDCli pd.Client
65+
addrs []string
66+
67+
pdClis []*tikv.CodecPDClient
68+
stores []*tikv.StoreProbe
69+
keyspaces []*keyspacepb.KeyspaceMeta
70+
}
71+
72+
func (s *testGCWithTiKVSuite) SetupTest() {
73+
if !*withTiKV {
74+
s.T().Skip("Cannot run without real tikv cluster")
75+
}
76+
s.addrs = strings.Split(*pdAddrs, ",")
77+
var err error
78+
s.globalPDCli, err = pd.NewClient(caller.TestComponent, s.addrs, pd.SecurityOption{})
79+
s.Require().NoError(err)
80+
}
81+
82+
func (s *testGCWithTiKVSuite) TearDownTest() {
83+
re := s.Require()
84+
for _, store := range s.stores {
85+
re.NoError(store.Close())
86+
}
87+
if s.globalPDCli != nil {
88+
s.globalPDCli.Close()
89+
}
90+
for _, pdCli := range s.pdClis {
91+
pdCli.Close()
92+
}
93+
for _, keyspaceMeta := range s.keyspaces {
94+
if keyspaceMeta == nil {
95+
s.dropKeyspace(keyspaceMeta)
96+
}
97+
}
98+
}
99+
100+
func (s *testGCWithTiKVSuite) createClient(keyspaceMeta *keyspacepb.KeyspaceMeta) (*tikv.CodecPDClient, *tikv.StoreProbe) {
101+
re := s.Require()
102+
var pdCli *tikv.CodecPDClient
103+
if keyspaceMeta == nil {
104+
inner, err := pd.NewClient(caller.TestComponent, s.addrs, pd.SecurityOption{})
105+
re.NoError(err)
106+
pdCli = tikv.NewCodecPDClient(tikv.ModeTxn, inner)
107+
} else {
108+
inner, err := pd.NewClientWithKeyspace(context.Background(), caller.TestComponent, keyspaceMeta.GetId(), s.addrs, pd.SecurityOption{})
109+
re.NoError(err)
110+
pdCli, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeTxn, inner, keyspaceMeta.GetName())
111+
re.NoError(err)
112+
}
113+
tlsConfig, err := (&config.Security{}).ToTLSConfig()
114+
re.NoError(err)
115+
var spkv tikv.SafePointKV
116+
if keyspaceMeta == nil {
117+
spkv, err = tikv.NewEtcdSafePointKV(s.addrs, tlsConfig)
118+
re.NoError(err)
119+
} else {
120+
spkv, err = tikv.NewEtcdSafePointKV(s.addrs, tlsConfig, tikv.WithPrefix(keyspace.MakeKeyspaceEtcdNamespace(pdCli.GetCodec())))
121+
re.NoError(err)
122+
}
123+
store, err := tikv.NewKVStore("test-store", pdCli, spkv, tikv.NewRPCClient(tikv.WithCodec(pdCli.GetCodec())))
124+
return pdCli, &tikv.StoreProbe{KVStore: store}
125+
}
126+
127+
func (s *testGCWithTiKVSuite) createKeyspace(name string, keyspaceLevelGC bool) *keyspacepb.KeyspaceMeta {
128+
re := s.Require()
129+
130+
gcManagementType := "unified"
131+
if keyspaceLevelGC {
132+
gcManagementType = "keyspace_level"
133+
}
134+
req := struct {
135+
Name string `json:"name"`
136+
Config map[string]string `json:"config"`
137+
}{
138+
Name: name,
139+
Config: map[string]string{"gc_management_type": gcManagementType},
140+
}
141+
reqJson, err := json.Marshal(req)
142+
re.NoError(err)
143+
resp, err := http.Post(fmt.Sprintf("%s/pd/api/v2/keyspaces", s.addrs[0]), "application/json", bytes.NewBuffer(reqJson))
144+
re.NoError(err)
145+
defer resp.Body.Close()
146+
respBody := new(strings.Builder)
147+
_, err = io.Copy(respBody, resp.Body)
148+
re.NoError(err)
149+
re.Equal(http.StatusOK, resp.StatusCode, "Failed to create keyspace %s, response: %s", name, respBody.String())
150+
151+
meta, err := s.globalPDCli.LoadKeyspace(context.Background(), name)
152+
re.NoError(err)
153+
// Avoid goroutine leak in the test.
154+
http.DefaultClient.CloseIdleConnections()
155+
return meta
156+
}
157+
158+
type storeKeyspaceType int
159+
160+
const (
161+
storeNullKeyspace storeKeyspaceType = iota
162+
storeKeyspaceLevelGCKeyspace
163+
storeUnifiedGCKeyspace
164+
)
165+
166+
func (s *testGCWithTiKVSuite) dropKeyspace(keyspaceMeta *keyspacepb.KeyspaceMeta) {
167+
re := s.Require()
168+
// Nil might be used to represent the null keyspace.
169+
if keyspaceMeta != nil {
170+
_, err := s.globalPDCli.UpdateKeyspaceState(context.Background(), keyspaceMeta.Id, keyspacepb.KeyspaceState_ARCHIVED)
171+
re.NoError(err)
172+
}
173+
}
174+
175+
func genKeyspaceName() string {
176+
return uuid.New().String()
177+
}
178+
179+
func (s *testGCWithTiKVSuite) prepareClients(storeKeyspaceTypes ...storeKeyspaceType) {
180+
s.pdClis = make([]*tikv.CodecPDClient, 0, len(storeKeyspaceTypes))
181+
s.stores = make([]*tikv.StoreProbe, 0, len(storeKeyspaceTypes))
182+
s.keyspaces = make([]*keyspacepb.KeyspaceMeta, 0, len(storeKeyspaceTypes))
183+
184+
for _, t := range storeKeyspaceTypes {
185+
var keyspaceMeta *keyspacepb.KeyspaceMeta
186+
switch t {
187+
case storeNullKeyspace:
188+
// represents by nil keyspace meta
189+
case storeKeyspaceLevelGCKeyspace:
190+
keyspaceMeta = s.createKeyspace(genKeyspaceName(), true)
191+
case storeUnifiedGCKeyspace:
192+
keyspaceMeta = s.createKeyspace(genKeyspaceName(), false)
193+
}
194+
pdCli, store := s.createClient(keyspaceMeta)
195+
s.keyspaces = append(s.keyspaces, keyspaceMeta)
196+
s.pdClis = append(s.pdClis, pdCli)
197+
s.stores = append(s.stores, store)
198+
}
199+
}
200+
201+
func (s *testGCWithTiKVSuite) TestLoadTxnSafePointFallback() {
202+
re := s.Require()
203+
204+
re.NoError(failpoint.Enable("tikvclient/noBuiltInTxnSafePointUpdater", "return"))
205+
defer func() {
206+
re.NoError(failpoint.Disable("tikvclient/noBuiltInTxnSafePointUpdater"))
207+
}()
208+
209+
s.prepareClients(storeNullKeyspace, storeUnifiedGCKeyspace, storeKeyspaceLevelGCKeyspace)
210+
211+
ctx := context.Background()
212+
213+
// The new keyspaces always have zero txn safe point, while the null keyspace may have larger txn safe point due
214+
// to other tests that has been run on the cluster. The following test will use values from the null keyspace's
215+
// txn safe point.
216+
state, err := s.pdClis[0].GetGCStatesClient(constants.NullKeyspaceID).GetGCState(ctx)
217+
re.NoError(err)
218+
base := state.TxnSafePoint
219+
220+
callCounter := 0
221+
hook := func(inner pdgc.GCStatesClient, ctx context.Context) (pdgc.GCState, error) {
222+
callCounter += 1
223+
return pdgc.GCState{}, status.Errorf(codes.Unimplemented, "simulated unimplemented error")
224+
}
225+
for _, store := range s.stores {
226+
hookGCStatesClientForStore(store, hook)
227+
_, err = store.GetGCStatesClient().GetGCState(ctx)
228+
re.Error(err)
229+
re.Equal(codes.Unimplemented, status.Code(err))
230+
}
231+
re.Equal(len(s.stores), callCounter)
232+
callCounter = 0
233+
234+
// Updating the null keyspace. The new value is visible by the null keyspace and keyspaces using unified GC,
235+
// but not by keyspaces using keyspace-level GC.
236+
res, err := s.pdClis[0].GetGCInternalController(constants.NullKeyspaceID).AdvanceTxnSafePoint(ctx, base+1)
237+
re.NoError(err)
238+
re.Equal(base+1, res.NewTxnSafePoint)
239+
// Call LoadTxnSafePoint twice, so that by checking the callCounter, we chan ensure the new API is called at most
240+
// once (for each KVStore) and won't be called again since the first falling back.
241+
// For store[0] (null keyspace) and store[1] (unified GC)
242+
for i := 0; i < 2; i++ {
243+
for _, store := range s.stores[0:2] {
244+
txnSafePoint, err := store.LoadTxnSafePoint(ctx)
245+
re.NoError(err)
246+
re.Equal(base+1, txnSafePoint)
247+
}
248+
// For store[2] (keyspace level GC): Invisible to the new txn safe point.
249+
txnSafePoint, err := s.stores[2].LoadTxnSafePoint(ctx)
250+
re.NoError(err)
251+
re.Less(txnSafePoint, base+1)
252+
}
253+
254+
// Updating the keyspace with keyspace level GC. The effect is visible only in the same keyspace.
255+
res, err = s.pdClis[2].GetGCInternalController(s.keyspaces[2].GetId()).AdvanceTxnSafePoint(ctx, base+2)
256+
re.NoError(err)
257+
re.Equal(base+2, res.NewTxnSafePoint)
258+
for i := 0; i < 2; i++ {
259+
txnSafePoint, err := s.stores[2].LoadTxnSafePoint(ctx)
260+
re.NoError(err)
261+
re.Equal(base+2, txnSafePoint)
262+
for _, store := range s.stores[0:2] {
263+
txnSafePoint, err := store.LoadTxnSafePoint(ctx)
264+
re.NoError(err)
265+
re.Less(txnSafePoint, base+2)
266+
}
267+
}
268+
269+
// Each store tries `GetGCState` only once.
270+
re.Equal(len(s.stores), callCounter)
271+
}
272+
273+
func (s *testGCWithTiKVSuite) TestCompatibleTxnSafePointLoaderValueParsing() {
274+
re := s.Require()
275+
276+
re.NoError(failpoint.Enable("tikvclient/noBuiltInTxnSafePointUpdater", "return"))
277+
defer func() {
278+
re.NoError(failpoint.Disable("tikvclient/noBuiltInTxnSafePointUpdater"))
279+
}()
280+
281+
s.prepareClients(storeNullKeyspace)
282+
store := s.stores[0]
283+
hook := func(inner pdgc.GCStatesClient, ctx context.Context) (pdgc.GCState, error) {
284+
return pdgc.GCState{}, status.Errorf(codes.Unimplemented, "simulated unimplemented error")
285+
}
286+
hookGCStatesClientForStore(store, hook)
287+
// Load txn safe point once to enter fallback state.
288+
ctx := context.Background()
289+
_, err := store.LoadTxnSafePoint(ctx)
290+
re.NoError(err)
291+
292+
fpname := "tikvclient/compatibleTxnSafePointLoaderEtcdGetResult"
293+
re.NoError(failpoint.Enable(fpname, `return("empty")`))
294+
defer func() {
295+
re.NoError(failpoint.Disable(fpname))
296+
}()
297+
txnSafePoint, err := store.LoadTxnSafePoint(ctx)
298+
re.NoError(err)
299+
re.Equal(uint64(0), txnSafePoint)
300+
301+
re.NoError(failpoint.Enable(fpname, `return("value:")`))
302+
txnSafePoint, err = store.LoadTxnSafePoint(ctx)
303+
re.NoError(err)
304+
re.Equal(uint64(0), txnSafePoint)
305+
306+
re.NoError(failpoint.Enable(fpname, `return("value:1")`))
307+
txnSafePoint, err = store.LoadTxnSafePoint(ctx)
308+
re.NoError(err)
309+
re.Equal(uint64(1), txnSafePoint)
310+
311+
re.NoError(failpoint.Enable(fpname, `return("value:abcde")`))
312+
txnSafePoint, err = store.LoadTxnSafePoint(ctx)
313+
re.Error(err)
314+
315+
re.NoError(failpoint.Enable(fpname, `return("value:459579321342754819")`))
316+
txnSafePoint, err = store.LoadTxnSafePoint(ctx)
317+
re.NoError(err)
318+
re.Equal(uint64(459579321342754819), txnSafePoint)
319+
320+
re.NoError(failpoint.Enable(fpname, `return("value:459579321342754819abcdefg")`))
321+
txnSafePoint, err = store.LoadTxnSafePoint(ctx)
322+
re.Error(err)
323+
324+
re.NoError(failpoint.Enable(fpname, `return("value:18446744073709551615")`))
325+
txnSafePoint, err = store.LoadTxnSafePoint(ctx)
326+
re.NoError(err)
327+
re.Equal(uint64(18446744073709551615), txnSafePoint)
328+
329+
// Out of range of uint64
330+
re.NoError(failpoint.Enable(fpname, `return("value:18446744073709551616")`))
331+
txnSafePoint, err = store.LoadTxnSafePoint(ctx)
332+
re.Error(err)
333+
}

integration_tests/safepoint_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ func mymakeKeys(rowNum int, prefix string) [][]byte {
8585

8686
func (s *testSafePointSuite) waitUntilErrorPlugIn(t uint64) {
8787
for {
88-
s.store.SaveSafePoint(t + 10)
88+
s.store.SaveSafePointToSafePointKV(t + 10)
8989
cachedTime := time.Now()
90-
newSafePoint, err := s.store.LoadSafePoint()
90+
newSafePoint, err := s.store.LoadSafePointFromSafePointKV()
9191
if err == nil {
9292
s.store.UpdateTxnSafePointCache(newSafePoint, cachedTime)
9393
break

internal/apicodec/codec.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"github.com/pingcap/kvproto/pkg/keyspacepb"
88
"github.com/pingcap/kvproto/pkg/kvrpcpb"
99
"github.com/tikv/client-go/v2/tikvrpc"
10+
"github.com/tikv/pd/client/constants"
1011
)
1112

1213
type (
@@ -25,7 +26,7 @@ const (
2526

2627
const (
2728
// NullspaceID is a special keyspace id that represents no keyspace exist.
28-
NullspaceID KeyspaceID = 0xffffffff
29+
NullspaceID = KeyspaceID(constants.NullKeyspaceID)
2930
)
3031

3132
// ParseKeyspaceID retrieves the keyspaceID from the given keyspace-encoded key.

metrics/metrics.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ var (
5454
TiKVRawkvCmdHistogram *prometheus.HistogramVec
5555
TiKVRawkvSizeHistogram *prometheus.HistogramVec
5656
TiKVTxnRegionsNumHistogram *prometheus.HistogramVec
57-
TiKVLoadSafepointCounter *prometheus.CounterVec
57+
TiKVLoadTxnSafePointCounter *prometheus.CounterVec
5858
TiKVSecondaryLockCleanupFailureCounter *prometheus.CounterVec
5959
TiKVRegionCacheCounter *prometheus.CounterVec
6060
TiKVLoadRegionCounter *prometheus.CounterVec
@@ -275,7 +275,7 @@ func initMetrics(namespace, subsystem string, constLabels prometheus.Labels) {
275275
ConstLabels: constLabels,
276276
}, []string{LblType, LblScope})
277277

278-
TiKVLoadSafepointCounter = prometheus.NewCounterVec(
278+
TiKVLoadTxnSafePointCounter = prometheus.NewCounterVec(
279279
prometheus.CounterOpts{
280280
Namespace: namespace,
281281
Subsystem: subsystem,
@@ -922,7 +922,7 @@ func RegisterMetrics() {
922922
prometheus.MustRegister(TiKVRawkvCmdHistogram)
923923
prometheus.MustRegister(TiKVRawkvSizeHistogram)
924924
prometheus.MustRegister(TiKVTxnRegionsNumHistogram)
925-
prometheus.MustRegister(TiKVLoadSafepointCounter)
925+
prometheus.MustRegister(TiKVLoadTxnSafePointCounter)
926926
prometheus.MustRegister(TiKVSecondaryLockCleanupFailureCounter)
927927
prometheus.MustRegister(TiKVRegionCacheCounter)
928928
prometheus.MustRegister(TiKVLoadRegionCounter)

0 commit comments

Comments
 (0)