Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion internal/destregistry/providers/destawss3/destawss3.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ type AWSS3Config struct {
Endpoint string // Optional endpoint for testing
}


// AWSS3Credentials is the credentials for an S3 destination
type AWSS3Credentials struct {
Key string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,10 @@ func TestAWSS3Publisher_Format_LegacyPatterns(t *testing.T) {
}

tests := []struct {
name string
template string
expected string
oldConfig string
name string
template string
expected string
oldConfig string
}{
{
// OLD CONFIG:
Expand Down Expand Up @@ -417,7 +417,7 @@ func TestAWSS3Publisher_Format_LegacyPatterns(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Logf("Old config: %s", tt.oldConfig)

publisher := destawss3.NewAWSS3Publisher(
nil,
"my-bucket",
Expand All @@ -430,4 +430,4 @@ func TestAWSS3Publisher_Format_LegacyPatterns(t *testing.T) {
assert.Equal(t, tt.expected, *input.Key)
})
}
}
}
40 changes: 20 additions & 20 deletions internal/destregistry/providers/destawss3/destawss3_publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (

// S3Consumer implements testsuite.MessageConsumer
type S3Consumer struct {
client *s3.Client
bucket string
msgChan chan testsuite.Message
done chan struct{}
seenKeys map[string]bool
client *s3.Client
bucket string
msgChan chan testsuite.Message
done chan struct{}
seenKeys map[string]bool
}

func NewS3Consumer(client *s3.Client, bucket string) *S3Consumer {
Expand Down Expand Up @@ -60,7 +60,7 @@ func (c *S3Consumer) consume() {

func (c *S3Consumer) pollS3() {
ctx := context.Background()

// List all objects
result, err := c.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(c.bucket),
Expand All @@ -71,7 +71,7 @@ func (c *S3Consumer) pollS3() {

for _, obj := range result.Contents {
key := *obj.Key

// Skip if we've already seen this object
if c.seenKeys[key] {
continue
Expand Down Expand Up @@ -163,7 +163,7 @@ func (s *S3PublishSuite) SetupSuite() {

// Get LocalStack endpoint
endpoint := testinfra.EnsureLocalStack()

// Set AWS environment variables for LocalStack
// The AWS SDK v2 will pick these up automatically
os.Setenv("AWS_ENDPOINT_URL_S3", endpoint)
Expand All @@ -174,30 +174,30 @@ func (s *S3PublishSuite) SetupSuite() {
os.Unsetenv("AWS_S3_ENDPOINT")
os.Unsetenv("AWS_ENDPOINT_URL")
})

// Create S3 client for test consumer
ctx := context.Background()
cfg, err := config.LoadDefaultConfig(ctx,
config.WithRegion("us-east-1"),
config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("test", "test", "")),
)
require.NoError(t, err)

s.client = s3.NewFromConfig(cfg, func(o *s3.Options) {
o.UsePathStyle = true // Required for LocalStack
})

// Create a unique bucket for this test
s.bucket = fmt.Sprintf("test-bucket-%s", uuid.New().String())
_, err = s.client.CreateBucket(ctx, &s3.CreateBucketInput{
Bucket: aws.String(s.bucket),
})
require.NoError(t, err)

// Create provider
provider, err := destawss3.New(testutil.Registry.MetadataLoader())
require.NoError(t, err)

// Create destination configuration
dest := testutil.DestinationFactory.Any(
testutil.DestinationFactory.WithType("aws_s3"),
Expand All @@ -213,12 +213,12 @@ func (s *S3PublishSuite) SetupSuite() {
"secret": "test",
}),
)

// Create consumer
consumer := NewS3Consumer(s.client, s.bucket)
s.consumer = consumer
// Initialize suite

// Initialize suite
s.InitSuite(testsuite.Config{
Provider: provider,
Dest: &dest,
Expand All @@ -231,11 +231,11 @@ func (s *S3PublishSuite) TearDownSuite() {
if s.consumer != nil {
s.consumer.Close()
}

// Clean up bucket
if s.client != nil && s.bucket != "" {
ctx := context.Background()

// Delete all objects first
listResult, err := s.client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: aws.String(s.bucket),
Expand All @@ -248,7 +248,7 @@ func (s *S3PublishSuite) TearDownSuite() {
})
}
}

// Delete bucket
s.client.DeleteBucket(ctx, &s3.DeleteBucketInput{
Bucket: aws.String(s.bucket),
Expand All @@ -261,4 +261,4 @@ func TestS3PublishIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}
suite.Run(t, new(S3PublishSuite))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ func TestComputeTarget(t *testing.T) {
expectedTarget: "test-namespace/my-topic",
},
{
name: "with missing name config",
config: map[string]string{},
name: "with missing name config",
config: map[string]string{},
credentials: map[string]string{
"connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234",
},
Expand Down
8 changes: 6 additions & 2 deletions internal/migrator/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,13 @@ func New(opts MigrationOpts) (*Migrator, error) {
return nil, fmt.Errorf("failed to get migration driver: %w", err)
}

m, err := migrate.NewWithSourceInstance("iofs", d, opts.databaseURL())
dbURL := opts.databaseURL()
m, err := migrate.NewWithSourceInstance("iofs", d, dbURL)
if err != nil {
return nil, fmt.Errorf("migrate.New: %w", err)
// Sanitize the error to prevent credential exposure in logs
// The original error from golang-migrate may contain the full database URL
// with credentials, which would be exposed if this error is logged.
return nil, sanitizeConnectionError(err, dbURL)
}

return &Migrator{
Expand Down
Loading