I hope to present a Proof Of Concept version of a multiprocessed data loader that I use extensively in my data integration pipelines.
Couple of problems that it is attempting to address are
The example below is constructed using a sqlite database. The solution is agnostic to underlying SQL engine.
We have a table called "stocks" with columns 'date', 'trans', 'symbol', 'qty', and 'price'.
import os
import random
import datetime
import pandas as pd
import sqlite3
import sql_loader
from sql_loader import SQLite_Upsert_Loader_Class
SQL_LOADER_HOME = '~/temp/sql_loader_home'
DB_NAME = 'sql_loader_1.db'
db_path = os.path.expanduser('{0}/{1}'.format(SQL_LOADER_HOME, DB_NAME))
def get_df(sql_query):
conn = sql_loader.get_db_connection()
df = pd.read_sql_query(sql_query, conn)
return df
stocks_sql_query = '''
SELECT * FROM stocks
'''
get_df(stocks_sql_query)
excel_filename = os.path.expanduser('{0}/{1}'.format(SQL_LOADER_HOME, 'stocks_update_1.xlsx'))
df = pd.read_excel(excel_filename)
df
kwargs = {
'table_name': 'stocks',
'primary_keys': ['date', 'trans', 'symbol'],
'df': df,
'dry_run': False,
}
stocks_loader = SQLite_Upsert_Loader_Class(**kwargs)
stocks_loader.execute()
get_df(stocks_sql_query)