Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
7d7e5b0
Update.
Tang8330 Aug 8, 2025
d3b62b9
WIP.
Tang8330 Aug 8, 2025
9c3c7b8
WIP.
Tang8330 Aug 8, 2025
8cfc683
WIP.
Tang8330 Aug 8, 2025
3ba86a3
Update.
Tang8330 Aug 8, 2025
479f4ca
WIP.
Tang8330 Aug 8, 2025
4e7fec0
WIP.
Tang8330 Aug 8, 2025
ba8b35f
WPI.
Tang8330 Aug 8, 2025
9f1472d
Update.
Tang8330 Aug 8, 2025
32a6be5
WIP.
Tang8330 Aug 8, 2025
0bf35e6
Update.
Tang8330 Aug 8, 2025
bb98935
Update.
Tang8330 Aug 8, 2025
bb2a29a
Fix tests.
Tang8330 Aug 8, 2025
6bd7758
Merge branch 'master' into more-kafka-work
Tang8330 Aug 8, 2025
938f253
Merge branch 'master' into more-kafka-work
Tang8330 Aug 9, 2025
950df68
WIP.
Tang8330 Aug 9, 2025
a687f26
WIP.
Tang8330 Aug 9, 2025
02ec8b5
WIP.
Tang8330 Aug 9, 2025
d3ac263
WIP.
Tang8330 Aug 9, 2025
918ea55
WIp.
Tang8330 Aug 9, 2025
2703836
Merge branch 'master' into more-kafka-work
Tang8330 Aug 12, 2025
6653d01
Fix test.
Tang8330 Aug 13, 2025
5b44680
Update.
Tang8330 Aug 13, 2025
cfed1d8
Update.
Tang8330 Aug 13, 2025
e2efb4f
Clean.
Tang8330 Aug 13, 2025
b16fe48
Move up
Tang8330 Aug 13, 2025
097d5e5
Update.
Tang8330 Aug 13, 2025
d769c21
WIP.
Tang8330 Aug 13, 2025
057870e
WIP.
Tang8330 Aug 13, 2025
6295ef5
Update.
Tang8330 Aug 13, 2025
f04ed4c
Update.
Tang8330 Aug 13, 2025
ab5eac5
Update.
Tang8330 Aug 16, 2025
cf016f9
More.
Tang8330 Aug 16, 2025
4b77bbe
Another guardrail.
Tang8330 Aug 16, 2025
e551321
Comment.
Tang8330 Aug 16, 2025
e190a78
Merge branch 'master' into more-kafka-work
Tang8330 Aug 22, 2025
6240387
Update.
Tang8330 Aug 22, 2025
0c5ec2a
Update.
Tang8330 Aug 22, 2025
838e1c8
New line.
Tang8330 Aug 22, 2025
150a7a4
Update.
Tang8330 Aug 22, 2025
ccebd1c
Extra line.
Tang8330 Aug 22, 2025
2cd6bc9
Should Lock = True.
Tang8330 Aug 25, 2025
c298c15
WIP.
Tang8330 Aug 25, 2025
d7e255e
Update.
Tang8330 Aug 25, 2025
306f5e7
WIP.
Tang8330 Aug 25, 2025
4c5e65d
WIP.
Tang8330 Aug 25, 2025
5e453e0
Merge branch 'master' into more-kafka-work
Tang8330 Sep 1, 2025
5ed14f7
Update.
Tang8330 Sep 1, 2025
7e1723d
Update.
Tang8330 Sep 1, 2025
5e332bc
Update.
Tang8330 Sep 1, 2025
65d6e15
Update.
Tang8330 Sep 2, 2025
95259cf
Merge branch 'refactor-integration-test' into more-kafka-work
Tang8330 Sep 2, 2025
14b29a8
Update.
Tang8330 Sep 2, 2025
20f37a3
Merge branch 'master' into more-kafka-work
Tang8330 Sep 2, 2025
1b6d970
Merge branch 'master' into more-kafka-work
Tang8330 Sep 7, 2025
cd4736e
CBC.
Tang8330 Sep 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion lib/kafkalib/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,98 @@ package kafkalib

import (
"context"
"fmt"
"sync"

"github.com/segmentio/kafka-go"
)

type ctxKey string

func BuildContextKey(topic string) ctxKey {
return ctxKey(fmt.Sprintf("consumer-%s", topic))
}

type Consumer interface {
Close() (err error)
ReadMessage(ctx context.Context) (kafka.Message, error)
FetchMessage(ctx context.Context) (kafka.Message, error)
CommitMessages(ctx context.Context, msgs ...kafka.Message) error
}

type ConsumerProvider struct {
mu sync.Mutex
Consumer
GroupID string
}

func (c *ConsumerProvider) LockAndProcess(ctx context.Context, lock bool, do func() error) error {
if lock {
fmt.Println("locking")
c.mu.Lock()
defer c.mu.Unlock()
}

fmt.Println("acquired lock")

if err := do(); err != nil {
return fmt.Errorf("failed to process: %w", err)
}

return nil
}

func InjectConsumerProvidersIntoContext(ctx context.Context, cfg *Kafka) (context.Context, error) {
kafkaConn := NewConnection(cfg.EnableAWSMSKIAM, cfg.DisableTLS, cfg.Username, cfg.Password, DefaultTimeout)
dialer, err := kafkaConn.Dialer(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka dialer: %w", err)
}

for _, topicConfig := range cfg.TopicConfigs {
kafkaCfg := kafka.ReaderConfig{
GroupID: cfg.GroupID,
Dialer: dialer,
Topic: topicConfig.Topic,
Brokers: cfg.BootstrapServers(true),
}

ctx = context.WithValue(ctx, BuildContextKey(topicConfig.Topic), &ConsumerProvider{Consumer: kafka.NewReader(kafkaCfg), GroupID: cfg.GroupID})
}

return ctx, nil
}

func GetConsumerFromContext(ctx context.Context, topic string) (*ConsumerProvider, error) {
value := ctx.Value(BuildContextKey(topic))
consumer, ok := value.(*ConsumerProvider)
if !ok {
return nil, fmt.Errorf("consumer not found for topic %q, got: %T", topic, value)
}

return consumer, nil
}

func (t *ConsumerProvider) CommitMessage(ctx context.Context, msg kafka.Message) error {
return t.Consumer.CommitMessages(ctx, msg)
}

func (t *ConsumerProvider) FetchMessageAndProcess(ctx context.Context, do func(kafka.Message) error) error {
msg, err := t.Consumer.FetchMessage(ctx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Peek?

if err != nil {
return NewFetchMessageError(err)
}

fmt.Println("fetching message")
t.mu.Lock()
defer t.mu.Unlock()

fmt.Println("acquired lock")

if err := do(msg); err != nil {
return fmt.Errorf("failed to process message: %w", err)
}

fmt.Println("processed message")

return nil
}
22 changes: 22 additions & 0 deletions lib/kafkalib/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package kafkalib

import "fmt"

type FetchMessageError struct {
Err error
}

func NewFetchMessageError(err error) FetchMessageError {
return FetchMessageError{
Err: err,
}
}

func (e FetchMessageError) Error() string {
return fmt.Sprintf("failed to fetch message: %v", e.Err)
}

func IsFetchMessageError(err error) bool {
_, ok := err.(FetchMessageError)
return ok
}
5 changes: 5 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/artie-labs/transfer/lib/cryptography"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/destination/utils"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/logger"
"github.com/artie-labs/transfer/lib/telemetry/metrics"
"github.com/artie-labs/transfer/models"
Expand Down Expand Up @@ -85,6 +86,10 @@ func main() {
slog.Info("Starting...", slog.String("version", version))

inMemDB := models.NewMemoryDB()
ctx, err = kafkalib.InjectConsumerProvidersIntoContext(ctx, settings.Config.Kafka)
if err != nil {
logger.Fatal("Failed to inject consumer providers into context", slog.Any("err", err))
}

var wg sync.WaitGroup
wg.Add(1)
Expand Down
2 changes: 0 additions & 2 deletions models/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,6 @@ func (e *Event) Save(cfg config.Config, inMemDB *models.DatabaseData, tc kafkali

// Does the table exist?
td := inMemDB.GetOrCreateTableData(e.tableID, tc.Topic)
td.Lock()
defer td.Unlock()
if td.Empty() {
cols := &columns.Columns{}
if e.columns != nil {
Expand Down
4 changes: 0 additions & 4 deletions models/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ type TableData struct {
tableID cdc.TableID
*optimization.TableData
lastFlushTime time.Time
sync.Mutex
}

func (t *TableData) GetTableID() cdc.TableID {
Expand Down Expand Up @@ -69,7 +68,6 @@ func (d *DatabaseData) GetOrCreateTableData(tableID cdc.TableID, topic string) *

if _, ok := d.tableData[tableID]; !ok {
table := &TableData{
Mutex: sync.Mutex{},
topic: topic,
tableID: tableID,
}
Expand All @@ -81,8 +79,6 @@ func (d *DatabaseData) GetOrCreateTableData(tableID cdc.TableID, topic string) *
}

func (d *DatabaseData) ClearTableConfig(tableID cdc.TableID) {
d.Lock()
defer d.Unlock()
d.tableData[tableID].Wipe()
}

Expand Down
15 changes: 0 additions & 15 deletions processes/consumer/configs.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
package consumer

import (
"context"
"log/slog"
"sync"

"github.com/artie-labs/transfer/lib/artie"
"github.com/artie-labs/transfer/lib/cdc"
"github.com/artie-labs/transfer/lib/kafkalib"
)
Expand Down Expand Up @@ -38,15 +35,3 @@ type TopicConfigFormatter struct {
tc kafkalib.TopicConfig
cdc.Format
}

func commitOffset(ctx context.Context, topic string, partitionsToOffset map[int]artie.Message) error {
for _, msg := range partitionsToOffset {
if err := topicToConsumer.Get(topic).CommitMessages(ctx, msg.GetMessage()); err != nil {
return err
}

slog.Info("Successfully committed Kafka offset", slog.String("topic", topic), slog.Int("partition", msg.Partition()), slog.Int64("offset", msg.GetMessage().Offset))
}

return nil
}
120 changes: 70 additions & 50 deletions processes/consumer/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/artie-labs/transfer/lib/cdc"
"github.com/artie-labs/transfer/lib/config"
"github.com/artie-labs/transfer/lib/destination"
"github.com/artie-labs/transfer/lib/kafkalib"
"github.com/artie-labs/transfer/lib/retry"
"github.com/artie-labs/transfer/lib/stringutil"
"github.com/artie-labs/transfer/lib/telemetry/metrics/base"
Expand Down Expand Up @@ -42,6 +43,16 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B
}
}

topicsToConsumerProvider := make(map[string]*kafkalib.ConsumerProvider)
for topic := range topicToTables {
consumer, err := kafkalib.GetConsumerFromContext(ctx, topic)
if err != nil {
return fmt.Errorf("failed to get consumer from context: %w", err)
}

topicsToConsumerProvider[topic] = consumer
}

// Flush will take everything in memory and call the destination to create temp tables.
var wg sync.WaitGroup
for topic, tables := range topicToTables {
Expand All @@ -50,61 +61,68 @@ func Flush(ctx context.Context, inMemDB *models.DatabaseData, dest destination.B
continue
}

for _, tableData := range tables {
wg.Add(1)
go func(_tableData *models.TableData) {
defer wg.Done()

if args.CoolDown != nil && _tableData.ShouldSkipFlush(*args.CoolDown) {
slog.Debug("Skipping flush because we are currently in a flush cooldown", slog.String("tableID", _tableData.GetTableID().String()))
return
}

retryCfg, err := retry.NewJitterRetryConfig(1_000, 30_000, 15, retry.AlwaysRetry)
if err != nil {
slog.Error("Failed to create retry config", slog.Any("err", err))
return
}

_tableData.Lock()
defer _tableData.Unlock()
if _tableData.Empty() {
return
}

action := "merge"
if _tableData.Mode() == config.History {
action = "append"
}

start := time.Now()
tags := map[string]string{
"mode": _tableData.Mode().String(),
"table": _tableData.GetTableID().Table,
"database": _tableData.TopicConfig().Database,
"schema": _tableData.TopicConfig().Schema,
"reason": args.Reason,
}

what, err := retry.WithRetriesAndResult(retryCfg, func(_ int, _ error) (string, error) {
return flush(ctx, dest, _tableData, action, inMemDB.ClearTableConfig)
})

if err != nil {
slog.Error(fmt.Sprintf("Failed to %s", action), slog.Any("err", err), slog.String("tableID", _tableData.GetTableID().String()))
}

tags["what"] = what
metricsClient.Timing("flush", time.Since(start), tags)
}(tableData)
consumer, ok := topicsToConsumerProvider[topic]
if !ok {
return fmt.Errorf("consumer not found for topic %q", topic)
}

consumer.LockAndProcess(ctx, args.Topic == "", func() error {
for _, tableData := range tables {
wg.Add(1)
go func(_tableData *models.TableData) {
defer wg.Done()

if args.CoolDown != nil && _tableData.ShouldSkipFlush(*args.CoolDown) {
slog.Debug("Skipping flush because we are currently in a flush cooldown", slog.String("tableID", _tableData.GetTableID().String()))
return
}

retryCfg, err := retry.NewJitterRetryConfig(1_000, 30_000, 15, retry.AlwaysRetry)
if err != nil {
slog.Error("Failed to create retry config", slog.Any("err", err))
return
}

if _tableData.Empty() {
return
}

action := "merge"
if _tableData.Mode() == config.History {
action = "append"
}

start := time.Now()
tags := map[string]string{
"mode": _tableData.Mode().String(),
"table": _tableData.GetTableID().Table,
"database": _tableData.TopicConfig().Database,
"schema": _tableData.TopicConfig().Schema,
"reason": args.Reason,
}

what, err := retry.WithRetriesAndResult(retryCfg, func(_ int, _ error) (string, error) {
return flush(ctx, dest, _tableData, action, inMemDB.ClearTableConfig, consumer)
})

if err != nil {
slog.Error(fmt.Sprintf("Failed to %s", action), slog.Any("err", err), slog.String("tableID", _tableData.GetTableID().String()))
}

tags["what"] = what
metricsClient.Timing("flush", time.Since(start), tags)
}(tableData)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Concurrency Issues in TableData Handling

The removal of sync.Mutex from models.TableData and the d.Lock() from ClearTableConfig introduced two race conditions. ClearTableConfig now accesses the DatabaseData.tableData map concurrently without protection, and multiple flush goroutines can concurrently access and modify the same TableData instance without synchronization.

Additional Locations (1)

Fix in Cursor Fix in Web

}

return nil
})
}

wg.Wait()
return nil
}

func flush(ctx context.Context, dest destination.Baseline, _tableData *models.TableData, action string, clearTableConfig func(cdc.TableID)) (string, error) {
func flush(ctx context.Context, dest destination.Baseline, _tableData *models.TableData, action string, clearTableConfig func(cdc.TableID), consumer *kafkalib.ConsumerProvider) (string, error) {
// This is added so that we have a new temporary table suffix for each merge / append.
_tableData.ResetTempTableSuffix()

Expand All @@ -122,8 +140,10 @@ func flush(ctx context.Context, dest destination.Baseline, _tableData *models.Ta
}

if commitTransaction {
if err = commitOffset(ctx, _tableData.TopicConfig().Topic, _tableData.PartitionsToLastMessage); err != nil {
return "commit_fail", fmt.Errorf("failed to commit kafka offset: %w", err)
for _, msg := range _tableData.PartitionsToLastMessage {
if err = consumer.CommitMessage(ctx, msg.GetMessage()); err != nil {
return "commit_fail", fmt.Errorf("failed to commit kafka offset: %w", err)
}
}

slog.Info(fmt.Sprintf("%s success, clearing memory...", stringutil.CapitalizeFirstLetter(action)), slog.String("tableID", _tableData.GetTableID().String()))
Expand Down
7 changes: 0 additions & 7 deletions processes/consumer/flush_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,6 @@ import (
"github.com/stretchr/testify/suite"
)

func SetKafkaConsumer(_topicToConsumer map[string]kafkalib.Consumer) {
topicToConsumer = &TopicToConsumer{
topicToConsumer: _topicToConsumer,
}
}

type FlushTestSuite struct {
suite.Suite
fakeConsumer *mocks.FakeConsumer
Expand Down Expand Up @@ -58,7 +52,6 @@ func (f *FlushTestSuite) SetupTest() {
f.baseline = f.fakeBaseline
f.db = models.NewMemoryDB()
f.fakeConsumer = &mocks.FakeConsumer{}
SetKafkaConsumer(map[string]kafkalib.Consumer{"foo": f.fakeConsumer})
}

func TestFlushTestSuite(t *testing.T) {
Expand Down
6 changes: 5 additions & 1 deletion processes/consumer/flush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ func (f *FlushTestSuite) TestShouldFlush() {
}

func (f *FlushTestSuite) TestMemoryConcurrency() {
provider := kafkalib.NewTopicsToConsumerProviderForTest("test-group")
assert.NoError(f.T(), provider.Add("foo", f.fakeConsumer))
ctx := provider.InjectIntoContext(f.T().Context())

tableIDs := []cdc.TableID{
cdc.NewTableID("public", "dusty"),
cdc.NewTableID("public", "snowflake"),
Expand Down Expand Up @@ -128,7 +132,7 @@ func (f *FlushTestSuite) TestMemoryConcurrency() {
}

f.fakeBaseline.MergeReturns(true, nil)
assert.NoError(f.T(), Flush(f.T().Context(), f.db, f.baseline, metrics.NullMetricsProvider{}, Args{}))
assert.NoError(f.T(), Flush(ctx, f.db, f.baseline, metrics.NullMetricsProvider{}, Args{}))
assert.Equal(f.T(), f.fakeConsumer.CommitMessagesCallCount(), len(tableIDs)) // Commit 3 times because 3 topics.

for i := range f.fakeConsumer.CommitMessagesCallCount() {
Expand Down
Loading
Loading