Skip to content

Commit 2348baf

Browse files
Add Post-Dapr-PubSub Reaction (#226)
Signed-off-by: Aman Singh <[email protected]>
1 parent 290a7e6 commit 2348baf

37 files changed

+3187
-1
lines changed

.github/workflows/build-test.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,11 @@ jobs:
306306
label: 'SyncDaprStateStore',
307307
path: './reactions/dapr/sync-statestore',
308308
name: 'reaction-sync-dapr-statestore'
309+
},
310+
{
311+
label: 'PostDaprPubSub',
312+
path: './reactions/dapr/post-pubsub',
313+
name: 'reaction-post-dapr-pubsub'
309314
}
310315
]
311316
steps:

.github/workflows/draft-release.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ env:
6969
{"label": "Result", "path": "reactions/platform/result-reaction", "name": "reaction-result", "platforms": "linux/amd64,linux/arm64"},
7070
{"label": "StorageQueue", "path": "reactions/azure/storagequeue-reaction", "name": "reaction-storage-queue", "platforms": "linux/amd64,linux/arm64"},
7171
{"label": "SyncDaprStateStore", "path": "reactions/dapr/sync-statestore", "name": "reaction-sync-dapr-statestore", "platforms": "linux/amd64,linux/arm64"},
72+
{"label": "PostDaprPubSub", "path": "reactions/dapr/post-pubsub", "name": "reaction-post-dapr-pubsub", "platforms": "linux/amd64,linux/arm64"},
7273
{"label": "StoredProc", "path": "reactions/sql/storedproc-reaction", "name": "reaction-storedproc", "platforms": "linux/amd64,linux/arm64"}]'
7374

7475
jobs:

cli/installers/resources/default-reaction-providers.yaml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,4 +247,13 @@ name: SyncDaprStateStore
247247
spec:
248248
services:
249249
reaction:
250-
image: reaction-sync-dapr-statestore
250+
image: reaction-sync-dapr-statestore
251+
---
252+
apiVersion: v1
253+
kind: ReactionProvider
254+
name: PostDaprPubSub
255+
spec:
256+
services:
257+
reaction:
258+
image: reaction-post-dapr-pubsub
259+
Lines changed: 310 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,310 @@
1+
const { describe, beforeAll, afterAll, test, expect } = require('@jest/globals');
2+
const { Client: PgClient } = require('pg');
3+
const { createClient: createRedisClient } = require('redis');
4+
const yaml = require('js-yaml');
5+
const fs = require('fs');
6+
const path = require('path');
7+
const PortForward = require('../fixtures/port-forward');
8+
const deployResources = require('../fixtures/deploy-resources');
9+
const deleteResources = require('../fixtures/delete-resources');
10+
const { waitFor } = require('../fixtures/infrastructure'); // Added import
11+
12+
const SCENARIO_DIR = __dirname;
13+
const K8S_RESOURCES_FILE = path.join(SCENARIO_DIR, 'resources.yaml');
14+
const SOURCES_FILE = path.join(SCENARIO_DIR, 'sources.yaml');
15+
const QUERIES_FILE = path.join(SCENARIO_DIR, 'queries.yaml');
16+
const REACTION_PROVIDER_FILE = path.join(SCENARIO_DIR, 'reaction-provider.yaml');
17+
const REACTIONS_FILE = path.join(SCENARIO_DIR, 'reactions.yaml');
18+
19+
const POSTGRES_SERVICE_NAME = 'pubsub-test-db';
20+
const POSTGRES_NAMESPACE = 'default';
21+
const POSTGRES_PORT = 5432;
22+
const POSTGRES_USER = 'testuser';
23+
const POSTGRES_PASSWORD = 'testpassword';
24+
const POSTGRES_DATABASE = 'testdb';
25+
26+
const DAPR_PUBSUB_REDIS_SERVICE_NAME = 'dapr-pubsub-redis-svc';
27+
const DAPR_PUBSUB_REDIS_NAMESPACE = 'default';
28+
const DAPR_PUBSUB_REDIS_PORT = 6379;
29+
30+
const PACKED_TOPIC = 'e2e-topic-packed';
31+
const UNPACKED_TOPIC = 'e2e-topic-unpacked';
32+
33+
function loadYaml(filePath) {
34+
const content = fs.readFileSync(filePath, 'utf8');
35+
return yaml.loadAll(content);
36+
}
37+
38+
async function clearRedisStream(redisClient, streamKey) {
39+
try {
40+
// XTRIM with MAXLEN 0 deletes all entries.
41+
await redisClient.xTrim(streamKey, 'MAXLEN', 0);
42+
console.log(`Cleared Redis stream: ${streamKey}`);
43+
} catch (err) {
44+
// Ignore if stream doesn't exist (error code 'ERR no such key')
45+
if (err.message && !err.message.toLowerCase().includes('no such key')) {
46+
console.error(`Error clearing Redis stream ${streamKey}:`, err);
47+
} else {
48+
console.log(`Redis stream ${streamKey} did not exist or already cleared.`);
49+
}
50+
}
51+
}
52+
53+
async function getMessagesFromRedisStream(redisClient, streamKey, lastId = '0-0') {
54+
try {
55+
const messages = await redisClient.xRange(streamKey, '-', '+');
56+
if (!messages || messages.length === 0) {
57+
return [];
58+
}
59+
return messages.map(msg => {
60+
const id = msg.id;
61+
const fields = msg.message;
62+
if (fields && fields.data) {
63+
try {
64+
return { id, data: JSON.parse(fields.data) };
65+
} catch (e) {
66+
console.warn(`Failed to parse JSON from 'data' field in message ${id} from stream ${streamKey}:`, fields.data);
67+
return { id, data: fields.data };
68+
}
69+
}
70+
console.warn(`Message ${id} from stream ${streamKey} did not have a 'data' field:`, fields);
71+
return { id, data: fields };
72+
});
73+
} catch (err) {
74+
if (err.message && err.message.toLowerCase().includes("no such key")) {
75+
return [];
76+
}
77+
console.error(`Error reading from Redis stream ${streamKey}:`, err);
78+
throw err;
79+
}
80+
}
81+
82+
83+
describe('PostDaprPubSub Reaction with Redis Stream Verification', () => {
84+
let pgClient;
85+
let redisClient;
86+
let pgPortForward;
87+
let redisPortForward;
88+
89+
const k8sResources = loadYaml(K8S_RESOURCES_FILE);
90+
const sourceResources = loadYaml(SOURCES_FILE);
91+
const queryResources = loadYaml(QUERIES_FILE);
92+
const reactionProviderResources = loadYaml(REACTION_PROVIDER_FILE);
93+
const reactionResources = loadYaml(REACTIONS_FILE);
94+
95+
const allResourceDefinitions = [
96+
...k8sResources,
97+
...sourceResources,
98+
...queryResources,
99+
...reactionProviderResources,
100+
...reactionResources,
101+
];
102+
103+
104+
beforeAll(async () => {
105+
console.log("Starting E2E test setup for PostDaprPubSub (Redis)...");
106+
try {
107+
// 1. deploy all k8s resouces first
108+
console.log("Deploying K8s resources...");
109+
await deployResources(k8sResources);
110+
111+
// 2. then wait for 15 seconds
112+
console.log("Waiting for K8s resources to stabilize...");
113+
await waitFor({ timeoutMs: 10000, description: "K8s resources to stabilize" });
114+
115+
// 3. then deploy sources.yaml
116+
console.log("Deploying Drasi Source resources...");
117+
await deployResources(sourceResources);
118+
119+
// 4. then deploy queries.yaml
120+
console.log("Deploying Drasi Query resources...");
121+
await deployResources(queryResources);
122+
123+
// 5. Then deploy reaction-provider
124+
console.log("Deploying Drasi ReactionProvider resources...");
125+
await deployResources(reactionProviderResources);
126+
127+
// 6. then deploy reaction
128+
console.log("Deploying Drasi Reaction resources...");
129+
await deployResources(reactionResources);
130+
console.log("All Drasi resources deployed.");
131+
132+
pgPortForward = new PortForward(POSTGRES_SERVICE_NAME, POSTGRES_PORT, POSTGRES_NAMESPACE);
133+
const localPgPort = await pgPortForward.start();
134+
pgClient = new PgClient({
135+
host: 'localhost',
136+
port: localPgPort,
137+
user: POSTGRES_USER,
138+
password: POSTGRES_PASSWORD,
139+
database: POSTGRES_DATABASE,
140+
});
141+
await pgClient.connect();
142+
console.log("Connected to PostgreSQL via port forward.");
143+
144+
redisPortForward = new PortForward(DAPR_PUBSUB_REDIS_SERVICE_NAME, DAPR_PUBSUB_REDIS_PORT, DAPR_PUBSUB_REDIS_NAMESPACE);
145+
const localRedisPort = await redisPortForward.start();
146+
redisClient = createRedisClient({ url: `redis://localhost:${localRedisPort}` });
147+
await redisClient.connect();
148+
console.log("Connected to Dapr Pub/Sub Redis via port forward.");
149+
150+
console.log("Waiting for 15 more seconds after all setup...");
151+
await waitFor({ timeoutMs: 15000, description: "all of the setup to stabilize" });
152+
153+
} catch (error) {
154+
console.error("Error during beforeAll setup:", error);
155+
if (pgPortForward) pgPortForward.stop();
156+
if (redisPortForward) redisPortForward.stop();
157+
if (pgClient) await pgClient.end().catch(console.error);
158+
if (redisClient) await redisClient.quit().catch(console.error);
159+
await deleteResources(allResourceDefinitions).catch(err => console.error("Cleanup failed during error handling:", err));
160+
throw error;
161+
}
162+
}, 300000); // 5 minutes timeout for setup
163+
164+
afterAll(async () => {
165+
console.log("Starting E2E test teardown...");
166+
if (pgClient) await pgClient.end().catch(err => console.error("Error closing PG client:", err));
167+
if (redisClient) await redisClient.quit().catch(err => console.error("Error quitting Redis client:", err));
168+
169+
if (pgPortForward) pgPortForward.stop();
170+
if (redisPortForward) redisPortForward.stop();
171+
172+
console.log("Attempting to delete Drasi and K8s resources...");
173+
await deleteResources(allResourceDefinitions).catch(err => console.error("Error during deleteResources:", err));
174+
console.log("Teardown complete.");
175+
}, 300000); // 5 minutes timeout for teardown
176+
177+
test('PACKED: should publish a packed ChangeEvent to the correct Redis Stream on INSERT', async () => {
178+
await clearRedisStream(redisClient, PACKED_TOPIC);
179+
180+
const newProductName = `Test Product Packed ${Date.now()}`;
181+
const newProductPrice = 99.99;
182+
await pgClient.query(
183+
"INSERT INTO product (name, description, price) VALUES ($1, 'Packed Test Desc', $2)",
184+
[newProductName, newProductPrice]
185+
);
186+
187+
const receivedMessages = await waitFor({
188+
actionFn: () => getMessagesFromRedisStream(redisClient, PACKED_TOPIC),
189+
predicateFn: (messages) => messages && messages.length >= 1,
190+
timeoutMs: 10000,
191+
pollIntervalMs: 1000,
192+
description: `packed message for product "${newProductName}" to appear in Redis stream ${PACKED_TOPIC}`
193+
});
194+
195+
expect(receivedMessages).toBeDefined();
196+
expect(receivedMessages.length).toEqual(1);
197+
198+
const cloudEvent = receivedMessages[0].data;
199+
expect(cloudEvent).toBeDefined();
200+
expect(cloudEvent.topic).toBe(PACKED_TOPIC);
201+
202+
const drasiEvent = cloudEvent.data;
203+
expect(drasiEvent).toBeDefined();
204+
expect(drasiEvent.payload).toBeDefined();
205+
expect(drasiEvent.payload.after).toBeDefined();
206+
expect(drasiEvent.payload.after.name).toBe(newProductName);
207+
expect(drasiEvent.op).toBe('i');
208+
expect(parseFloat(drasiEvent.payload.after.price)).toBe(newProductPrice);
209+
}, 20000);
210+
211+
test('UNPACKED: should publish individual unpacked change notifications on INSERT', async () => {
212+
await clearRedisStream(redisClient, UNPACKED_TOPIC);
213+
214+
const newProductName = `Test Product Unpacked ${Date.now()}`;
215+
const newProductPrice = 49.50;
216+
await pgClient.query(
217+
"INSERT INTO product (name, description, price) VALUES ($1, 'Unpacked Test Desc', $2)",
218+
[newProductName, newProductPrice]
219+
);
220+
221+
const receivedMessages = await waitFor({
222+
actionFn: () => getMessagesFromRedisStream(redisClient, UNPACKED_TOPIC),
223+
predicateFn: (messages) => messages && messages.length >= 1,
224+
timeoutMs: 10000,
225+
pollIntervalMs: 1000,
226+
description: `unpacked message for product "${newProductName}" to appear in Redis stream ${UNPACKED_TOPIC}`
227+
});
228+
229+
expect(receivedMessages).toBeDefined();
230+
expect(receivedMessages.length).toEqual(1);
231+
232+
const cloudEvent = receivedMessages[0].data;
233+
expect(cloudEvent).toBeDefined();
234+
expect(cloudEvent.topic).toBe(UNPACKED_TOPIC);
235+
236+
const drasiEvent = cloudEvent.data;
237+
expect(drasiEvent).toBeDefined();
238+
expect(drasiEvent.op).toBe('i'); // Insert operation
239+
expect(drasiEvent.payload).toBeDefined();
240+
expect(drasiEvent.payload.source).toBeDefined();
241+
expect(drasiEvent.payload.source.queryId).toBe('product-updates-unpacked');
242+
expect(drasiEvent.payload.after).toBeDefined();
243+
expect(drasiEvent.payload.after.name).toBe(newProductName);
244+
expect(parseFloat(drasiEvent.payload.after.price)).toBe(newProductPrice);
245+
expect(drasiEvent.payload.before).toBeUndefined();
246+
}, 20000);
247+
248+
test('UNPACKED: should publish individual unpacked change notifications on UPDATE', async () => {
249+
// Ensure a product exists to update.
250+
const productNameToUpdate = `ProductToUpdate ${Date.now()}`;
251+
const initialDescription = "Initial Description";
252+
const initialPrice = 50.00;
253+
await pgClient.query(
254+
"INSERT INTO product (name, description, price) VALUES ($1, $2, $3)",
255+
[productNameToUpdate, initialDescription, initialPrice]
256+
);
257+
258+
await waitFor({
259+
actionFn: () => getMessagesFromRedisStream(redisClient, UNPACKED_TOPIC),
260+
predicateFn: (messages) => messages && messages.length >= 1,
261+
timeoutMs: 10000,
262+
pollIntervalMs: 1000,
263+
description: `propagation of initial insert event for "${productNameToUpdate}" to appear in Redis stream ${UNPACKED_TOPIC}`
264+
});
265+
266+
await clearRedisStream(redisClient, UNPACKED_TOPIC); // Clear before update
267+
268+
const updatedDescription = 'High-performance laptop - Updated Model';
269+
await pgClient.query(
270+
"UPDATE product SET description = $1 WHERE name = $2",
271+
[updatedDescription, productNameToUpdate]
272+
);
273+
274+
const receivedMessages = await waitFor({
275+
actionFn: async () => {
276+
const allMessages = await getMessagesFromRedisStream(redisClient, UNPACKED_TOPIC);
277+
// Filter for an 'update' (op: 'u') event
278+
return allMessages.filter(msg =>
279+
msg.data &&
280+
msg.data.data && // Drasi event level
281+
msg.data.data.op === 'u'
282+
);
283+
},
284+
predicateFn: (filteredUpdateMessages) => filteredUpdateMessages && filteredUpdateMessages.length === 1,
285+
timeoutMs: 10000,
286+
pollIntervalMs: 1000,
287+
description: `unpacked update message for product "${productNameToUpdate}" in Redis stream ${UNPACKED_TOPIC}`
288+
});
289+
290+
expect(receivedMessages).toBeDefined();
291+
expect(receivedMessages.length).toEqual(1);
292+
293+
const cloudEvent = receivedMessages[0].data;
294+
expect(cloudEvent).toBeDefined();
295+
expect(cloudEvent.topic).toBe(UNPACKED_TOPIC);
296+
297+
const drasiEvent = cloudEvent.data;
298+
expect(drasiEvent).toBeDefined();
299+
expect(drasiEvent.op).toBe('u'); // Update operation
300+
expect(drasiEvent.payload).toBeDefined();
301+
expect(drasiEvent.payload.source).toBeDefined();
302+
expect(drasiEvent.payload.source.queryId).toBe('product-updates-unpacked');
303+
expect(drasiEvent.payload.after).toBeDefined();
304+
expect(drasiEvent.payload.after.name).toBe(productNameToUpdate);
305+
expect(drasiEvent.payload.after.description).toBe(updatedDescription);
306+
expect(drasiEvent.payload.before).toBeDefined();
307+
expect(drasiEvent.payload.before.name).toBe(productNameToUpdate);
308+
expect(drasiEvent.payload.before.description).toBe(initialDescription);
309+
}, 20000);
310+
});
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
apiVersion: v1
2+
kind: ContinuousQuery
3+
name: product-updates-packed
4+
spec:
5+
mode: query
6+
sources:
7+
subscriptions:
8+
- id: pubsub-pg-source
9+
nodes:
10+
- sourceLabel: product
11+
query: >
12+
MATCH
13+
(p:product)
14+
RETURN
15+
p.product_id AS product_id,
16+
p.name AS name,
17+
p.description AS description,
18+
p.price AS price
19+
---
20+
apiVersion: v1
21+
kind: ContinuousQuery
22+
name: product-updates-unpacked
23+
spec:
24+
mode: query
25+
sources:
26+
subscriptions:
27+
- id: pubsub-pg-source
28+
nodes:
29+
- sourceLabel: product
30+
query: >
31+
MATCH
32+
(p:product)
33+
RETURN
34+
p.product_id AS product_id,
35+
p.name AS name,
36+
p.description AS description,
37+
p.price AS price
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
apiVersion: v1
2+
kind: ReactionProvider
3+
name: PostDaprPubSub
4+
spec:
5+
services:
6+
reaction:
7+
image: reaction-post-dapr-pubsub

0 commit comments

Comments
 (0)