Intro
Data Model
AWS
Scripts
Results
What was Learned
This post was in collaboration with Jonathan Adams.
As data scientists, our job is mainly focused on using data to solve business problems, not engineering the enviroment for our use. However, a colleague and I were keen to learn more about the nuances of this work because we were suddenly tasked with some of these data engineering responsbilities. We each needed to learn for our jobs and saw it as a chance to build our online portfolio.
Instead of using a commonly accessible dataset, we decided to simulate what a real company’s data looks like. This allowed the project to be more fun and gave us a data structure we were familiar with. We called the fake company JJ Furnitiure (JJ stands for Julian and Jon) and we created artifical transaction, customer, and product data.
This post (Part 1) is about data engineering and our learnings. Part 2 will be data science oriented. Thanks for reading and I hope you enjoy!
Data engineers use two types of databases for their main tasks of:
- loading in raw data from source systems (OLTP: writing data | recording system | AWS RDS)
- designing tables that benefit end-users (OLAP: reading data | analytical system | AWS Redshift)
The below image is the usual data model that teams follow. They load in their raw transactional data into an OLTP (online transactional processing) database. This is best for writing data - actions like insert, update, and delete. Next is ETL (extract, transform, load). Here, you design a star schema and load in the results to an OLAP (online analytical processing) database. This is best for reading data, aka SQL queries. End-users who do analytics and make dashboards benefit from this system.
For our project, we wanted to create raw source data that somewhat imitated reality. This type of data is usually messy and needs wrangling. We also tried to design it how real data engineers would. Generally, this consists of having small tables that you would later join on in an ETL process. This enables the engineer to more easily keep track of and manipulate tables. Here is the OLTP database we designed.
Businesses create data through transactions and need to store it somewhere. That’s where the OLTP database comes in. End-users should never see this database.
We used Amazon Web Services free tier to create our OLTP database. In AWS, it’s called RDS (relational database service). It has row-wise storage which is optimized for writing data. Conversely, the OLAP model in AWS is the familiar Redshift. It stores data column-wise to minimize query processesing time.
For this basic project, all we had to do is make an AWS account and go to the service called RDS and click “Create database”.
We followed the steps the website took us through to make the RDS. One of the decision points was which SQL language to use - we chose postgres because it’s common and familiar. Next, we needed a SQL client to connect to it. DBeaver is a good for Mac’s, so we used that.
At this point is where we encountered friction. Thankfully, others had experienced the same errors and made good posts online on how to overcome them. Main issues were:
- “Connection timeout” error. Solution: make a security group with Amazon EC2. This was not an intuitive design by Amazon.
- Our database name didn’t work for the connection. Solution: use the default name “postgres”, not what you named your database on the Amazon RDS website…geez.
Here are screenshots of the solutions we found online that saved us:
We finally achieved a successful connection thanks to the above two posts and were glad to begin the next stage of writing python code to simulate our fake furniture company’s data. This, along with learning how to automate table creation and data insertion into the database we just made.
These four scripts work in concert with one another to connect to the database, drop/create empty tables, and then create/insert data. Jon audited a data enginnering Coursera class and we re-purposed some of that code for the first three scripts listed below. #4 we created from scratch.
import psycopg2
import pandas as pd
from configparser import ConfigParser
class PostgresConnector():
'''Used for creating connection to MySQL database.
Has methods for pulling data and performing database
updates'''
def __init__(self, config_file_path='config.ini', section='',encoding='utf-16'):
'''Initialize connection to MySQL database.
This requires a config file with named section
containing the following fields:
HOST, PORT, USER, PASSWORD, DATABASE'''
cf = ConfigParser()
cf.read(config_file_path, encoding=encoding)
self.db_host = cf.get(section, 'HOST')
self.db_port = cf.getint(section, 'PORT')
self.db_user = cf.get(section, 'USER')
self.db_pwd = cf.get(section, 'PASSWORD')
self.db_name = cf.get(section,'DATABASE')
self._connect = psycopg2.connect(host=self.db_host,
port=int(self.db_port),
user=self.db_user,
password=self.db_pwd,
dbname=self.db_name)
def queryall(self, sql, params=None, return_df=True):
'''Executes SQL query and returns the fetched dataset'''
cursor = self._connect.cursor()
try:
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
result = cursor.fetchall()
columns = [cursor.description[i][0] for \
i in range(len(cursor.description))]
except Exception as e:
print(e)
finally:
cursor.close()
if return_df == True:
return pd.DataFrame(result,columns=columns)
else:
return result
def update(self, sql, params=None):
'''Executes SQL queries used for updating database
such as insert and update (CRUD operations)'''
cursor = self._connect.cursor()
try:
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
self._connect.commit()
except Exception as e:
print(e)
finally:
cursor.close()
def close(self):
'''Close connection to MySQL database'''
if self._connect:
self._connect.close()
# DROP TABLES
drop_customer_table = "DROP TABLE IF EXISTS customers"
drop_product_table = "DROP TABLE IF EXISTS products"
drop_material_table = "DROP TABLE IF EXISTS materials"
drop_color_table = "DROP TABLE IF EXISTS colors"
drop_description_table = "DROP TABLE IF EXISTS desription"
drop_transactions_table = "DROP TABLE IF EXISTS transactions"
# CREATE TABLES
create_customer_table = """
CREATE TABLE IF NOT EXISTS customers
(
customer_id SERIAL PRIMARY KEY,
first_name VARCHAR(200),
last_name VARCHAR(200),
email_address VARCHAR(200),
dob DATE,
gender VARCHAR(100),
street_address VARCHAR(1000),
state VARCHAR(20),
date_created DATE,
create_source VARCHAR(100)
)
"""
create_product_table = """
CREATE TABLE IF NOT EXISTS products
(
product_id SERIAL PRIMARY KEY,
material_id INT,
color_id INT,
description_id INT,
pieces INT,
cost NUMERIC
)
"""
create_material_table = """
CREATE TABLE IF NOT EXISTS materials
(
material_id SERIAL PRIMARY KEY,
material_desc VARCHAR(1000)
)
"""
create_color_table = """
CREATE TABLE IF NOT EXISTS colors
(
color_id SERIAL PRIMARY KEY,
color_desc VARCHAR(1000)
)
"""
create_description_table = """
CREATE TABLE IF NOT EXISTS descriptions
(
description_id SERIAL PRIMARY KEY,
product_description VARCHAR(1000)
)
"""
create_transactions_table = """
CREATE TABLE IF NOT EXISTS transactions
(
id SERIAL PRIMARY KEY,
transaction_id INT,
transaction_date DATE,
customer_id INT,
product_id INT,
line_item_number SMALLINT,
sale_amount NUMERIC(20),
quantity SMALLINT,
sale_or_return INT
)
"""
# INSERT QUERIES
insert_customer_table = """
INSERT INTO customers (first_name, last_name, email_address,
dob, gender, street_address, state, date_created, create_source)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
insert_product_table = """
INSERT INTO products (material_id, color_id, description_id, pieces, cost)
VALUES (%s, %s, %s, %s, %s)
"""
insert_material_table = """
INSERT INTO materials (material_desc)
VALUES (%s)
"""
insert_color_table = """
INSERT INTO colors (color_desc)
VALUES (%s)
"""
insert_description_table = """
INSERT INTO descriptions (product_description)
VALUES (%s)
"""
insert_transactions_table = """
INSERT INTO transactions (transaction_id, transaction_date, customer_id, product_id, line_item_number, sale_amount, quantity, sale_or_return)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
# QUERY LISTS
drop_table_queries = [drop_customer_table, drop_product_table, drop_material_table, drop_color_table, drop_description_table, drop_transactions_table]
create_table_queries = [create_customer_table, create_product_table, create_material_table, create_color_table, create_description_table, create_transactions_table]
insert_table_queries = [insert_customer_table, insert_product_table, insert_material_table, insert_color_table, insert_description_table, insert_transactions_table]
import psycopg2
from sql_queries import create_table_queries, drop_table_queries
import configparser
ENCODING='utf-16'
def drop_tables(cur, conn):
"""
Loops through list of drop table queries
and drops tables listed in sql_queries.py
"""
for query in drop_table_queries:
cur.execute(query)
conn.commit()
def create_tables(cur, conn):
""""
Loops through list of create table queries
and creates tables listed in sql_queries.py
"""
for query in create_table_queries:
cur.execute(query)
conn.commit()
def main(args):
"""
Drops and creates tables as outlined in sql_queries.py
You will need the following information in your config.init file:
HOST
USER
PASSWORD
DATABASE
"""
config = configparser.ConfigParser()
config.read(args, encoding=ENCODING)
HOST=config.get('jj-furniture','host')
USER=config.get('jj-furniture','user')
PASSWORD=config.get('jj-furniture','password')
DATABASE=config.get('jj-furniture','database')
try:
conn = psycopg2.connect(host=HOST, user=USER, password=PASSWORD, dbname=DATABASE)
cur = conn.cursor()
#drop_tables(cur, conn)
create_tables(cur, conn)
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == "__main__":
import sys
main(sys.argv[1])
import sys
import os
import configparser
import pandas as pd
import numpy as np
import datetime as dt
from faker import Faker
from sqlalchemy import create_engine
# importing custom modules from upper directories
script_dir = os.path.abspath('')
mymodule_dir = os.path.join( script_dir, '..', '..', 'database', 'table_setup' )
sys.path.append( mymodule_dir )
import sql_queries
mymodule_dir = os.path.join( script_dir, '..', '..', 'database', 'connection' )
sys.path.append( mymodule_dir )
import postgres_db
# connect to database
encoding = 'utf-16'
config_file_path = os.path.join( script_dir, '..', '..', 'database', 'connection' )
config_file_path += '/config.ini'
db_conn = postgres_db.PostgresConnector(config_file_path = config_file_path,
section = 'jj-furniture',
encoding = encoding)
# insert data: colors
color_list = ['grey', 'dark grey', 'red', 'blue', 'green', 'black', 'brown', 'beige', 'white', 'yellow']
for color in color_list:
db_conn.update(sql_queries.insert_color_table, [color])
# insert data: descriptions
description_list = ['sofa', 'love-seat', 'dinette set', 'mattress', 'bedframe', 'nightstand', 'chest', 'dresser', 'chair', 'desk', 'desk chair', 'bookcase', 'coffee table', 'end table']
for description in description_list:
db_conn.update(sql_queries.insert_description_table, [description])
# insert data: materials
material_list = ['fabric', 'leather', 'wood', 'metal', 'plastic']
for material in material_list:
db_conn.update(sql_queries.insert_material_table, [material])
# insert data: products
def get_ids(table_name):
query = '''
select {}_id
from {}
'''
results = db_conn.queryall(query.format(table_name[:-1], table_name))
return results[results.columns[0]].values.tolist()
material_ids = get_ids('materials')
color_ids = get_ids('colors')
description_ids = get_ids('descriptions')
products = [
{
'material_id': np.random.choice(material_ids),
'color_id': np.random.choice(color_ids),
'description_id': np.random.choice(description_ids),
'pieces': np.random.randint(low = 1, high = 8, size = 1)[0],
'cost': np.round(np.random.lognormal(mean = np.log(100), sigma = 1, size = 1)[0],2) # make sure to give same cost to manufacturer
}
for x in range(200)
]
df_products = pd.DataFrame(products)
for i, row in df_products.iterrows():
if i % 100 == 0:
print(i)
db_conn.update(sql_queries.insert_product_table,
(row['material_id'],
row['color_id'],
row['description_id'],
row['pieces'],
row['cost'])
)
# insert data: customers
fake = Faker()
store_type = ['Store', 'Web']
num_customers = 10000
customers = [
{
'first_name': fake.first_name(),
'last_name': fake.last_name(),
'email_address': fake.email(),
'dob': fake.date_between(start_date ='-70y', end_date = '-18y'),
'gender': fake.profile()['sex'],
'street_address': fake.street_address(),
'state': fake.random_element(elements = ('NC', 'SC', 'TN', 'FL', 'GA', 'MS', 'AL')),
'date_created': fake.date_between(start_date ='-12y'),
'create_source': np.random.choice(store_type, p = [.8, .2]) # can give probability of selection to Store Type
}
for x in range(1, num_customers+1)
]
df_customers = pd.DataFrame(customers)
for i, row in df_customers.iterrows():
if i % num_customers*.25 == 0:
print(i)
db_conn.update(sql_queries.insert_customer_table,
(row['first_name'],
row['last_name'],
row['email_address'],
row['dob'],
row['gender'],
row['street_address'],
row['state'],
row['date_created'],
row['create_source'])
)
# insert data: transactions
def get_cost(product):
query = '''
select cost
from products
where product_id = {}
'''
return float(db_conn.queryall(query.format(product))['cost'][0])
customer_id_list = get_ids('customers')
product_id_list = get_ids('products')
num_trans = 25000
transaction_id = 1
years = list(range(2010, 2023))
weights = list(range(1, len(years) + 1))
weights = [x/sum(weights) for x in weights]
for trans in range(num_trans):
if transaction_id % num_trans*.25 == 0:
print(i)
num_items = np.random.randint(low = 1, high = 5)
year = np.random.choice(years, size = None, p = weights)
transaction_date = fake.date_between_dates(date_start = dt.datetime.strptime(str(year) + '-01-01', '%Y-%m-%d'), date_end = dt.datetime.strptime(str(year) + '-12-31', '%Y-%m-%d'))
customer_id = np.random.choice(customer_id_list).item()
for item in range(num_items):
product_id = np.random.choice(product_id_list).item()
line_item_number = item + 1
sale_or_return = 0
sale_amount = round(get_cost(product_id) * np.random.uniform(low = 1.43, high = 4), 2)
quantity = np.random.choice([1,2,3], p = [.8, .15, .05]).item()
db_conn.update(sql_queries.insert_transactions_table,
(transaction_id,
transaction_date,
customer_id,
product_id,
line_item_number,
sale_amount,
quantity,
sale_or_return))
transaction_id += 1
We achived our goal of creating business-like raw datasets and loading them into a database. The below image shows our tables: colors, descriptions, materials, products, and transactions, along with a subset example of our fake customers table. The next step would be to design a star-schema, create an ETL process, and load those results into a Redshift databse. This is what data scientists and analysts would use.
This is an overview of what we learned doing this project:
- what an RDS is
- how to set up an RDS in AWS with postgres
- connecting using DBeaver sql client
- better github practices
- olap vs oltp
- wrtiting
python scripts for automating table creation and inserting rows
Things we didn’t do but know need to happen:
- automating data quality checks
- slow-changing dimensions
- design fact and dimension tables (ETL)
- data security
- group permissions
- privacy data omissions
This was a fun project and we feel more in-tune with the challenges data engineers face. Mainly, what it takes to architect a data managment solution that is efficient, secure, and makes end-users happy. We are mindful of the fact that real-world data is a lot messier and larger than the watered-down example we used here.