Skip to main content

Python Reference

Patterns python nodes give you the full power of the python ecosystem -- you can ingest data from APIs with requests, manipulate data with pandas, or build models with pytorch. The Patterns Protocol provides methods for working with both ETL aggregate pipelines and real-time streaming flows in a unified framework, and managing the data storage and transport on your behalf, so you can focus on the problem at hand.

All Patterns nodes store and retrieve data from Tables. Tables are a thin abstraction over database tables that provide a stable reference across table versions and are the primary way to interact with data in Patterns code.

Patterns provides additional objects for helping you build robust pipelines. State can be used to store lightweight state for the given node, to checkpoint progress against an external API for instance. Parameters can be used to make parts of your script easily configurable from the UI or by the end user.

Here is the full API available to Python scripts:

from datetime import date, datetime

from patterns import (
Table,
Parameter,
State,
)
from dcp.utils.pandas import assert_dataframes_are_almost_equal
from pandas import DataFrame


input_table = Table("input_table")
output_table = Table("out_table", mode="w")
upsert_table = Table("upsert_table", mode="w")

state = State()

pstr = Parameter("ppstr", type=str, default="str")
pbool = Parameter("ppbool", type=bool, default=True)
pint = Parameter("ppint", type=int, default=0)
pfloat = Parameter("ppfloat", type=float, default=0.0)
pdate = Parameter(
"ppdate", type=date, default="2000-01-01"
) # default=date(2000, 1, 1))
pdatetime = Parameter(
"ppdatetime", type=datetime, default="2000-01-01 00:00:01"
) # default=datetime(2000, 1, 1))
plist = Parameter("pplist", type=list, default="a1")


records = [{"a": 1, "b": 2}, {"a": 3, "b": 4}]
df = DataFrame(records)


# Input table
itr = input_table.read()
assert itr == records, itr
itdf = input_table.read_dataframe()
assert_dataframes_are_almost_equal(itdf, df)
for chunk in input_table.read(chunksize=10):
assert records == chunk
for chunk in input_table.read(as_format="dataframe", chunksize=10):
assert_dataframes_are_almost_equal(chunk, df)

sql = "select * from {{ input_table }}"
itr = input_table.read_sql(sql)
assert itr == records, itr
itdf = input_table.read_sql(sql, as_format="dataframe")
assert_dataframes_are_almost_equal(itdf, df)
for chunk in input_table.read_sql(sql, chunksize=10):
assert chunk == records
for chunk in input_table.read_sql(sql, as_format="dataframe", chunksize=10):
assert_dataframes_are_almost_equal(chunk, df)


# Streaming tables
stream = input_table.as_stream() # By deafult, will stream in order records were written
for r, record in zip(records, stream):
assert r == record
stream.rewind()
stream = input_table.as_stream(order_by="a") # Again after reset
for r, record in zip(records, stream.consume_records()):
assert r == record
stream.seek(1)
assert len(list(stream)) == 1


# Output table
output_table.append(records)
output_table.append(records[0])
output_table.append(df)
output_table.truncate()
output_table.append(records)
output_table.reset()


# Upsert table
upsert_table.init(unique_on="a")
upsert_table.upsert(records)
upsert_table.upsert(records)
upsert_table.upsert([{"a": 1, "b": 0}, {"a": 4, "b": 4}])


# State
state.get()
state.set({"k": 1})
state.set_value("k", "v")
state.get_value("k", "v")
state.should_continue()
state.request_new_run()
state.reset()

Python API Reference

Table

class Table()

__init__

def __init__(name: str,
mode: str = "r",
description: str = None,
schema: str = None,
required: bool = True)

A Table is a thin abstraction over a database table that provides a stable reference across versions of the table.

Arguments:

  • name - The Patterns name for the table. The actual database table on disk will include this name and a hash.
  • mode - Whether to use the table in "read" mode ("r") or "write" mode ("w")
  • description - An optional short description of this table
  • schema - An optional explicit Schema for this table. If not provided the schema will be inferred, or can be set with the table's init method.
  • required - Whether this table is a required table for the operation of the node, or is optional.

read

def read(
as_format: str = "records",
chunksize: int | None = None
) -> List[dict] | DataFrame | Iterator[List[dict]] | Iterator[DataFrame]

Reads records from this table.

Arguments:

  • as_format - Format to return records in. Defaults to list of dicts ('records'). Set to 'dataframe' to get pandas dataframe (equivalent to read_dataframe)
  • chunksize - If specified, returns an iterator of the requested format in chunks of given size

read_dataframe

def read_dataframe(chunksize: int | None = None
) -> DataFrame | Iterator[DataFrame]

Returns records as a pandas dataframe. Equivalent to .read(as_format='dataframe')

Arguments:

  • chunksize - If specified, returns an iterator of the dataframes of given size

read_sql

def read_sql(
sql: str,
as_format: str = "records",
chunksize: int | None = None
) -> List[dict] | DataFrame | Iterator[List[dict]] | Iterator[DataFrame]

Reads records resulting from the given sql expression, in same manner as read.

To reference tables in the sql, you can get their current (fully qualified and quoted) sql name by referencing .sql_name or, equivalently, taking their str() representation

my_table = Table("my_table")
my_table.read_sql(f'select * from {my_table} limit 10')

Arguments:

  • sql - The sql select statement to execute
  • as_format - Format to return records in. Defaults to list of dicts ('records'). Set to 'dataframe' to get pandas dataframe.
  • chunksize - If specified, returns an iterator of the requested format in chunks of given size

as_stream

def as_stream(order_by: str = None, starting_value: Any = None) -> Stream

Returns a Stream over the given table that will consume each record in the table exactly once, in order.

Progress along the stream is stored in the node's state. A table may have multiple simultaneous streams with different orderings. The stream is ordered by the order_by parameter if provided otherwise defaults to the schema's strictly_monotonic_ordering if defined or its created_ordering if defined. If none of those orderings are defined, an exception is thrown.

By default, all Tables written to by a python or webhook node are augmented with a patterns_id that provides a strictly_monotonic_ordering for the Table.

Arguments:

  • order_by - Optional, the field to order the stream by. If not provided defaults to schema-defined orderings
  • starting_value - Optional, value on the order by field at which to start the stream

Returns:

Stream object.

reset

def reset()

Resets the table.

No data is deleted on disk, but the active version of the table is reset to None.

is_connected

@property
def is_connected() -> bool

Returns true if this table port is connected to a store in the graph.

Operations on unconnected tables are no-ops and return dummy objects.

sql_name

@property
def sql_name() -> str | None

The fully qualified and quoted sql name of the active table version. The table may or may not exist on disk yet.

schema

@property
def schema() -> Schema | None

The Schema of the active table version. May be None

record_count

@property
def record_count() -> int | None

The record count of the active table version. May be None

exists

@property
def exists() -> bool

True if the table has been created on disk.

init

def init(schema: Schema | str | dict | None = None,
schema_hints: dict[str, str] | None = None,
unique_on: str | list[str] | None = None,
add_created: str | None = None,
add_monotonic_id: str | None = "patterns_id",
auto_indexes: bool = True)

Provides properties for this table that are used when a table version is first created on disk.

Arguments:

  • schema - A CommonModel Schema object or str name, or a dictionary of field names to field types
  • schema_hints - A dictionary of field names to CommonModel field types that are used to override any inferred types. e.g. {"field1": "Text", "field2": "Integer"}
  • unique_on - A field name or list of field names to that records should be unique on. Used by components to operate efficiently and correctly on the table.
  • add_created - If specified, is the field name that an "auto_now" timestamp will be added to each record when append or upsert is called. This field will be the default streaming order for the table (by automatically filling the created_ordering role on the associated Schema), but only if add_monotonic_id is NOT specified and the associated schema defines no monotonic ordering.
  • add_monotonic_id - If specified, is the field name that a unique, strictly monotonically increasing base32 string (a ULID) will be added to each record when append or upsert is called. This field will be the default streaming order for the table (by automatically filling the strictly_monotonic_ordering role on the associated Schema). By default it is set to patterns_id.
  • auto_indexes - If true (the default), an index is automatically created on new table versions for the unique_on property

append

def append(records: DataFrame | List[dict] | dict)

Appends the records to the end of this table.

If this is the first write to this table then any schema provided is used to create the table, otherwise the schema is inferred from the passed in records.

Records are buffered and written to disk in batches. To force an immediate write, call table.flush().

To replace a table with a new (empty) version and append from there, call table.reset() or table.replace(new_records).

Arguments:

  • records - May be a list of records (list of dicts with str keys), a single record (dict), or a pandas dataframe.

upsert

def upsert(records: DataFrame | List[dict] | dict)

Upserts the records into this table, inserting new rows or updating if unique key conflicts.

Unique fields must be provided by the Schema or passed to init. If this is the first write to this table then any schema provided is used to create the table, otherwise the schema is inferred from the passed in records.

Records are buffered and written to disk in batches. To force an immediate write, call table.flush().

Arguments:

  • records - May be a list of records (list of dicts with str keys), a single record (dict), or a pandas dataframe.

replace

def replace(records: DataFrame | List[dict])

Replaces the current table version (if any) with a new one containing just records.

Equivalent to table.reset(); table.append(records).

Arguments:

  • records - May be a list of records (list of dicts with str keys) or a pandas dataframe.

truncate

def truncate()

Truncates this table, preserving the table and schema on disk, but deleting all rows.

Unlike `reset, which sets the active TableVersion to a new version, this action is destructive and cannot be undone.

execute_sql

def execute_sql(sql: str)

Executes the given sql against the database this table is stored on.

The sql is inspected to determine if it creates new tables or only modifies them, and appropriate events are recorded. The sql should ONLY create or update THIS table. Creating or updating other tables will result in incorrect event propagation.

To reference tables in the sql, you can get their current (fully qualified and quoted) sql name by referencing .sql_name or, equivalently, taking their str() representation

my_table = Table("my_table", "w")
my_table.execute_sql(f'create table {my_table} as select 1 as a, 2 as b')

Arguments:

  • sql - Any valid sql statement that creates, inserts, updates, or otherwise alters this table.

reset

def reset()

Resets this table to point to a new (null) TableVersion with no Schema or data.

Schema and data of previous version still exist on disk until garbage collected according to the table's retention policy.

flush

def flush()

Flushes any buffered records to disk.

Calls to table.append and table.upsert are buffered and flushed periodically and at the end of an execution. Use this method to force an immediate write.

Stream

class Stream()

A stateful view of a Table that supports consuming the table in a one-record-at-a-time manner in a given ordering, preserving progress across executions for the given node.

Example

table = Table("my_table")
stream = table.as_stream(order_by="id")
for record in stream.consume_records():
print(record)

# Rewind and the stream will consume from the beginning again
stream.rewind()
for record in stream.consume_records():
print(record)

stream.seek(42)
for record in stream.consume_records():
print(record) # only values *greater* than 42

consume_records

def consume_records(with_metadata: bool = False) -> Iterator[dict]

Iterates over records in this stream one at a time.

When a record is yielded it is marked as consumed, regardless of what happens after. If you want to recover from errors and have the option to re-process records, you can use rollback and checkpoint explicitly in a try / except block.

__iter__

def __iter__() -> Iterator[dict]

Equivalent to consume_records

checkpoint

def checkpoint()

Saves the stream state (which records have been consumed from the iterator) to disk.

rollback

def rollback()

Rolls back stream to beginning of execution or last checkpoint.

rewind

def rewind()

Resets the stream to consume from the beginning again

seek

def seek(value: Any)

Seeks to the given value (of the order_by field).

Stream will consume values strictly greater than the given value, not including any record equal to the given value.

order_by_field

@property
def order_by_field() -> str

Returns the ordering field for this stream

State

class State()

State is a wrapper around a Table that supports quickly storing and retrieving single values from the database.

set

def set(state: dict)

Replaces the whole state dict with the provided one

set_value

def set_value(key: str, value: Any)

Sets the given value for the given key on this node's state.

get

def get() -> dict

Gets the current state dict

get_value

def get_value(key: str, default: Any = None) -> Any

Gets latest value from state for this node for the given key.

Arguments:

  • key - key for state value
  • default - default value if key is not present in state

Returns:

value from state

get_datetime

def get_datetime(key: str, default: datetime = None) -> datetime | None

Gets latest value from state for given key and tries to cast to a python datetime.

Arguments:

  • key - key for state
  • default - default datetime if key is not present in state

Returns:

datetime from state or None

should_continue

def should_continue(pct_of_limit: float = None,
seconds_till_limit: int = None) -> bool

Returns False if execution is near its hard time limit (10 minutes typically), otherwise returns True.

Used to exit gracefully from long-running jobs, typically in conjunction with request_new_run. Defaults to 80% of limit or 120 seconds before the hard limit, which ever is greater.

Arguments:

  • pct_of_limit - percent of time limit to trigger at
  • seconds_till_limit - seconds before time limit to trigger at

request_new_run

def request_new_run(trigger_downstream: bool = True,
wait_atleast_seconds: int = None)

Requests a new run from the server for this node, to be started once the current execution finishes.

Often used in conjunction with should_continue to run long jobs over multiple executions safely.

The requested run be delayed with wait_atleast_seconds to space out the executions.

Arguments:

  • trigger_downstream - Whether new run should trigger downstream nodes too
  • wait_atleast_seconds - Time to wait until starting the new run

reset

def reset()

Resets (clears) the state for this node.

Parameter

class Parameter()

__init__

def __init__(description: str = None,
type: Type[T] = str,
default: Any = "MISSING") -> T

Parameters let a python script take values from the end user and/or UI easily.

Allowed parameter types:

  • str
  • int
  • float
  • bool
  • datetime
  • date
  • list
  • Connection

Arguments:

  • description - Description / help text
  • type - should be the actual python type, e.g. type=str
  • default - default value. If not set explicitly, the parameter is assumed optional. May be set to None

TableVersion

class TableVersion()

A specific version of a Table, representing an actual database table that may or may not be stored on disk yet.

A Table may have many TableVersions, one or zero of which will be active at any given time.

name

@property
def name() -> str

The unqualified name of the table.

storage

@property
def storage()

The dcp Storage object this table is stored on.

schema

@property
def schema() -> Schema | None

The realized schema of this TableVersion. None if does not exist on disk.

record_count

@property
def record_count() -> int | None

The realized schema of this TableVersion. None if does not exist on disk.

exists

@property
def exists() -> bool

True if this version exists on disk.

Python packages installed

The following python packages are installed for use in python scripts. Please contact us if you would like to see a package added same-day.

Lifetimes = "^0.11.3"
PySheets = "^1.0.3"
boto3 = "^1.24.25"
discord-py = "^2.1.0"
dvc = "^2.31.0"
dvc-gs = "^2.19.1"
faunadb = {git = "https://github.com/robertcsapo/faunadb-python.git", rev = "bee4f6d"}
firebolt-sqlalchemy = "^0.5.1"
google-cloud-bigquery-storage = "^2.13.0"
google-cloud-profiler = "^3.0.8"
keras = "^2.8.0"
matplotlib = "^3.5.2"
memory-profiler = "^0.60.0"
mysqlclient = "^2.1.1"
nltk = "^3.7"
notion-tools = {git = "https://github.com/patchbio/notion-tools.git", rev = "f4c789d389b2ef3bce744883e40a8c40bda6373b"}
openai = "^0.23.1"
openpyxl = "^3.0.9"
pyrr = "^0.10.3"
scikit-image = "^0.19.2"
scikit-learn = "^1.1.0"
scipy = "^1.8.0"
slack-bolt = "^1.15.1"
snowflake-sqlalchemy = "^1.3.4"
sqlalchemy-bigquery = "^1.5.0"
sqlalchemy-redshift = "^0.8.9"
sshtunnel = "^0.4.0"
statsmodels = "^0.13.2"
textblob = "^0.17.1"
torch = { version = "^1.11.0", markers = "sys_platform == 'linux'" }
tweepy = "^4.10.1"
web3 = "^5.30.0"
xgboost = "^1.6.1"