Skip to main content

Python

Patterns python nodes give you the full power of the python ecosystem -- you can ingest data from APIs with requests, manipulate data with pandas, or build models with pytorch. The Patterns Protocol provides methods for working with both ETL aggregate pipelines and real-time streaming flows in a unified framework, and managing the data storage and transport on your behalf, so you can focus on the problem at hand.

All Patterns nodes store and retrieve data from Streams and Tables. These are represented as python objects you can import into your python script to interact with Patterns data.

Patterns provides additional objects for helping you build robust pipelines. State can be used to store lightweight state for the given node, to checkpoint progress against an external API, for instance. Parameters can be used to make parts of your script easily configurable from the UI.

Here is the full API available to Python scripts:

from patterns import (
Table,
Parameter,
State,
Stream,
)
from pandas import DataFrame


# Best practice to declare your tables and streams at the top of your script
input_stream = Stream("input_stream")
input_table = Table("input_table")

output_stream = Stream("output_stream", mode="w")
output_table = Table("out_table", mode="w")

state = State()

pint = Parameter("pint", type=int, default=0)
pstr = Parameter("pstr", type=str, default="str")
pbool = Parameter("pbool", type=bool, default=True)
plist = Parameter("plist", type=list, default=None)


records = [{"a": 1, "b": 2}, {"a": 3, "b": 4}]
df = DataFrame(records)


# Input stream
for record in input_stream:
print(record)


# Input table
itr = input_table.read() # Read ALL table data as records (list of dictionaries)
itdf = input_table.read(as_format="dataframe") # Read ALL table data as a pandas DataFrame
for chunk in input_table.read(chunksize=10): # Read records into memory in chunks
print(chunk)
for chunk in input_table.read(as_format="dataframe", chunksize=10):
print(chunk)

sql = "select * from {{ input_table }}"
itr = input_table.read_sql(sql) # Read records returning from the given sql. The sql should use jinja templating to refer to Table names
itdf = input_table.read_sql(sql, as_format="dataframe") # Or as dataframe
for chunk in input_table.read_sql(sql, chunksize=10):
print(chunk)
for chunk in input_table.read_sql(sql, as_format="dataframe", chunksize=10):
print(chunk)


# Output stream
output_stream.write(records) # Write a list of dictionaries
output_stream.write(records[0]) # Write a single dictionary (record)
output_stream.write(df) # Write a pandas dataframe
output_stream.signal_reset() # Reset the stream (clears all records in the stream)


# Output table
output_table.write(records)
output_table.write(records[0])
output_table.write(df)
output_table.signal_reset() # Reset the table (Resets the table to a new empty version)


# State
state.set_value("k", "v")
state.get_value("k", "default")
state.get_datetime("k") # Convenience for parsing datetime formats
state.should_continue() # Check if the function execution has hit its time limit and should stop gracefully
state.request_new_run() # Request a new run of the function once the current one completes (often used in tandem w should_continue)
state.signal_reset() # Reset all state

Reference

Table(
name: str,
description: str = None,
schema: str = None,
required: bool = True,
mode: str = "r",
)
Stream(
name: str,
description: str = None,
schema: str = None,
required: bool = True,
mode: str = "r",
)
Parameter(
name: str,
type: str = None,
description: str = None,
default: Any
)

# Where type is one of:
class ParameterType(str, Enum):
Text = "text"
Boolean = "bool"
Integer = "int"
Float = "float"
Date = "date"
DateTime = "datetime"
List = "list"
Schema = "schema"
State()