dataframe

AstroCertified

Convert a Table object into a Pandas DataFrame or persist a DataFrame result to a database table.

View on GitHub

Last Updated: Dec. 16, 2021

Access Instructions

Install the Astro provider package into your Airflow environment.

Import the module into your DAG file and instantiate it with your desired params.

Parameters

python_callableOptional[Callable]Function to be decorated.
multiple_outputsOptional[bool]If set to True, the decorated function's return value will be unrolled to multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys.
conn_idstrThe ID of the configured Airflow Connection to use to connect to a database.
parametersOptional[dict or iterable]The parameters to render the SQL query with.
databaseOptional[str]The name of the database to use when executing the SQL.
schemaOptional[str]The name of the schema to use when executing the SQL.
warehouseOptional[str]The name of the warehouse to use when executing the SQL.

Documentation

Your pipeline might call for procedures that would be too complex or impossible in SQL. This could be building a model from a feature set, or using a windowing function which more Pandas is adept for. The dataframe function can easily move your data into a Pandas dataframe and back to your database as needed. At runtime, the operator loads any Table object into a Pandas DataFrame. If the Task returns a DataFame, downstream TaskFlow API Tasks can interact with it to continue using Python. If after running the function, you wish to return the value into your database, simply include a Table in the reserved output_table parameter (please note that since this parameter is reserved, you can not use it in your function definition).

Example:
from astro import dataframe
from astro.sql import transform
from astro.sql.table import Table
import pandas as pd
@dataframe
def get_dataframe():
return pd.DataFrame({"numbers": [1, 2, 3], "colors": ["red", "white", "blue"]})
@transform
def sample_pg(input_table: Table):
return "SELECT * FROM {input_table}"
with self.dag:
my_df = get_dataframe(
output_table=Table(
table_name="my_df_table", conn_id="postgres_conn", database="pagila"
)
)
pg_df = sample_pg(my_df)

Was this page helpful?