transform

AstroCertified

Execute an explicit, SELECT SQL statement. Data returned from this SQL is inserted into a temporary table which can used by other downstream tasks.

View on GitHub

Last Updated: Dec. 16, 2021

Access Instructions

Install the Astro provider package into your Airflow environment.

Import the module into your DAG file and instantiate it with your desired params.

Parameters

python_callableOptional[Callable]Function to be decorated.
multiple_outputsOptional[bool]If set to True, the decorated function's return value will be unrolled to multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
conn_idstrThe ID of the configured Airflow Connection to use to connect to a database.
autocommitboolIf True, each SQL command is automatically committed.
parametersOptional[dict or iterable]The parameters to render the SQL query with.
databaseOptional[str]The name of the database to use when executing the SQL.
schemaOptional[str]The name of the schema to use when executing the SQL.
warehouseOptional[str]The name of the warehouse to use when executing the SQL.

Documentation

With your data is in an SQL system, it's time to start transforming it! The @transform SQL decorator is your "ELT" system. Each step of the transform pipeline creates a new table from the SELECT statement and enables tasks to pass those tables as if they were native Python objects. You will notice that the functions use a custom templating system. Wrapping a value in single brackets (like {customer_table}) indicates the value needs to be rendered as a SQL table. The SQL decorator also treats values in double brackets as Airflow Jinja templates. Please note that this is NOT an f string. F-strings in SQL formatting risk security breaches via SQL injections. For security, users MUST explicitly identify tables in the function parameters by typing a value as a Table. Only then will the SQL decorator treat the value as a table.

Example:
@transform
def get_orders():
...
@transform
def get_customers():
...
@transform
def join_orders_and_customers(orders_table: Table, customer_table: Table):
"""Join `orders_table` and `customers_table` to create a simple 'feature' dataset."""
return """SELECT c.customer_id, c.source, c.region, c.member_since,
CASE WHEN purchase_count IS NULL THEN 0 ELSE 1 END AS recent_purchase
FROM {orders_table} c LEFT OUTER JOIN {customer_table} p ON c.customer_id = p.customer_id"""
with dag:
orders = get_orders()
customers = get_customers()
join_orders_and_customers(orders, customers)

Was this page helpful?