merge

AstroCertified

Merge data into an existing table in situations where there may be conflicts. This function adds data to a table with either an "update" or "ignore" strategy. The "ignore" strategy does not add values that conflict, while the "update" strategy overwrites the older values.

View on GitHub

Last Updated: Dec. 16, 2021

Access Instructions

Install the Astro provider package into your Airflow environment.

Import the module into your DAG file and instantiate it with your desired params.

Parameters

target_tableRequiredastro.sql.table.TableThe primary table that will be are merged into.
merge_tableRequiredastro.sql.table.TableThe Table containing the data to be merged to ``target_table``.
merge_keysRequireddictA dictionary of what fields to compare when determining conflicts.``{'foo': 'bar'}`` would be equivalent to ``main_table.foo=merge_table.bar``
target_columnsRequiredList[str]The columns that will be merged into. Order of the columns matters and needs to be same length as ``merge_columns``.
merge_columnsRequiredList[str]The columns used to perform the merge.
conflict_strategyRequiredstrMode to apply when handling merge conflicts should they arise. The possible values are 'ignore' (ignore all new values present in ``merge_table`` but not in ``target_table``) and 'update' (overwrite the ``target_table``).

Documentation

To merge data into an existing table in situations where there MIGHT be conflicts, the merge function adds data to a table with either an "update" or "ignore" strategy. The "ignore" strategy does not add values that conflict, while the "update" strategy overwrites the older values. This function only handles basic merge statements. Use the run_raw_sql decorator for complex statements. Note that the merge_keys parameter is a list in Postgres, but a map in Snowflake. This syntax decision was unavoidable due to the differences in how Postgres and Snowflake handle conflict resolution. Also note that * inserts are disabled for the merge function.

Examples:

Postgres

a = merge(
target_table=MAIN_TABLE,
merge_table=MERGE_TABLE,
merge_keys=["list", "sell"],
target_columns=["list", "sell", "taxes"],
merge_columns=["list", "sell", "age"],
conn_id="postgres_conn",
conflict_strategy="update",
database="pagila",
)

Snowflake

a = merge(
target_table=MAIN_TABLE,
merge_table=MERGE_TABLE,
merge_keys={"list": "list", "sell": "sell"},
target_columns=["list", "sell"],
merge_columns=["list", "sell"],
conn_id="snowflake_conn",
database="DWH_LEGACY",
conflict_strategy="ignore",
)

Was this page helpful?