Working with Python Nodes
Any valid python script is a valid python node in Patterns:
print("hello world")
To do interesting things, though, you’ll want to work with Table stores to read and write data and build dynamic data pipelines and applications.
Working with data​
Using data in Python nodes is as simple as declaring a Table store. To read from a store we declare it in read-only mode (the default). To write to a store, we declare it in write mode.
from patterns import Table
# An input (readable) Table
messages = Table("messages")
# An output (writeable) Table
messages_with_metadata = Table("messages_with_metadata", "w")
Now our python node is referencing a table named messages
and another named messages_with_metadata
. If these store nodes don’t exist in our App yet, we’ll need to create them before we run our python node.
With the tables declared, we can read new, un-processed messages from our Table by creating a Stream view and using consume_records()
, augment the record, and then append them to our Table:
import random
from datetime.datetime import now
from patterns import Table
# An input (readable) Table
messages = Table("messages")
# An output (writeable) Table
messages_with_metadata = Table("messages_with_metadata", "w")
# Use a stream to process each record incrementally just once
stream = messages.as_stream(order_by="timestamp")
for record in stream.consume_records():
record["processed_at"] = now()
record["random"] = random.randint(0,100)
messages_with_metadata.append(record)
All records written in python are augmented with a patterns_id
field by default that is strictly incrementing value (a ULID). This value
uniquely identifies the record across the entire Patterns platform and provides a default ordering for streaming the table. To disable this
automatic field, you can call table.init(add_monotonic_id=None)
before writing to a table.
Tracking state​
Sometimes it’s useful to track small amounts of metadata across runs of a node. For this, Patterns provides the State
object. This object also lets you control aspects of the execution of the node, like requesting a subsequent run of the node, or checking if the node should stop gracefully:
import random
from datetime.datetime import now
from patterns import State, Table
# An input (readable) Table
messages = Table("messages")
# An output (writeable) Table
messages_with_metadata = Table("messages_with_metadata", "w")
# Use a stream to process each record incrementally just once
stream = messages.as_stream(order_by="timestamp")
# The state for this node
state = State()
# Get a state value, defaulting to 0
record_count = state.get_value("record_count", 0)
records = []
for record in stream.consume_records():
record_count += 1
record["processed_at"] = now()
record["random"] = random.randint(0,100)
record["count"] = record_count
messages_with_metadata.append(record)
# Check if our node is about to hit its execution time limit
if not state.should_continue():
# Request for the node to run again, to continue processing in a subsequent execution
state.request_new_run()
break
table.append(records)
# Set a new state value
state.set_value("record_count", record_count)
Using third party libraries​
Patterns comes with common data python packages installed, and has native support for Pandas:
from patterns import Table
from sklearn.linear_model import LinearRegression
# An input (readable) Table
messages = Table("historical_messages")
df = messages.read_dataframe() # Read the whole table into memory
regr = LinearRegression()
model = regr.fit(
df["record_count"], df["consumed_at"].apply(lambda dt: dt.timestamp())
)
print(model.coef_)
You can also specify dependencies yourself.
Using Parameters and Secrets​
You can use the Parameter
type to configure a node without editing its code.
from patterns import Table, Parameter
import requests
messages = Table("messages", mode="w")
count = Parameter(
"ingestion_count",
type=int,
default=100,
description="The number of messages to request from the API",
)
api_key = Parameter(
"example_api_key",
description="The API key for the example service",
)
response = requests.get(
"https://www.example.com/api/messages",
headers={"API_KEY": api_key},
)
messages.append(response.json())
See the Python API reference for details on using Parameters in your code, or the documentation on configuring parameters with secrets
Sharing Code Between Python Nodes​
If you have code that you want to reuse in multiple Python nodes, you can write it
in a separate file and import that as a module in your nodes. You'll also need to
create an empty file named __init__.py
.
def shared_function():
return "hello"
# __init__.py can be empty
from patterns import Table
from . import common
output = Table("output", mode="w")
value = common.shared_function()
output.append({"shared", value})
See the Python Docs for more information on how imports work in Python.
Sharing code with the devkit​
When using the devkit, any files in your app directory are included in the app, so you can import any files in that directory from your nodes.
Sharing code in Patterns Studio​
To create a Python file that isn't associated with a node, use the Add File
button
in the sidebar.
Adding Python Dependencies​
You can install any packages available on PyPI by specifying them in a
requirements.txt
file, then adding following line to the top of your graph.yml
file:
requirements_file: requirements.txt
.
Requirements are downloaded every time a python node runs, so adding many dependencies can cause your executions to take longer.
Example​
If we want to install Faker and beautifulsoup, first we create a
requirements.txt
with the package requirements:
beautifulsoup4==4.11.2
Faker==16.6.1
Then point to the file in your graph.yml
:
title: My App
+ requirements_file: requirements.txt
functions:
- node_file: node.py
id: a8he6c
Now you can use those requirements in your python nodes:
from faker import Faker
fake = Faker()
print(fake.text())