Skip to content

Commit a085125

Browse files
authored
kafka(ticdc): sarama do not retry if produce message failed to prevent out of order (#11870) (#11960)
close #11935
1 parent f4b40c7 commit a085125

File tree

2 files changed

+9
-9
lines changed

2 files changed

+9
-9
lines changed

pkg/logutil/log.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -223,14 +223,11 @@ func initMySQLLogger() error {
223223

224224
// initSaramaLogger hacks logger used in sarama lib
225225
func initSaramaLogger(level zapcore.Level) error {
226-
// only available less than info level
227-
if !zapcore.InfoLevel.Enabled(level) {
228-
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level)
229-
if err != nil {
230-
return errors.Trace(err)
231-
}
232-
sarama.Logger = logger
226+
logger, err := zap.NewStdLogAt(log.L().With(zap.String("component", "sarama")), level)
227+
if err != nil {
228+
return errors.Trace(err)
233229
}
230+
sarama.Logger = logger
234231
return nil
235232
}
236233

pkg/sink/kafka/sarama.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,18 @@ func NewSaramaConfig(ctx context.Context, o *Options) (*sarama.Config, error) {
5858
// For kafka cluster with a bad network condition, producer should not try to
5959
// waster too much time on sending a message, get response no matter success
6060
// or fail as soon as possible is preferred.
61-
config.Producer.Retry.Max = 3
62-
config.Producer.Retry.Backoff = 100 * time.Millisecond
61+
// According to the https://github.com/IBM/sarama/issues/2619,
62+
// sarama may send message out of order even set the `config.Net.MaxOpenRequest` to 1,
63+
// when the kafka cluster is unhealthy and trigger the internal retry mechanism.
64+
config.Producer.Retry.Max = 0
6365

6466
// make sure sarama producer flush messages as soon as possible.
6567
config.Producer.Flush.Bytes = 0
6668
config.Producer.Flush.Messages = 0
6769
config.Producer.Flush.Frequency = time.Duration(0)
6870
config.Producer.Flush.MaxMessages = o.MaxMessages
6971

72+
config.Net.MaxOpenRequests = 1
7073
config.Net.DialTimeout = o.DialTimeout
7174
config.Net.WriteTimeout = o.WriteTimeout
7275
config.Net.ReadTimeout = o.ReadTimeout

0 commit comments

Comments
 (0)