Collect: Airbyte Extract And Load to both S3 and Snowflake

Lakehouse, Airflow, 2024

image

Set Up

Install Postgres

  • For interst of time, I installed Postgress via an installer
CREATE TABLE sales_order (
    SALESORDERID BIGINT PRIMARY KEY,
    CREATEDBY BIGINT,
    CREATEDAT TIMESTAMP,
    CHANGEDBY BIGINT,
    CHANGEDAT TIMESTAMP,
    FISCVARIANT VARCHAR,
    FISCALYEARPERIOD BIGINT,
    NOTEID VARCHAR,
    PARTNERID BIGINT,
    SALESORG VARCHAR,
    CURRENCY VARCHAR,
    GROSSAMOUNT BIGINT,
    NETAMOUNT DOUBLE PRECISION,
    TAXAMOUNT DOUBLE PRECISION,
    LIFECYCLESTATUS VARCHAR,
    BILLINGSTATUS VARCHAR,
    DELIVERYSTATUS VARCHAR
);

Insert data from csv to postgress

import pandas as pd
import psycopg2

# PostgreSQL credentials
db_username = 'xxxxxx'
db_password = ''
db_host = 'localhost'
db_port = '5432'
db_name = 'xxxxxx'

# CSV file path
csv_file_path = '/XXXXXXXX/Study/python/DataEngineering/ETL/Collect/Ingest/Postgress/SalesOrdersTimeStamp.csv'

# Read the CSV file into a DataFrame
df = pd.read_csv(csv_file_path)

# Create a connection to the PostgreSQL database
try:
    connection = psycopg2.connect(
        user=db_username,
        password=db_password,
        host=db_host,
        port=db_port,
        database=db_name
    )
    cursor = connection.cursor()

    # Insert data into the table
    for index, row in df.iterrows():
        cursor.execute("""
            INSERT INTO sales_order (
                SALESORDERID, CREATEDBY, CREATEDAT, CHANGEDBY, CHANGEDAT, 
                FISCVARIANT, FISCALYEARPERIOD, NOTEID, PARTNERID, SALESORG, 
                CURRENCY, GROSSAMOUNT, NETAMOUNT, TAXAMOUNT, 
                LIFECYCLESTATUS, BILLINGSTATUS, DELIVERYSTATUS
            ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
        """, (
            row['SALESORDERID'], row['CREATEDBY'], row['CREATEDAT'], row['CHANGEDBY'], row['CHANGEDAT'], 
            row['FISCVARIANT'], row['FISCALYEARPERIOD'], row['NOTEID'], row['PARTNERID'], row['SALESORG'], 
            row['CURRENCY'], row['GROSSAMOUNT'], row['NETAMOUNT'], row['TAXAMOUNT'], 
            row['LIFECYCLESTATUS'], row['BILLINGSTATUS'], row['DELIVERYSTATUS']
        ))
    connection.commit()

except Exception as error:
    print(f"Error while connecting to PostgreSQL: {error}")

finally:
    if connection:
        cursor.close()
        connection.close()
        print("PostgreSQL connection is closed")

Install Airbyte

**clone Airbyte from GitHub**

git clone --depth=1 https://github.com/airbytehq/airbyte.git

**switch into Airbyte directory**

cd airbyte

**start Airbyte**

./run-ab-platform.sh

Using the default config

  • Host: http://localhost:8000/
  • User: airbyte and password is password

S3

  • Create a user with access to the Bucket. Generate access key
  • Create a folder.

image

Configure Airbyte

Source: Postgress

image

I am using the basic update.

image

Target: S3

image

Running a sync

image

Validating in S3

image

Configuring Snowflake

image

Syncing into Snowflake

image