@@ -20,7 +20,6 @@ import (
20
20
21
21
"github.com/pingcap/log"
22
22
"github.com/pingcap/tiflow/cdc/contextutil"
23
- "github.com/pingcap/tiflow/cdc/model"
24
23
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/txn"
25
24
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
26
25
"github.com/pingcap/tiflow/pkg/causality"
@@ -47,8 +46,6 @@ type worker struct {
47
46
flushInterval time.Duration
48
47
hasPending bool
49
48
postTxnExecutedCallbacks []func ()
50
-
51
- lastSlowConflictDetectLog map [model.TableID ]time.Time
52
49
}
53
50
54
51
func newWorker (ctx context.Context , ID int , backend backend , workerCount int ) * worker {
@@ -71,8 +68,6 @@ func newWorker(ctx context.Context, ID int, backend backend, workerCount int) *w
71
68
flushInterval : backend .MaxFlushInterval (),
72
69
hasPending : false ,
73
70
postTxnExecutedCallbacks : make ([]func (), 0 , 1024 ),
74
-
75
- lastSlowConflictDetectLog : make (map [model.TableID ]time.Time ),
76
71
}
77
72
}
78
73
@@ -90,9 +85,6 @@ func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error {
90
85
zap .String ("changefeedID" , w .changefeed ),
91
86
zap .Int ("workerID" , w .ID ))
92
87
93
- cleanSlowLogHistory := time .NewTicker (time .Hour )
94
- defer cleanSlowLogHistory .Stop ()
95
-
96
88
start := time .Now ()
97
89
for {
98
90
select {
@@ -101,15 +93,6 @@ func (w *worker) run(txnCh <-chan causality.TxnWithNotifier[*txnEvent]) error {
101
93
zap .String ("changefeedID" , w .changefeed ),
102
94
zap .Int ("workerID" , w .ID ))
103
95
return nil
104
- case <- cleanSlowLogHistory .C :
105
- lastSlowConflictDetectLog := w .lastSlowConflictDetectLog
106
- w .lastSlowConflictDetectLog = make (map [model.TableID ]time.Time )
107
- now := time .Now ()
108
- for tableID , lastLog := range lastSlowConflictDetectLog {
109
- if now .Sub (lastLog ) <= time .Minute {
110
- w .lastSlowConflictDetectLog [tableID ] = lastLog
111
- }
112
- }
113
96
case txn := <- txnCh :
114
97
// we get the data from txnCh.out until no more data here or reach the state that can be flushed.
115
98
// If no more data in txnCh.out, and also not reach the state that can be flushed,
@@ -166,24 +149,8 @@ func (w *worker) onEvent(txn *txnEvent, postTxnExecuted func()) bool {
166
149
return false
167
150
}
168
151
169
- conflictDetectTime := txn .conflictResolved .Sub (txn .start ).Seconds ()
170
- w .metricConflictDetectDuration .Observe (conflictDetectTime )
152
+ w .metricConflictDetectDuration .Observe (txn .conflictResolved .Sub (txn .start ).Seconds ())
171
153
w .metricQueueDuration .Observe (time .Since (txn .start ).Seconds ())
172
-
173
- // Log tables which conflict detect time larger than 1 minute.
174
- if conflictDetectTime > float64 (60 ) {
175
- now := time .Now ()
176
- // Log slow conflict detect tables every minute.
177
- if lastLog , ok := w .lastSlowConflictDetectLog [txn .Event .TableInfo .ID ]; ! ok || now .Sub (lastLog ) > time .Minute {
178
- log .Warn ("Transaction dmlSink finds a slow transaction in conflict detector" ,
179
- zap .String ("changefeedID" , w .changefeed ),
180
- zap .Int ("workerID" , w .ID ),
181
- zap .Int64 ("TableID" , txn .Event .TableInfo .ID ),
182
- zap .Float64 ("seconds" , conflictDetectTime ))
183
- w .lastSlowConflictDetectLog [txn .Event .Table .TableID ] = now
184
- }
185
- }
186
-
187
154
w .metricTxnWorkerHandledRows .Add (float64 (len (txn .Event .Rows )))
188
155
w .postTxnExecutedCallbacks = append (w .postTxnExecutedCallbacks , postTxnExecuted )
189
156
return w .backend .OnTxnEvent (txn .TxnCallbackableEvent )
0 commit comments