Patterns is now Atoma 🚀
Sign up here to automate analytics!
Skip to main content

Working with Python Nodes

Any valid python script is a valid python node in Patterns:

print("hello world")

To do interesting things, though, you’ll want to work with Table stores to read and write data and build dynamic data pipelines and applications.

Working with data

Using data in Python nodes is as simple as declaring a Table store. To read from a store we declare it in read-only mode (the default). To write to a store, we declare it in write mode.

from patterns import Table

# An input (readable) Table
messages = Table("messages")

# An output (writeable) Table
messages_with_metadata = Table("messages_with_metadata", "w")

Now our python node is referencing a table named messages and another named messages_with_metadata . If these store nodes don’t exist in our App yet, we’ll need to create them before we run our python node.

With the tables declared, we can read new, un-processed messages from our Table by creating a Stream view and using consume_records() , augment the record, and then append them to our Table:

Python: message stream to table
import random
from datetime.datetime import now
from patterns import Table

# An input (readable) Table
messages = Table("messages")

# An output (writeable) Table
messages_with_metadata = Table("messages_with_metadata", "w")

# Use a stream to process each record incrementally just once
stream = messages.as_stream(order_by="timestamp")

for record in stream.consume_records():
record["processed_at"] = now()
record["random"] = random.randint(0,100)
messages_with_metadata.append(record)

All records written in python are augmented with a patterns_id field by default that is strictly incrementing value (a ULID). This value uniquely identifies the record across the entire Patterns platform and provides a default ordering for streaming the table. To disable this automatic field, you can call table.init(add_monotonic_id=None) before writing to a table.

Tracking state

Sometimes it’s useful to track small amounts of metadata across runs of a node. For this, Patterns provides the State object. This object also lets you control aspects of the execution of the node, like requesting a subsequent run of the node, or checking if the node should stop gracefully:

import random
from datetime.datetime import now
from patterns import State, Table

# An input (readable) Table
messages = Table("messages")

# An output (writeable) Table
messages_with_metadata = Table("messages_with_metadata", "w")

# Use a stream to process each record incrementally just once
stream = messages.as_stream(order_by="timestamp")

# The state for this node
state = State()

# Get a state value, defaulting to 0
record_count = state.get_value("record_count", 0)

records = []
for record in stream.consume_records():
record_count += 1
record["processed_at"] = now()
record["random"] = random.randint(0,100)
record["count"] = record_count
messages_with_metadata.append(record)

# Check if our node is about to hit its execution time limit
if not state.should_continue():
# Request for the node to run again, to continue processing in a subsequent execution
state.request_new_run()
break

table.append(records)

# Set a new state value
state.set_value("record_count", record_count)

Using third party libraries

Patterns comes with common data python packages installed, and has native support for Pandas:

from patterns import Table
from sklearn.linear_model import LinearRegression

# An input (readable) Table
messages = Table("historical_messages")

df = messages.read_dataframe() # Read the whole table into memory

regr = LinearRegression()
model = regr.fit(
df["record_count"], df["consumed_at"].apply(lambda dt: dt.timestamp())
)

print(model.coef_)

You can also specify dependencies yourself.

Using Parameters and Secrets

You can use the Parameter type to configure a node without editing its code.

from patterns import Table, Parameter
import requests

messages = Table("messages", mode="w")

count = Parameter(
"ingestion_count",
type=int,
default=100,
description="The number of messages to request from the API",
)

api_key = Parameter(
"example_api_key",
description="The API key for the example service",
)

response = requests.get(
"https://www.example.com/api/messages",
headers={"API_KEY": api_key},
)

messages.append(response.json())

See the Python API reference for details on using Parameters in your code, or the documentation on configuring parameters with secrets

Sharing Code Between Python Nodes

If you have code that you want to reuse in multiple Python nodes, you can write it in a separate file and import that as a module in your nodes. You'll also need to create an empty file named __init__.py.

common.py
def shared_function():
return "hello"
__init__.py
# __init__.py can be empty
python_node.py
from patterns import Table
from . import common

output = Table("output", mode="w")
value = common.shared_function()
output.append({"shared", value})

See the Python Docs for more information on how imports work in Python.

Sharing code with the devkit

When using the devkit, any files in your app directory are included in the app, so you can import any files in that directory from your nodes.

Sharing code in Patterns Studio

To create a Python file that isn't associated with a node, use the Add File button in the sidebar.

Adding Python Dependencies

You can install any packages available on PyPI by specifying them in a requirements.txt file, then adding following line to the top of your graph.yml file: requirements_file: requirements.txt.

caution

Requirements are downloaded every time a python node runs, so adding many dependencies can cause your executions to take longer.

Example

If we want to install Faker and beautifulsoup, first we create a requirements.txt with the package requirements:

requirements.txt
beautifulsoup4==4.11.2 
Faker==16.6.1

Then point to the file in your graph.yml:

graph.yml
  title: My App
+ requirements_file: requirements.txt
functions:
- node_file: node.py
id: a8he6c

Now you can use those requirements in your python nodes:

node.py
from faker import Faker
fake = Faker()
print(fake.text())