Skip to content

Commit af53b26

Browse files
committed
option-and-thinking
1 parent 080d8ad commit af53b26

File tree

3 files changed

+37
-5
lines changed

3 files changed

+37
-5
lines changed

pkg/ccl/changefeedccl/changefeedbase/options.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ const (
114114
OptLaggingRangesPollingInterval = `lagging_ranges_polling_interval`
115115
OptIgnoreDisableChangefeedReplication = `ignore_disable_changefeed_replication`
116116
OptEncodeJSONValueNullAsObject = `encode_json_value_null_as_object`
117+
OptCreateKafkaTopics = `create_kafka_topics`
117118
// TODO(#142273): look into whether we want to add headers to pub/sub, and other
118119
// sinks as well (eg cloudstorage, webhook, ..). Currently it's kafka-only.
119120
OptHeadersJSONColumnName = `headers_json_column_name`
@@ -406,6 +407,7 @@ var ChangefeedOptionExpectValues = map[string]OptionPermittedValues{
406407
OptLaggingRangesPollingInterval: durationOption,
407408
OptIgnoreDisableChangefeedReplication: flagOption,
408409
OptEncodeJSONValueNullAsObject: flagOption,
410+
OptCreateKafkaTopics: enum("yes", "no", "auto").orEmptyMeans("auto"),
409411
OptEnrichedProperties: csv(string(EnrichedPropertySource), string(EnrichedPropertySchema)),
410412
OptHeadersJSONColumnName: stringOption,
411413
}
@@ -428,7 +430,7 @@ var CommonOptions = makeStringSet(OptCursor, OptEndTime, OptEnvelope,
428430
var SQLValidOptions map[string]struct{} = nil
429431

430432
// KafkaValidOptions is options exclusive to Kafka sink
431-
var KafkaValidOptions = makeStringSet(OptAvroSchemaPrefix, OptConfluentSchemaRegistry, OptKafkaSinkConfig, OptHeadersJSONColumnName)
433+
var KafkaValidOptions = makeStringSet(OptAvroSchemaPrefix, OptConfluentSchemaRegistry, OptKafkaSinkConfig, OptHeadersJSONColumnName, OptCreateKafkaTopics)
432434

433435
// CloudStorageValidOptions is options exclusive to cloud storage sink
434436
var CloudStorageValidOptions = makeStringSet(OptCompression)
@@ -660,6 +662,26 @@ func (s StatementOptions) GetEndTime() string {
660662
return s.m[OptEndTime]
661663
}
662664

665+
type CreateKafkaTopics string
666+
667+
const (
668+
CreateKafkaTopicsAuto CreateKafkaTopics = "auto"
669+
CreateKafkaTopicsYes CreateKafkaTopics = "yes"
670+
CreateKafkaTopicsNo CreateKafkaTopics = "no"
671+
)
672+
673+
func (s StatementOptions) GetCreateKafkaTopics() (CreateKafkaTopics, error ) {
674+
if _, ok := s.m[OptCreateKafkaTopics]; !ok {
675+
return CreateKafkaTopicsAuto, nil
676+
}
677+
678+
rawVal, err := s.getEnumValue(OptCreateKafkaTopics)
679+
if err != nil {
680+
return CreateKafkaTopicsAuto, err
681+
}
682+
return CreateKafkaTopics(rawVal), nil
683+
}
684+
663685
func (s StatementOptions) getEnumValue(k string) (string, error) {
664686
enumOptions := ChangefeedOptionExpectValues[k]
665687
rawVal, present := s.m[k]

pkg/ccl/changefeedccl/sink.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,13 @@ func getSink(
250250
case isKafkaSink(u):
251251
return validateOptionsAndMakeSink(changefeedbase.KafkaValidOptions, func() (Sink, error) {
252252
if KafkaV2Enabled.Get(&serverCfg.Settings.SV) {
253+
createTopics, err := opts.GetCreateKafkaTopics()
254+
if err != nil {
255+
return nil, err
256+
}
253257
return makeKafkaSinkV2(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(),
254258
numSinkIOWorkers(serverCfg), newCPUPacerFactory(ctx, serverCfg), timeutil.DefaultTimeSource{},
255-
serverCfg.Settings, metricsBuilder, kafkaSinkV2Knobs{})
259+
serverCfg.Settings, metricsBuilder, kafkaSinkV2Knobs{}, createTopics)
256260
} else {
257261
return makeKafkaSink(ctx, &changefeedbase.SinkURL{URL: u}, AllTargets(feedCfg), opts.GetKafkaConfigJSON(), serverCfg.Settings, metricsBuilder)
258262
}

pkg/ccl/changefeedccl/sink_kafka_v2.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type kafkaSinkClientV2 struct {
4747
recordResize func(numRecords int64)
4848

4949
topicsForConnectionCheck []string
50+
createTopicsManually bool
5051

5152
// we need to fetch and keep track of this ourselves since kgo doesnt expose metadata to us
5253
metadataMu struct {
@@ -68,6 +69,7 @@ func newKafkaSinkClientV2(
6869
knobs kafkaSinkV2Knobs,
6970
mb metricsRecorderBuilder,
7071
topicsForConnectionCheck []string,
72+
createTopics changefeedbase.CreateKafkaTopics,
7173
) (*kafkaSinkClientV2, error) {
7274
bootstrapBrokers := strings.Split(bootstrapAddrsStr, `,`)
7375

@@ -82,8 +84,6 @@ func newKafkaSinkClientV2(
8284
kgo.ProducerBatchMaxBytes(256 << 20), // 256MiB
8385
kgo.BrokerMaxWriteBytes(1 << 30), // 1GiB
8486

85-
kgo.AllowAutoTopicCreation(),
86-
8787
kgo.RecordRetries(5),
8888
// This applies only to non-produce requests, ie the ListTopics call.
8989
kgo.RequestRetries(5),
@@ -95,6 +95,10 @@ func newKafkaSinkClientV2(
9595
}),
9696
}
9797

98+
if createTopics == changefeedbase.CreateKafkaTopicsAuto {
99+
baseOpts = append(baseOpts, kgo.AllowAutoTopicCreation())
100+
}
101+
98102
recordResize := func(numRecords int64) {}
99103
if m := mb(requiresResourceAccounting); m != nil { // `m` can be nil in tests.
100104
baseOpts = append(baseOpts, kgo.WithHooks(&kgoMetricsAdapter{throttling: m.getKafkaThrottlingMetrics(settings)}))
@@ -128,6 +132,7 @@ func newKafkaSinkClientV2(
128132
includeErrorDetails: changefeedbase.KafkaV2ErrorDetailsEnabled.Get(&settings.SV),
129133
recordResize: recordResize,
130134
topicsForConnectionCheck: topicsForConnectionCheck,
135+
createTopicsManually: createTopics == changefeedbase.CreateKafkaTopicsYes,
131136
}
132137
c.metadataMu.allTopicPartitions = make(map[string][]int32)
133138

@@ -361,6 +366,7 @@ func makeKafkaSinkV2(
361366
settings *cluster.Settings,
362367
mb metricsRecorderBuilder,
363368
knobs kafkaSinkV2Knobs,
369+
createTopics changefeedbase.CreateKafkaTopics,
364370
) (Sink, error) {
365371
batchCfg, retryOpts, err := getSinkConfigFromJson(jsonConfig, sinkJSONConfig{
366372
// Defaults from the v1 sink - flush immediately.
@@ -400,7 +406,7 @@ func makeKafkaSinkV2(
400406
}
401407

402408
topicsForConnectionCheck := topicNamer.DisplayNamesSlice()
403-
client, err := newKafkaSinkClientV2(ctx, clientOpts, batchCfg, u.Host, settings, knobs, mb, topicsForConnectionCheck)
409+
client, err := newKafkaSinkClientV2(ctx, clientOpts, batchCfg, u.Host, settings, knobs, mb, topicsForConnectionCheck, createTopics)
404410
if err != nil {
405411
return nil, err
406412
}

0 commit comments

Comments
 (0)