Skip to content

Commit a8a858b

Browse files
authored
topsql: add pubsub datasink (#30860)
1 parent fe1aaf2 commit a8a858b

File tree

7 files changed

+537
-22
lines changed

7 files changed

+537
-22
lines changed

server/rpc_server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import (
3333
"github.com/pingcap/tidb/util"
3434
"github.com/pingcap/tidb/util/logutil"
3535
"github.com/pingcap/tidb/util/memory"
36+
"github.com/pingcap/tidb/util/topsql"
3637
"go.uber.org/zap"
3738
"google.golang.org/grpc"
3839
"google.golang.org/grpc/keepalive"
@@ -64,6 +65,7 @@ func NewRPCServer(config *config.Config, dom *domain.Domain, sm util.SessionMana
6465
}
6566
diagnosticspb.RegisterDiagnosticsServer(s, rpcSrv)
6667
tikvpb.RegisterTikvServer(s, rpcSrv)
68+
topsql.RegisterPubSubServer(s)
6769
return s
6870
}
6971

sessionctx/variable/tidb_vars.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -832,5 +832,5 @@ type TopSQL struct {
832832

833833
// TopSQLEnabled uses to check whether enabled the top SQL feature.
834834
func TopSQLEnabled() bool {
835-
return TopSQLVariable.Enable.Load() && config.GetGlobalConfig().TopSQL.ReceiverAddress != ""
835+
return TopSQLVariable.Enable.Load()
836836
}

util/topsql/reporter/mock/pubsub.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2021 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package mock
16+
17+
import (
18+
"fmt"
19+
"net"
20+
21+
"github.com/pingcap/tidb/util/logutil"
22+
"go.uber.org/zap"
23+
"google.golang.org/grpc"
24+
)
25+
26+
type mockPubSubServer struct {
27+
addr string
28+
listen net.Listener
29+
grpcServer *grpc.Server
30+
}
31+
32+
// NewMockPubSubServer creates a mock publisher server.
33+
func NewMockPubSubServer() (*mockPubSubServer, error) {
34+
addr := "127.0.0.1:0"
35+
lis, err := net.Listen("tcp", addr)
36+
if err != nil {
37+
return nil, err
38+
}
39+
server := grpc.NewServer()
40+
41+
return &mockPubSubServer{
42+
addr: fmt.Sprintf("127.0.0.1:%d", lis.Addr().(*net.TCPAddr).Port),
43+
listen: lis,
44+
grpcServer: server,
45+
}, nil
46+
}
47+
48+
func (svr *mockPubSubServer) Serve() {
49+
err := svr.grpcServer.Serve(svr.listen)
50+
if err != nil {
51+
logutil.BgLogger().Warn("[top-sql] mock pubsub server serve failed", zap.Error(err))
52+
}
53+
}
54+
55+
func (svr *mockPubSubServer) Server() *grpc.Server {
56+
return svr.grpcServer
57+
}
58+
59+
func (svr *mockPubSubServer) Address() string {
60+
return svr.addr
61+
}
62+
63+
func (svr *mockPubSubServer) Stop() {
64+
if svr.grpcServer != nil {
65+
svr.grpcServer.Stop()
66+
}
67+
}

util/topsql/reporter/pubsub.go

Lines changed: 267 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
// Copyright 2021 PingCAP, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package reporter
16+
17+
import (
18+
"context"
19+
"errors"
20+
"time"
21+
22+
"github.com/pingcap/tidb/util"
23+
"github.com/pingcap/tidb/util/logutil"
24+
"github.com/pingcap/tipb/go-tipb"
25+
"go.uber.org/zap"
26+
)
27+
28+
// TopSQLPubSubService implements tipb.TopSQLPubSubServer.
29+
//
30+
// If a client subscribes to TopSQL records, the TopSQLPubSubService is responsible
31+
// for registering an associated DataSink to the reporter. Then the DataSink sends
32+
// data to the client periodically.
33+
type TopSQLPubSubService struct {
34+
dataSinkRegisterer DataSinkRegisterer
35+
}
36+
37+
// NewTopSQLPubSubService creates a new TopSQLPubSubService.
38+
func NewTopSQLPubSubService(dataSinkRegisterer DataSinkRegisterer) *TopSQLPubSubService {
39+
return &TopSQLPubSubService{dataSinkRegisterer: dataSinkRegisterer}
40+
}
41+
42+
var _ tipb.TopSQLPubSubServer = &TopSQLPubSubService{}
43+
44+
// Subscribe registers dataSinks to the reporter and redirects data received from reporter
45+
// to subscribers associated with those dataSinks.
46+
func (ps *TopSQLPubSubService) Subscribe(_ *tipb.TopSQLSubRequest, stream tipb.TopSQLPubSub_SubscribeServer) error {
47+
ds := newPubSubDataSink(stream, ps.dataSinkRegisterer)
48+
if err := ps.dataSinkRegisterer.Register(ds); err != nil {
49+
return err
50+
}
51+
return ds.run()
52+
}
53+
54+
type pubSubDataSink struct {
55+
ctx context.Context
56+
cancel context.CancelFunc
57+
58+
stream tipb.TopSQLPubSub_SubscribeServer
59+
sendTaskCh chan sendTask
60+
61+
// for deregister
62+
registerer DataSinkRegisterer
63+
}
64+
65+
func newPubSubDataSink(stream tipb.TopSQLPubSub_SubscribeServer, registerer DataSinkRegisterer) *pubSubDataSink {
66+
ctx, cancel := context.WithCancel(stream.Context())
67+
68+
return &pubSubDataSink{
69+
ctx: ctx,
70+
cancel: cancel,
71+
72+
stream: stream,
73+
sendTaskCh: make(chan sendTask, 1),
74+
75+
registerer: registerer,
76+
}
77+
}
78+
79+
var _ DataSink = &pubSubDataSink{}
80+
81+
func (ds *pubSubDataSink) TrySend(data *ReportData, deadline time.Time) error {
82+
select {
83+
case ds.sendTaskCh <- sendTask{data: data, deadline: deadline}:
84+
return nil
85+
case <-ds.ctx.Done():
86+
return ds.ctx.Err()
87+
default:
88+
ignoreReportChannelFullCounter.Inc()
89+
return errors.New("the channel of pubsub dataSink is full")
90+
}
91+
}
92+
93+
func (ds *pubSubDataSink) OnReporterClosing() {
94+
ds.cancel()
95+
}
96+
97+
func (ds *pubSubDataSink) run() error {
98+
defer func() {
99+
ds.registerer.Deregister(ds)
100+
ds.cancel()
101+
}()
102+
103+
for {
104+
select {
105+
case task := <-ds.sendTaskCh:
106+
ctx, cancel := context.WithDeadline(ds.ctx, task.deadline)
107+
var err error
108+
109+
start := time.Now()
110+
go util.WithRecovery(func() {
111+
defer cancel()
112+
err = ds.doSend(ctx, task.data)
113+
114+
if err != nil {
115+
reportAllDurationFailedHistogram.Observe(time.Since(start).Seconds())
116+
} else {
117+
reportAllDurationSuccHistogram.Observe(time.Since(start).Seconds())
118+
}
119+
}, nil)
120+
121+
// When the deadline is exceeded, the closure inside `go util.WithRecovery` above may not notice that
122+
// immediately because it can be blocked by `stream.Send`.
123+
// In order to clean up resources as quickly as possible, we let that closure run in an individual goroutine,
124+
// and wait for timeout here.
125+
<-ctx.Done()
126+
127+
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
128+
logutil.BgLogger().Warn(
129+
"[top-sql] pubsub datasink failed to send data to subscriber due to deadline exceeded",
130+
zap.Time("deadline", task.deadline),
131+
)
132+
return ctx.Err()
133+
}
134+
135+
if err != nil {
136+
logutil.BgLogger().Warn(
137+
"[top-sql] pubsub datasink failed to send data to subscriber",
138+
zap.Error(err),
139+
)
140+
return err
141+
}
142+
case <-ds.ctx.Done():
143+
return ds.ctx.Err()
144+
}
145+
}
146+
}
147+
148+
func (ds *pubSubDataSink) doSend(ctx context.Context, data *ReportData) error {
149+
if err := ds.sendCPUTime(ctx, data.CPUTimeRecords); err != nil {
150+
return err
151+
}
152+
if err := ds.sendSQLMeta(ctx, data.SQLMetas); err != nil {
153+
return err
154+
}
155+
return ds.sendPlanMeta(ctx, data.PlanMetas)
156+
}
157+
158+
func (ds *pubSubDataSink) sendCPUTime(ctx context.Context, records []tipb.CPUTimeRecord) (err error) {
159+
if len(records) == 0 {
160+
return
161+
}
162+
163+
start := time.Now()
164+
sentCount := 0
165+
defer func() {
166+
topSQLReportRecordCounterHistogram.Observe(float64(sentCount))
167+
if err != nil {
168+
reportRecordDurationFailedHistogram.Observe(time.Since(start).Seconds())
169+
} else {
170+
reportRecordDurationSuccHistogram.Observe(time.Since(start).Seconds())
171+
}
172+
}()
173+
174+
cpuRecord := &tipb.TopSQLSubResponse_Record{}
175+
r := &tipb.TopSQLSubResponse{RespOneof: cpuRecord}
176+
177+
for i := range records {
178+
cpuRecord.Record = &records[i]
179+
if err = ds.stream.Send(r); err != nil {
180+
return
181+
}
182+
sentCount += 1
183+
184+
select {
185+
case <-ctx.Done():
186+
err = ctx.Err()
187+
return
188+
default:
189+
}
190+
}
191+
192+
return
193+
}
194+
195+
func (ds *pubSubDataSink) sendSQLMeta(ctx context.Context, sqlMetas []tipb.SQLMeta) (err error) {
196+
if len(sqlMetas) == 0 {
197+
return
198+
}
199+
200+
start := time.Now()
201+
sentCount := 0
202+
defer func() {
203+
topSQLReportSQLCountHistogram.Observe(float64(sentCount))
204+
if err != nil {
205+
reportSQLDurationFailedHistogram.Observe(time.Since(start).Seconds())
206+
} else {
207+
reportSQLDurationSuccHistogram.Observe(time.Since(start).Seconds())
208+
}
209+
}()
210+
211+
sqlMeta := &tipb.TopSQLSubResponse_SqlMeta{}
212+
r := &tipb.TopSQLSubResponse{RespOneof: sqlMeta}
213+
214+
for i := range sqlMetas {
215+
sqlMeta.SqlMeta = &sqlMetas[i]
216+
if err = ds.stream.Send(r); err != nil {
217+
return
218+
}
219+
sentCount += 1
220+
221+
select {
222+
case <-ctx.Done():
223+
err = ctx.Err()
224+
return
225+
default:
226+
}
227+
}
228+
229+
return
230+
}
231+
232+
func (ds *pubSubDataSink) sendPlanMeta(ctx context.Context, planMetas []tipb.PlanMeta) (err error) {
233+
if len(planMetas) == 0 {
234+
return
235+
}
236+
237+
start := time.Now()
238+
sentCount := 0
239+
defer func() {
240+
topSQLReportPlanCountHistogram.Observe(float64(sentCount))
241+
if err != nil {
242+
reportPlanDurationFailedHistogram.Observe(time.Since(start).Seconds())
243+
} else {
244+
reportPlanDurationSuccHistogram.Observe(time.Since(start).Seconds())
245+
}
246+
}()
247+
248+
planMeta := &tipb.TopSQLSubResponse_PlanMeta{}
249+
r := &tipb.TopSQLSubResponse{RespOneof: planMeta}
250+
251+
for i := range planMetas {
252+
planMeta.PlanMeta = &planMetas[i]
253+
if err = ds.stream.Send(r); err != nil {
254+
return
255+
}
256+
sentCount += 1
257+
258+
select {
259+
case <-ctx.Done():
260+
err = ctx.Err()
261+
return
262+
default:
263+
}
264+
}
265+
266+
return
267+
}

0 commit comments

Comments
 (0)