Skip to main content

Working with Python Nodes

Any valid python script is a valid python node in Patterns:

print("hello world")

To do interesting things, though, you’ll want to work with Stream and Table stores to read and write data and build dynamic data pipelines and applications.

Working with data

Using data in Python nodes is as simple as declaring a Stream or Table store. To read from a store we declare it in read-only mode (the default). To write to a store, we declare it in write mode.

from patterns import Stream, Table

# An input (readable) Stream
stream = Stream("messages")

# An output (writeable) Table
table = Table("historical_messages", "w")

Now our python node is referencing a Stream store named messages and a Table store named historical_messages . If these store nodes don’t exist in our App yet, we’ll need to create them before we run our python node.

Now that they’re declared, we can read new, un-processed messages from our Stream with consume_records() , augment the record, and then append them to our Table:

from datetime.datetime import now
from patterns import Stream, Table

# An input (readable) Stream
stream = Stream("messages")

# An output (writeable) Table
table = Table("historical_messages", "w")

records = []
for record in stream.consume_records():
record["consumed_at"] = now()
records.append(record)

table.write(records)

Tracking state

Sometimes it’s useful to track small amounts of metadata across runs of a node. For this, Patterns provides the State object. This object also lets you control aspects of the execution of the node, like requesting a subsequent run of the node, or checking if the node should stop gracefully:

from datetime.datetime import now
from patterns import State, Stream, Table

# An input (readable) Stream
stream = Stream("messages")

# An output (writeable) Table
table = Table("historical_messages", "w")

# The state for this node
state = State()

# Get a state value, defaulting to 0
record_count = state.get_value("record_count", 0)

records = []
for record in stream.consume_records():
record_count += 1
record["record_count"] = record_count
record["consumed_at"] = now()
records.append(record)

# Check if our node is about to hit its execution time limit
if not state.should_continue():
break

table.write(records)

# Set a new state value
state.set_value("record_count", record_count)

Using third party libraries

Patterns comes with common data python packages installed, and has native support for Pandas:

from datetime.datetime import now
from patterns import Table
from sklearn.linear_model import LinearRegression

# An input (readable) Table
messages = Table("historical_messages")

df = messages.read(as_format="dataframe") # Read the whole table into memory

regr = LinearRegression()
model = regr.fit(
df[["record_count"]], df["consumed_at"].apply(lambda dt: dt.timestamp())
)

print(model.coef_)

Using parameters

Parameters allow App users to configure and augment nodes and functionality without editing the code. They are used extensively for Marketplace components, but can be useful for any App:

from datetime.datetime import now
from patterns import State, Stream, Table

# An input (readable) Stream
stream = Stream("messages")

# An output (writeable) Table
table = Table("historical_messages", "w")

# The state for this node
state = State()

# Parameter
add_record_count = Parameter(
"add_record_count",
type=bool,
default=True,
description="If true, augments records with cumulative record count"
)

# Get a state value, defaulting to 0
record_count = state.get_value("record_count", 0)

records = []
for record in stream.consume_records():
record_count += 1
if add_record_count:
record["record_count"] = record_count
record["consumed_at"] = now()
records.append(record)

# Check if our node is about to hit its execution time limit
if not state.should_continue():
break

table.write(records)

# Set a new state value
state.set_value("record_count", record_count)

For more technical details, refer to the Python API reference.