Skip to content

Commit fd83e49

Browse files
committed
opensearch: add connector, flow tests, deps
1 parent 1d464df commit fd83e49

File tree

12 files changed

+1007
-3
lines changed

12 files changed

+1007
-3
lines changed

.github/workflows/flow.yml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,24 @@ jobs:
6262
MINIO_API_PORT_NUMBER: 9999
6363
AWS_EC2_METADATA_DISABLED: true
6464
MINIO_DEFAULT_BUCKETS: peerdb
65+
opensearch:
66+
image: opensearchproject/opensearch:2.13.0
67+
ports:
68+
- 19200:9200
69+
env:
70+
discovery.type: single-node
71+
plugins.security.ssl.http.enabled: false
72+
plugins.ml.enabled: false
73+
plugins.security.disabled: true
74+
bootstrap.memory_lock: true
75+
OPENSEARCH_INITIAL_ADMIN_PASSWORD: S3curepa55!
76+
OPENSEARCH_USERNAME: admin
77+
OPENSEARCH_PASSWORD: S3curepa55!
78+
options: >-
79+
--health-cmd "curl -f http://localhost:9200/ || exit 1"
80+
--health-interval 10s
81+
--health-timeout 10s
82+
--health-retries 20
6583
otelcol:
6684
image: otel/opentelemetry-collector-contrib:0.128.0@sha256:1ab0baba0ee3695d823c46653d8a6e8894896e668ce8bd7ebe002e948d827bc7
6785
ports:
@@ -391,6 +409,9 @@ jobs:
391409
PEERDB_CATALOG_DATABASE: postgres
392410
PEERDB_QUEUE_FORCE_TOPIC_CREATION: "true"
393411
ELASTICSEARCH_TEST_ADDRESS: http://localhost:9200
412+
OPENSEARCH_TEST_ADDRESS: http://localhost:19200
413+
OPENSEARCH_TEST_USERNAME: admin
414+
OPENSEARCH_TEST_PASSWORD: S3curepa55!
394415
CI_PG_VERSION: ${{ matrix.db-version.pg }}
395416
CI_MYSQL_VERSION: ${{ matrix.db-version.mysql }}
396417
CI_MONGO_ADMIN_URI: mongodb://admin:admin@localhost:27017/?replicaSet=rs0&authSource=admin

flow/connectors/core.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
connbigquery "github.com/PeerDB-io/peerdb/flow/connectors/bigquery"
1313
connclickhouse "github.com/PeerDB-io/peerdb/flow/connectors/clickhouse"
1414
connelasticsearch "github.com/PeerDB-io/peerdb/flow/connectors/elasticsearch"
15+
connopensearch "github.com/PeerDB-io/peerdb/flow/connectors/opensearch"
1516
conneventhub "github.com/PeerDB-io/peerdb/flow/connectors/eventhub"
1617
connkafka "github.com/PeerDB-io/peerdb/flow/connectors/kafka"
1718
connmongo "github.com/PeerDB-io/peerdb/flow/connectors/mongo"
@@ -431,6 +432,12 @@ func LoadPeer(ctx context.Context, catalogPool shared.CatalogPool, peerName stri
431432
return nil, fmt.Errorf("failed to unmarshal Elasticsearch config: %w", err)
432433
}
433434
peer.Config = &protos.Peer_ElasticsearchConfig{ElasticsearchConfig: &config}
435+
case protos.DBType_OPENSEARCH:
436+
var config protos.OpensearchConfig
437+
if err := proto.Unmarshal(peerOptions, &config); err != nil {
438+
return nil, fmt.Errorf("failed to unmarshal Opensearch config: %w", err)
439+
}
440+
peer.Config = &protos.Peer_OpensearchConfig{OpensearchConfig: &config}
434441
default:
435442
return nil, fmt.Errorf("unsupported peer type: %s", peer.Type)
436443
}
@@ -462,6 +469,8 @@ func GetConnector(ctx context.Context, env map[string]string, config *protos.Pee
462469
return connpubsub.NewPubSubConnector(ctx, env, inner.PubsubConfig)
463470
case *protos.Peer_ElasticsearchConfig:
464471
return connelasticsearch.NewElasticsearchConnector(ctx, inner.ElasticsearchConfig)
472+
case *protos.Peer_OpensearchConfig:
473+
return connopensearch.NewOpensearchConnector(ctx, inner.OpensearchConfig)
465474
default:
466475
return nil, errors.ErrUnsupported
467476
}
@@ -514,6 +523,7 @@ var (
514523
_ CDCSyncConnector = &conns3.S3Connector{}
515524
_ CDCSyncConnector = &connclickhouse.ClickHouseConnector{}
516525
_ CDCSyncConnector = &connelasticsearch.ElasticsearchConnector{}
526+
_ CDCSyncConnector = &connopensearch.OpensearchConnector{}
517527

518528
_ CDCSyncPgConnector = &connpostgres.PostgresConnector{}
519529

@@ -552,6 +562,7 @@ var (
552562
_ QRepSyncConnector = &conns3.S3Connector{}
553563
_ QRepSyncConnector = &connclickhouse.ClickHouseConnector{}
554564
_ QRepSyncConnector = &connelasticsearch.ElasticsearchConnector{}
565+
_ QRepSyncConnector = &connopensearch.OpensearchConnector{}
555566

556567
_ QRepSyncPgConnector = &connpostgres.PostgresConnector{}
557568

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
package connopensearch
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"crypto/tls"
7+
"encoding/base64"
8+
"encoding/json"
9+
"errors"
10+
"fmt"
11+
"log/slog"
12+
"maps"
13+
"net/http"
14+
"sync"
15+
"sync/atomic"
16+
"time"
17+
18+
"github.com/opensearch-project/opensearch-go/v4"
19+
"github.com/opensearch-project/opensearch-go/v4/opensearchapi"
20+
"github.com/opensearch-project/opensearch-go/v4/opensearchutil"
21+
"go.temporal.io/sdk/log"
22+
23+
metadataStore "github.com/PeerDB-io/peerdb/flow/connectors/external_metadata"
24+
"github.com/PeerDB-io/peerdb/flow/connectors/utils"
25+
"github.com/PeerDB-io/peerdb/flow/generated/protos"
26+
"github.com/PeerDB-io/peerdb/flow/internal"
27+
"github.com/PeerDB-io/peerdb/flow/model"
28+
"github.com/PeerDB-io/peerdb/flow/shared"
29+
"github.com/PeerDB-io/peerdb/flow/shared/types"
30+
)
31+
32+
const (
33+
actionIndex = "index"
34+
actionDelete = "delete"
35+
)
36+
37+
type OpensearchConnector struct {
38+
*metadataStore.PostgresMetadata
39+
client *opensearch.Client
40+
apiClient *opensearchapi.Client
41+
logger log.Logger
42+
}
43+
44+
func NewOpensearchConnector(ctx context.Context,
45+
config *protos.OpensearchConfig,
46+
) (*OpensearchConnector, error) {
47+
osCfg := opensearch.Config{
48+
Addresses: config.Addresses,
49+
Transport: &http.Transport{
50+
MaxIdleConnsPerHost: 4,
51+
TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS13},
52+
},
53+
}
54+
if config.AuthType == protos.ElasticsearchAuthType_BASIC {
55+
osCfg.Username = *config.Username
56+
osCfg.Password = *config.Password
57+
} else if config.AuthType == protos.ElasticsearchAuthType_APIKEY {
58+
// TODO: Add API Key support
59+
}
60+
61+
osClient, err := opensearch.NewClient(osCfg)
62+
if err != nil {
63+
return nil, fmt.Errorf("error creating opensearch connector: %w", err)
64+
}
65+
apiClient, err := opensearchapi.NewClient(opensearchapi.Config{Client: osCfg})
66+
if err != nil {
67+
return nil, fmt.Errorf("error creating opensearch API client: %w", err)
68+
}
69+
pgMetadata, err := metadataStore.NewPostgresMetadata(ctx)
70+
if err != nil {
71+
return nil, err
72+
}
73+
74+
return &OpensearchConnector{
75+
PostgresMetadata: pgMetadata,
76+
client: osClient,
77+
apiClient: apiClient,
78+
logger: internal.LoggerFromCtx(ctx),
79+
}, nil
80+
}
81+
82+
func (osc *OpensearchConnector) ConnectionActive(ctx context.Context) error {
83+
err := osc.client.DiscoverNodes()
84+
if err != nil {
85+
return fmt.Errorf("failed to check if opensearch peer is active: %w", err)
86+
}
87+
return nil
88+
}
89+
90+
func (osc *OpensearchConnector) Close() error {
91+
// stateless connector
92+
return nil
93+
}
94+
95+
// ES is queue-like, no raw table staging needed
96+
func (osc *OpensearchConnector) CreateRawTable(ctx context.Context,
97+
req *protos.CreateRawTableInput,
98+
) (*protos.CreateRawTableOutput, error) {
99+
return &protos.CreateRawTableOutput{TableIdentifier: "n/a"}, nil
100+
}
101+
102+
// we handle schema changes by not handling them since no mapping is being enforced right now
103+
func (osc *OpensearchConnector) ReplayTableSchemaDeltas(ctx context.Context, env map[string]string,
104+
flowJobName string, _ []*protos.TableMapping, schemaDeltas []*protos.TableSchemaDelta,
105+
) error {
106+
return nil
107+
}
108+
109+
func recordItemsProcessor(items model.RecordItems) ([]byte, error) {
110+
qRecordJsonMap := make(map[string]any)
111+
112+
for key, val := range items.ColToVal {
113+
if r, ok := val.(types.QValueJSON); ok { // JSON is stored as a string, fix that
114+
qRecordJsonMap[key] = json.RawMessage(
115+
shared.UnsafeFastStringToReadOnlyBytes(r.Val))
116+
} else {
117+
qRecordJsonMap[key] = val.Value()
118+
}
119+
}
120+
121+
return json.Marshal(qRecordJsonMap)
122+
}
123+
124+
func (osc *OpensearchConnector) SyncRecords(ctx context.Context,
125+
req *model.SyncRecordsRequest[model.RecordItems],
126+
) (*model.SyncResponse, error) {
127+
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
128+
var lastSeenLSN atomic.Int64
129+
var numRecords int64
130+
131+
// no I don't like this either
132+
osBulkIndexerCache := make(map[string]opensearchutil.BulkIndexer)
133+
bulkIndexersHaveShutdown := false
134+
// true if we saw errors while closing
135+
cacheCloser := func() bool {
136+
closeHasErrors := false
137+
if !bulkIndexersHaveShutdown {
138+
for osBulkIndexer := range maps.Values(osBulkIndexerCache) {
139+
err := osBulkIndexer.Close(context.Background())
140+
if err != nil {
141+
osc.logger.Error("[os] failed to close bulk indexer", slog.Any("error", err))
142+
closeHasErrors = true
143+
}
144+
numRecords += int64(osBulkIndexer.Stats().NumFlushed)
145+
}
146+
bulkIndexersHaveShutdown = true
147+
}
148+
return closeHasErrors
149+
}
150+
defer cacheCloser()
151+
152+
flushLoopDone := make(chan struct{})
153+
go func() {
154+
flushTimeout, err := internal.PeerDBQueueFlushTimeoutSeconds(ctx, req.Env)
155+
if err != nil {
156+
osc.logger.Warn("[opensearch] failed to get flush timeout, no periodic flushing", slog.Any("error", err))
157+
return
158+
}
159+
ticker := time.NewTicker(flushTimeout)
160+
defer ticker.Stop()
161+
162+
for {
163+
select {
164+
case <-ctx.Done():
165+
return
166+
case <-flushLoopDone:
167+
return
168+
case <-ticker.C:
169+
lastSeen := lastSeenLSN.Load()
170+
if lastSeen > req.ConsumedOffset.Load() {
171+
if err := osc.SetLastOffset(ctx, req.FlowJobName, model.CdcCheckpoint{ID: lastSeen}); err != nil {
172+
osc.logger.Warn("[os] SetLastOffset error", slog.Any("error", err))
173+
} else {
174+
shared.AtomicInt64Max(req.ConsumedOffset, lastSeen)
175+
osc.logger.Info("processBatch", slog.Int64("updated last offset", lastSeen))
176+
}
177+
}
178+
}
179+
}
180+
}()
181+
182+
var docId string
183+
var bulkIndexFatalError error
184+
var bulkIndexErrors []error
185+
var bulkIndexOnFailureMutex sync.Mutex
186+
187+
for record := range req.Records.GetRecords() {
188+
if _, ok := record.(*model.MessageRecord[model.RecordItems]); ok {
189+
continue
190+
}
191+
192+
var bodyBytes []byte
193+
var err error
194+
action := actionIndex
195+
196+
switch record.(type) {
197+
case *model.InsertRecord[model.RecordItems], *model.UpdateRecord[model.RecordItems]:
198+
bodyBytes, err = recordItemsProcessor(record.GetItems())
199+
if err != nil {
200+
osc.logger.Error("[os] failed to json.Marshal record", slog.Any("error", err))
201+
return nil, fmt.Errorf("[os] failed to json.Marshal record: %w", err)
202+
}
203+
case *model.DeleteRecord[model.RecordItems]:
204+
action = actionDelete
205+
// no need to supply the document since we are deleting
206+
bodyBytes = nil
207+
}
208+
209+
bulkIndexer, ok := osBulkIndexerCache[record.GetDestinationTableName()]
210+
if !ok {
211+
bulkIndexer, err = opensearchutil.NewBulkIndexer(opensearchutil.BulkIndexerConfig{
212+
Index: record.GetDestinationTableName(),
213+
Client: osc.apiClient,
214+
// can't really ascertain how many tables present to provide a reasonable value
215+
NumWorkers: 1,
216+
FlushInterval: 10 * time.Second,
217+
})
218+
if err != nil {
219+
osc.logger.Error("[os] failed to initialize bulk indexer", slog.Any("error", err))
220+
return nil, fmt.Errorf("[os] failed to initialize bulk indexer: %w", err)
221+
}
222+
osBulkIndexerCache[record.GetDestinationTableName()] = bulkIndexer
223+
}
224+
225+
if len(req.TableNameSchemaMapping[record.GetDestinationTableName()].PrimaryKeyColumns) == 1 {
226+
qValue, err := record.GetItems().GetValueByColName(
227+
req.TableNameSchemaMapping[record.GetDestinationTableName()].PrimaryKeyColumns[0])
228+
if err != nil {
229+
osc.logger.Error("[os] failed to process record", slog.Any("error", err))
230+
return nil, fmt.Errorf("[os] failed to process record: %w", err)
231+
}
232+
docId = fmt.Sprint(qValue.Value())
233+
} else {
234+
tablePkey, err := model.RecToTablePKey(req.TableNameSchemaMapping, record)
235+
if err != nil {
236+
osc.logger.Error("[os] failed to process record", slog.Any("error", err))
237+
return nil, fmt.Errorf("[os] failed to process record: %w", err)
238+
}
239+
docId = base64.RawURLEncoding.EncodeToString(tablePkey.PkeyColVal[:])
240+
}
241+
242+
if err := bulkIndexer.Add(ctx, opensearchutil.BulkIndexerItem{
243+
Action: action,
244+
DocumentID: docId,
245+
Body: bytes.NewReader(bodyBytes),
246+
OnSuccess: func(_ context.Context, _ opensearchutil.BulkIndexerItem, _ opensearchapi.BulkRespItem) {
247+
shared.AtomicInt64Max(&lastSeenLSN, record.GetCheckpointID())
248+
record.PopulateCountMap(tableNameRowsMapping)
249+
},
250+
// OnFailure is called for each failed operation, log and let parent handle
251+
OnFailure: func(ctx context.Context, item opensearchutil.BulkIndexerItem,
252+
res opensearchapi.BulkRespItem, err error,
253+
) {
254+
// attempt to delete a record that wasn't present, possible from no initial load
255+
if item.Action == actionDelete && res.Status == 404 {
256+
return
257+
}
258+
bulkIndexOnFailureMutex.Lock()
259+
defer bulkIndexOnFailureMutex.Unlock()
260+
if err != nil {
261+
bulkIndexErrors = append(bulkIndexErrors, err)
262+
} else {
263+
causeString := ""
264+
if res.Error.Cause.Type != "" || res.Error.Cause.Reason != "" {
265+
causeString = fmt.Sprintf("(caused by type:%s reason:%s)", res.Error.Cause.Type, res.Error.Cause.Reason)
266+
}
267+
cbErr := fmt.Errorf("id:%s action:%s type:%s reason:%s %s", item.DocumentID, item.Action, res.Error.Type,
268+
res.Error.Reason, causeString)
269+
bulkIndexErrors = append(bulkIndexErrors, cbErr)
270+
if res.Error.Type == "illegal_argument_exception" {
271+
bulkIndexFatalError = cbErr
272+
}
273+
}
274+
},
275+
}); err != nil {
276+
osc.logger.Error("[os] failed to add record to bulk indexer", slog.Any("error", err))
277+
return nil, fmt.Errorf("[os] failed to add record to bulk indexer: %w", err)
278+
}
279+
if bulkIndexFatalError != nil {
280+
osc.logger.Error("[os] fatal error while indexing record", slog.Any("error", bulkIndexFatalError))
281+
return nil, fmt.Errorf("[os] fatal error while indexing record: %w", bulkIndexFatalError)
282+
}
283+
}
284+
// "Receive on a closed channel yields the zero value after all elements in the channel are received."
285+
close(flushLoopDone)
286+
287+
if cacheCloser() {
288+
osc.logger.Error("[os] failed to close bulk indexer(s)")
289+
return nil, errors.New("[os] failed to close bulk indexer(s)")
290+
}
291+
if len(bulkIndexErrors) > 0 {
292+
for _, err := range bulkIndexErrors {
293+
osc.logger.Error("[os] failed to index record", slog.Any("err", err))
294+
}
295+
}
296+
297+
lastCheckpoint := req.Records.GetLastCheckpoint()
298+
if err := osc.FinishBatch(ctx, req.FlowJobName, req.SyncBatchID, lastCheckpoint); err != nil {
299+
return nil, err
300+
}
301+
302+
return &model.SyncResponse{
303+
CurrentSyncBatchID: req.SyncBatchID,
304+
LastSyncedCheckpoint: lastCheckpoint,
305+
NumRecordsSynced: numRecords,
306+
TableNameRowsMapping: tableNameRowsMapping,
307+
TableSchemaDeltas: req.Records.SchemaDeltas,
308+
}, nil
309+
}

0 commit comments

Comments
 (0)