Skip to content

Commit dba1b32

Browse files
Merge pull request #106 from amansinghoriginal/mw
Docs for middleware - parseJson, promote, decoder
2 parents 7ba25b9 + 1efe91a commit dba1b32

File tree

2 files changed

+207
-1
lines changed

2 files changed

+207
-1
lines changed

.github/config/en-drasi.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,3 +140,4 @@ GTID
140140
GTIDs
141141
kube
142142
eventGridSchema
143+
ParseJson

docs/content/concepts/middleware/_index.md

Lines changed: 206 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,4 +188,209 @@ spec:
188188
RETURN
189189
s.sensorId,
190190
s.currentValue
191-
```
191+
```
192+
193+
### Promote
194+
195+
The **promote** middleware processes `SourceChange` events (`Insert` and `Update`) to copy values from deep-nested locations inside an element’s `properties` map to new top-level properties. Selection is performed with JSONPath expressions, and each promoted value is written under an explicit `target_name`. This is useful for flattening complex structures or making frequently accessed data more readily available.
196+
197+
The configuration for the **promote** component is as follows:
198+
199+
| Property | Type | Description | Required | Default |
200+
|----------------------|-------------------------------------|------------------------------------------------------------------------------------------------------------|----------|---------------|
201+
| `kind` | String | Must be **promote**. | Yes | |
202+
| `name` | String | The name of this configuration, that can be used in a source pipeline. | Yes | |
203+
| `config` | Object | Contains the specific configuration for the promote middleware. | Yes | |
204+
| `config.mappings` | Array of *Mapping* objects | Defines the promotion rules. Must contain at least one mapping entry. | Yes | – |
205+
| `config.on_conflict` | `"overwrite"` \| `"skip"` \| `"fail"` | Specifies the action to take if a `target_name` already exists in the top-level properties of the element. | No | `"overwrite"` |
206+
| `config.on_error` | `"skip"` \| `"fail"` | Determines the behavior when a mapping encounters an error (e.g., JSONPath selects 0 or >1 items, type conversion fails). | No | `"fail"` |
207+
208+
The `Mapping Object` within the `config.mappings` array has the following properties:
209+
210+
| Property | Type | Description | Required |
211+
|---------------|--------|--------------------------------------------------------------------------------|----------|
212+
| `path` | String | A [JSONPath](https://en.wikipedia.org/wiki/JSONPath) expression that must select exactly one value from the element's properties. | Yes |
213+
| `target_name` | String | The name of the new top-level property that will receive the selected value. | Yes |
214+
215+
#### Example
216+
217+
Here's an example of how to configure the **promote** middleware to extract user and order data to top-level properties. This configuration will attempt to promote several fields; if a target property already exists, it will be skipped, and if a JSONPath expression fails to resolve, that specific mapping will be skipped.
218+
219+
```yaml
220+
spec:
221+
sources:
222+
middleware:
223+
- name: promote_user_and_order_data
224+
kind: promote
225+
config:
226+
mappings:
227+
- path: "$.user.id"
228+
target_name: "userId"
229+
- path: "$.user.location.city"
230+
target_name: "city"
231+
- path: "$.order.total"
232+
target_name: "orderTotal"
233+
- path: "$.metadata" # Promoting an entire object
234+
target_name: "meta"
235+
on_conflict: skip # Keep existing values if 'userId', 'city', etc. already exist
236+
on_error: skip # Skip mappings that error (e.g., if '$.order.total' doesn't exist)
237+
```
238+
239+
For instance, if an incoming node has properties like this:
240+
```json
241+
{
242+
"user": {
243+
"id": "user123",
244+
"location": {
245+
"city": "New York"
246+
}
247+
},
248+
"order": {
249+
"total": 100.50
250+
},
251+
"metadata": { "source": "api", "version": "1.1" }
252+
}
253+
```
254+
After processing with the `promote_user_and_order_data` middleware configured above, the node's properties would be transformed to:
255+
```json
256+
{
257+
"user": {
258+
"id": "user123",
259+
"location": {
260+
"city": "New York"
261+
}
262+
},
263+
"order": {
264+
"total": 100.50
265+
},
266+
"metadata": { "source": "api", "version": "1.1" },
267+
"userId": "user123",
268+
"city": "New York",
269+
"orderTotal": 100.50,
270+
"meta": { "source": "api", "version": "1.1" }
271+
}
272+
```
273+
The `userId`, `city`, `orderTotal`, and `meta` fields are now available as top-level properties.
274+
275+
### ParseJson
276+
277+
The **parse_json** middleware processes `SourceChange` events (specifically `Insert` and `Update`) to parse a string property containing a JSON document into a structured `ElementValue` (Object or List). This is useful when a data source provides JSON data embedded within a string field, which needs to be accessible as a structured object or array for querying.
278+
279+
The configuration for the **parse_json** component is as follows:
280+
281+
| Property | Type | Description | Required | Default |
282+
|---------------------------|-------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
283+
| `kind` | String | Must be **parse_json**. | Yes | |
284+
| `name` | String | The name of this configuration, that can be used in a source pipeline. | Yes | |
285+
| `config` | Object | Contains the specific configuration for the parse_json middleware. | Yes | |
286+
| `config.target_property` | String | The name of the element property containing the JSON string to be parsed. | Yes | |
287+
| `config.output_property` | String | Optional. The name of the property where the parsed `ElementValue` should be stored. If omitted or `null`, `target_property` will be overwritten. | No | `null` |
288+
| `config.on_error` | String (`"skip"` or `"fail"`) | Defines behavior when an error occurs (e.g., target property not found, value is not a string, JSON parsing fails, or conversion fails). `"skip"` logs a warning and passes the change through unchanged; `"fail"` stops processing and returns an error. | No | `"fail"` |
289+
| `config.max_json_size` | Integer (bytes) | Maximum size (in bytes) of the JSON string that will be parsed. Helps guard against unexpectedly large payloads. | No | `1_048_576` (1MB) |
290+
| `config.max_nesting_depth`| Integer | Maximum allowed nesting depth for objects/arrays within the JSON document. Prevents issues with excessively nested structures. | No | `20` |
291+
292+
#### Example
293+
294+
Here's an example of how to configure the **parse_json** middleware to parse a JSON string from the `raw_event_json` property and store the resulting structured object in a new property named `event_details`. If an error occurs during parsing, the change will be skipped.
295+
296+
```yaml
297+
spec:
298+
sources:
299+
middleware:
300+
- name: parse_event_data
301+
kind: parse_json
302+
config:
303+
target_property: "raw_event_json"
304+
output_property: "event_details"
305+
on_error: "skip"
306+
```
307+
308+
For example, if an incoming node has properties like:
309+
```json
310+
{
311+
"id": "event001",
312+
"timestamp": "2025-06-01T12:00:00Z",
313+
"raw_event_json": "{\"user\": \"alice\", \"action\": \"login\", \"details\": {\"ip\": \"192.168.1.100\"}}"
314+
}
315+
```
316+
After processing with the `parse_event_data` middleware, the node's properties would be transformed to:
317+
```json
318+
{
319+
"id": "event001",
320+
"timestamp": "2025-06-01T12:00:00Z",
321+
"raw_event_json": "{\"user\": \"alice\", \"action\": \"login\", \"details\": {\"ip\": \"192.168.1.100\"}}",
322+
"event_details": {
323+
"user": "alice",
324+
"action": "login",
325+
"details": {
326+
"ip": "192.168.1.100"
327+
}
328+
}
329+
}
330+
```
331+
The `event_details` property now contains the parsed JSON object, which can be queried directly.
332+
333+
### Decoder
334+
335+
The **decoder** middleware processes `SourceChange` events (specifically `Insert` and `Update`) to decode a string value found in a specified property of an `Element`. It supports various common encoding formats such as Base64, Hexadecimal, URL encoding, and JSON string escapes. This is useful when data from a source is encoded for transmission or storage.
336+
337+
The configuration for the **decoder** component is as follows:
338+
339+
| Property | Type | Description | Required | Default |
340+
|---------------------------|------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|----------|---------|
341+
| `kind` | String | Must be **decoder**. | Yes | |
342+
| `name` | String | The name of this configuration, that can be used in a source pipeline. | Yes | |
343+
| `config` | Object | Contains the specific configuration for the decoder middleware. | Yes | |
344+
| `config.encoding_type` | String | The encoding format of the `target_property` value. Supported types: `base64`, `base64url`, `hex`, `url`, `json_escape`. | Yes | |
345+
| `config.target_property` | String | The name of the element property containing the encoded string to be decoded. | Yes | |
346+
| `config.output_property` | String | Optional. The name of the property where the decoded string should be stored. If omitted or `null`, `target_property` will be overwritten. | No | `null` |
347+
| `config.strip_quotes` | Boolean | If `true`, removes surrounding double quotes (`"`) from the `target_property` value *before* attempting to decode it. | No | `false` |
348+
| `config.on_error` | String (`"skip"` or `"fail"`) | Defines behavior when an error occurs (e.g., target property not found, value is not a string, or decoding fails). `"skip"` logs a warning and passes the change through unchanged; `"fail"` stops processing and returns an error. | No | `"fail"`|
349+
350+
#### Encoding Types Supported:
351+
* **`base64`**: Standard Base64 encoding (RFC 4648).
352+
* **`base64url`**: URL-safe Base64 encoding (RFC 4648 §5), without padding.
353+
* **`hex`**: Hexadecimal encoding (e.g., `48656c6c6f`).
354+
* **`url`**: Percent-encoding (e.g., `Hello%20World`).
355+
* **`json_escape`**: Decodes JSON string escape sequences (e.g., `\"`, `\\`, `\n`, `\uXXXX`). Assumes the input is the *content* of a JSON string literal, not the literal itself including quotes.
356+
357+
#### Example
358+
359+
Here's an example of how to configure the **decoder** middleware to decode a Base64 encoded string. The string is located in the `raw_user_payload` property, may be surrounded by quotes which should be stripped, and the decoded result will be stored in a new `user_data` property. Errors will be skipped.
360+
361+
```yaml
362+
spec:
363+
sources:
364+
middleware:
365+
- name: decode_user_data
366+
kind: decoder
367+
config:
368+
encoding_type: "base64"
369+
target_property: "raw_user_payload"
370+
output_property: "user_data"
371+
strip_quotes: true
372+
on_error: "skip"
373+
```
374+
375+
For example, if an incoming node has properties like:
376+
```json
377+
{
378+
"message_id": "msg123",
379+
"raw_user_payload": "\"SGVsbG8gV29ybGQh\""
380+
}
381+
```
382+
(The string `SGVsbG8gV29ybGQh` is "Hello World!" encoded in Base64.)
383+
384+
After processing with the `decode_user_data` middleware:
385+
1. `strip_quotes: true` removes the surrounding double quotes from `"SGVsbG8gV29ybGQh"` to yield `SGVsbG8gV29ybGQh`.
386+
2. This resulting string is then Base64 decoded to `Hello World!`.
387+
388+
The node's properties would be transformed to:
389+
```json
390+
{
391+
"message_id": "msg123",
392+
"raw_user_payload": "\"SGVsbG8gV29ybGQh\"",
393+
"user_data": "Hello World!"
394+
}
395+
```
396+
The `user_data` property now contains the decoded string "Hello World!".

0 commit comments

Comments
 (0)