Python Reference
Patterns python nodes give you the full power of the python ecosystem -- you can ingest data from APIs with requests
, manipulate data with pandas
, or build models with pytorch
. The Patterns Protocol provides methods for working with both ETL aggregate pipelines and real-time streaming flows in a unified framework, and managing the data storage and transport on your behalf, so you can focus on the problem at hand.
All Patterns nodes store and retrieve data from Tables
. Tables are a thin abstraction over database tables that provide a stable reference across table versions and are the primary way to interact with data in Patterns code.
Patterns provides additional objects for helping you build robust pipelines. State
can be used to store lightweight state for the given node,
to checkpoint progress against an external API for instance. Parameters
can be used to make parts of your script easily configurable from
the UI or by the end user.
Here is the full API available to Python scripts:
from datetime import date, datetime
from patterns import (
Table,
Parameter,
State,
)
from dcp.utils.pandas import assert_dataframes_are_almost_equal
from pandas import DataFrame
input_table = Table("input_table")
output_table = Table("out_table", mode="w")
upsert_table = Table("upsert_table", mode="w")
state = State()
pstr = Parameter("ppstr", type=str, default="str")
pbool = Parameter("ppbool", type=bool, default=True)
pint = Parameter("ppint", type=int, default=0)
pfloat = Parameter("ppfloat", type=float, default=0.0)
pdate = Parameter(
"ppdate", type=date, default="2000-01-01"
) # default=date(2000, 1, 1))
pdatetime = Parameter(
"ppdatetime", type=datetime, default="2000-01-01 00:00:01"
) # default=datetime(2000, 1, 1))
plist = Parameter("pplist", type=list, default="a1")
records = [{"a": 1, "b": 2}, {"a": 3, "b": 4}]
df = DataFrame(records)
# Input table
itr = input_table.read()
assert itr == records, itr
itdf = input_table.read_dataframe()
assert_dataframes_are_almost_equal(itdf, df)
for chunk in input_table.read(chunksize=10):
assert records == chunk
for chunk in input_table.read(as_format="dataframe", chunksize=10):
assert_dataframes_are_almost_equal(chunk, df)
sql = "select * from {{ input_table }}"
itr = input_table.read_sql(sql)
assert itr == records, itr
itdf = input_table.read_sql(sql, as_format="dataframe")
assert_dataframes_are_almost_equal(itdf, df)
for chunk in input_table.read_sql(sql, chunksize=10):
assert chunk == records
for chunk in input_table.read_sql(sql, as_format="dataframe", chunksize=10):
assert_dataframes_are_almost_equal(chunk, df)
# Streaming tables
stream = input_table.as_stream() # By deafult, will stream in order records were written
for r, record in zip(records, stream):
assert r == record
stream.rewind()
stream = input_table.as_stream(order_by="a") # Again after reset
for r, record in zip(records, stream.consume_records()):
assert r == record
stream.seek(1)
assert len(list(stream)) == 1
# Output table
output_table.append(records)
output_table.append(records[0])
output_table.append(df)
output_table.truncate()
output_table.append(records)
output_table.reset()
# Upsert table
upsert_table.init(unique_on="a")
upsert_table.upsert(records)
upsert_table.upsert(records)
upsert_table.upsert([{"a": 1, "b": 0}, {"a": 4, "b": 4}])
# State
state.get()
state.set({"k": 1})
state.set_value("k", "v")
state.get_value("k", "v")
state.should_continue()
state.request_new_run()
state.reset()
Python API Reference
Table
class Table()
__init__
def __init__(name: str,
mode: str = "r",
description: str = None,
schema: str = None,
required: bool = True)
A Table is a thin abstraction over a database table that provides a stable reference across versions of the table.
Arguments:
name
- The Patterns name for the table. The actual database table on disk will include this name and a hash.mode
- Whether to use the table in "read" mode ("r") or "write" mode ("w")description
- An optional short description of this tableschema
- An optional explicit Schema for this table. If not provided the schema will be inferred, or can be set with the table'sinit
method.required
- Whether this table is a required table for the operation of the node, or is optional.
read
def read(
as_format: str = "records",
chunksize: int | None = None
) -> List[dict] | DataFrame | Iterator[List[dict]] | Iterator[DataFrame]
Reads records from this table.
Arguments:
as_format
- Format to return records in. Defaults to list of dicts ('records'). Set to 'dataframe' to get pandas dataframe (equivalent toread_dataframe
)chunksize
- If specified, returns an iterator of the requested format in chunks of given size
read_dataframe
def read_dataframe(chunksize: int | None = None
) -> DataFrame | Iterator[DataFrame]
Returns records as a pandas dataframe. Equivalent to .read(as_format='dataframe')
Arguments:
chunksize
- If specified, returns an iterator of the dataframes of given size
read_sql
def read_sql(
sql: str,
as_format: str = "records",
chunksize: int | None = None
) -> List[dict] | DataFrame | Iterator[List[dict]] | Iterator[DataFrame]
Reads records resulting from the given sql expression, in same manner as read
.
To reference tables in the sql, you can get their current (fully qualified and quoted)
sql name by referencing .sql_name
or, equivalently, taking their str() representation
my_table = Table("my_table")
my_table.read_sql(f'select * from {my_table} limit 10')
Arguments:
sql
- The sql select statement to executeas_format
- Format to return records in. Defaults to list of dicts ('records'). Set to 'dataframe' to get pandas dataframe.chunksize
- If specified, returns an iterator of the requested format in chunks of given size
as_stream
def as_stream(order_by: str = None, starting_value: Any = None) -> Stream
Returns a Stream over the given table that will consume each record in the table exactly once, in order.
Progress along the stream is stored in the node's state. A table may have
multiple simultaneous streams with different orderings. The stream is ordered
by the order_by
parameter if provided otherwise defaults to the schema's
strictly_monotonic_ordering
if defined or its created_ordering
if defined.
If none of those orderings are defined, an exception is thrown.
By default, all Tables written to by a python or webhook node are augmented with
a patterns_id
that provides a strictly_monotonic_ordering
for the Table.
Arguments:
order_by
- Optional, the field to order the stream by. If not provided defaults to schema-defined orderingsstarting_value
- Optional, value on the order by field at which to start the stream
Returns:
Stream object.
reset
def reset()
Resets the table.
No data is deleted on disk, but the active version of the table is reset to None.
is_connected
@property
def is_connected() -> bool
Returns true if this table port is connected to a store in the graph.
Operations on unconnected tables are no-ops and return dummy objects.
sql_name
@property
def sql_name() -> str | None
The fully qualified and quoted sql name of the active table version. The table may or may not exist on disk yet.
schema
@property
def schema() -> Schema | None
The Schema of the active table version. May be None
record_count
@property
def record_count() -> int | None
The record count of the active table version. May be None
exists
@property
def exists() -> bool
True if the table has been created on disk.
init
def init(schema: Schema | str | dict | None = None,
schema_hints: dict[str, str] | None = None,
unique_on: str | list[str] | None = None,
add_created: str | None = None,
add_monotonic_id: str | None = "patterns_id",
auto_indexes: bool = True)
Provides properties for this table that are used when a table version is first created on disk.
Arguments:
schema
- A CommonModel Schema object or str name, or a dictionary of field names to field typesschema_hints
- A dictionary of field names to CommonModel field types that are used to override any inferred types. e.g. {"field1": "Text", "field2": "Integer"}unique_on
- A field name or list of field names to that records should be unique on. Used by components to operate efficiently and correctly on the table.add_created
- If specified, is the field name that an "auto_now" timestamp will be added to each record whenappend
orupsert
is called. This field will be the default streaming order for the table (by automatically filling thecreated_ordering
role on the associated Schema), but only if add_monotonic_id is NOT specified and the associated schema defines no monotonic ordering.add_monotonic_id
- If specified, is the field name that a unique, strictly monotonically increasing base32 string (a ULID) will be added to each record whenappend
orupsert
is called. This field will be the default streaming order for the table (by automatically filling thestrictly_monotonic_ordering
role on the associated Schema). By default it is set topatterns_id
.auto_indexes
- If true (the default), an index is automatically created on new table versions for theunique_on
property
append
def append(records: DataFrame | List[dict] | dict)
Appends the records to the end of this table.
If this is the first write to this table then any schema provided is used to create the table, otherwise the schema is inferred from the passed in records.
Records are buffered and written to disk in batches. To force an immediate write,
call table.flush()
.
To replace a table with a new (empty) version and append from there, call
table.reset()
or table.replace(new_records)
.
Arguments:
records
- May be a list of records (list of dicts with str keys), a single record (dict), or a pandas dataframe.
upsert
def upsert(records: DataFrame | List[dict] | dict)
Upserts the records into this table, inserting new rows or updating if unique key conflicts.
Unique fields must be provided by the Schema or passed to init
. If this is
the first write to this table then any schema provided is used to create the table,
otherwise the schema is inferred from the passed in records.
Records are buffered and written to disk in batches. To force an immediate write,
call table.flush()
.
Arguments:
records
- May be a list of records (list of dicts with str keys), a single record (dict), or a pandas dataframe.
replace
def replace(records: DataFrame | List[dict])
Replaces the current table version (if any) with a new one containing just records
.
Equivalent to table.reset(); table.append(records)
.
Arguments:
records
- May be a list of records (list of dicts with str keys) or a pandas dataframe.
truncate
def truncate()
Truncates this table, preserving the table and schema on disk, but deleting all rows.
Unlike `reset
, which sets the active TableVersion to a new version, this action is
destructive and cannot be undone.
execute_sql
def execute_sql(sql: str)
Executes the given sql against the database this table is stored on.
The sql is inspected to determine if it creates new tables or only modifies them, and appropriate events are recorded. The sql should ONLY create or update THIS table. Creating or updating other tables will result in incorrect event propagation.
To reference tables in the sql, you can get their current (fully qualified and quoted)
sql name by referencing .sql_name
or, equivalently, taking their str() representation
my_table = Table("my_table", "w")
my_table.execute_sql(f'create table {my_table} as select 1 as a, 2 as b')
Arguments:
sql
- Any valid sql statement that creates, inserts, updates, or otherwise alters this table.
reset
def reset()
Resets this table to point to a new (null) TableVersion with no Schema or data.
Schema and data of previous version still exist on disk until garbage collected according to the table's retention policy.
flush
def flush()
Flushes any buffered records to disk.
Calls to table.append and table.upsert are buffered and flushed periodically and at the end of an execution. Use this method to force an immediate write.
Stream
class Stream()
A stateful view of a Table that supports consuming the table in a one-record-at-a-time manner in a given ordering, preserving progress across executions for the given node.
Example
table = Table("my_table")
stream = table.as_stream(order_by="id")
for record in stream.consume_records():
print(record)
# Rewind and the stream will consume from the beginning again
stream.rewind()
for record in stream.consume_records():
print(record)
stream.seek(42)
for record in stream.consume_records():
print(record) # only values *greater* than 42
consume_records
def consume_records(with_metadata: bool = False) -> Iterator[dict]
Iterates over records in this stream one at a time.
When a record is yielded it is marked as consumed, regardless of what happens after.
If you want to recover from errors and have the option to re-process records,
you can use rollback
and checkpoint
explicitly in a try / except block.
__iter__
def __iter__() -> Iterator[dict]
Equivalent to consume_records
checkpoint
def checkpoint()
Saves the stream state (which records have been consumed from the iterator) to disk.
rollback
def rollback()
Rolls back stream to beginning of execution or last checkpoint
.
rewind
def rewind()
Resets the stream to consume from the beginning again
seek
def seek(value: Any)
Seeks to the given value (of the order_by field).
Stream will consume values strictly greater than the given value, not including any record equal to the given value.
order_by_field
@property
def order_by_field() -> str
Returns the ordering field for this stream
State
class State()
State is a wrapper around a Table that supports quickly storing and retrieving single values from the database.
set
def set(state: dict)
Replaces the whole state dict with the provided one
set_value
def set_value(key: str, value: Any)
Sets the given value for the given key on this node's state.
get
def get() -> dict
Gets the current state dict
get_value
def get_value(key: str, default: Any = None) -> Any
Gets latest value from state for this node for the given key.
Arguments:
key
- key for state valuedefault
- default value if key is not present in state
Returns:
value from state
get_datetime
def get_datetime(key: str, default: datetime = None) -> datetime | None
Gets latest value from state for given key and tries to cast to a python datetime.
Arguments:
key
- key for statedefault
- default datetime if key is not present in state
Returns:
datetime from state or None
should_continue
def should_continue(pct_of_limit: float = None,
seconds_till_limit: int = None) -> bool
Returns False if execution is near its hard time limit (10 minutes typically), otherwise returns True.
Used to exit gracefully from long-running jobs, typically in conjunction with
request_new_run
. Defaults to 80% of limit or 120 seconds before the
hard limit, which ever is greater.
Arguments:
pct_of_limit
- percent of time limit to trigger atseconds_till_limit
- seconds before time limit to trigger at
request_new_run
def request_new_run(trigger_downstream: bool = True,
wait_atleast_seconds: int = None)
Requests a new run from the server for this node, to be started once the current execution finishes.
Often used in conjunction with should_continue
to run long jobs
over multiple executions safely.
The requested run be delayed with wait_atleast_seconds
to space out
the executions.
Arguments:
trigger_downstream
- Whether new run should trigger downstream nodes toowait_atleast_seconds
- Time to wait until starting the new run
reset
def reset()
Resets (clears) the state for this node.
Parameter
class Parameter()
__init__
def __init__(description: str = None,
type: Type[T] = str,
default: Any = "MISSING") -> T
Parameters let a python script take values from the end user and/or UI easily.
Allowed parameter types:
- str
- int
- float
- bool
- datetime
- date
- list
- Connection
Arguments:
description
- Description / help texttype
- should be the actual python type, e.g.type=str
default
- default value. If not set explicitly, the parameter is assumed optional. May be set to None
TableVersion
class TableVersion()
A specific version of a Table, representing an actual database table that may or may not be stored on disk yet.
A Table may have many TableVersions, one or zero of which will be active at any given time.
name
@property
def name() -> str
The unqualified name of the table.
storage
@property
def storage()
The dcp Storage object this table is stored on.
schema
@property
def schema() -> Schema | None
The realized schema of this TableVersion. None if does not exist on disk.
record_count
@property
def record_count() -> int | None
The realized schema of this TableVersion. None if does not exist on disk.
exists
@property
def exists() -> bool
True if this version exists on disk.
Python packages installed
The following python packages are installed for use in python scripts. Please contact us if you would like to see a package added same-day.
Lifetimes = "^0.11.3"
PySheets = "^1.0.3"
boto3 = "^1.24.25"
discord-py = "^2.1.0"
dvc = "^2.31.0"
dvc-gs = "^2.19.1"
faunadb = {git = "https://github.com/robertcsapo/faunadb-python.git", rev = "bee4f6d"}
firebolt-sqlalchemy = "^0.5.1"
google-cloud-bigquery-storage = "^2.13.0"
google-cloud-profiler = "^3.0.8"
keras = "^2.8.0"
matplotlib = "^3.5.2"
memory-profiler = "^0.60.0"
mysqlclient = "^2.1.1"
nltk = "^3.7"
notion-tools = {git = "https://github.com/patchbio/notion-tools.git", rev = "f4c789d389b2ef3bce744883e40a8c40bda6373b"}
openai = "^0.23.1"
openpyxl = "^3.0.9"
pyrr = "^0.10.3"
scikit-image = "^0.19.2"
scikit-learn = "^1.1.0"
scipy = "^1.8.0"
slack-bolt = "^1.15.1"
snowflake-sqlalchemy = "^1.3.4"
sqlalchemy-bigquery = "^1.5.0"
sqlalchemy-redshift = "^0.8.9"
sshtunnel = "^0.4.0"
statsmodels = "^0.13.2"
textblob = "^0.17.1"
torch = { version = "^1.11.0", markers = "sys_platform == 'linux'" }
tweepy = "^4.10.1"
web3 = "^5.30.0"
xgboost = "^1.6.1"