Incremental (Append and Deduplicate) Load with Airbyte Demo
Date:
An “Incremental Append + Deduped Load” is a data integration pattern commonly used in ETL to ensure that duplicate records are avoided at the target database and are maintained at the source table too. This is used when we want to add data directly into a staging layer.
- Append: Adding only the new or changed records from the source system to the target system.
- Deduplication: Ensuring that any duplicate records that might have been introduced during the append process are removed or handled appropriately.
Incremental Append involves extracting only the new or modified records from the source system. This can be achieved by typically using a cursor timestamp audit column. Using a timestamp column (e.g., last_modified or updated_at) to filter and extract only the records that have been updated or created since the last load. Then we use the difference (delta) between the current state and the last state of the data to capture the delta. We could potentially also use CDC tools or techniques to identify and extract only the records that have changed since the last extraction but it can add some complexity.
Once the new and updated records are appended to the target system, deduplication ensures that no duplicate records exist. Deduplication can be done by using primary keys to enforce uniqueness. The we would need to use a MERGE Statement to combine new data with existing data, ensuring that duplicates are handled based on specified conditions. Finally we would use a SQL window functions to identify and remove duplicates, keeping only the latest record based on a timestamp or version column.
SCD1 (Slowly Changing Dimension Type 1) is a suitable use case for the Incremental Append + Deduped Load pattern. SCD1 is used to manage changes in a dimension table where the changes overwrite the existing records. This means that when a change occurs in the source data, the corresponding record in the dimension table is updated with the new information. We need to load only the new or updated records from the source system. This can be achieved by identifying the records that have changed since the last load using a timestamp or some change tracking mechanism. After loading the new and updated records, you need to ensure that the dimension table contains the latest version of each record. This involves removing any older versions of the records that have been updated.
The objective of this demo is to be able to query a source table in Postgres, perform an extract from the source table and load in a desination analytics environment. However this would to most importantly perform only delta changes.
- Full Load
- Insert New Records
- Update Change Records
- Update With Delete Change
Setup
Please refer a Sanbox demo to set up Postgres, Airbyte and Snowflake.
Configuring the source.
Destination
Airbyte created a table in Snowflake with the source columns along with some additional columns.
Configuring the Stream Cursor.
A cursor is the value used to track whether a record extracted in an incremental sync. We will be using CHANGEAT for this.
We would need to configure the cursor which will be used to handle the delta updates.
Data Setup
We will leverage the table from the previous demo. But I have upadted the table to use a primarykey.
ALTER TABLE SALES_ORDER ADD PRIMARY KEY (SALESORDERID)
I had used a python script to load the CSV using the currentdatetime for the CHANGEAT AND CREATEAT DATETIME.
Code to INSERT DATA into postgres using current_datetime = datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’) for CHANGEAT AND CREATEAT DATETIME.
import pandas as pd
import psycopg2
from datetime import datetime
# PostgreSQL credentials
db_username = 'krisnunes'
db_password = ''
db_host = 'localhost'
db_port = '5432'
db_name = 'krisnunes'
# CSV file path
csv_file_path = '/Users/krisnunes/Study/python/DataEngineering/ETL/Collect/Ingest/Postgress/SalesOrdersTimeStamp5.csv'
# Read the CSV file into a DataFrame
df = pd.read_csv(csv_file_path)
# Create a connection to the PostgreSQL database
try:
connection = psycopg2.connect(
user=db_username,
password=db_password,
host=db_host,
port=db_port,
database=db_name
)
cursor = connection.cursor()
# Insert data into the table
for index, row in df.iterrows():
# Get the current datetime
current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
INSERT INTO sales_order (
SALESORDERID, CREATEDBY, CREATEDAT, CHANGEDBY, CHANGEDAT,
FISCVARIANT, FISCALYEARPERIOD, NOTEID, PARTNERID, SALESORG,
CURRENCY, GROSSAMOUNT, NETAMOUNT, TAXAMOUNT,
LIFECYCLESTATUS, BILLINGSTATUS, DELIVERYSTATUS
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""", (
row['SALESORDERID'], row['CREATEDBY'], current_datetime, row['CHANGEDBY'], current_datetime,
row['FISCVARIANT'], row['FISCALYEARPERIOD'], row['NOTEID'], row['PARTNERID'], row['SALESORG'],
row['CURRENCY'], row['GROSSAMOUNT'], row['NETAMOUNT'], row['TAXAMOUNT'],
row['LIFECYCLESTATUS'], row['BILLINGSTATUS'], row['DELIVERYSTATUS']
))
connection.commit()
except Exception as error:
print(f"Error while connecting to PostgreSQL: {error}")
finally:
if connection:
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
Demo Run
1. Full Load
The FULL LOAD should move all the data in the table. All the 670 rows was syced by Airbyte.
Validating the change
There were 670 records also created in Snowflake table.
Below is a sample data of the table
We can see there is somemetadata also added by airbyte.
2. Insert New Records
We will be inserting 2 new records.
After running the insert query.
Running Airbyte Sync
Validating Data in Snowflake
Running a count we see there are 2 new records added taking the count to 672.
We can query the metadata and we can see that there was a new run.
We are able to get the run synid and query the records which were inserted.
3. Update Change Records
Updating a record which was
My original dataset had 2
Code to UDPATE DATA into postgres using current_datetime = datetime.now().strftime(‘%Y-%m-%d %H:%M:%S’) for CHANGEAT DATETIME.
import pandas as pd
import psycopg2
from datetime import datetime
# PostgreSQL credentials
db_username = 'krisnunes'
db_password = ''
db_host = 'localhost'
db_port = '5432'
db_name = 'krisnunes'
# CSV file path
csv_file_path = '/Users/krisnunes/Study/python/DataEngineering/ETL/Collect/Ingest/Postgress/SalesOrdersTimeStamp4.csv'
# Read the CSV file into a DataFrame
df = pd.read_csv(csv_file_path)
# Create a connection to the PostgreSQL database
try:
connection = psycopg2.connect(
user=db_username,
password=db_password,
host=db_host,
port=db_port,
database=db_name
)
cursor = connection.cursor()
# Insert data into the table
for index, row in df.iterrows():
# Get the current datetime
current_datetime = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
cursor.execute("""
UPDATE sales_order SET
CREATEDBY = %s,
CREATEDAT = %s,
CHANGEDBY = %s,
CHANGEDAT = %s,
FISCVARIANT = %s,
FISCALYEARPERIOD = %s,
NOTEID = %s,
PARTNERID = %s,
SALESORG = %s,
CURRENCY = %s,
GROSSAMOUNT = %s,
NETAMOUNT = %s,
TAXAMOUNT = %s,
LIFECYCLESTATUS = %s,
BILLINGSTATUS = %s,
DELIVERYSTATUS = %s
WHERE SALESORDERID = %s
""", (
row['CREATEDBY'], row['CREATEDAT'], row['CHANGEDBY'], current_datetime,
row['FISCVARIANT'], row['FISCALYEARPERIOD'], row['NOTEID'], row['PARTNERID'], row['SALESORG'],
row['CURRENCY'], row['GROSSAMOUNT'], row['NETAMOUNT'], row['TAXAMOUNT'],
row['LIFECYCLESTATUS'], row['BILLINGSTATUS'], row['DELIVERYSTATUS'],
row['SALESORDERID'] # The identifier to find the correct record
))
connection.commit()
except Exception as error:
print(f"Error while connecting to PostgreSQL: {error}")
finally:
if connection:
cursor.close()
connection.close()
print("PostgreSQL connection is closed")
We will be updating the below 4 rows.
select * from sales_order where salesorderid in (500000100,500000101,500000102,500000103, 500000000, 500000001);
4. Delete
The delete will use a Note column to indicate that the row is deleted and perform a row update.