-
Notifications
You must be signed in to change notification settings - Fork 50
Description
Abstract
This is a proposal for a new Drasi Reaction, Drasi.Reactions.DaprWorkflows
, to integrate Drasi Continuous Query (CQ) results with Dapr Workflows. This reaction will enable users to automate Dapr workflow operations (start, raise event, restart, terminate) based on real-time data changes detected by Drasi. The goal is to simplify the creation of change-driven architectures where Dapr Workflows act upon insights derived by Drasi.
This RFC aims to gather feedback on the proposed design and configuration for the Dapr Workflows Reaction.
Motivation
Users combining Drasi for change detection with Dapr Workflows for process automation currently require custom integration logic. A dedicated reaction would standardize this integration, reducing boilerplate code and providing a configurable bridge between these two systems.
Proposed Solution
The Drasi.Reactions.DaprWorkflows
reaction will monitor Drasi CQ outputs and, based on user configuration, interact with the Dapr workflow engine.
Core Capabilities:
- Workflow Lifecycle Management:
- Start Dapr workflows for new items added to a CQ's result set.
- Interact with running workflows (raise events, restart, terminate) for updated CQ items.
- Terminate workflows for deleted CQ items.
- Data Transformation: Allow JMESPath transformation of Drasi change data before passing it to workflow operations.
- Instance ID Management: Support random instance IDs or derive them from a CQ result key field.
- Stateful Tracking (Optional): Utilize a Dapr state store to map CQ result keys to workflow instance IDs for reliable targeting of existing workflows. This will be required for raising events to, restarting or terminating previously launched workflows.
Configuration Schema
Reaction-Level Configuration
StateStoreName
(string, optional):- Description: Name of the Dapr state store component for tracking workflow instances.
- Purpose: Enables reliable targeting of existing workflow instances for operations like
RaiseEvent
,RestartWorkflow
, orTerminateWorkflow
.
Per-Query Configuration (JSON)
{
"KeyField": "orderId", // Required: Field in CQ result for unique identification.
"WorkflowName": "OrderProcessingWorkflow", // Required: Target Dapr Workflow name.
"AddedAction": { // Optional: Action for new CQ items.
"Action": "StartWorkflow", // "None", "StartWorkflow"
"InstanceIdStrategy": "ResultKey", // "Random", "ResultKey"
"InputTransform": {
"TransformType": "JMESPath", // "None", "JMESPath"
"Expression": "{order_details: @, customer_id: customer.id}"
}
},
"UpdatedAction": { // Optional: Action for updated CQ items.
"Action": "RaiseEvent", // "None", "RaiseEvent", "RestartWorkflow", "TerminateWorkflow"
"EventName": "OrderUpdatedEvent", // Required if Action is "RaiseEvent"
"InputTransform": {
"TransformType": "JMESPath",
"Expression": "{update_payload: changes}"
}
},
"DeletedAction": { // Optional: Action for deleted CQ items.
"Action": "TerminateWorkflow", // "None", "TerminateWorkflow"
"InputTransform": {
"TransformType": "JMESPath",
"Expression": "{reason: 'Item removed from source'}"
}
}
}
Detailed Breakdown of Per-Query Options:
-
KeyField
(string, required)- Description: The field in each Drasi CQ result item that uniquely identifies it.
- Example:
"orderId"
,"deviceId"
-
WorkflowName
(string, required)- Description: The registered name/type of the Dapr Workflow to interact with.
- Example:
"OrderProcessingWorkflow"
,"ShipmentTrackerWorkflow"
-
AddedAction
(object, optional, default:Action: "None"
)- Configures what happens when a new item appears in the CQ results.
Action
(string):"None"
: Take no action."StartWorkflow"
: Start a new Dapr workflow instance.
InstanceIdStrategy
(string, default:"Random"
, applicable ifAction
is"StartWorkflow"
):"Random"
: The reaction generates a random GUID as the workflow instance ID."ResultKey"
: The reaction uses the value of theKeyField
from the CQ result item as the workflow instance ID. This allows for idempotent start operations if the same item is processed again and for easier correlation.
InputTransform
(object, optional):- Defines how to transform the Drasi
ChangeEvent
data before sending it as input to the workflow. TransformType
:"None"
or"JMESPath"
.Expression
: The JMESPath expression ifTransformType
is"JMESPath"
. The input to the JMESPath expression is the entire DrasiChangeEvent
object for the added item.
- Defines how to transform the Drasi
-
UpdatedAction
(object, optional, default:Action: "None"
)- Configures what happens when an existing item in the CQ results is updated.
Action
(string):"None"
: Take no action."RaiseEvent"
: Raise an event to an existing workflow instance. RequiresStateStoreName
to be configured at the reaction level to locate existing workflow instances."RestartWorkflow"
: Attempt to restart an existing workflow instance. RequiresStateStoreName
to locate existing workflow instances."TerminateWorkflow"
: Terminate an existing workflow instance. RequiresStateStoreName
to locate existing workflow instances.
EventName
(string, required ifAction
is"RaiseEvent"
): The name of the event to be raised on the workflow instance.InputTransform
(object, optional):- Similar to
AddedAction.InputTransform
, but transforms theChangeEvent
for the updated item.
- Similar to
-
DeletedAction
(object, optional, default:Action: "None"
)- Configures what happens when an item is removed from the CQ results.
Action
(string):"None"
: Take no action."TerminateWorkflow"
: Terminate an existing workflow instance. RequiresStateStoreName
(to locate existing workflow instances).
InputTransform
(object, optional):- Similar to
AddedAction.InputTransform
, but transforms theChangeEvent
for the deleted item.
- Similar to
Example Reaction YAML
kind: Reaction
apiVersion: v1
name: my-order-workflow-trigger
spec:
kind: DaprWorkflows
properties:
StateStoreName: "mystatestore" # Optional: Dapr state store for instance tracking
queries:
orderProcessingQuery: |
{
"KeyField": "orderId",
"WorkflowName": "OrderProcessingWorkflow",
"AddedAction": {
"Action": "StartWorkflow",
"InstanceIdStrategy": "ResultKey",
"InputTransform": {
"TransformType": "JMESPath",
"Expression": "{ \"order_details\": \"data\", \"customer_id\": \"data.customer.id\" }"
}
},
"UpdatedAction": {
"Action": "RaiseEvent",
"EventName": "OrderDetailsChanged",
"InputTransform": {
"TransformType": "JMESPath",
"Expression": "{ \"updated_fields\": \"data.changes\" }"
}
},
"DeletedAction": {
"Action": "TerminateWorkflow",
"InputTransform": {
"TransformType": "JMESPath",
"Expression": "{ \"termination_reason\": \"Order removed from system\" }"
}
}
}
inventoryUpdateQuery: |
{
"KeyField": "productId",
"WorkflowName": "InventoryUpdateWorkflow",
"AddedAction": {
"Action": "StartWorkflow",
"InstanceIdStrategy": "Random",
"InputTransform": {
"TransformType": "None"
}
},
"UpdatedAction": {
"Action": "None"
},
"DeletedAction": {
"Action": "None"
}
}