Skip to content

Commit e60fec1

Browse files
authored
client: split large files and rename unclear fields (#1700)
Signed-off-by: Lynn <[email protected]>
1 parent 4fd3c42 commit e60fec1

File tree

10 files changed

+767
-687
lines changed

10 files changed

+767
-687
lines changed

internal/client/client.go

Lines changed: 47 additions & 347 deletions
Large diffs are not rendered by default.

internal/client/client_async.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@ func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikv
7272
}
7373
tikvrpc.AttachContext(req, req.Context)
7474

75-
// TODO(zyguan): If the client created `WithGRPCDialOptions(grpc.WithBlock())`, `getConnArray` might be blocked for
75+
// TODO(zyguan): If the client created `WithGRPCDialOptions(grpc.WithBlock())`, `getConnPool` might be blocked for
7676
// a while when the corresponding conn array is uninitialized. However, since tidb won't set this option, we just
77-
// keep `getConnArray` synchronous for now.
78-
connArray, err := c.getConnArray(addr, true)
77+
// keep `getConnPool` synchronous for now.
78+
connPool, err := c.getConnPool(addr, true)
7979
if err != nil {
8080
cb.Invoke(nil, err)
8181
return
@@ -113,7 +113,7 @@ func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikv
113113
metrics.BatchRequestDurationDone.Observe(elapsed.Seconds())
114114

115115
// rpc metrics
116-
connArray.updateRPCMetrics(req, resp, elapsed)
116+
connPool.updateRPCMetrics(req, resp, elapsed)
117117

118118
// tracing
119119
if spanRPC != nil {
@@ -131,15 +131,15 @@ func (c *RPCClient) SendRequestAsync(ctx context.Context, addr string, req *tikv
131131
resp, err = c.option.codec.DecodeResponse(req, resp)
132132
}
133133

134-
return resp, WrapErrConn(err, connArray)
134+
return resp, WrapErrConn(err, connPool)
135135
})
136136

137137
stop = context.AfterFunc(ctx, func() {
138138
logutil.Logger(ctx).Debug("async send request cancelled (context done)", zap.String("to", addr), zap.Error(ctx.Err()))
139139
entry.error(ctx.Err())
140140
})
141141

142-
batchConn := connArray.batchConn
142+
batchConn := connPool.batchConn
143143
if val, err := util.EvalFailpoint("mockBatchCommandsChannelFullOnAsyncSend"); err == nil {
144144
mockBatchCommandsChannelFullOnAsyncSend(ctx, batchConn, cb, val)
145145
}

internal/client/client_batch.go

Lines changed: 0 additions & 310 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import (
4040
"encoding/json"
4141
"fmt"
4242
"math"
43-
"runtime"
4443
"runtime/trace"
4544
"strings"
4645
"sync"
@@ -264,159 +263,6 @@ type batchConnMetrics struct {
264263
bestBatchSize prometheus.Observer
265264
}
266265

267-
type batchConn struct {
268-
// An atomic flag indicates whether the batch is idle or not.
269-
// 0 for busy, others for idle.
270-
idle uint32
271-
272-
// batchCommandsCh used for batch commands.
273-
batchCommandsCh chan *batchCommandsEntry
274-
batchCommandsClients []*batchCommandsClient
275-
tikvTransportLayerLoad uint64
276-
closed chan struct{}
277-
278-
reqBuilder *batchCommandsBuilder
279-
280-
// Notify rpcClient to check the idle flag
281-
idleNotify *uint32
282-
idleDetect *time.Timer
283-
284-
fetchMoreTimer *time.Timer
285-
286-
index uint32
287-
288-
metrics batchConnMetrics
289-
}
290-
291-
func newBatchConn(connCount, maxBatchSize uint, idleNotify *uint32) *batchConn {
292-
return &batchConn{
293-
batchCommandsCh: make(chan *batchCommandsEntry, maxBatchSize),
294-
batchCommandsClients: make([]*batchCommandsClient, 0, connCount),
295-
tikvTransportLayerLoad: 0,
296-
closed: make(chan struct{}),
297-
reqBuilder: newBatchCommandsBuilder(maxBatchSize),
298-
idleNotify: idleNotify,
299-
idleDetect: time.NewTimer(idleTimeout),
300-
}
301-
}
302-
303-
func (a *batchConn) initMetrics(target string) {
304-
a.metrics.pendingRequests = metrics.TiKVBatchPendingRequests.WithLabelValues(target)
305-
a.metrics.batchSize = metrics.TiKVBatchRequests.WithLabelValues(target)
306-
a.metrics.sendLoopWaitHeadDur = metrics.TiKVBatchSendLoopDuration.WithLabelValues(target, "wait-head")
307-
a.metrics.sendLoopWaitMoreDur = metrics.TiKVBatchSendLoopDuration.WithLabelValues(target, "wait-more")
308-
a.metrics.sendLoopSendDur = metrics.TiKVBatchSendLoopDuration.WithLabelValues(target, "send")
309-
a.metrics.recvLoopRecvDur = metrics.TiKVBatchRecvLoopDuration.WithLabelValues(target, "recv")
310-
a.metrics.recvLoopProcessDur = metrics.TiKVBatchRecvLoopDuration.WithLabelValues(target, "process")
311-
a.metrics.batchSendTailLat = metrics.TiKVBatchSendTailLatency.WithLabelValues(target)
312-
a.metrics.batchRecvTailLat = metrics.TiKVBatchRecvTailLatency.WithLabelValues(target)
313-
a.metrics.headArrivalInterval = metrics.TiKVBatchHeadArrivalInterval.WithLabelValues(target)
314-
a.metrics.batchMoreRequests = metrics.TiKVBatchMoreRequests.WithLabelValues(target)
315-
a.metrics.bestBatchSize = metrics.TiKVBatchBestSize.WithLabelValues(target)
316-
}
317-
318-
func (a *batchConn) isIdle() bool {
319-
return atomic.LoadUint32(&a.idle) != 0
320-
}
321-
322-
// fetchAllPendingRequests fetches all pending requests from the channel.
323-
func (a *batchConn) fetchAllPendingRequests(maxBatchSize int) (headRecvTime time.Time, headArrivalInterval time.Duration) {
324-
// Block on the first element.
325-
latestReqStartTime := a.reqBuilder.latestReqStartTime
326-
var headEntry *batchCommandsEntry
327-
select {
328-
case headEntry = <-a.batchCommandsCh:
329-
if !a.idleDetect.Stop() {
330-
<-a.idleDetect.C
331-
}
332-
a.idleDetect.Reset(idleTimeout)
333-
case <-a.idleDetect.C:
334-
a.idleDetect.Reset(idleTimeout)
335-
atomic.AddUint32(&a.idle, 1)
336-
atomic.CompareAndSwapUint32(a.idleNotify, 0, 1)
337-
// This batchConn to be recycled
338-
return time.Now(), 0
339-
case <-a.closed:
340-
return time.Now(), 0
341-
}
342-
if headEntry == nil {
343-
return time.Now(), 0
344-
}
345-
headRecvTime = time.Now()
346-
if headEntry.start.After(latestReqStartTime) && !latestReqStartTime.IsZero() {
347-
headArrivalInterval = headEntry.start.Sub(latestReqStartTime)
348-
}
349-
a.reqBuilder.push(headEntry)
350-
351-
// This loop is for trying best to collect more requests.
352-
for a.reqBuilder.len() < maxBatchSize {
353-
select {
354-
case entry := <-a.batchCommandsCh:
355-
if entry == nil {
356-
return
357-
}
358-
a.reqBuilder.push(entry)
359-
default:
360-
return
361-
}
362-
}
363-
return
364-
}
365-
366-
// fetchMorePendingRequests fetches more pending requests from the channel.
367-
func (a *batchConn) fetchMorePendingRequests(
368-
maxBatchSize int,
369-
batchWaitSize int,
370-
maxWaitTime time.Duration,
371-
) {
372-
// Try to collect `batchWaitSize` requests, or wait `maxWaitTime`.
373-
if a.fetchMoreTimer == nil {
374-
a.fetchMoreTimer = time.NewTimer(maxWaitTime)
375-
} else {
376-
a.fetchMoreTimer.Reset(maxWaitTime)
377-
}
378-
for a.reqBuilder.len() < batchWaitSize {
379-
select {
380-
case entry := <-a.batchCommandsCh:
381-
if entry == nil {
382-
if !a.fetchMoreTimer.Stop() {
383-
<-a.fetchMoreTimer.C
384-
}
385-
return
386-
}
387-
a.reqBuilder.push(entry)
388-
case <-a.fetchMoreTimer.C:
389-
return
390-
}
391-
}
392-
if !a.fetchMoreTimer.Stop() {
393-
<-a.fetchMoreTimer.C
394-
}
395-
396-
// Do an additional non-block try. Here we test the length with `maxBatchSize` instead
397-
// of `batchWaitSize` because trying best to fetch more requests is necessary so that
398-
// we can adjust the `batchWaitSize` dynamically.
399-
yielded := false
400-
for a.reqBuilder.len() < maxBatchSize {
401-
select {
402-
case entry := <-a.batchCommandsCh:
403-
if entry == nil {
404-
return
405-
}
406-
a.reqBuilder.push(entry)
407-
default:
408-
if yielded {
409-
return
410-
}
411-
// yield once to batch more requests.
412-
runtime.Gosched()
413-
yielded = true
414-
}
415-
}
416-
}
417-
418-
const idleTimeout = 3 * time.Minute
419-
420266
var (
421267
// presetBatchPolicies defines a set of [turboBatchOptions] as batch policies.
422268
presetBatchPolicies = map[string]turboBatchOptions{
@@ -534,150 +380,6 @@ func (t *turboBatchTrigger) preferredBatchWaitSize(avgBatchWaitSize float64, def
534380
return batchWaitSize
535381
}
536382

537-
// BatchSendLoopPanicCounter is only used for testing.
538-
var BatchSendLoopPanicCounter int64 = 0
539-
540-
var initBatchPolicyWarn sync.Once
541-
542-
func (a *batchConn) batchSendLoop(cfg config.TiKVClient) {
543-
defer func() {
544-
if r := recover(); r != nil {
545-
metrics.TiKVPanicCounter.WithLabelValues(metrics.LabelBatchSendLoop).Inc()
546-
logutil.BgLogger().Error("batchSendLoop",
547-
zap.Any("r", r),
548-
zap.Stack("stack"))
549-
atomic.AddInt64(&BatchSendLoopPanicCounter, 1)
550-
logutil.BgLogger().Info("restart batchSendLoop", zap.Int64("count", atomic.LoadInt64(&BatchSendLoopPanicCounter)))
551-
go a.batchSendLoop(cfg)
552-
}
553-
}()
554-
555-
trigger, ok := newTurboBatchTriggerFromPolicy(cfg.BatchPolicy)
556-
if !ok {
557-
initBatchPolicyWarn.Do(func() {
558-
logutil.BgLogger().Warn("fallback to default batch policy due to invalid value", zap.String("value", cfg.BatchPolicy))
559-
})
560-
}
561-
turboBatchWaitTime := trigger.turboWaitTime()
562-
563-
avgBatchWaitSize := float64(cfg.BatchWaitSize)
564-
for {
565-
sendLoopStartTime := time.Now()
566-
a.reqBuilder.reset()
567-
568-
headRecvTime, headArrivalInterval := a.fetchAllPendingRequests(int(cfg.MaxBatchSize))
569-
if a.reqBuilder.len() == 0 {
570-
// the conn is closed or recycled.
571-
return
572-
}
573-
574-
// curl -X PUT -d 'return(true)' http://0.0.0.0:10080/fail/tikvclient/mockBlockOnBatchClient
575-
if val, err := util.EvalFailpoint("mockBlockOnBatchClient"); err == nil {
576-
if val.(bool) {
577-
time.Sleep(1 * time.Hour)
578-
}
579-
}
580-
581-
if batchSize := a.reqBuilder.len(); batchSize < int(cfg.MaxBatchSize) {
582-
if cfg.MaxBatchWaitTime > 0 && atomic.LoadUint64(&a.tikvTransportLayerLoad) > uint64(cfg.OverloadThreshold) {
583-
// If the target TiKV is overload, wait a while to collect more requests.
584-
metrics.TiKVBatchWaitOverLoad.Inc()
585-
a.fetchMorePendingRequests(int(cfg.MaxBatchSize), int(cfg.BatchWaitSize), cfg.MaxBatchWaitTime)
586-
} else if turboBatchWaitTime > 0 && headArrivalInterval > 0 && trigger.needFetchMore(headArrivalInterval) {
587-
batchWaitSize := trigger.preferredBatchWaitSize(avgBatchWaitSize, int(cfg.BatchWaitSize))
588-
a.fetchMorePendingRequests(int(cfg.MaxBatchSize), batchWaitSize, turboBatchWaitTime)
589-
a.metrics.batchMoreRequests.Observe(float64(a.reqBuilder.len() - batchSize))
590-
}
591-
}
592-
length := a.reqBuilder.len()
593-
avgBatchWaitSize = 0.2*float64(length) + 0.8*avgBatchWaitSize
594-
a.metrics.pendingRequests.Observe(float64(len(a.batchCommandsCh) + length))
595-
a.metrics.bestBatchSize.Observe(avgBatchWaitSize)
596-
a.metrics.headArrivalInterval.Observe(headArrivalInterval.Seconds())
597-
a.metrics.sendLoopWaitHeadDur.Observe(headRecvTime.Sub(sendLoopStartTime).Seconds())
598-
a.metrics.sendLoopWaitMoreDur.Observe(time.Since(sendLoopStartTime).Seconds())
599-
600-
a.getClientAndSend()
601-
602-
sendLoopEndTime := time.Now()
603-
a.metrics.sendLoopSendDur.Observe(sendLoopEndTime.Sub(sendLoopStartTime).Seconds())
604-
if dur := sendLoopEndTime.Sub(headRecvTime); dur > batchSendTailLatThreshold {
605-
a.metrics.batchSendTailLat.Observe(dur.Seconds())
606-
}
607-
}
608-
}
609-
610-
const (
611-
SendFailedReasonNoAvailableLimit = "concurrency limit exceeded"
612-
SendFailedReasonTryLockForSendFail = "tryLockForSend fail"
613-
)
614-
615-
func (a *batchConn) getClientAndSend() {
616-
if val, err := util.EvalFailpoint("mockBatchClientSendDelay"); err == nil {
617-
if timeout, ok := val.(int); ok && timeout > 0 {
618-
time.Sleep(time.Duration(timeout * int(time.Millisecond)))
619-
}
620-
}
621-
622-
// Choose a connection by round-robbin.
623-
var (
624-
cli *batchCommandsClient
625-
target string
626-
)
627-
reasons := make([]string, 0)
628-
hasHighPriorityTask := a.reqBuilder.hasHighPriorityTask()
629-
for i := 0; i < len(a.batchCommandsClients); i++ {
630-
a.index = (a.index + 1) % uint32(len(a.batchCommandsClients))
631-
target = a.batchCommandsClients[a.index].target
632-
// The lock protects the batchCommandsClient from been closed while it's in use.
633-
c := a.batchCommandsClients[a.index]
634-
if hasHighPriorityTask || c.available() > 0 {
635-
if c.tryLockForSend() {
636-
cli = c
637-
break
638-
} else {
639-
reasons = append(reasons, SendFailedReasonTryLockForSendFail)
640-
}
641-
} else {
642-
reasons = append(reasons, SendFailedReasonNoAvailableLimit)
643-
}
644-
}
645-
if cli == nil {
646-
logutil.BgLogger().Info("no available connections", zap.String("target", target), zap.Any("reasons", reasons))
647-
metrics.TiKVNoAvailableConnectionCounter.Inc()
648-
if config.GetGlobalConfig().TiKVClient.MaxConcurrencyRequestLimit == config.DefMaxConcurrencyRequestLimit {
649-
// Only cancel requests when MaxConcurrencyRequestLimit feature is not enabled, to be compatible with the behavior of older versions.
650-
// TODO: But when MaxConcurrencyRequestLimit feature is enabled, the requests won't be canceled and will wait until timeout.
651-
// This behavior may not be reasonable, as the timeout is usually 40s or 60s, which is too long to retry in time.
652-
a.reqBuilder.cancel(errors.New("no available connections"))
653-
}
654-
return
655-
}
656-
defer cli.unlockForSend()
657-
available := cli.available()
658-
reqSendTime := time.Now()
659-
batch := 0
660-
req, forwardingReqs := a.reqBuilder.buildWithLimit(available, func(id uint64, e *batchCommandsEntry) {
661-
cli.batched.Store(id, e)
662-
cli.sent.Add(1)
663-
atomic.StoreInt64(&e.sendLat, int64(reqSendTime.Sub(e.start)))
664-
if trace.IsEnabled() {
665-
trace.Log(e.ctx, "rpc", "send")
666-
}
667-
})
668-
if req != nil {
669-
batch += len(req.RequestIds)
670-
cli.send("", req)
671-
}
672-
for forwardedHost, req := range forwardingReqs {
673-
batch += len(req.RequestIds)
674-
cli.send(forwardedHost, req)
675-
}
676-
if batch > 0 {
677-
a.metrics.batchSize.Observe(float64(batch))
678-
}
679-
}
680-
681383
type tryLock struct {
682384
*sync.Cond
683385
reCreating bool
@@ -1127,18 +829,6 @@ func (c *batchCommandsClient) initBatchClient(forwardedHost string) error {
1127829
return nil
1128830
}
1129831

1130-
func (a *batchConn) Close() {
1131-
// Close all batchRecvLoop.
1132-
for _, c := range a.batchCommandsClients {
1133-
// After connections are closed, `batchRecvLoop`s will check the flag.
1134-
atomic.StoreInt32(&c.closed, 1)
1135-
}
1136-
// Don't close(batchCommandsCh) because when Close() is called, someone maybe
1137-
// calling SendRequest and writing batchCommandsCh, if we close it here the
1138-
// writing goroutine will panic.
1139-
close(a.closed)
1140-
}
1141-
1142832
func sendBatchRequest(
1143833
ctx context.Context,
1144834
addr string,

0 commit comments

Comments
 (0)