Skip to main content

Webhooks and Streaming

Patterns has native support for stream processing. A common automation is to have a webhook ingesting real-time events from some external system, processing these events with a series of python nodes, and then sending a response, either to a communication channel like Slack or Discord, or to trigger an action in another external system.

Streams

Processing a Table as a stream is simple in Patterns:

transactions = Table("transactions")
transactions_stream = transactions.as_stream(order_by="processed_at")

for tx in transactions_stream:
# Do something w the transaction
...

In this example, each transaction will be processed exactly once, in order of the processed_at field, no matter how many times this python is run. The node accomplishes this by tracking the latest value processed in its state. This enables real-time and incremental processing of data.

All Tables have associated Schemas that can provide a default or natural ordering to records so that there's no need to specify the order_by argument. Webhooks, for example, provide a unique monotonic id as their default ordering, so you can do this:

webhook_events = Table("webhook_events")

for event in webhook_events.as_stream():
# Do something w the event
...

and each event will be processed exactly once in the order the events were received.

danger

Note that the order_by field on the stream should be strictly monotonic -- that is each value is unique and ordered in creation time. If not, then there can't be any guarantee that each record will be processed once and only once if a node's execution is interrupted for any reason (time-out or other error). Ordering by a timestamp like created_at will generally work but if there are many records with the same timestamp, the stream has no way to checkpoint its progress within the group, and so will risk skipping or re-processing records if interrupted mid-group. It is thus recommended to use unique values to order streams.

Webhooks

Webhooks accept data via POST request with a JSON payload that is a single record or an array of records:

curl -X POST https://api-production.patterns.app/api/app/webhooks/... \
-H 'Content-Type: application/json' \
-d '{"a":"hello","b":1}'

or an array:

curl -X POST https://api-production.patterns.app/api/app/webhooks/... \
-H 'Content-Type: application/json' \
-d '[{"a":"hello","b":1},{"a":"bye","b":2}]'