diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml index 5217653f..c3deedec 100644 --- a/.github/workflows/build-test.yml +++ b/.github/workflows/build-test.yml @@ -306,6 +306,11 @@ jobs: label: 'SyncDaprStateStore', path: './reactions/dapr/sync-statestore', name: 'reaction-sync-dapr-statestore' + }, + { + label: 'PostDaprPubSub', + path: './reactions/dapr/post-pubsub', + name: 'reaction-post-dapr-pubsub' } ] steps: diff --git a/.github/workflows/draft-release.yml b/.github/workflows/draft-release.yml index 0fb3f8f6..b15a8719 100644 --- a/.github/workflows/draft-release.yml +++ b/.github/workflows/draft-release.yml @@ -69,6 +69,7 @@ env: {"label": "Result", "path": "reactions/platform/result-reaction", "name": "reaction-result", "platforms": "linux/amd64,linux/arm64"}, {"label": "StorageQueue", "path": "reactions/azure/storagequeue-reaction", "name": "reaction-storage-queue", "platforms": "linux/amd64,linux/arm64"}, {"label": "SyncDaprStateStore", "path": "reactions/dapr/sync-statestore", "name": "reaction-sync-dapr-statestore", "platforms": "linux/amd64,linux/arm64"}, + {"label": "PostDaprPubSub", "path": "reactions/dapr/post-pubsub", "name": "reaction-post-dapr-pubsub", "platforms": "linux/amd64,linux/arm64"}, {"label": "StoredProc", "path": "reactions/sql/storedproc-reaction", "name": "reaction-storedproc", "platforms": "linux/amd64,linux/arm64"}]' jobs: diff --git a/cli/installers/resources/default-reaction-providers.yaml b/cli/installers/resources/default-reaction-providers.yaml index 313d1fef..3d84271c 100644 --- a/cli/installers/resources/default-reaction-providers.yaml +++ b/cli/installers/resources/default-reaction-providers.yaml @@ -247,4 +247,13 @@ name: SyncDaprStateStore spec: services: reaction: - image: reaction-sync-dapr-statestore \ No newline at end of file + image: reaction-sync-dapr-statestore +--- +apiVersion: v1 +kind: ReactionProvider +name: PostDaprPubSub +spec: + services: + reaction: + image: reaction-post-dapr-pubsub + diff --git a/e2e-tests/06-post-dapr-pubsub-scenario/post-pubsub-redis.test.js b/e2e-tests/06-post-dapr-pubsub-scenario/post-pubsub-redis.test.js new file mode 100644 index 00000000..b31fe3a2 --- /dev/null +++ b/e2e-tests/06-post-dapr-pubsub-scenario/post-pubsub-redis.test.js @@ -0,0 +1,310 @@ +const { describe, beforeAll, afterAll, test, expect } = require('@jest/globals'); +const { Client: PgClient } = require('pg'); +const { createClient: createRedisClient } = require('redis'); +const yaml = require('js-yaml'); +const fs = require('fs'); +const path = require('path'); +const PortForward = require('../fixtures/port-forward'); +const deployResources = require('../fixtures/deploy-resources'); +const deleteResources = require('../fixtures/delete-resources'); +const { waitFor } = require('../fixtures/infrastructure'); // Added import + +const SCENARIO_DIR = __dirname; +const K8S_RESOURCES_FILE = path.join(SCENARIO_DIR, 'resources.yaml'); +const SOURCES_FILE = path.join(SCENARIO_DIR, 'sources.yaml'); +const QUERIES_FILE = path.join(SCENARIO_DIR, 'queries.yaml'); +const REACTION_PROVIDER_FILE = path.join(SCENARIO_DIR, 'reaction-provider.yaml'); +const REACTIONS_FILE = path.join(SCENARIO_DIR, 'reactions.yaml'); + +const POSTGRES_SERVICE_NAME = 'pubsub-test-db'; +const POSTGRES_NAMESPACE = 'default'; +const POSTGRES_PORT = 5432; +const POSTGRES_USER = 'testuser'; +const POSTGRES_PASSWORD = 'testpassword'; +const POSTGRES_DATABASE = 'testdb'; + +const DAPR_PUBSUB_REDIS_SERVICE_NAME = 'dapr-pubsub-redis-svc'; +const DAPR_PUBSUB_REDIS_NAMESPACE = 'default'; +const DAPR_PUBSUB_REDIS_PORT = 6379; + +const PACKED_TOPIC = 'e2e-topic-packed'; +const UNPACKED_TOPIC = 'e2e-topic-unpacked'; + +function loadYaml(filePath) { + const content = fs.readFileSync(filePath, 'utf8'); + return yaml.loadAll(content); +} + +async function clearRedisStream(redisClient, streamKey) { + try { + // XTRIM with MAXLEN 0 deletes all entries. + await redisClient.xTrim(streamKey, 'MAXLEN', 0); + console.log(`Cleared Redis stream: ${streamKey}`); + } catch (err) { + // Ignore if stream doesn't exist (error code 'ERR no such key') + if (err.message && !err.message.toLowerCase().includes('no such key')) { + console.error(`Error clearing Redis stream ${streamKey}:`, err); + } else { + console.log(`Redis stream ${streamKey} did not exist or already cleared.`); + } + } +} + +async function getMessagesFromRedisStream(redisClient, streamKey, lastId = '0-0') { + try { + const messages = await redisClient.xRange(streamKey, '-', '+'); + if (!messages || messages.length === 0) { + return []; + } + return messages.map(msg => { + const id = msg.id; + const fields = msg.message; + if (fields && fields.data) { + try { + return { id, data: JSON.parse(fields.data) }; + } catch (e) { + console.warn(`Failed to parse JSON from 'data' field in message ${id} from stream ${streamKey}:`, fields.data); + return { id, data: fields.data }; + } + } + console.warn(`Message ${id} from stream ${streamKey} did not have a 'data' field:`, fields); + return { id, data: fields }; + }); + } catch (err) { + if (err.message && err.message.toLowerCase().includes("no such key")) { + return []; + } + console.error(`Error reading from Redis stream ${streamKey}:`, err); + throw err; + } +} + + +describe('PostDaprPubSub Reaction with Redis Stream Verification', () => { + let pgClient; + let redisClient; + let pgPortForward; + let redisPortForward; + + const k8sResources = loadYaml(K8S_RESOURCES_FILE); + const sourceResources = loadYaml(SOURCES_FILE); + const queryResources = loadYaml(QUERIES_FILE); + const reactionProviderResources = loadYaml(REACTION_PROVIDER_FILE); + const reactionResources = loadYaml(REACTIONS_FILE); + + const allResourceDefinitions = [ + ...k8sResources, + ...sourceResources, + ...queryResources, + ...reactionProviderResources, + ...reactionResources, + ]; + + + beforeAll(async () => { + console.log("Starting E2E test setup for PostDaprPubSub (Redis)..."); + try { + // 1. deploy all k8s resouces first + console.log("Deploying K8s resources..."); + await deployResources(k8sResources); + + // 2. then wait for 15 seconds + console.log("Waiting for K8s resources to stabilize..."); + await waitFor({ timeoutMs: 10000, description: "K8s resources to stabilize" }); + + // 3. then deploy sources.yaml + console.log("Deploying Drasi Source resources..."); + await deployResources(sourceResources); + + // 4. then deploy queries.yaml + console.log("Deploying Drasi Query resources..."); + await deployResources(queryResources); + + // 5. Then deploy reaction-provider + console.log("Deploying Drasi ReactionProvider resources..."); + await deployResources(reactionProviderResources); + + // 6. then deploy reaction + console.log("Deploying Drasi Reaction resources..."); + await deployResources(reactionResources); + console.log("All Drasi resources deployed."); + + pgPortForward = new PortForward(POSTGRES_SERVICE_NAME, POSTGRES_PORT, POSTGRES_NAMESPACE); + const localPgPort = await pgPortForward.start(); + pgClient = new PgClient({ + host: 'localhost', + port: localPgPort, + user: POSTGRES_USER, + password: POSTGRES_PASSWORD, + database: POSTGRES_DATABASE, + }); + await pgClient.connect(); + console.log("Connected to PostgreSQL via port forward."); + + redisPortForward = new PortForward(DAPR_PUBSUB_REDIS_SERVICE_NAME, DAPR_PUBSUB_REDIS_PORT, DAPR_PUBSUB_REDIS_NAMESPACE); + const localRedisPort = await redisPortForward.start(); + redisClient = createRedisClient({ url: `redis://localhost:${localRedisPort}` }); + await redisClient.connect(); + console.log("Connected to Dapr Pub/Sub Redis via port forward."); + + console.log("Waiting for 15 more seconds after all setup..."); + await waitFor({ timeoutMs: 15000, description: "all of the setup to stabilize" }); + + } catch (error) { + console.error("Error during beforeAll setup:", error); + if (pgPortForward) pgPortForward.stop(); + if (redisPortForward) redisPortForward.stop(); + if (pgClient) await pgClient.end().catch(console.error); + if (redisClient) await redisClient.quit().catch(console.error); + await deleteResources(allResourceDefinitions).catch(err => console.error("Cleanup failed during error handling:", err)); + throw error; + } + }, 300000); // 5 minutes timeout for setup + + afterAll(async () => { + console.log("Starting E2E test teardown..."); + if (pgClient) await pgClient.end().catch(err => console.error("Error closing PG client:", err)); + if (redisClient) await redisClient.quit().catch(err => console.error("Error quitting Redis client:", err)); + + if (pgPortForward) pgPortForward.stop(); + if (redisPortForward) redisPortForward.stop(); + + console.log("Attempting to delete Drasi and K8s resources..."); + await deleteResources(allResourceDefinitions).catch(err => console.error("Error during deleteResources:", err)); + console.log("Teardown complete."); + }, 300000); // 5 minutes timeout for teardown + + test('PACKED: should publish a packed ChangeEvent to the correct Redis Stream on INSERT', async () => { + await clearRedisStream(redisClient, PACKED_TOPIC); + + const newProductName = `Test Product Packed ${Date.now()}`; + const newProductPrice = 99.99; + await pgClient.query( + "INSERT INTO product (name, description, price) VALUES ($1, 'Packed Test Desc', $2)", + [newProductName, newProductPrice] + ); + + const receivedMessages = await waitFor({ + actionFn: () => getMessagesFromRedisStream(redisClient, PACKED_TOPIC), + predicateFn: (messages) => messages && messages.length >= 1, + timeoutMs: 10000, + pollIntervalMs: 1000, + description: `packed message for product "${newProductName}" to appear in Redis stream ${PACKED_TOPIC}` + }); + + expect(receivedMessages).toBeDefined(); + expect(receivedMessages.length).toEqual(1); + + const cloudEvent = receivedMessages[0].data; + expect(cloudEvent).toBeDefined(); + expect(cloudEvent.topic).toBe(PACKED_TOPIC); + + const drasiEvent = cloudEvent.data; + expect(drasiEvent).toBeDefined(); + expect(drasiEvent.payload).toBeDefined(); + expect(drasiEvent.payload.after).toBeDefined(); + expect(drasiEvent.payload.after.name).toBe(newProductName); + expect(drasiEvent.op).toBe('i'); + expect(parseFloat(drasiEvent.payload.after.price)).toBe(newProductPrice); + }, 20000); + + test('UNPACKED: should publish individual unpacked change notifications on INSERT', async () => { + await clearRedisStream(redisClient, UNPACKED_TOPIC); + + const newProductName = `Test Product Unpacked ${Date.now()}`; + const newProductPrice = 49.50; + await pgClient.query( + "INSERT INTO product (name, description, price) VALUES ($1, 'Unpacked Test Desc', $2)", + [newProductName, newProductPrice] + ); + + const receivedMessages = await waitFor({ + actionFn: () => getMessagesFromRedisStream(redisClient, UNPACKED_TOPIC), + predicateFn: (messages) => messages && messages.length >= 1, + timeoutMs: 10000, + pollIntervalMs: 1000, + description: `unpacked message for product "${newProductName}" to appear in Redis stream ${UNPACKED_TOPIC}` + }); + + expect(receivedMessages).toBeDefined(); + expect(receivedMessages.length).toEqual(1); + + const cloudEvent = receivedMessages[0].data; + expect(cloudEvent).toBeDefined(); + expect(cloudEvent.topic).toBe(UNPACKED_TOPIC); + + const drasiEvent = cloudEvent.data; + expect(drasiEvent).toBeDefined(); + expect(drasiEvent.op).toBe('i'); // Insert operation + expect(drasiEvent.payload).toBeDefined(); + expect(drasiEvent.payload.source).toBeDefined(); + expect(drasiEvent.payload.source.queryId).toBe('product-updates-unpacked'); + expect(drasiEvent.payload.after).toBeDefined(); + expect(drasiEvent.payload.after.name).toBe(newProductName); + expect(parseFloat(drasiEvent.payload.after.price)).toBe(newProductPrice); + expect(drasiEvent.payload.before).toBeUndefined(); + }, 20000); + + test('UNPACKED: should publish individual unpacked change notifications on UPDATE', async () => { + // Ensure a product exists to update. + const productNameToUpdate = `ProductToUpdate ${Date.now()}`; + const initialDescription = "Initial Description"; + const initialPrice = 50.00; + await pgClient.query( + "INSERT INTO product (name, description, price) VALUES ($1, $2, $3)", + [productNameToUpdate, initialDescription, initialPrice] + ); + + await waitFor({ + actionFn: () => getMessagesFromRedisStream(redisClient, UNPACKED_TOPIC), + predicateFn: (messages) => messages && messages.length >= 1, + timeoutMs: 10000, + pollIntervalMs: 1000, + description: `propagation of initial insert event for "${productNameToUpdate}" to appear in Redis stream ${UNPACKED_TOPIC}` + }); + + await clearRedisStream(redisClient, UNPACKED_TOPIC); // Clear before update + + const updatedDescription = 'High-performance laptop - Updated Model'; + await pgClient.query( + "UPDATE product SET description = $1 WHERE name = $2", + [updatedDescription, productNameToUpdate] + ); + + const receivedMessages = await waitFor({ + actionFn: async () => { + const allMessages = await getMessagesFromRedisStream(redisClient, UNPACKED_TOPIC); + // Filter for an 'update' (op: 'u') event + return allMessages.filter(msg => + msg.data && + msg.data.data && // Drasi event level + msg.data.data.op === 'u' + ); + }, + predicateFn: (filteredUpdateMessages) => filteredUpdateMessages && filteredUpdateMessages.length === 1, + timeoutMs: 10000, + pollIntervalMs: 1000, + description: `unpacked update message for product "${productNameToUpdate}" in Redis stream ${UNPACKED_TOPIC}` + }); + + expect(receivedMessages).toBeDefined(); + expect(receivedMessages.length).toEqual(1); + + const cloudEvent = receivedMessages[0].data; + expect(cloudEvent).toBeDefined(); + expect(cloudEvent.topic).toBe(UNPACKED_TOPIC); + + const drasiEvent = cloudEvent.data; + expect(drasiEvent).toBeDefined(); + expect(drasiEvent.op).toBe('u'); // Update operation + expect(drasiEvent.payload).toBeDefined(); + expect(drasiEvent.payload.source).toBeDefined(); + expect(drasiEvent.payload.source.queryId).toBe('product-updates-unpacked'); + expect(drasiEvent.payload.after).toBeDefined(); + expect(drasiEvent.payload.after.name).toBe(productNameToUpdate); + expect(drasiEvent.payload.after.description).toBe(updatedDescription); + expect(drasiEvent.payload.before).toBeDefined(); + expect(drasiEvent.payload.before.name).toBe(productNameToUpdate); + expect(drasiEvent.payload.before.description).toBe(initialDescription); + }, 20000); +}); \ No newline at end of file diff --git a/e2e-tests/06-post-dapr-pubsub-scenario/queries.yaml b/e2e-tests/06-post-dapr-pubsub-scenario/queries.yaml new file mode 100644 index 00000000..12dc243e --- /dev/null +++ b/e2e-tests/06-post-dapr-pubsub-scenario/queries.yaml @@ -0,0 +1,37 @@ +apiVersion: v1 +kind: ContinuousQuery +name: product-updates-packed +spec: + mode: query + sources: + subscriptions: + - id: pubsub-pg-source + nodes: + - sourceLabel: product + query: > + MATCH + (p:product) + RETURN + p.product_id AS product_id, + p.name AS name, + p.description AS description, + p.price AS price +--- +apiVersion: v1 +kind: ContinuousQuery +name: product-updates-unpacked +spec: + mode: query + sources: + subscriptions: + - id: pubsub-pg-source + nodes: + - sourceLabel: product + query: > + MATCH + (p:product) + RETURN + p.product_id AS product_id, + p.name AS name, + p.description AS description, + p.price AS price \ No newline at end of file diff --git a/e2e-tests/06-post-dapr-pubsub-scenario/reaction-provider.yaml b/e2e-tests/06-post-dapr-pubsub-scenario/reaction-provider.yaml new file mode 100644 index 00000000..ac17a7be --- /dev/null +++ b/e2e-tests/06-post-dapr-pubsub-scenario/reaction-provider.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ReactionProvider +name: PostDaprPubSub +spec: + services: + reaction: + image: reaction-post-dapr-pubsub \ No newline at end of file diff --git a/e2e-tests/06-post-dapr-pubsub-scenario/reactions.yaml b/e2e-tests/06-post-dapr-pubsub-scenario/reactions.yaml new file mode 100644 index 00000000..481d045b --- /dev/null +++ b/e2e-tests/06-post-dapr-pubsub-scenario/reactions.yaml @@ -0,0 +1,20 @@ +kind: Reaction +apiVersion: v1 +name: post-dapr-pubsub +spec: + kind: PostDaprPubSub + queries: + product-updates-packed: > + { + "pubsubName": "drasitest-pubsub", + "topicName": "e2e-topic-packed", + "outputFormat": "Packed", + "skipControlSignals": false + } + product-updates-unpacked: > + { + "pubsubName": "drasitest-pubsub", + "topicName": "e2e-topic-unpacked", + "outputFormat": "Unpacked", + "skipControlSignals": true + } \ No newline at end of file diff --git a/e2e-tests/06-post-dapr-pubsub-scenario/resources.yaml b/e2e-tests/06-post-dapr-pubsub-scenario/resources.yaml new file mode 100644 index 00000000..17fcac99 --- /dev/null +++ b/e2e-tests/06-post-dapr-pubsub-scenario/resources.yaml @@ -0,0 +1,164 @@ +# PostgreSQL ConfigMap for Initialization +apiVersion: v1 +kind: ConfigMap +metadata: + name: pubsub-test-db-init + namespace: default +data: + init.sql: | + CREATE ROLE replication_group; + CREATE ROLE replication_user REPLICATION LOGIN; + GRANT replication_group TO testuser; + GRANT replication_group TO replication_user; + + CREATE TABLE product ( + product_id SERIAL PRIMARY KEY, + name VARCHAR(255) NOT NULL, + description TEXT, + price DECIMAL(10, 2) + ); + + ALTER TABLE product OWNER TO replication_group; + + INSERT INTO product (name, description, price) VALUES + ('Laptop', 'High-performance laptop', 1200.00), + ('Mouse', 'Ergonomic wireless mouse', 25.00); + + -- Create a publication for the product table + CREATE PUBLICATION product_publication FOR TABLE product; + + -- Create a replication slot + SELECT pg_create_logical_replication_slot('product_slot', 'pgoutput'); +--- +# PostgreSQL Secret for Credentials +apiVersion: v1 +kind: Secret +metadata: + name: pubsub-test-db-credentials + namespace: default +type: Opaque +stringData: + POSTGRES_USER: testuser + POSTGRES_PASSWORD: testpassword +--- +# PostgreSQL StatefulSet +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: pubsub-test-db + namespace: default +spec: + serviceName: pubsub-test-db + replicas: 1 + selector: + matchLabels: + app: pubsub-test-db + template: + metadata: + labels: + app: pubsub-test-db + spec: + containers: + - name: postgres + image: postgres:15-alpine + ports: + - containerPort: 5432 + envFrom: + - secretRef: + name: pubsub-test-db-credentials + env: + - name: POSTGRES_DB + value: testdb + args: + - -c + - wal_level=logical + - -c + - max_replication_slots=5 + - -c + - max_wal_senders=10 + volumeMounts: + - name: product-db-data + mountPath: /var/lib/postgresql/data + - name: init-script + mountPath: /docker-entrypoint-initdb.d + resources: + limits: + cpu: "1" + memory: "1Gi" + requests: + cpu: "0.5" + memory: "512Mi" + volumes: + - name: product-db-data + emptyDir: {} + - name: init-script + configMap: + name: pubsub-test-db-init +--- +# PostgreSQL Service +apiVersion: v1 +kind: Service +metadata: + name: pubsub-test-db + namespace: default +spec: + ports: + - port: 5432 + selector: + app: pubsub-test-db +--- +# Redis Deployment for Dapr Pub/Sub +apiVersion: apps/v1 +kind: Deployment +metadata: + name: dapr-pubsub-redis +spec: + replicas: 1 + selector: + matchLabels: + app: dapr-pubsub-redis + template: + metadata: + labels: + app: dapr-pubsub-redis + spec: + containers: + - name: redis + image: redis:7-alpine + ports: + - containerPort: 6379 + resources: + limits: + cpu: "0.5" + memory: "512Mi" + requests: + cpu: "0.1" + memory: "256Mi" +--- +# Redis Service for Dapr Pub/Sub +apiVersion: v1 +kind: Service +metadata: + name: dapr-pubsub-redis-svc + namespace: default +spec: + ports: + - port: 6379 + targetPort: 6379 + selector: + app: dapr-pubsub-redis +--- +# Dapr Pub/Sub Component in drasi-system Namespace +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: drasitest-pubsub + namespace: drasi-system +spec: + type: pubsub.redis + version: v1 + metadata: + - name: redisHost + value: dapr-pubsub-redis-svc.default.svc.cluster.local:6379 + - name: redisPassword + value: "" \ No newline at end of file diff --git a/e2e-tests/06-post-dapr-pubsub-scenario/sources.yaml b/e2e-tests/06-post-dapr-pubsub-scenario/sources.yaml new file mode 100644 index 00000000..cf9728f1 --- /dev/null +++ b/e2e-tests/06-post-dapr-pubsub-scenario/sources.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Source +name: pubsub-pg-source +spec: + kind: PostgreSQL + properties: + host: pubsub-test-db.default.svc.cluster.local + port: 5432 + user: testuser + password: testpassword + database: testdb + ssl: false + tables: + - public.product \ No newline at end of file diff --git a/e2e-tests/fixtures/infrastructure.js b/e2e-tests/fixtures/infrastructure.js index 80648010..612baa6c 100644 --- a/e2e-tests/fixtures/infrastructure.js +++ b/e2e-tests/fixtures/infrastructure.js @@ -40,6 +40,7 @@ const images = [ "drasi-project/reaction-storedproc", "drasi-project/reaction-gremlin", "drasi-project/reaction-sync-dapr-statestore", + "drasi-project/reaction-post-dapr-pubsub", ]; async function loadDrasiImages(clusterName) { diff --git a/reactions/dapr/post-pubsub/Dockerfile b/reactions/dapr/post-pubsub/Dockerfile new file mode 100644 index 00000000..78277ea3 --- /dev/null +++ b/reactions/dapr/post-pubsub/Dockerfile @@ -0,0 +1,31 @@ +# Build stage +FROM mcr.microsoft.com/dotnet/sdk:9.0 AS build +ARG BUILD_CONFIGURATION=Release +WORKDIR /src + +# Copy solution and project files +COPY post-dapr-pubsub.sln ./ +COPY Drasi.Reactions.PostDaprPubSub/Drasi.Reactions.PostDaprPubSub.csproj ./Drasi.Reactions.PostDaprPubSub/ + +# Restore dependencies +RUN dotnet restore "./Drasi.Reactions.PostDaprPubSub/Drasi.Reactions.PostDaprPubSub.csproj" + +# Copy only the source code +COPY Drasi.Reactions.PostDaprPubSub/ ./Drasi.Reactions.PostDaprPubSub/ + +# Build the reaction project +WORKDIR /src/Drasi.Reactions.PostDaprPubSub +RUN dotnet publish "./Drasi.Reactions.PostDaprPubSub.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false + +# Final stage/image +FROM mcr.microsoft.com/dotnet/aspnet:9.0 AS final +WORKDIR /app +COPY --from=build /app/publish . + +# Set log levels for reaction in production +ENV Logging__LogLevel__Default="Information" +ENV Logging__LogLevel__Microsoft="Warning" +ENV Logging__LogLevel__Microsoft_Hosting_Lifetime="Information" +ENV Logging__LogLevel__Drasi_Reactions_PostDaprPubSub="Debug" + +ENTRYPOINT ["dotnet", "Drasi.Reactions.PostDaprPubSub.dll"] \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Dockerfile.debug b/reactions/dapr/post-pubsub/Dockerfile.debug new file mode 100644 index 00000000..33085784 --- /dev/null +++ b/reactions/dapr/post-pubsub/Dockerfile.debug @@ -0,0 +1,32 @@ +# Build stage +FROM mcr.microsoft.com/dotnet/sdk:9.0 AS build +ARG BUILD_CONFIGURATION=Release +WORKDIR /src + +# Copy solution and project files +COPY post-dapr-pubsub.sln ./ +COPY Drasi.Reactions.PostDaprPubSub/Drasi.Reactions.PostDaprPubSub.csproj ./Drasi.Reactions.PostDaprPubSub/ + +# Restore dependencies +RUN dotnet restore "./Drasi.Reactions.PostDaprPubSub/Drasi.Reactions.PostDaprPubSub.csproj" + +# Copy only the source code +COPY Drasi.Reactions.PostDaprPubSub/ ./Drasi.Reactions.PostDaprPubSub/ + +# Build the reaction project +WORKDIR /src/Drasi.Reactions.PostDaprPubSub +RUN dotnet publish "./Drasi.Reactions.PostDaprPubSub.csproj" -c $BUILD_CONFIGURATION -o /app/publish /p:UseAppHost=false + +# Final stage/image +FROM ubuntu:25.04 AS final +RUN apt-get update && apt-get install -y bash curl dotnet-runtime-8.0 aspnetcore-runtime-8.0 && rm -rf /var/lib/apt/lists/* +WORKDIR /app +COPY --from=build /app/publish . + +# Set log levels for reaction in debug environment +ENV Logging__LogLevel__Default="Debug" +ENV Logging__LogLevel__Microsoft="Information" +ENV Logging__LogLevel__Microsoft_Hosting_Lifetime="Information" +ENV Logging__LogLevel__Drasi_Reactions_PostDaprPubSub="Debug" + +ENTRYPOINT ["dotnet", "Drasi.Reactions.PostDaprPubSub.dll"] \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ChangeFormatterTests.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ChangeFormatterTests.cs new file mode 100644 index 00000000..09d83250 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ChangeFormatterTests.cs @@ -0,0 +1,185 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Xunit; +using Moq; +using Microsoft.Extensions.Logging; +using Drasi.Reaction.SDK.Models.QueryOutput; +using Drasi.Reactions.PostDaprPubSub.Services; +using System.Text.Json; + +namespace Drasi.Reactions.PostDaprPubSub.Tests; + +public class ChangeFormatterTests +{ + [Fact] + public void DrasiChangeFormatter_FormatEmptyChangeEvent_ShouldReturnEmptyCollection() + { + // Arrange + var formatter = new DrasiChangeFormatter(); + var evt = new ChangeEvent + { + QueryId = "test-query", + Sequence = 1, + AddedResults = Array.Empty>(), + UpdatedResults = Array.Empty(), + DeletedResults = Array.Empty>() + }; + + // Act + var result = formatter.Format(evt); + + // Assert + Assert.Empty(result); + } + + [Fact] + public void DrasiChangeFormatter_FormatAddedResults_ShouldReturnCorrectFormat() + { + // Arrange + var formatter = new DrasiChangeFormatter(); + var addedItem = new Dictionary { { "id", "123" }, { "name", "test" } }; + var evt = new ChangeEvent + { + QueryId = "test-query", + Sequence = 1, + SourceTimeMs = 1000, + AddedResults = new[] { addedItem }, + UpdatedResults = Array.Empty(), + DeletedResults = Array.Empty>() + }; + + // Act + var result = formatter.Format(evt).ToList(); + + // Assert + Assert.Single(result); + var json = result[0].GetRawText(); + Assert.Contains("\"op\":\"i\"", json); + Assert.Contains("\"queryId\":\"test-query\"", json); + Assert.Contains("\"id\":\"123\"", json); + Assert.Contains("\"name\":\"test\"", json); + } + + [Fact] + public void DrasiChangeFormatter_FormatUpdatedResults_ShouldReturnCorrectFormat() + { + // Arrange + var formatter = new DrasiChangeFormatter(); + var beforeItem = new Dictionary { { "id", "123" }, { "name", "before" } }; + var afterItem = new Dictionary { { "id", "123" }, { "name", "after" } }; + var updatedElement = new UpdatedResultElement { Before = beforeItem, After = afterItem }; + + var evt = new ChangeEvent + { + QueryId = "test-query", + Sequence = 1, + SourceTimeMs = 1000, + AddedResults = Array.Empty>(), + UpdatedResults = new[] { updatedElement }, + DeletedResults = Array.Empty>() + }; + + // Act + var result = formatter.Format(evt).ToList(); + + // Assert + Assert.Single(result); + var json = result[0].GetRawText(); + Assert.Contains("\"op\":\"u\"", json); + Assert.Contains("\"queryId\":\"test-query\"", json); + Assert.Contains("\"before\":{", json); + Assert.Contains("\"after\":{", json); + Assert.Contains("\"name\":\"before\"", json); + Assert.Contains("\"name\":\"after\"", json); + } + + [Fact] + public void DrasiChangeFormatter_FormatDeletedResults_ShouldReturnCorrectFormat() + { + // Arrange + var formatter = new DrasiChangeFormatter(); + var deletedItem = new Dictionary { { "id", "123" }, { "name", "test" } }; + var evt = new ChangeEvent + { + QueryId = "test-query", + Sequence = 1, + SourceTimeMs = 1000, + AddedResults = Array.Empty>(), + UpdatedResults = Array.Empty(), + DeletedResults = new[] { deletedItem } + }; + + // Act + var result = formatter.Format(evt).ToList(); + + // Assert + Assert.Single(result); + var json = result[0].GetRawText(); + Assert.Contains("\"op\":\"d\"", json); + Assert.Contains("\"queryId\":\"test-query\"", json); + Assert.Contains("\"before\":{", json); + Assert.Contains("\"id\":\"123\"", json); + Assert.DoesNotContain("\"after\":{", json); + } + + [Fact] + public void DrasiChangeFormatter_FormatMultipleResults_ShouldReturnCorrectCount() + { + // Arrange + var formatter = new DrasiChangeFormatter(); + var addedItem = new Dictionary { { "id", "123" }, { "name", "test" } }; + var deletedItem = new Dictionary { { "id", "456" }, { "name", "deleted" } }; + var beforeItem = new Dictionary { { "id", "789" }, { "name", "before" } }; + var afterItem = new Dictionary { { "id", "789" }, { "name", "after" } }; + var updatedElement = new UpdatedResultElement { Before = beforeItem, After = afterItem }; + + var evt = new ChangeEvent + { + QueryId = "test-query", + Sequence = 1, + SourceTimeMs = 1000, + AddedResults = new[] { addedItem }, + UpdatedResults = new[] { updatedElement }, + DeletedResults = new[] { deletedItem } + }; + + // Act + var result = formatter.Format(evt).ToList(); + + // Assert + Assert.Equal(3, result.Count); + Assert.Contains(result, r => r.GetRawText().Contains("\"op\":\"i\"")); + Assert.Contains(result, r => r.GetRawText().Contains("\"op\":\"u\"")); + Assert.Contains(result, r => r.GetRawText().Contains("\"op\":\"d\"")); + } + + [Fact] + public void ChangeFormatterFactory_GetFormatter_ShouldReturnDrasiFormatter() + { + // Arrange + var serviceProvider = new Mock(); + var drasiFormatter = new DrasiChangeFormatter(); + + serviceProvider.Setup(s => s.GetService(typeof(DrasiChangeFormatter))).Returns(drasiFormatter); + + var factory = new ChangeFormatterFactory(serviceProvider.Object); + + // Act + var drasiResult = factory.GetFormatter(); + + // Assert + Assert.Same(drasiFormatter, drasiResult); + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ChangeHandlerTests.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ChangeHandlerTests.cs new file mode 100644 index 00000000..4e03c09a --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ChangeHandlerTests.cs @@ -0,0 +1,201 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Xunit; +using Moq; +using Dapr.Client; +using Microsoft.Extensions.Logging; +using Drasi.Reaction.SDK.Models.QueryOutput; +using Drasi.Reactions.PostDaprPubSub.Services; +using System.Text.Json; +using Dapr; + +namespace Drasi.Reactions.PostDaprPubSub.Tests; + +public class ChangeHandlerTests +{ + private readonly Mock _mockDaprClient; + private readonly Mock _mockFormatterFactory; + private readonly Mock> _mockLogger; + private readonly ChangeHandler _handler; + + public ChangeHandlerTests() + { + _mockDaprClient = new Mock(); + _mockFormatterFactory = new Mock(); + _mockLogger = new Mock>(); + + _handler = new ChangeHandler( + _mockDaprClient.Object, + _mockFormatterFactory.Object, + _mockLogger.Object + ); + } + + [Fact] + public async Task HandleChange_NullConfig_ThrowsArgumentNullException() + { + // Arrange + var evt = new ChangeEvent { QueryId = "test-query" }; + + // Act & Assert + await Assert.ThrowsAsync(() => _handler.HandleChange(evt, null)); + } + + [Fact] + public async Task HandleChange_PackedFormat_PublishesPackedEvent() + { + // Arrange + var evt = new ChangeEvent { + QueryId = "test-query", + AddedResults = new[] { new Dictionary { { "id", "1" } } } + }; + var config = new QueryConfig { + PubsubName = "test-pubsub", + TopicName = "test-topic", + Format = OutputFormat.Packed + }; + + _mockDaprClient.Setup(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act + await _handler.HandleChange(evt, config); + + // Assert + _mockDaprClient.Verify(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny() + ), Times.Once); + } + + [Fact] + public async Task HandleChange_UnpackedFormat_PublishesUnpackedEvents() + { + // Arrange + var evt = new ChangeEvent { + QueryId = "test-query", + AddedResults = new[] { new Dictionary { { "id", "1" } } } + }; + var config = new QueryConfig { + PubsubName = "test-pubsub", + TopicName = "test-topic", + Format = OutputFormat.Unpacked + }; + + var mockFormatter = new Mock(); + var formattedElements = new[] { + JsonDocument.Parse("{\"test\":\"value\"}").RootElement + }; + + mockFormatter.Setup(f => f.Format(evt)).Returns(formattedElements); + _mockFormatterFactory.Setup(ff => ff.GetFormatter()).Returns(mockFormatter.Object); + + _mockDaprClient.Setup(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act + await _handler.HandleChange(evt, config); + + // Assert + _mockFormatterFactory.Verify(ff => ff.GetFormatter(), Times.Once); + mockFormatter.Verify(f => f.Format(evt), Times.Once); + _mockDaprClient.Verify(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny() + ), Times.Once); + } + + [Fact] + public async Task HandleChange_PublishFails_ThrowsException() + { + // Arrange + var evt = new ChangeEvent { QueryId = "test-query" }; + var config = new QueryConfig { + PubsubName = "test-pubsub", + TopicName = "test-topic", + Format = OutputFormat.Packed + }; + + var exception = new DaprException("Test error"); + + _mockDaprClient.Setup(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny())) + .ThrowsAsync(exception); + + // Act & Assert + var ex = await Assert.ThrowsAsync(() => _handler.HandleChange(evt, config)); + Assert.Same(exception, ex); + } + + [Fact] + public async Task HandleChange_MultipleUnpackedEvents_PublishesEachEvent() + { + // Arrange + var evt = new ChangeEvent { + QueryId = "test-query", + AddedResults = new[] { + new Dictionary { { "id", "1" } }, + new Dictionary { { "id", "2" } } + } + }; + var config = new QueryConfig { + PubsubName = "test-pubsub", + TopicName = "test-topic", + Format = OutputFormat.Unpacked + }; + + var mockFormatter = new Mock(); + var formattedElements = new[] { + JsonDocument.Parse("{\"id\":\"1\"}").RootElement, + JsonDocument.Parse("{\"id\":\"2\"}").RootElement + }; + + mockFormatter.Setup(f => f.Format(evt)).Returns(formattedElements); + _mockFormatterFactory.Setup(ff => ff.GetFormatter()).Returns(mockFormatter.Object); + + _mockDaprClient.Setup(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act + await _handler.HandleChange(evt, config); + + // Assert + _mockDaprClient.Verify(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny() + ), Times.Exactly(2)); + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ControlSignalHandlerTests.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ControlSignalHandlerTests.cs new file mode 100644 index 00000000..c1204426 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ControlSignalHandlerTests.cs @@ -0,0 +1,221 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Xunit; +using Moq; +using Dapr.Client; +using Microsoft.Extensions.Logging; +using Drasi.Reaction.SDK.Models.QueryOutput; +using Drasi.Reactions.PostDaprPubSub.Services; +using System.Text.Json; +using Dapr; +using Drasi.Reactions.PostDaprPubSub; // Added for QueryConfig + +namespace Drasi.Reactions.PostDaprPubSub.Tests; + +public class ControlSignalHandlerTests +{ + private readonly Mock _mockDaprClient; + private readonly Mock> _mockLogger; + private readonly ControlSignalHandler _handler; + + public ControlSignalHandlerTests() + { + _mockDaprClient = new Mock(); + _mockLogger = new Mock>(); + + _handler = new ControlSignalHandler( + _mockDaprClient.Object, + _mockLogger.Object + ); + } + + [Fact] + public async Task HandleControlSignal_NullConfig_ThrowsArgumentNullException() + { + // Arrange + var evt = new ControlEvent { + QueryId = "test-query", + ControlSignal = new ControlSignalClass { Kind = ControlSignalKind.Running } + }; + + // Act & Assert + await Assert.ThrowsAsync(() => _handler.HandleControlSignal(evt, null!)); + } + + [Fact] + public async Task HandleControlSignal_SkipControlSignals_DoesNotPublish() + { + // Arrange + var evt = new ControlEvent { + QueryId = "test-query", + ControlSignal = new ControlSignalClass { Kind = ControlSignalKind.Running } + }; + var config = new QueryConfig { + PubsubName = "test-pubsub", + TopicName = "test-topic", + SkipControlSignals = true + }; + + // Act + await _handler.HandleControlSignal(evt, config); + + // Assert + _mockDaprClient.Verify(dc => dc.PublishEventAsync( + It.IsAny(), + It.IsAny(), + It.IsAny(), + It.IsAny() + ), Times.Never); + } + + [Fact] + public async Task HandleControlSignal_PackedFormat_PublishesPackedEvent() + { + // Arrange + var evt = new ControlEvent { + QueryId = "test-query", + ControlSignal = new ControlSignalClass { Kind = ControlSignalKind.Running } + }; + var config = new QueryConfig { + PubsubName = "test-pubsub", + TopicName = "test-topic", + Format = OutputFormat.Packed, + SkipControlSignals = false + }; + + _mockDaprClient.Setup(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act + await _handler.HandleControlSignal(evt, config); + + // Assert + _mockDaprClient.Verify(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny() + ), Times.Once); + } + + [Fact] + public async Task HandleControlSignal_UnpackedFormat_PublishesUnpackedEvent() + { + // Arrange + var evt = new ControlEvent { + QueryId = "test-query", + ControlSignal = new ControlSignalClass { Kind = ControlSignalKind.Running } + }; + var config = new QueryConfig { + PubsubName = "test-pubsub", + TopicName = "test-topic", + Format = OutputFormat.Unpacked, + SkipControlSignals = false + }; + + _mockDaprClient.Setup(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Act + await _handler.HandleControlSignal(evt, config); + + // Assert + _mockDaprClient.Verify(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.Is(je => je.GetRawText().Contains("\"kind\":\"running\"")), // Check for unpacked structure + It.IsAny() + ), Times.Once); + } + + [Fact] + public async Task HandleControlSignal_PublishFails_ThrowsException() + { + // Arrange + var evt = new ControlEvent { + QueryId = "test-query", + ControlSignal = new ControlSignalClass { Kind = ControlSignalKind.Running } + }; + var config = new QueryConfig { + PubsubName = "test-pubsub", + TopicName = "test-topic", + Format = OutputFormat.Packed, + SkipControlSignals = false + }; + + var exception = new DaprException("Test error"); + + _mockDaprClient.Setup(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny())) + .ThrowsAsync(exception); + + // Act & Assert + var ex = await Assert.ThrowsAsync(() => _handler.HandleControlSignal(evt, config)); + Assert.Same(exception, ex); + } + + [Fact] + public async Task HandleControlSignal_DifferentControlSignalTypes_FormatsCorrectly() + { + // Arrange and run test for each control signal type + foreach (ControlSignalKind signalKind in Enum.GetValues(typeof(ControlSignalKind))) + { + // Arrange + var evt = new ControlEvent { + QueryId = "test-query", + ControlSignal = new ControlSignalClass { Kind = signalKind } + }; + + var config = new QueryConfig { + PubsubName = "test-pubsub", + TopicName = "test-topic", + Format = OutputFormat.Unpacked, + SkipControlSignals = false + }; + + _mockDaprClient.Setup(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.IsAny(), + It.IsAny())) + .Returns(Task.CompletedTask); + + // Reset call counts before each run + _mockDaprClient.Invocations.Clear(); + + // Act + await _handler.HandleControlSignal(evt, config); + + // Assert + _mockDaprClient.Verify(dc => dc.PublishEventAsync( + config.PubsubName, + config.TopicName, + It.Is(je => je.GetRawText().Contains($"\"kind\":\"{JsonNamingPolicy.CamelCase.ConvertName(signalKind.ToString())}\"")), + It.IsAny() + ), Times.Once, $"Failed for control signal type: {signalKind}"); + } + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/DaprInitializationServiceTests.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/DaprInitializationServiceTests.cs new file mode 100644 index 00000000..7121baba --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/DaprInitializationServiceTests.cs @@ -0,0 +1,109 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Xunit; +using Moq; +using Dapr.Client; +using Microsoft.Extensions.Logging; +using Dapr; + +namespace Drasi.Reactions.PostDaprPubSub.Tests; + +public class DaprInitializationServiceTests +{ + private readonly Mock _mockDaprClient; + private readonly Mock> _mockLogger; + private readonly Mock _mockErrorStateHandler; + private readonly DaprInitializationService _service; + + public DaprInitializationServiceTests() + { + _mockDaprClient = new Mock(); + _mockLogger = new Mock>(); + _mockErrorStateHandler = new Mock(); + + _service = new DaprInitializationService( + _mockDaprClient.Object, + _mockLogger.Object, + _mockErrorStateHandler.Object + ); + } + + [Fact] + public async Task WaitForDaprSidecarAsync_Success_LogsInfoMessage() + { + // Arrange + _mockDaprClient.Setup(d => d.WaitForSidecarAsync(It.IsAny())) + .Returns(Task.CompletedTask); + + // Act + await _service.WaitForDaprSidecarAsync(CancellationToken.None); + + // Assert + _mockDaprClient.Verify(d => d.WaitForSidecarAsync(It.IsAny()), Times.Once); + _mockErrorStateHandler.Verify(e => e.Terminate(It.IsAny()), Times.Never); + } + + [Fact] + public async Task WaitForDaprSidecarAsync_DaprException_TerminatesAndRethrows() + { + // Arrange + var exception = new DaprException("Dapr sidecar not available"); + _mockDaprClient.Setup(d => d.WaitForSidecarAsync(It.IsAny())) + .ThrowsAsync(exception); + + // Act & Assert + var ex = await Assert.ThrowsAsync(() => + _service.WaitForDaprSidecarAsync(CancellationToken.None)); + + Assert.Same(exception, ex); + _mockErrorStateHandler.Verify(e => e.Terminate(It.Is(s => + s.Contains("Dapr sidecar is not available"))), Times.Once); + } + + [Fact] + public async Task WaitForDaprSidecarAsync_OtherException_TerminatesAndRethrows() + { + // Arrange + var exception = new Exception("Unexpected error"); + _mockDaprClient.Setup(d => d.WaitForSidecarAsync(It.IsAny())) + .ThrowsAsync(exception); + + // Act & Assert + var ex = await Assert.ThrowsAsync(() => + _service.WaitForDaprSidecarAsync(CancellationToken.None)); + + Assert.Same(exception, ex); + _mockErrorStateHandler.Verify(e => e.Terminate(It.Is(s => + s.Contains("Unexpected error while waiting for Dapr sidecar"))), Times.Once); + } + + [Fact] + public async Task WaitForDaprSidecarAsync_Cancelled_DoesNotCallTerminate() + { + // Arrange + var cts = new CancellationTokenSource(); + cts.Cancel(); + var cancellationToken = cts.Token; + + _mockDaprClient.Setup(d => d.WaitForSidecarAsync(cancellationToken)) + .ThrowsAsync(new OperationCanceledException()); + + // Act & Assert + await Assert.ThrowsAsync(() => + _service.WaitForDaprSidecarAsync(cancellationToken)); + + _mockErrorStateHandler.Verify(e => e.Terminate(It.IsAny()), Times.Never); + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/Drasi.Reactions.PostDaprPubSub.Tests.csproj b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/Drasi.Reactions.PostDaprPubSub.Tests.csproj new file mode 100644 index 00000000..d6cb48f6 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/Drasi.Reactions.PostDaprPubSub.Tests.csproj @@ -0,0 +1,30 @@ + + + + net9.0 + enable + enable + + false + true + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ErrorStateHandlerTests.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ErrorStateHandlerTests.cs new file mode 100644 index 00000000..7f5c68b2 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/ErrorStateHandlerTests.cs @@ -0,0 +1,50 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Xunit; +using Moq; +using Microsoft.Extensions.Logging; + +namespace Drasi.Reactions.PostDaprPubSub.Tests; + +public class ErrorStateHandlerTests +{ + [Fact] + public void Terminate_CallsTerminateWithError() + { + // This test would typically use reflection or a wrapper to test that the static method is called. + // Since we can't easily mock static methods in C#, this test is more of a placeholder. + // In a real-world scenario, we'd need to refactor to make the code more testable. + + // Arrange + var handler = new ErrorStateHandler(); + var errorMessage = "Test error message"; + + // We're not actually testing anything here since we can't easily verify the static method call + // without introducing additional complexity or using a mocking framework like Typemock Isolator. + + // This is more of a documentation to show the intent of the test. + // In a proper test setup, we'd verify that Reaction.TerminateWithError is called + // with the expected error message. + + // Act - in a real test, we'd catch an exception or use a test fixture + // handler.Terminate(errorMessage); + + // Assert - in a real test, we'd verify the static method was called + // Assert.True(...); + + // For now, we just assert that the class exists and doesn't throw when used + Assert.NotNull(handler); + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/QueryConfigTests.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/QueryConfigTests.cs new file mode 100644 index 00000000..13447f93 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/QueryConfigTests.cs @@ -0,0 +1,122 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.ComponentModel.DataAnnotations; +using Xunit; + +namespace Drasi.Reactions.PostDaprPubSub.Tests; + +public class QueryConfigTests +{ + [Fact] + public void QueryConfig_DefaultValues_ShouldBeCorrect() + { + // Arrange, Act + var config = new QueryConfig(); + + // Assert + Assert.Equal("drasi-pubsub", config.PubsubName); + Assert.Equal(string.Empty, config.TopicName); + Assert.Equal(OutputFormat.Unpacked, config.Format); // Default is unpacked + Assert.False(config.SkipControlSignals); + } + + [Fact] + public void ValidateConfig_ValidConfiguration_ShouldHaveNoErrors() + { + // Arrange + var config = new QueryConfig + { + PubsubName = "test-pubsub", + TopicName = "test-topic" + }; + + var validationContext = new ValidationContext(config); + var validationResults = new List(); + + // Act + var isValid = Validator.TryValidateObject(config, validationContext, validationResults, true); + + // Assert + Assert.True(isValid); + Assert.Empty(validationResults); + } + + [Fact] + public void ValidateConfig_EmptyPubsubName_ShouldHaveError() + { + // Arrange + var config = new QueryConfig + { + PubsubName = "", + TopicName = "test-topic" + }; + + var validationContext = new ValidationContext(config); + var validationResults = new List(); + + // Act + var isValid = Validator.TryValidateObject(config, validationContext, validationResults, true); + + // Assert + Assert.False(isValid); + Assert.Single(validationResults); + Assert.Contains(validationResults, r => r.MemberNames.Contains("PubsubName")); + } + + [Fact] + public void ValidateConfig_EmptyTopicName_ShouldHaveError() + { + // Arrange + var config = new QueryConfig + { + PubsubName = "test-pubsub", + TopicName = "" + }; + + var validationContext = new ValidationContext(config); + var validationResults = new List(); + + // Act + var isValid = Validator.TryValidateObject(config, validationContext, validationResults, true); + + // Assert + Assert.False(isValid); + Assert.Single(validationResults); + Assert.Contains(validationResults, r => r.MemberNames.Contains("TopicName")); + } + + [Fact] + public void QueryConfig_FormatMode_DefaultIsUnpacked() + { + // Arrange, Act + var config = new QueryConfig(); + + // Assert + Assert.Equal(OutputFormat.Unpacked, config.Format); + } + + [Fact] + public void QueryConfig_FormatMode_CanBeSetToPacked() + { + // Arrange, Act + var config = new QueryConfig + { + Format = OutputFormat.Packed + }; + + // Assert + Assert.Equal(OutputFormat.Packed, config.Format); + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/QueryConfigValidationServiceTests.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/QueryConfigValidationServiceTests.cs new file mode 100644 index 00000000..5da35cca --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub.Tests/QueryConfigValidationServiceTests.cs @@ -0,0 +1,163 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Xunit; +using Moq; +using Microsoft.Extensions.Logging; +using Drasi.Reaction.SDK.Services; +using System.Collections.Generic; + +namespace Drasi.Reactions.PostDaprPubSub.Tests; + +public class QueryConfigValidationServiceTests +{ + private readonly Mock> _mockLogger; + private readonly Mock _mockQueryConfigService; + private readonly Mock _mockErrorStateHandler; + private readonly QueryConfigValidationService _service; + + public QueryConfigValidationServiceTests() + { + _mockLogger = new Mock>(); + _mockQueryConfigService = new Mock(); + _mockErrorStateHandler = new Mock(); + + _service = new QueryConfigValidationService( + _mockLogger.Object, + _mockQueryConfigService.Object, + _mockErrorStateHandler.Object + ); + } + + [Fact] + public async Task ValidateQueryConfigsAsync_NoQueries_LogsWarningAndSucceeds() + { + // Arrange + _mockQueryConfigService.Setup(q => q.GetQueryNames()).Returns(new List()); + + // Act + await _service.ValidateQueryConfigsAsync(CancellationToken.None); + + // Assert + _mockErrorStateHandler.Verify(e => e.Terminate(It.IsAny()), Times.Never); + } + + [Fact] + public async Task ValidateQueryConfigsAsync_NullQueryConfig_TerminatesWithError() + { + // Arrange + var queryNames = new List { "test-query" }; + _mockQueryConfigService.Setup(q => q.GetQueryNames()).Returns(queryNames); + _mockQueryConfigService.Setup(q => q.GetQueryConfig("test-query")).Returns((QueryConfig?)null); + + // Act & Assert + var ex = await Assert.ThrowsAsync(() => + _service.ValidateQueryConfigsAsync(CancellationToken.None)); + + Assert.Contains("test-query", ex.Message); + _mockErrorStateHandler.Verify(e => e.Terminate(It.Is(s => s.Contains("test-query"))), Times.Once); + } + + [Fact] + public async Task ValidateQueryConfigsAsync_InvalidConfig_TerminatesWithError() + { + // Arrange + var queryNames = new List { "test-query" }; + var invalidConfig = new QueryConfig { + PubsubName = "", // Empty - invalid + TopicName = "test-topic" + }; + + _mockQueryConfigService.Setup(q => q.GetQueryNames()).Returns(queryNames); + _mockQueryConfigService.Setup(q => q.GetQueryConfig("test-query")).Returns(invalidConfig); + + // Act & Assert + var ex = await Assert.ThrowsAsync(() => + _service.ValidateQueryConfigsAsync(CancellationToken.None)); + + Assert.Contains("test-query", ex.Message); + Assert.Contains("PubsubName", ex.Message); + _mockErrorStateHandler.Verify(e => e.Terminate(It.Is(s => + s.Contains("test-query") && s.Contains("PubsubName"))), Times.Once); + } + + [Fact] + public async Task ValidateQueryConfigsAsync_ValidConfig_Succeeds() + { + // Arrange + var queryNames = new List { "test-query" }; + var validConfig = new QueryConfig { + PubsubName = "test-pubsub", + TopicName = "test-topic" + }; + + _mockQueryConfigService.Setup(q => q.GetQueryNames()).Returns(queryNames); + _mockQueryConfigService.Setup(q => q.GetQueryConfig("test-query")).Returns(validConfig); + + // Act + await _service.ValidateQueryConfigsAsync(CancellationToken.None); + + // Assert + _mockErrorStateHandler.Verify(e => e.Terminate(It.IsAny()), Times.Never); + } + + [Fact] + public async Task ValidateQueryConfigsAsync_MultipleQueries_ValidatesAll() + { + // Arrange + var queryNames = new List { "query1", "query2" }; + var validConfig1 = new QueryConfig { + PubsubName = "pubsub1", + TopicName = "topic1" + }; + var validConfig2 = new QueryConfig { + PubsubName = "pubsub2", + TopicName = "topic2" + }; + + _mockQueryConfigService.Setup(q => q.GetQueryNames()).Returns(queryNames); + _mockQueryConfigService.Setup(q => q.GetQueryConfig("query1")).Returns(validConfig1); + _mockQueryConfigService.Setup(q => q.GetQueryConfig("query2")).Returns(validConfig2); + + // Act + await _service.ValidateQueryConfigsAsync(CancellationToken.None); + + // Assert + _mockErrorStateHandler.Verify(e => e.Terminate(It.IsAny()), Times.Never); + _mockQueryConfigService.Verify(q => q.GetQueryConfig("query1"), Times.Once); + _mockQueryConfigService.Verify(q => q.GetQueryConfig("query2"), Times.Once); + } + + [Fact] + public async Task ValidateQueryConfigsAsync_FirstQueryInvalid_StopsValidationAndTerminates() + { + // Arrange + var queryNames = new List { "query1", "query2" }; + var invalidConfig = new QueryConfig { + PubsubName = "", // Empty - invalid + TopicName = "topic1" + }; + + _mockQueryConfigService.Setup(q => q.GetQueryNames()).Returns(queryNames); + _mockQueryConfigService.Setup(q => q.GetQueryConfig("query1")).Returns(invalidConfig); + + // Act & Assert + var ex = await Assert.ThrowsAsync(() => + _service.ValidateQueryConfigsAsync(CancellationToken.None)); + + // Second query should not be checked after first fails + _mockQueryConfigService.Verify(q => q.GetQueryConfig("query2"), Times.Never); + _mockErrorStateHandler.Verify(e => e.Terminate(It.Is(s => s.Contains("query1"))), Times.Once); + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Drasi.Reactions.PostDaprPubSub.csproj b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Drasi.Reactions.PostDaprPubSub.csproj new file mode 100644 index 00000000..7ce5d6ea --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Drasi.Reactions.PostDaprPubSub.csproj @@ -0,0 +1,18 @@ + + + + Exe + net9.0 + enable + enable + + + + + + + + + + + \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/ErrorStateHandler.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/ErrorStateHandler.cs new file mode 100644 index 00000000..d1fe35e0 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/ErrorStateHandler.cs @@ -0,0 +1,39 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Drasi.Reactions.PostDaprPubSub; + +/// +/// Interface for handling terminal error states. +/// +public interface IErrorStateHandler +{ + /// + /// Terminate the reaction with an error message. + /// + /// Error message explaining the termination reason + void Terminate(string message); +} + +/// +/// Implementation of the error state handler. +/// +public class ErrorStateHandler : IErrorStateHandler +{ + /// + public void Terminate(string message) + { + Reaction.SDK.Reaction.TerminateWithError(message); + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Models/Unpacked.generated.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Models/Unpacked.generated.cs new file mode 100644 index 00000000..25927358 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Models/Unpacked.generated.cs @@ -0,0 +1,601 @@ +// +// +// To parse this JSON data, add NuGet 'System.Text.Json' then do one of these: +// +// using Drasi.Reactions.PostDaprPubSub.Models.Unpacked; +// +// var changeNotification = ChangeNotification.FromJson(jsonString); +// var changePayload = ChangePayload.FromJson(jsonString); +// var changeSource = ChangeSource.FromJson(jsonString); +// var controlPayload = ControlPayload.FromJson(jsonString); +// var controlSignalNotification = ControlSignalNotification.FromJson(jsonString); +// var notification = Notification.FromJson(jsonString); +// var op = Op.FromJson(jsonString); +// var reloadHeader = ReloadHeader.FromJson(jsonString); +// var reloadItem = ReloadItem.FromJson(jsonString); +// var versions = Versions.FromJson(jsonString); +#nullable enable +#pragma warning disable CS8618 +#pragma warning disable CS8601 +#pragma warning disable CS8603 + +namespace Drasi.Reactions.PostDaprPubSub.Models.Unpacked +{ + using System; + using System.Collections.Generic; + + using System.Text.Json; + using System.Text.Json.Serialization; + using System.Globalization; + + public partial class ChangeNotification + { + [JsonPropertyName("op")] + public ChangeNotificationOp Op { get; set; } + + [JsonPropertyName("payload")] + public PayloadClass Payload { get; set; } + + /// + /// The sequence number of the source change + /// + [JsonPropertyName("seq")] + public long Seq { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("metadata")] + public Dictionary Metadata { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public partial class PayloadClass + { + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("after")] + public Dictionary After { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("before")] + public Dictionary Before { get; set; } + + [JsonPropertyName("source")] + public SourceClass Source { get; set; } + } + + public partial class SourceClass + { + /// + /// The ID of the query that the change originated from + /// + [JsonPropertyName("queryId")] + public string QueryId { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public partial class ChangePayload + { + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("after")] + public Dictionary After { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("before")] + public Dictionary Before { get; set; } + + [JsonPropertyName("source")] + public SourceClass Source { get; set; } + } + + public partial class ChangeSource + { + /// + /// The ID of the query that the change originated from + /// + [JsonPropertyName("queryId")] + public string QueryId { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public partial class ControlPayload + { + [JsonPropertyName("kind")] + public string Kind { get; set; } + + [JsonPropertyName("source")] + public SourceClass Source { get; set; } + } + + public partial class ControlSignalNotification + { + [JsonPropertyName("op")] + public ControlSignalNotificationOp Op { get; set; } + + [JsonPropertyName("payload")] + public ControlSignalNotificationPayload Payload { get; set; } + + /// + /// The sequence number of the control signal + /// + [JsonPropertyName("seq")] + public long Seq { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("metadata")] + public Dictionary Metadata { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public partial class ControlSignalNotificationPayload + { + [JsonPropertyName("kind")] + public string Kind { get; set; } + + [JsonPropertyName("source")] + public SourceClass Source { get; set; } + } + + public partial class Notification + { + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("metadata")] + public Dictionary Metadata { get; set; } + + [JsonPropertyName("op")] + public OpEnum Op { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public partial class ReloadHeader + { + [JsonPropertyName("op")] + public ReloadHeaderOp Op { get; set; } + + /// + /// The sequence number of last known source change + /// + [JsonPropertyName("seq")] + public long Seq { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("metadata")] + public Dictionary Metadata { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public partial class ReloadItem + { + [JsonPropertyName("op")] + public ReloadItemOp Op { get; set; } + + [JsonPropertyName("payload")] + public PayloadClass Payload { get; set; } + + [JsonIgnore(Condition = JsonIgnoreCondition.WhenWritingNull)] + [JsonPropertyName("metadata")] + public Dictionary Metadata { get; set; } + + [JsonPropertyName("ts_ms")] + public long TsMs { get; set; } + } + + public enum ChangeNotificationOp { D, I, U }; + + public enum ControlSignalNotificationOp { X }; + + public enum OpEnum { D, H, I, R, U, X }; + + public enum ReloadHeaderOp { H }; + + public enum ReloadItemOp { R }; + + public enum VersionsEnum { V1 }; + + public partial class ChangeNotification + { + public static ChangeNotification FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + public partial class ChangePayload + { + public static ChangePayload FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + public partial class ChangeSource + { + public static ChangeSource FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + public partial class ControlPayload + { + public static ControlPayload FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + public partial class ControlSignalNotification + { + public static ControlSignalNotification FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + public partial class Notification + { + public static Notification FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + public class Op + { + public static OpEnum FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + public partial class ReloadHeader + { + public static ReloadHeader FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + public partial class ReloadItem + { + public static ReloadItem FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + public class Versions + { + public static VersionsEnum FromJson(string json) => JsonSerializer.Deserialize(json, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + public static class Serialize + { + public static string ToJson(this ChangeNotification self) => JsonSerializer.Serialize(self, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + public static string ToJson(this ChangePayload self) => JsonSerializer.Serialize(self, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + public static string ToJson(this ChangeSource self) => JsonSerializer.Serialize(self, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + public static string ToJson(this ControlPayload self) => JsonSerializer.Serialize(self, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + public static string ToJson(this ControlSignalNotification self) => JsonSerializer.Serialize(self, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + public static string ToJson(this Notification self) => JsonSerializer.Serialize(self, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + public static string ToJson(this OpEnum self) => JsonSerializer.Serialize(self, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + public static string ToJson(this ReloadHeader self) => JsonSerializer.Serialize(self, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + public static string ToJson(this ReloadItem self) => JsonSerializer.Serialize(self, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + public static string ToJson(this VersionsEnum self) => JsonSerializer.Serialize(self, Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings); + } + + internal static class Converter + { + public static readonly JsonSerializerOptions Settings = new(JsonSerializerDefaults.General) + { + Converters = + { + ChangeNotificationOpConverter.Singleton, + ControlSignalNotificationOpConverter.Singleton, + OpEnumConverter.Singleton, + ReloadHeaderOpConverter.Singleton, + ReloadItemOpConverter.Singleton, + VersionsEnumConverter.Singleton, + new DateOnlyConverter(), + new TimeOnlyConverter(), + IsoDateTimeOffsetConverter.Singleton + }, + }; + } + + internal class ChangeNotificationOpConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(ChangeNotificationOp); + + public override ChangeNotificationOp Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + switch (value) + { + case "d": + return ChangeNotificationOp.D; + case "i": + return ChangeNotificationOp.I; + case "u": + return ChangeNotificationOp.U; + } + throw new Exception("Cannot unmarshal type ChangeNotificationOp"); + } + + public override void Write(Utf8JsonWriter writer, ChangeNotificationOp value, JsonSerializerOptions options) + { + switch (value) + { + case ChangeNotificationOp.D: + JsonSerializer.Serialize(writer, "d", options); + return; + case ChangeNotificationOp.I: + JsonSerializer.Serialize(writer, "i", options); + return; + case ChangeNotificationOp.U: + JsonSerializer.Serialize(writer, "u", options); + return; + } + throw new Exception("Cannot marshal type ChangeNotificationOp"); + } + + public static readonly ChangeNotificationOpConverter Singleton = new ChangeNotificationOpConverter(); + } + + internal class ControlSignalNotificationOpConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(ControlSignalNotificationOp); + + public override ControlSignalNotificationOp Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + if (value == "x") + { + return ControlSignalNotificationOp.X; + } + throw new Exception("Cannot unmarshal type ControlSignalNotificationOp"); + } + + public override void Write(Utf8JsonWriter writer, ControlSignalNotificationOp value, JsonSerializerOptions options) + { + if (value == ControlSignalNotificationOp.X) + { + JsonSerializer.Serialize(writer, "x", options); + return; + } + throw new Exception("Cannot marshal type ControlSignalNotificationOp"); + } + + public static readonly ControlSignalNotificationOpConverter Singleton = new ControlSignalNotificationOpConverter(); + } + + internal class OpEnumConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(OpEnum); + + public override OpEnum Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + switch (value) + { + case "d": + return OpEnum.D; + case "h": + return OpEnum.H; + case "i": + return OpEnum.I; + case "r": + return OpEnum.R; + case "u": + return OpEnum.U; + case "x": + return OpEnum.X; + } + throw new Exception("Cannot unmarshal type OpEnum"); + } + + public override void Write(Utf8JsonWriter writer, OpEnum value, JsonSerializerOptions options) + { + switch (value) + { + case OpEnum.D: + JsonSerializer.Serialize(writer, "d", options); + return; + case OpEnum.H: + JsonSerializer.Serialize(writer, "h", options); + return; + case OpEnum.I: + JsonSerializer.Serialize(writer, "i", options); + return; + case OpEnum.R: + JsonSerializer.Serialize(writer, "r", options); + return; + case OpEnum.U: + JsonSerializer.Serialize(writer, "u", options); + return; + case OpEnum.X: + JsonSerializer.Serialize(writer, "x", options); + return; + } + throw new Exception("Cannot marshal type OpEnum"); + } + + public static readonly OpEnumConverter Singleton = new OpEnumConverter(); + } + + internal class ReloadHeaderOpConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(ReloadHeaderOp); + + public override ReloadHeaderOp Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + if (value == "h") + { + return ReloadHeaderOp.H; + } + throw new Exception("Cannot unmarshal type ReloadHeaderOp"); + } + + public override void Write(Utf8JsonWriter writer, ReloadHeaderOp value, JsonSerializerOptions options) + { + if (value == ReloadHeaderOp.H) + { + JsonSerializer.Serialize(writer, "h", options); + return; + } + throw new Exception("Cannot marshal type ReloadHeaderOp"); + } + + public static readonly ReloadHeaderOpConverter Singleton = new ReloadHeaderOpConverter(); + } + + internal class ReloadItemOpConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(ReloadItemOp); + + public override ReloadItemOp Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + if (value == "r") + { + return ReloadItemOp.R; + } + throw new Exception("Cannot unmarshal type ReloadItemOp"); + } + + public override void Write(Utf8JsonWriter writer, ReloadItemOp value, JsonSerializerOptions options) + { + if (value == ReloadItemOp.R) + { + JsonSerializer.Serialize(writer, "r", options); + return; + } + throw new Exception("Cannot marshal type ReloadItemOp"); + } + + public static readonly ReloadItemOpConverter Singleton = new ReloadItemOpConverter(); + } + + internal class VersionsEnumConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(VersionsEnum); + + public override VersionsEnum Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + if (value == "v1") + { + return VersionsEnum.V1; + } + throw new Exception("Cannot unmarshal type VersionsEnum"); + } + + public override void Write(Utf8JsonWriter writer, VersionsEnum value, JsonSerializerOptions options) + { + if (value == VersionsEnum.V1) + { + JsonSerializer.Serialize(writer, "v1", options); + return; + } + throw new Exception("Cannot marshal type VersionsEnum"); + } + + public static readonly VersionsEnumConverter Singleton = new VersionsEnumConverter(); + } + + public class DateOnlyConverter : JsonConverter + { + private readonly string serializationFormat; + public DateOnlyConverter() : this(null) { } + + public DateOnlyConverter(string? serializationFormat) + { + this.serializationFormat = serializationFormat ?? "yyyy-MM-dd"; + } + + public override DateOnly Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + return DateOnly.Parse(value!); + } + + public override void Write(Utf8JsonWriter writer, DateOnly value, JsonSerializerOptions options) + => writer.WriteStringValue(value.ToString(serializationFormat)); + } + + public class TimeOnlyConverter : JsonConverter + { + private readonly string serializationFormat; + + public TimeOnlyConverter() : this(null) { } + + public TimeOnlyConverter(string? serializationFormat) + { + this.serializationFormat = serializationFormat ?? "HH:mm:ss.fff"; + } + + public override TimeOnly Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + var value = reader.GetString(); + return TimeOnly.Parse(value!); + } + + public override void Write(Utf8JsonWriter writer, TimeOnly value, JsonSerializerOptions options) + => writer.WriteStringValue(value.ToString(serializationFormat)); + } + + internal class IsoDateTimeOffsetConverter : JsonConverter + { + public override bool CanConvert(Type t) => t == typeof(DateTimeOffset); + + private const string DefaultDateTimeFormat = "yyyy'-'MM'-'dd'T'HH':'mm':'ss.FFFFFFFK"; + + private DateTimeStyles _dateTimeStyles = DateTimeStyles.RoundtripKind; + private string? _dateTimeFormat; + private CultureInfo? _culture; + + public DateTimeStyles DateTimeStyles + { + get => _dateTimeStyles; + set => _dateTimeStyles = value; + } + + public string? DateTimeFormat + { + get => _dateTimeFormat ?? string.Empty; + set => _dateTimeFormat = (string.IsNullOrEmpty(value)) ? null : value; + } + + public CultureInfo Culture + { + get => _culture ?? CultureInfo.CurrentCulture; + set => _culture = value; + } + + public override void Write(Utf8JsonWriter writer, DateTimeOffset value, JsonSerializerOptions options) + { + string text; + + + if ((_dateTimeStyles & DateTimeStyles.AdjustToUniversal) == DateTimeStyles.AdjustToUniversal + || (_dateTimeStyles & DateTimeStyles.AssumeUniversal) == DateTimeStyles.AssumeUniversal) + { + value = value.ToUniversalTime(); + } + + text = value.ToString(_dateTimeFormat ?? DefaultDateTimeFormat, Culture); + + writer.WriteStringValue(text); + } + + public override DateTimeOffset Read(ref Utf8JsonReader reader, Type typeToConvert, JsonSerializerOptions options) + { + string? dateText = reader.GetString(); + + if (string.IsNullOrEmpty(dateText) == false) + { + if (!string.IsNullOrEmpty(_dateTimeFormat)) + { + return DateTimeOffset.ParseExact(dateText, _dateTimeFormat, Culture, _dateTimeStyles); + } + else + { + return DateTimeOffset.Parse(dateText, Culture, _dateTimeStyles); + } + } + else + { + return default(DateTimeOffset); + } + } + + + public static readonly IsoDateTimeOffsetConverter Singleton = new IsoDateTimeOffsetConverter(); + } +} +#pragma warning restore CS8618 +#pragma warning restore CS8601 +#pragma warning restore CS8603 diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Program.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Program.cs new file mode 100644 index 00000000..f02ea72b --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Program.cs @@ -0,0 +1,62 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Drasi.Reaction.SDK; +using Drasi.Reactions.PostDaprPubSub; +using Drasi.Reactions.PostDaprPubSub.Services; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +try +{ + var reaction = new ReactionBuilder() + .UseChangeEventHandler() + .UseControlEventHandler() + .UseJsonQueryConfig() + .ConfigureServices((services) => + { + // Register formatters + services.AddSingleton(); + services.AddSingleton(); + + // Register services + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + + // Register Dapr client + services.AddDaprClient(); + }) + .Build(); + + var logger = reaction.Services.GetRequiredService>(); + logger.LogInformation("Starting PostDaprPubSub reaction"); + + // Step 1. Wait for Dapr sidecar + var daprInitService = reaction.Services.GetRequiredService(); + await daprInitService.WaitForDaprSidecarAsync(CancellationToken.None); + + // Step 2. Validate query configurations + var validationService = reaction.Services.GetRequiredService(); + await validationService.ValidateQueryConfigsAsync(CancellationToken.None); + + // Step 3. Start the reaction + await reaction.StartAsync(); +} +catch (Exception ex) +{ + var errorStateHandler = new ErrorStateHandler(); + errorStateHandler.Terminate($"Fatal error starting PostDaprPubSub reaction: {ex.Message}"); + throw; +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/QueryConfig.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/QueryConfig.cs new file mode 100644 index 00000000..0ef81793 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/QueryConfig.cs @@ -0,0 +1,72 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System.ComponentModel.DataAnnotations; +using System.Text.Json.Serialization; + +namespace Drasi.Reactions.PostDaprPubSub; + +/// +/// Specifies the format mode for publishing events. +/// +public enum OutputFormat +{ + /// + /// Send individual messages for each change (default). + /// + Unpacked = 0, + + /// + /// Send the entire ChangeEvent as a single message. + /// + Packed = 1 +} + +/// +/// Configuration for the PostDaprPubSub reaction. +/// Maps Drasi queries to Dapr pubsub topics. +/// +public class QueryConfig : IValidatableObject +{ + /// + /// Name of the Dapr pubsub component to use for publishing. + /// + [Required] + [JsonPropertyName("pubsubName")] + public string PubsubName { get; set; } = "drasi-pubsub"; + + /// + /// Name of the topic to publish events to. + /// + [Required] + [JsonPropertyName("topicName")] + public string TopicName { get; set; } = string.Empty; + + /// + /// Specifies how events should be formatted when published. + /// + [JsonPropertyName("format")] + public OutputFormat Format { get; set; } = OutputFormat.Unpacked; + + /// + /// Whether to skip publishing control signals to the topic. + /// + [JsonPropertyName("skipControlSignals")] + public bool SkipControlSignals { get; set; } = false; + + public IEnumerable Validate(ValidationContext validationContext) + { + yield break; + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/ChangeFormatterFactory.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/ChangeFormatterFactory.cs new file mode 100644 index 00000000..bb397ff7 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/ChangeFormatterFactory.cs @@ -0,0 +1,47 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Microsoft.Extensions.DependencyInjection; + +namespace Drasi.Reactions.PostDaprPubSub.Services; + +/// +/// Factory for creating appropriate change formatters based on the event format. +/// +public interface IChangeFormatterFactory +{ + /// + /// Gets a formatter for the specified event format. + /// + /// An appropriate change formatter + IChangeFormatter GetFormatter(); +} + +/// +/// Implementation of the change formatter factory. +/// +public class ChangeFormatterFactory : IChangeFormatterFactory +{ + private readonly IServiceProvider _serviceProvider; + + public ChangeFormatterFactory(IServiceProvider serviceProvider) + { + _serviceProvider = serviceProvider ?? throw new ArgumentNullException(nameof(serviceProvider)); + } + + public IChangeFormatter GetFormatter() + { + return _serviceProvider.GetRequiredService(); + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/ChangeHandler.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/ChangeHandler.cs new file mode 100644 index 00000000..c9ac251a --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/ChangeHandler.cs @@ -0,0 +1,81 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Drasi.Reactions.PostDaprPubSub.Services; + +using Dapr.Client; +using Drasi.Reaction.SDK; +using Drasi.Reaction.SDK.Models.QueryOutput; +using Microsoft.Extensions.Logging; +using System.Text.Json; + +public class ChangeHandler : IChangeEventHandler +{ + private readonly DaprClient _daprClient; + private readonly IChangeFormatterFactory _formatterFactory; + private readonly ILogger _logger; + + public ChangeHandler( + DaprClient daprClient, + IChangeFormatterFactory formatterFactory, + ILogger logger) + { + _daprClient = daprClient ?? throw new ArgumentNullException(nameof(daprClient)); + _formatterFactory = formatterFactory ?? throw new ArgumentNullException(nameof(formatterFactory)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task HandleChange(ChangeEvent evt, QueryConfig? config) + { + var queryId = evt.QueryId; + var queryConfig = config + ?? throw new ArgumentNullException(nameof(config), $"Query configuration is null for query {queryId}"); + + _logger.LogDebug("Processing change event for query {QueryId} with pubsub {PubsubName} and topic {TopicName}", + queryId, queryConfig.PubsubName, queryConfig.TopicName); + + if (queryConfig.Format == OutputFormat.Packed) + { + // Send the complete change event as a single message + await PublishPackedEvent(evt, queryConfig); + } + else + { + // Format and send individual events for each change + await PublishUnpackedEvents(evt, queryConfig); + } + } + + private async Task PublishPackedEvent(ChangeEvent evt, QueryConfig queryConfig) + { + var serializedEvent = JsonSerializer.Serialize(evt, ModelOptions.JsonOptions); + using var doc = JsonDocument.Parse(serializedEvent); + await _daprClient.PublishEventAsync(queryConfig.PubsubName, queryConfig.TopicName, doc.RootElement); + _logger.LogDebug("Published packed event for query {QueryId}", evt.QueryId); + } + + private async Task PublishUnpackedEvents(ChangeEvent evt, QueryConfig queryConfig) + { + var formatter = _formatterFactory.GetFormatter(); + var events = formatter.Format(evt); + + foreach (var eventData in events) + { + await _daprClient.PublishEventAsync(queryConfig.PubsubName, queryConfig.TopicName, eventData); + } + + _logger.LogDebug("Published {Count} unpacked events for query {QueryId}", + events.Count(), evt.QueryId); + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/ControlSignalHandler.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/ControlSignalHandler.cs new file mode 100644 index 00000000..a1ca723b --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/ControlSignalHandler.cs @@ -0,0 +1,87 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +namespace Drasi.Reactions.PostDaprPubSub.Services; + +using Dapr.Client; +using Drasi.Reaction.SDK; +using Drasi.Reaction.SDK.Models.QueryOutput; +using Drasi.Reactions.PostDaprPubSub.Models.Unpacked; +using Microsoft.Extensions.Logging; +using System.Text.Json; + +public class ControlSignalHandler : IControlEventHandler +{ + private readonly DaprClient _daprClient; + private readonly ILogger _logger; + + public ControlSignalHandler( + DaprClient daprClient, + ILogger logger) + { + _daprClient = daprClient ?? throw new ArgumentNullException(nameof(daprClient)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + } + + public async Task HandleControlSignal(ControlEvent evt, QueryConfig? config) + { + var queryId = evt.QueryId; + var queryConfig = config + ?? throw new ArgumentNullException(nameof(config), $"Query configuration is null for query {queryId}"); + + if (queryConfig.SkipControlSignals) + { + _logger.LogDebug("Skipping control signal {SignalType} for query {QueryId} (skipControlSignals=true)", + evt.ControlSignal.Kind, queryId); + return; + } + + _logger.LogDebug("Processing control signal {SignalType} for query {QueryId} with pubsub {PubsubName} and topic {TopicName}", + evt.ControlSignal.Kind, queryId, queryConfig.PubsubName, queryConfig.TopicName); + + if (queryConfig.Format == OutputFormat.Packed) + { + // Send the complete control event as a single message + var serializedEvent = JsonSerializer.Serialize(evt, ModelOptions.JsonOptions); + using var doc = JsonDocument.Parse(serializedEvent); + await _daprClient.PublishEventAsync(queryConfig.PubsubName, queryConfig.TopicName, doc.RootElement); + _logger.LogDebug("Published packed control signal for query {QueryId}", queryId); + } + else + { + // Create and send an unpacked control signal + var notification = new ControlSignalNotification + { + Op = ControlSignalNotificationOp.X, + TsMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Payload = new ControlSignalNotificationPayload() + { + Kind = JsonSerializer.Serialize(evt.ControlSignal.Kind, ModelOptions.JsonOptions).Trim('"'), + Source = new SourceClass() + { + QueryId = queryId, + TsMs = evt.SourceTimeMs + } + } + }; + + var serializedData = JsonSerializer.Serialize(notification, Converter.Settings); + using var doc = JsonDocument.Parse(serializedData); + JsonElement serializedEvent = doc.RootElement.Clone(); + + await _daprClient.PublishEventAsync(queryConfig.PubsubName, queryConfig.TopicName, serializedEvent); + _logger.LogDebug("Published unpacked control signal for query {QueryId}", queryId); + } + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/DaprInitializationService.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/DaprInitializationService.cs new file mode 100644 index 00000000..f9ed0e85 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/DaprInitializationService.cs @@ -0,0 +1,70 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Dapr; +using Dapr.Client; +using Microsoft.Extensions.Logging; + +namespace Drasi.Reactions.PostDaprPubSub; + +public interface IDaprInitializationService +{ + Task WaitForDaprSidecarAsync(CancellationToken cancellationToken); +} + +public class DaprInitializationService : IDaprInitializationService +{ + private readonly DaprClient _daprClient; + private readonly ILogger _logger; + private readonly IErrorStateHandler _errorStateHandler; + + public DaprInitializationService( + DaprClient daprClient, + ILogger logger, + IErrorStateHandler errorStateHandler) + { + _daprClient = daprClient ?? throw new ArgumentNullException(nameof(daprClient)); + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _errorStateHandler = errorStateHandler ?? throw new ArgumentNullException(nameof(errorStateHandler)); + } + + public async Task WaitForDaprSidecarAsync(CancellationToken cancellationToken) + { + _logger.LogDebug("Waiting for Dapr sidecar to be available..."); + try + { + await _daprClient.WaitForSidecarAsync(cancellationToken); + _logger.LogInformation("Dapr sidecar is available."); + } + catch (OperationCanceledException) + { + _logger.LogWarning("Waiting for Dapr sidecar was canceled."); + throw; // Rethrow to allow the caller to handle cancellation + } + catch (DaprException ex) + { + var errorMessage = "Dapr sidecar is not available."; + _logger.LogError(ex, errorMessage); + _errorStateHandler.Terminate(errorMessage); + throw; + } + catch (Exception ex) + { + var errorMessage = "Unexpected error while waiting for Dapr sidecar."; + _logger.LogError(ex, errorMessage); + _errorStateHandler.Terminate(errorMessage); + throw; + } + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/DrasiChangeFormatter.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/DrasiChangeFormatter.cs new file mode 100644 index 00000000..dbf83b01 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/DrasiChangeFormatter.cs @@ -0,0 +1,101 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Drasi.Reactions.PostDaprPubSub.Models.Unpacked; +using Drasi.Reaction.SDK.Models.QueryOutput; +using System.Text.Json; + +namespace Drasi.Reactions.PostDaprPubSub.Services; + +/// +/// Formatter for Drasi native format. +/// +public class DrasiChangeFormatter : IChangeFormatter +{ + public IEnumerable Format(ChangeEvent evt) + { + var notificationList = new List(); + foreach (var inputItem in evt.AddedResults) + { + var outputItem = new ChangeNotification + { + Op = ChangeNotificationOp.I, + TsMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Payload = new PayloadClass() + { + Source = new SourceClass() + { + QueryId = evt.QueryId, + TsMs = evt.SourceTimeMs + }, + After = inputItem + } + }; + notificationList.Add(outputItem); + } + + foreach (var inputItem in evt.UpdatedResults) + { + var outputItem = new ChangeNotification + { + Op = ChangeNotificationOp.U, + TsMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Payload = new PayloadClass() + { + Source = new SourceClass() + { + QueryId = evt.QueryId, + TsMs = evt.SourceTimeMs + }, + Before = inputItem.Before, + After = inputItem.After + } + }; + notificationList.Add(outputItem); + } + + foreach (var inputItem in evt.DeletedResults) + { + var outputItem = new ChangeNotification + { + Op = ChangeNotificationOp.D, + TsMs = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(), + Payload = new PayloadClass() + { + Source = new SourceClass() + { + QueryId = evt.QueryId, + TsMs = evt.SourceTimeMs + }, + Before = inputItem + } + }; + notificationList.Add(outputItem); + } + + var result = new List(); + foreach (var item in notificationList) + { + var serializedDataJson = JsonSerializer.Serialize( + item, + Drasi.Reactions.PostDaprPubSub.Models.Unpacked.Converter.Settings + ); + + using var doc = JsonDocument.Parse(serializedDataJson); + JsonElement serializedEvent = doc.RootElement.Clone(); + result.Add(serializedEvent); + } + return result; + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/IChangeFormatter.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/IChangeFormatter.cs new file mode 100644 index 00000000..f8a57abc --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/IChangeFormatter.cs @@ -0,0 +1,31 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Drasi.Reaction.SDK.Models.QueryOutput; +using System.Text.Json; + +namespace Drasi.Reactions.PostDaprPubSub.Services; + +/// +/// Interface for formatting change events. +/// +public interface IChangeFormatter +{ + /// + /// Formats a change event into a collection of JSON elements. + /// + /// The change event to format + /// A collection of formatted JSON elements + IEnumerable Format(ChangeEvent evt); +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/QueryConfigValidationService.cs b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/QueryConfigValidationService.cs new file mode 100644 index 00000000..07c7cb34 --- /dev/null +++ b/reactions/dapr/post-pubsub/Drasi.Reactions.PostDaprPubSub/Services/QueryConfigValidationService.cs @@ -0,0 +1,87 @@ +// Copyright 2024 The Drasi Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Drasi.Reaction.SDK.Services; +using Microsoft.Extensions.Logging; +using System.ComponentModel.DataAnnotations; +using System.Text; + +namespace Drasi.Reactions.PostDaprPubSub; + +public interface IQueryConfigValidationService +{ + Task ValidateQueryConfigsAsync(CancellationToken cancellationToken); +} + +public class QueryConfigValidationService : IQueryConfigValidationService +{ + private readonly ILogger _logger; + private readonly IQueryConfigService _queryConfigService; + private readonly IErrorStateHandler _errorStateHandler; + + public QueryConfigValidationService( + ILogger logger, + IQueryConfigService queryConfigService, + IErrorStateHandler errorStateHandler) + { + _logger = logger ?? throw new ArgumentNullException(nameof(logger)); + _queryConfigService = queryConfigService ?? throw new ArgumentNullException(nameof(queryConfigService)); + _errorStateHandler = errorStateHandler ?? throw new ArgumentNullException(nameof(errorStateHandler)); + } + + public Task ValidateQueryConfigsAsync(CancellationToken cancellationToken) + { + _logger.LogDebug("Validating query configurations..."); + + var queryNames = _queryConfigService.GetQueryNames(); + if (!queryNames.Any()) + { + _logger.LogWarning("No queries configured."); + return Task.CompletedTask; + } + + foreach (var queryName in queryNames) + { + QueryConfig? queryConfig; + queryConfig = _queryConfigService.GetQueryConfig(queryName); + if (queryConfig == null) + { + var errorMessage = $"Query configuration for '{queryName}' is null."; + _logger.LogError(errorMessage); + _errorStateHandler.Terminate(errorMessage); + throw new InvalidProgramException(errorMessage); + } + + var validationResults = new List(); + if (!Validator.TryValidateObject(queryConfig, new ValidationContext(queryConfig), validationResults, validateAllProperties: true)) + { + var errors = new StringBuilder($"Configuration validation failed for query {queryName}:"); + foreach (var validationResult in validationResults) + { + var members = string.Join(", ", validationResult.MemberNames); + errors.AppendLine().Append($" - {validationResult.ErrorMessage}. Members: {members}"); + } + + var errorMessage = errors.ToString(); + _errorStateHandler.Terminate(errorMessage); + throw new InvalidProgramException(errorMessage); + } + + _logger.LogDebug("Validated Query configuration for '{QueryName}'.", queryName); + } + + _logger.LogInformation("Validated query configurations."); + return Task.CompletedTask; + } +} \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/Makefile b/reactions/dapr/post-pubsub/Makefile new file mode 100644 index 00000000..d6940219 --- /dev/null +++ b/reactions/dapr/post-pubsub/Makefile @@ -0,0 +1,34 @@ +.PHONY: default docker-build kind-load k3d-load test clean + +CLUSTER_NAME ?= kind +IMAGE_PREFIX ?= drasi-project +DOCKER_TAG_VERSION ?= latest +DOCKERX_OPTS ?= --load --cache-to type=inline,mode=max + +default: docker-build + +# Build the Docker image for the reaction +docker-build: + docker buildx build . --no-cache -t $(IMAGE_PREFIX)/reaction-post-dapr-pubsub:$(DOCKER_TAG_VERSION) $(DOCKERX_OPTS) -f Dockerfile + +# Load the built image into the specified Kind cluster +kind-load: + kind load docker-image $(IMAGE_PREFIX)/reaction-post-dapr-pubsub:$(DOCKER_TAG_VERSION) --name $(CLUSTER_NAME) + +# Load the built image into the specified k3d cluster +k3d-load: CLUSTER_NAME=k3s-default +k3d-load: + k3d image import $(IMAGE_PREFIX)/reaction-post-dapr-pubsub:$(DOCKER_TAG_VERSION) -c $(CLUSTER_NAME) + +# Run unit tests +test: + dotnet test post-dapr-pubsub.sln + +lint-check: + @echo "No lint checks to run yet" + +# Clean build artifacts +clean: + dotnet clean post-dapr-pubsub.sln + rm -rf */bin */obj + rm -rf Drasi.Reactions.PostDaprPubSub.Tests/TestResults \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/README.md b/reactions/dapr/post-pubsub/README.md new file mode 100644 index 00000000..8ca95cd4 --- /dev/null +++ b/reactions/dapr/post-pubsub/README.md @@ -0,0 +1,78 @@ +# Post Dapr PubSub Reaction + +This reaction forwards Drasi query results to Dapr PubSub topics. It allows mapping each Drasi query to a specific Dapr pubsub component and topic. + +## Features + +- Maps Drasi queries to Dapr PubSub topics +- Supports both packed and unpacked event formats (unpacked is default, using Drasi native format) +- Forwards both change events and control signals +- Configurable per query +- Validation of configurations at startup +- Leverages Dapr's built-in retry mechanisms for publishing reliability + +## Configuration + +The reaction is configured using JSON for each query. The configuration includes: + +| Parameter | Description | Default | Required | +|-----------|-------------|---------|----------| +| `pubsubName` | Name of the Dapr PubSub component | `drasi-pubsub` | Yes | +| `topicName` | Name of the topic to publish to | - | Yes | +| `packed` | Whether to send events in packed format (`true`) or unpacked (`false`) | `false` | No | +| `skipControlSignals` | Skip publishing control signals | `false` | No | + +### Example Configuration + +```yaml +kind: Reaction +apiVersion: v1 +name: post-dapr-pubsub +spec: + kind: PostDaprPubSub + properties: + # No global properties needed for this reaction + queries: + example-query: | + { + "pubsubName": "drasi-pubsub", + "topicName": "example-topic", + "packed": false, + "skipControlSignals": false + } + another-query: | + { + "pubsubName": "messaging", + "topicName": "data-updates", + "packed": true, + "skipControlSignals": false + } + control-signals-skipped: | + { + "pubsubName": "drasi-pubsub", + "topicName": "changes-only", + # "packed" defaults to false + "skipControlSignals": true + } +``` + +## Error Handling and Retries + +If an error occurs while attempting to publish an event to the configured Dapr Pub/Sub topic (e.g., the Dapr sidecar is temporarily unavailable or the Pub/Sub component returns an error), the reaction will throw an exception. + +Drasi's internal infrastructure, which uses a Dapr Pub/Sub component to deliver messages to reactions, will handle this exception. This typically results in the message being redelivered to the `PostDaprPubSub` reaction later, effectively retrying the publish operation. The exact retry behavior (number of retries, backoff strategy) is governed by the Dapr Pub/Sub component's configuration used internally by Drasi for reaction message delivery. This ensures that transient issues during publishing do not lead to lost messages, relying on Dapr's inherent resilience. + +## Event Formats + +### Packed vs. Unpacked + +- **Packed format**: The entire ChangeEvent or ControlEvent is sent as a single message to the topic. +- **Unpacked format (Default)**: Individual messages are created for each change or control signal. + +### Drasi Native Format (for Unpacked Events) + +The native Drasi format uses these operation types: +- Insert operations (I): For new data added +- Update operations (U): For data changes +- Delete operations (D): For data removed +- Control signals (X): For system events \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/post-dapr-pubsub.sln b/reactions/dapr/post-pubsub/post-dapr-pubsub.sln new file mode 100644 index 00000000..fbe6a60f --- /dev/null +++ b/reactions/dapr/post-pubsub/post-dapr-pubsub.sln @@ -0,0 +1,30 @@ +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 17 +VisualStudioVersion = 17.5.33424.131 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Drasi.Reactions.PostDaprPubSub", "Drasi.Reactions.PostDaprPubSub\Drasi.Reactions.PostDaprPubSub.csproj", "{8EB4E5E8-0D1A-4FEB-B267-70A9ACF6E70A}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Drasi.Reactions.PostDaprPubSub.Tests", "Drasi.Reactions.PostDaprPubSub.Tests\Drasi.Reactions.PostDaprPubSub.Tests.csproj", "{DABD4F69-337F-4FEE-ABC2-F183F0DACE8A}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {8EB4E5E8-0D1A-4FEB-B267-70A9ACF6E70A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {8EB4E5E8-0D1A-4FEB-B267-70A9ACF6E70A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {8EB4E5E8-0D1A-4FEB-B267-70A9ACF6E70A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {8EB4E5E8-0D1A-4FEB-B267-70A9ACF6E70A}.Release|Any CPU.Build.0 = Release|Any CPU + {DABD4F69-337F-4FEE-ABC2-F183F0DACE8A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DABD4F69-337F-4FEE-ABC2-F183F0DACE8A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DABD4F69-337F-4FEE-ABC2-F183F0DACE8A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DABD4F69-337F-4FEE-ABC2-F183F0DACE8A}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {06283812-D392-4A63-A3E4-43C8B6B9F63D} + EndGlobalSection +EndGlobal \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/reaction-provider.yaml b/reactions/dapr/post-pubsub/reaction-provider.yaml new file mode 100644 index 00000000..ac17a7be --- /dev/null +++ b/reactions/dapr/post-pubsub/reaction-provider.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: ReactionProvider +name: PostDaprPubSub +spec: + services: + reaction: + image: reaction-post-dapr-pubsub \ No newline at end of file diff --git a/reactions/dapr/post-pubsub/reaction.yaml b/reactions/dapr/post-pubsub/reaction.yaml new file mode 100644 index 00000000..9c32db6c --- /dev/null +++ b/reactions/dapr/post-pubsub/reaction.yaml @@ -0,0 +1,29 @@ +kind: Reaction +apiVersion: v1 +name: post-dapr-pubsub +spec: + kind: PostDaprPubSub + properties: + # No global properties needed for this reaction + queries: + example-query: | + { + "pubsubName": "drasi-pubsub", + "topicName": "example-topic", + "packed": false, + "skipControlSignals": false + } + another-query: | + { + "pubsubName": "messaging", + "topicName": "data-updates", + "packed": true, + "skipControlSignals": false + } + control-signals-skipped: | + { + "pubsubName": "drasi-pubsub", + "topicName": "changes-only", + "packed": false, + "skipControlSignals": true + } \ No newline at end of file