Overview of SQL upsert loader

I hope to present a Proof Of Concept version of a multiprocessed data loader that I use extensively in my data integration pipelines.

Couple of problems that it is attempting to address are

  1. Ability to operate on any update flat file associated with a SQL table, with minor configuration.
  2. Ability to perform upsert operations on datasets without a primary key column. Although a combination key has to be identified using multiple columns to dedup the records.

The example below is constructed using a sqlite database. The solution is agnostic to underlying SQL engine.

We have a table called "stocks" with columns 'date', 'trans', 'symbol', 'qty', and 'price'.

In [1]:
import os
import random
import datetime
import pandas as pd
import sqlite3

import sql_loader
from sql_loader import SQLite_Upsert_Loader_Class

SQL_LOADER_HOME = '~/temp/sql_loader_home'
DB_NAME = 'sql_loader_1.db'
db_path = os.path.expanduser('{0}/{1}'.format(SQL_LOADER_HOME, DB_NAME))
In [2]:
def get_df(sql_query):
    conn = sql_loader.get_db_connection()
    df = pd.read_sql_query(sql_query, conn)
    return df
In [3]:
stocks_sql_query = '''
SELECT * FROM stocks
'''

1.

A snapshot of the table before we run the loader

  • Below we can see the records in our table "stocks", it has 7 records.
  • we don't have a primary key in this table.
  • columns "date", "trans" and "symbol" together can be used to create a combination key.
In [4]:
get_df(stocks_sql_query)
Out[4]:
date trans symbol qty price
0 2006-01-05 BUY RHAT 100.0 35.14
1 2006-03-28 BUY IBM 1000.0 45.00
2 2006-04-05 BUY MSFT 1000.0 72.00
3 2006-04-06 SELL IBM 500.0 53.00
4 2006-03-28 BUY RHAT 1000.0 45.00
5 2006-04-05 BUY RHAT 1000.0 72.00
6 2006-04-06 SELL RHAT 1001.0 1001.00

2.

Let's look at the flat file that we need to integrate into our table "stocks"

  • The flat file is read in from excel and has 12 records.
  • records 0,1,2 and 3 are duplicates.
  • record 4,5,6 have updated values.
  • rest of the records are new and will be need to be inserted.
In [5]:
excel_filename = os.path.expanduser('{0}/{1}'.format(SQL_LOADER_HOME, 'stocks_update_1.xlsx'))
df = pd.read_excel(excel_filename)
In [6]:
df
Out[6]:
date trans symbol qty price
0 2006-01-05 BUY RHAT 100 35.140000
1 2006-03-28 BUY IBM 1000 45.000000
2 2006-04-05 BUY MSFT 1000 72.000000
3 2006-04-06 SELL IBM 500 53.000000
4 2006-03-28 BUY RHAT 200 10.000000
5 2006-04-05 BUY RHAT 300 20.000000
6 2006-04-06 SELL RHAT 400 30.000000
7 2006-08-01 BUY DATA 660 81.288268
8 2006-08-02 SELL AMZN 960 22.046658
9 2006-08-03 BUY BRK 500 85.700459
10 2006-08-04 BUY GOOG 970 77.123425
11 2006-08-05 BUY FBK 950 44.813605

3.

Configure the SQLite_Upsert_Loader_Class

  • all we need to mention to the loader are we configuration variables
    • table_name - what table to insert the records into?
    • primary_keys - what columns are to be used to dedup the records and figure out if an update or insert op needs to be performed.
    • df - the variable referencing the pandas dataframe
    • dry_run - flag to do a dry_run, which skips the commit step
In [8]:
kwargs = {
    'table_name': 'stocks',
    'primary_keys': ['date', 'trans', 'symbol'],
    'df': df,
    'dry_run': False,
}
stocks_loader = SQLite_Upsert_Loader_Class(**kwargs)
stocks_loader.execute()
2020-11-29 00:56:21,558 root sql_loader.py:236 - INFO - stocks selects:12 inserts:5 updates:7
  • the loader logs the summary of the opeartion it performed
  • selects:12 - it operated on all records in the flat file.
  • inserts:5 - it performed inserts for 5 records.
  • updates:7 - performed updates on 7 records.

4.

Let's look at the updated table

  • notice values for records 0,1,2,3 remain unchanged.
  • the values for records 4,5,6 have been updated to reflect the new values.
  • rest of the records have been added to the table.
In [9]:
get_df(stocks_sql_query)
Out[9]:
date trans symbol qty price
0 2006-01-05 BUY RHAT 100.0 35.140000
1 2006-03-28 BUY IBM 1000.0 45.000000
2 2006-04-05 BUY MSFT 1000.0 72.000000
3 2006-04-06 SELL IBM 500.0 53.000000
4 2006-03-28 BUY RHAT 200.0 10.000000
5 2006-04-05 BUY RHAT 300.0 20.000000
6 2006-04-06 SELL RHAT 400.0 30.000000
7 2006-08-01 BUY DATA 660.0 81.288268
8 2006-08-02 SELL AMZN 960.0 22.046658
9 2006-08-03 BUY BRK 500.0 85.700459
10 2006-08-04 BUY GOOG 970.0 77.123425
11 2006-08-05 BUY FBK 950.0 44.813605

5.

Food for thought

  • The production version that I use had a function that can execute major portion of the operation in parallel. This is crucial for large data use-cases.
  • This leverages the python's multiprocessing library to create a Pool and operates on them parallelly.
In [ ]: