Skip to main content

Building Components

Components in Patterns are reusable graphs. They consist of one or more nodes, are given a unique slug name, and are uploaded to the publishing organization's component library. They can be simple utilities, like a SQL deduplication node, or complex flows, like an entire underwriting model and scoring system.

Quick example

For now, component development is best done locally with help from the Patterns Devkit. Install it if you haven’t already ( pip install patterns-devkit ) and then login ( patterns login ) to connect to the Patterns platform. Sign up at studio.patterns.app if you don’t have an account.

Node files

For our example, we’ll make a simple one node component that takes in a record stream and augments the records with a timestamp:

augment_with_timestamp.py
from datetime import datetime, timezone

from patterns import *

@node
def augment_with_timestamp(
input_stream=InputStream,
output_stream=OutputStream,
timestamp_field_name=Parameter(type=str, default="timestamp"),
):
for record in input_stream:
record[timestamp_field_name] = datetime.now(timezone.utc)
output_stream.write(record)

To make this node into a component we will make a create a graph.yml and fill in the special fields slug, version, and exposes. In addition, it is recommended to add a description_file (markdown), tags, and an icon image (which will display in graph UI and component marketplace).

graph.yml

title: Augment w/ Timestamp
slug: augment-with-timestamp
description_file: augment-with-timestamp.md
icon: augment-icon.svg
version: 1.0.0
tags:
- utilities
exposes:
inputs:
- input_stream
outputs:
- output_stream
parameters:
- timestamp_field_name
functions:
- node_file: augment_with_timestamp.py

We can then upload and publish our component to our organization with the devkit command patterns upload path/to/graph.yml --publish-component. (You may see “graph errors” when you upload a component that has an unconnected input or unfilled parameter, these are ok to ignore, since they will be connected by users of your component) Now if we run patterns list components we should see it included in the output.

Using components

To use the component in a different graph, we need to include it as a node with the uses field of our graph.yml file:

title: Example using component
functions:
- node_file: generate_stream.py
- uses: my-org/augment-with-timestamp@v0
inputs:
input_stream: my_stream
parameter_values:
timestamp_field_name: processed_at
stores:
- stream: my_stream
- stream: output_stream

Advanced topics

Schemas

Schemas are recommended for any component where the structure of the data is known ahead of time. Schemas allow users to get clean and documented data, and allow other components to safely interoperate. Here’s a (truncated) example schema file:

schemas/Order.yml

name: Order
description: E-commerce order
unique_on:
- id
field_roles:
created_ordering: created_at
updated_ordering: updated_at
immutable: false
fields:
id:
type: Integer
customer_id:
type: Text

In our component graph.yml we can then include the schema and expose it:

title: Augment w/ Timestamp
slug: augment-with-timestamp
version: 1.0.0
exposes:
inputs:
- input_stream
outputs:
- output_stream
parameters:
- timestamp_field_name
schemas:
- Order
functions:
- node_file: augment_with_timestamp.py
schemas:
- schema_file: schemas/Order.yml

This schema will now be available to nodes within the component as well as usable by external graphs:

...
schemas:
- uses: my-org/my-component@v0
schema: Order

Advanced Components: Building importers for JSON APIs

Goals

  • User can add component(s) (eg patterns/stripe-importer) to their graph to ingest all their historical data from a given vendor (e.g. all their transactions from Stripe)
  • The component gives them access to a back-filled, up-to-date, and flattened & deduped table output for each resource available from the vendor, as well as debug access to the raw JSON dump from the api
  • The component ingests the user’s entire history of data in small chunks, saving its progress to State as it goes, backfilling and then staying up-to-date, so even if the api is slow and back-filling takes many hours or days, progress is still reliably made

The recommended pattern for standard JSON apis is to build a function that iterates over all api objects by ascending updated time, saving the latest updated time to State after each iteration, and writing the data to a stream port. The streamed records are then passed to a standard flatten and deduplication step that turns the stream into a deduplicated table, according to the Schema definition. This setup ensures that:

  • The user’s data is kept up-to-date quickly and efficiently by only fetching new or updated records (we track latest updated fetched)
  • Old records that have been updated will be re-ingested and updated to latest version in the end table (we re-download updated rows and upsert them in the flatten and dedupe step)
  • Data is in clean and predictable format (we use an explicit schema)
  • User still has debug access to raw json, if needed

Here is a minimal example of an importer function (simplified for ease of understanding):

@node
def import_orders(
shopify_orders_stream=OutputStream(schema="ShopifyOrder"),
api_key=Parameter(type=str),
state=State,
):
while state.should_continue():
latest_updated_at = state.get_state_value("latest_updated_at") or DEFAULT_MIN_DATE
params = {
"api_key": api_key,
"order": "updated_at asc",
"updated_at_min": latest_updated_at,
"status": "any",
"limit": 250,
}
resp = requests.get(endpoint_url, params)
resp_json = resp.json()
records = resp_json["orders"]
if len(records) == 0:
break
new_latest_updated_at = max(o["updated_at"] for o in records)
for record in records:
shopify_orders_stream.stream_record(record)
state.set_state_value("latest_updated_at", new_latest_updated_at)
# Shopify has cursor-based pagination, so we can safely paginate results
next_page = resp_json.get("next_page")
if not next_page:
# No more pages
break
endpoint_url = next_page
params = {}

The associated component graph.yml, which uses the standard Patterns flatten and dedupe component:

title: Shopify Orders importer
slug: shopify-orders-importer
version: 1.0.0
exposes:
outputs:
- shopify_orders
parameters:
- api_key
schemas:
- ShopifyOrder
functions:
- node_file: import_orders.py
- uses: patterns/flatten@v0
inputs:
input_stream: shopify_orders_stream
outputs:
flattened_table: shopify_orders
stores:
- stream: shopify_orders_stream
schemas:
- schema_file: schemas/ShopifyOrder.yml

The flatten and dedupe component expects a Schema to be defined on the input_stream, so we define one in schemas/ShopifyOrder.yml and declare it and expose it in our graph.yml. To work well in the flatten and dedupe step, the Schema should specify the unique_on fields and the updated_ordering (ideally) or the created_ordering fields, so it knows which record version takes precedence (by default it takes the most recently seen version):

name: ShopifyOrder
description: Shopify order
unique_on:
- id
field_roles:
created_ordering: created_at
updated_ordering: updated_at
immutable: false
fields:
id:
type: Integer
admin_graphql_api_id:
type: Text
app_id:
type: Integer
billing_address:
type: Json
browser_ip:
type: Text
buyer_accepts_marketing:
type: Boolean
cancel_reason:
type: Text
cancelled_at:
type: DateTime
...

We then repeat this pattern for other shopify endpoints, like shopify-customers