Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -8329,6 +8329,14 @@ def go_deps():
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/twmb/franz-go/pkg/kadm/com_github_twmb_franz_go_pkg_kadm-v1.11.0.zip",
],
)
go_repository(
name = "com_github_twmb_franz_go_pkg_kfake",
build_file_proto_mode = "disable_global",
importpath = "github.com/twmb/franz-go/pkg/kfake",
# TODO: mirror this repo (to fix, run `./dev generate bazel --mirror`)
sum = "h1:OdVmioEFv4chXyb9F2X4Nv1uwKqYytSQZ2iH5i/u3u4=",
version = "v0.0.0-20241015012055-0a9996b613b1",
)
go_repository(
name = "com_github_twmb_franz_go_pkg_kmsg",
build_file_proto_mode = "disable_global",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ require (
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/twitchtv/twirp v8.1.0+incompatible // indirect
github.com/twmb/franz-go/pkg/kfake v0.0.0-20241015012055-0a9996b613b1 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.9.0 // indirect
github.com/twpayne/go-kml v1.5.2 // indirect
github.com/urfave/cli/v2 v2.3.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2227,6 +2227,8 @@ github.com/twmb/franz-go v1.18.0 h1:25FjMZfdozBywVX+5xrWC2W+W76i0xykKjTdEeD2ejw=
github.com/twmb/franz-go v1.18.0/go.mod h1:zXCGy74M0p5FbXsLeASdyvfLFsBvTubVqctIaa5wQ+I=
github.com/twmb/franz-go/pkg/kadm v1.11.0 h1:FfeWJ0qadntFpAcQt8JzNXW4dijjytZNLrzJuzzzuxA=
github.com/twmb/franz-go/pkg/kadm v1.11.0/go.mod h1:qrhkdH+SWS3ivmbqOgHbpgVHamhaKcjH0UM+uOp0M1A=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20241015012055-0a9996b613b1 h1:OdVmioEFv4chXyb9F2X4Nv1uwKqYytSQZ2iH5i/u3u4=
github.com/twmb/franz-go/pkg/kfake v0.0.0-20241015012055-0a9996b613b1/go.mod h1:nkBI/wGFp7t1NJnnCeJdS4sX5atPAqwCPpDXKuI7SC8=
github.com/twmb/franz-go/pkg/kmsg v1.9.0 h1:JojYUph2TKAau6SBtErXpXGC7E3gg4vGZMv9xFU/B6M=
github.com/twmb/franz-go/pkg/kmsg v1.9.0/go.mod h1:CMbfazviCyY6HM0SXuG5t9vOwYDHRCSrJJyBAe5paqg=
github.com/twpayne/go-geom v1.4.2 h1:I2MtC83UDniuTVzGWZOyhMn9hDPrjLm0jsUbohimtZQ=
Expand Down
3 changes: 3 additions & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"enriched_source_provider.go",
"event_processing.go",
"fetch_table_bytes.go",
"kafka_admin_helpers.go",
"metrics.go",
"parallel_io.go",
"parquet.go",
Expand Down Expand Up @@ -182,6 +183,7 @@ go_library(
"@com_github_twmb_franz_go//pkg/kgo",
"@com_github_twmb_franz_go//pkg/kversion",
"@com_github_twmb_franz_go_pkg_kadm//:kadm",
"@com_github_twmb_franz_go_pkg_kmsg//:kmsg",
"@com_google_cloud_go_pubsub//apiv1",
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
"@org_golang_google_api//impersonate",
Expand Down Expand Up @@ -382,6 +384,7 @@ go_test(
"@com_github_twmb_franz_go//pkg/sasl",
"@com_github_twmb_franz_go//pkg/sasl/plain",
"@com_github_twmb_franz_go_pkg_kadm//:kadm",
"@com_github_twmb_franz_go_pkg_kfake//:kfake",
"@com_google_cloud_go_pubsub//apiv1",
"@com_google_cloud_go_pubsub//apiv1/pubsubpb",
"@com_google_cloud_go_pubsub//pstest",
Expand Down
213 changes: 195 additions & 18 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ package changefeedccl
import (
"context"
"fmt"
"maps"
"net/url"
"slices"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/backup/backupresolver"
Expand Down Expand Up @@ -65,6 +68,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/twmb/franz-go/pkg/kversion"
)

// featureChangefeedEnabled is used to enable and disable the CHANGEFEED feature.
Expand Down Expand Up @@ -1552,9 +1559,19 @@ func (b *changefeedResumer) resumeWithRetries(
maxBackoff := changefeedbase.MaxRetryBackoff.Get(&execCfg.Settings.SV)
backoffReset := changefeedbase.RetryBackoffReset.Get(&execCfg.Settings.SV)
for r := getRetry(ctx, maxBackoff, backoffReset); r.Next(); {
flowErr := maybeUpgradePreProductionReadyExpression(ctx, jobID, details, jobExec)
flowErr := func() error {
targets, err := AllTargets(ctx, details, execCfg)
if err != nil {
return err
}
if err := maybeCreateKafkaTopics(ctx, details, targets); err != nil {
return errors.Wrap(err, "failed to create kafka topics")
}

if err := maybeUpgradePreProductionReadyExpression(ctx, jobID, details, jobExec); err != nil {
return errors.Wrap(err, "failed to upgrade pre-production ready expression")
}

if flowErr == nil {
// startedCh is normally used to signal back to the creator of the job that
// the job has started; however, in this case nothing will ever receive
// on the channel, causing the changefeed flow to block. Replace it with
Expand All @@ -1566,10 +1583,6 @@ func (b *changefeedResumer) resumeWithRetries(

confPoller := make(chan struct{})
g := ctxgroup.WithContext(ctx)
targets, err := AllTargets(ctx, details, execCfg)
if err != nil {
return err
}
g.GoCtx(func(ctx context.Context) error {
defer close(confPoller)
return distChangefeedFlow(ctx, jobExec, jobID, details, description, localState, startedCh, onTracingEvent, targets)
Expand All @@ -1594,21 +1607,20 @@ func (b *changefeedResumer) resumeWithRetries(
}
}
})
return g.Wait()
}()

flowErr = g.Wait()

if flowErr == nil {
return nil // Changefeed completed -- e.g. due to initial_scan=only mode.
}
if flowErr == nil {
return nil // Changefeed completed -- e.g. due to initial_scan=only mode.
}

if errors.Is(flowErr, replanErr) {
log.Dev.Infof(ctx, "restarting changefeed due to updated configuration")
continue
}
if errors.Is(flowErr, replanErr) {
log.Dev.Infof(ctx, "restarting changefeed due to updated configuration")
continue
}

if knobs != nil && knobs.HandleDistChangefeedError != nil {
flowErr = knobs.HandleDistChangefeedError(flowErr)
}
if knobs != nil && knobs.HandleDistChangefeedError != nil {
flowErr = knobs.HandleDistChangefeedError(flowErr)
}

// Terminate changefeed if needed.
Expand Down Expand Up @@ -2124,3 +2136,168 @@ func getChangefeedEventMigrator(migrateEvent bool) log.StructuredEventMigrator {
channel.CHANGEFEED,
)
}
func maybeCreateKafkaTopics(ctx context.Context, details jobspb.ChangefeedDetails, targets changefeedbase.Targets) error {
ctx, span := tracing.ChildSpan(ctx, "maybeCreateKafkaTopics")
defer span.Finish()

opts := changefeedbase.MakeStatementOptions(details.Opts)
createTopics, err := opts.GetCreateKafkaTopics()
if err != nil {
return err
}
if createTopics != changefeedbase.CreateKafkaTopicsYes {
return nil
}

parsedSinkURL, err := url.Parse(details.SinkURI)
if err != nil {
return err
}
if !isKafkaSink(parsedSinkURL) {
return nil
}

sinkURL := &changefeedbase.SinkURL{URL: parsedSinkURL}

// Build a TopicNamer and derive the exact set of topic names for this changefeed.
topicNamer, err := buildKafkaTopicNamer(targets, sinkURL)
if err != nil {
return err
}
topics := topicNamer.DisplayNamesSlice()
topicsSet := make(map[string]struct{}, len(topics))
for _, topic := range topics {
topicsSet[topic] = struct{}{}
}

// Build kafka client & admin using shared helper.
bootstrapBrokers := strings.Split(sinkURL.Host, `,`)
sinkOpts, err := opts.GetKafkaSinkOptions()
if err != nil {
return err
}
clientOpts, err := buildKgoConfig(ctx, sinkURL, sinkOpts.JSONConfig, nil /* netMetrics */)
if err != nil {
return err
}
client, adminClient, err := buildKafkaClients(ctx, bootstrapBrokers, false /* allowAutoTopic */, clientOpts, kafkaSinkV2Knobs{})
if err != nil {
return err
}
defer client.Close()

kadmClient := adminClient.(*kadm.Client)

// Check for topics existence first, so that clusters under the supported api version can still work if they already have the topics
allExist, preExistingTopics, err := topicsAlreadyExist(ctx, kadmClient, topicsSet)
if err != nil {
return err
}
if allExist {
return nil
}

if err := ensureWeCanCreateKafkaTopics(ctx, kadmClient); err != nil {
return err
}

// Determine which topics are missing.
missing := make([]string, 0, len(topics))
for _, t := range topics {
d, ok := preExistingTopics[t]
if d.Err != nil && !errors.Is(d.Err, kerr.UnknownTopicOrPartition) {
return errors.Wrapf(d.Err, "failed to get topic details for topic %s", t)
}
if !ok || (d.Err != nil && errors.Is(d.Err, kerr.UnknownTopicOrPartition)) {
missing = append(missing, t)
}
}
if len(missing) == 0 {
return nil
}

// Validate the topics before creating them.
vresp, err := kadmClient.ValidateCreateTopics(ctx, -1, -1, nil, missing...)
if err != nil {
return err
}
for topic, r := range vresp {
if r.Err != nil && !errors.Is(r.Err, kerr.TopicAlreadyExists) {
return errors.Wrapf(r.Err, "failed to validate topic creation for topic %s", topic)
}
}

// Create the topics.
cresp, err := kadmClient.CreateTopics(ctx, -1, -1, nil, missing...)
if err != nil {
return err
}
for topic, r := range cresp {
if r.Err != nil && !errors.Is(r.Err, kerr.TopicAlreadyExists) {
return errors.Wrapf(r.Err, "failed to create topic %s", topic)
}
log.Dev.Infof(ctx, "created topic %s with (numPartitions: %d, replicationFactor: %d, configs: %+v)", topic, r.NumPartitions, r.ReplicationFactor, r.Configs)
}
return nil
}

// ensureWeCanCreateKafkaTopics checks if the admin client supports create
// topics with default values (-1), and returns an error if it doesn't. The
// actual check is a bit strange but essentially we need to make sure all the
// brokers support the create topics api at least at v2.4.0 level.
func ensureWeCanCreateKafkaTopics(ctx context.Context, adminClient *kadm.Client) error {
v24 := kversion.V2_4_0()
v24kvm, ok := v24.LookupMaxKeyVersion(kmsg.CreateTopics.Int16())
if !ok {
return errors.AssertionFailedf("v2.4.0 does not support create topics but it should")
}

vr, err := adminClient.ApiVersions(ctx)
if err != nil {
return err
}
for bid, ver := range vr {
if ver.Err != nil {
return errors.Wrapf(ver.Err, "failed to get api versions for broker %d", bid)
}
mv, ok := ver.KeyMaxVersion(kmsg.CreateTopics.Int16())
if !ok {
return errors.Errorf("broker %d does not support create topics at all", bid)
}
if mv < v24kvm {
return errors.Errorf("broker %d does not support create topics at >= v2.4.0 level: %s vs %s", bid, mv, v24kvm)
}
}

return nil
}

// topicsAlreadyExist checks if the topics already exist in the cluster.
// It returns true if all the topics exist, the details of the topics if they exist,
// and an error if there is an error.
func topicsAlreadyExist(ctx context.Context, adminClient *kadm.Client, topicsSet map[string]struct{}) (bool, kadm.TopicDetails, error) {
tr, err := adminClient.ListTopics(ctx, slices.Collect(maps.Keys(topicsSet))...)
if err != nil {
return false, nil, err
}
preExistingTopics := make(map[string]struct{}, len(topicsSet))
for topic, r := range tr {
if r.Err != nil && !errors.Is(r.Err, kerr.UnknownTopicOrPartition) {
return false, nil, errors.Wrapf(r.Err, "failed to list topic %s", topic)
}
preExistingTopics[topic] = struct{}{}
}
return setsAreEqual(topicsSet, preExistingTopics), tr, nil
}

func setsAreEqual(a, b map[string]struct{}) bool {
if len(a) != len(b) {
return false
}
for k := range a {
if _, ok := b[k]; !ok {
return false
}
}
return true
}
Loading