[
Apply Schema on the dataframe from files¶
Let us understand how to apply schema while creating the data frame.
- In many cases, data files might not contain the metadata such as column names, data types, etc.
- We might get the data metadata in the form of separate files. Also, it is common that metadata is available via Database Tables or REST based schema registries.
- We need to make sure that the metadata (schema) is applied on the data as part of data processing.
In this case data files are available under /data/retail_db, the json file with metadata is available under schemas/retail_db/retail.json.
In [1]:
!ls -ltr /data/retail_db
total 20128 drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 categories drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 customers -rw-rw-r-- 1 itversity itversity 1748 Mar 8 02:04 create_db_tables_pg.sql -rw-rw-r-- 1 itversity itversity 10303297 Mar 8 02:04 create_db.sql drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 departments drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 order_items -rw-rw-r-- 1 itversity itversity 10297372 Mar 8 02:04 load_db_tables_pg.sql drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 orders drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 products
In [2]:
!ls -ltr schemas/retail_db/retail.json
-rw-rw-r-- 1 itversity itversity 2083 Mar 8 02:04 schemas/retail_db/retail.json
In [3]:
!cat schemas/retail_db/retail.json
{ "categories": [ {"column_name": "category_id", "data_type": "int"}, {"column_name": "category_department_id", "data_type": "int"}, {"column_name": "category_name", "data_type": "str"} ], "customers": [ {"column_name": "customer_id", "data_type": "int"}, {"column_name": "customer_fname", "data_type": "str"}, {"column_name": "customer_lname", "data_type": "str"}, {"column_name": "customer_email", "data_type": "str"}, {"column_name": "customer_password", "data_type": "str"}, {"column_name": "customer_street", "data_type": "str"}, {"column_name": "customer_city", "data_type": "str"}, {"column_name": "customer_state", "data_type": "str"}, {"column_name": "customer_zipcode", "data_type": "str"} ], "departments": [ {"column_name": "department_id", "data_type": "int"}, {"column_name": "department_name", "data_type": "str"} ], "order_items": [ {"column_name": "order_item_id", "data_type": "int"}, {"column_name": "order_item_order_id", "data_type": "int"}, {"column_name": "order_item_product_id", "data_type": "int"}, {"column_name": "order_item_quantity", "data_type": "int"}, {"column_name": "order_item_subtotal", "data_type": "float"}, {"column_name": "order_item_product_price", "data_type": "float"} ], "orders": [ {"column_name": "order_id", "data_type": "int"}, {"column_name": "order_date", "data_type": "str"}, {"column_name": "order_customer_id", "data_type": "int"}, {"column_name": "order_status", "data_type": "int"} ], "products": [ {"column_name": "product_id", "data_type": "int"}, {"column_name": "product_category_id", "data_type": "int"}, {"column_name": "product_name", "data_type": "str"}, {"column_name": "product_description", "data_type": "str"}, {"column_name": "product_price", "data_type": "float"}, {"column_name": "product_image", "data_type": "str"} ] }
In [4]:
!ls -ltr /data/retail_db/orders
total 2932 -rw-rw-r-- 1 itversity itversity 2999944 Mar 8 02:04 part-00000
In [5]:
# Read orders data into list of strings
orders_path = '/data/retail_db/orders/part-00000'
orders = open(orders_path). \
read(). \
splitlines()
In [6]:
orders[:10]
Out[6]:
['1,2013-07-25 00:00:00.0,11599,CLOSED', '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT', '3,2013-07-25 00:00:00.0,12111,COMPLETE', '4,2013-07-25 00:00:00.0,8827,CLOSED', '5,2013-07-25 00:00:00.0,11318,COMPLETE', '6,2013-07-25 00:00:00.0,7130,COMPLETE', '7,2013-07-25 00:00:00.0,4530,COMPLETE', '8,2013-07-25 00:00:00.0,2911,PROCESSING', '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT', '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT']
In [8]:
# Load schemas into dict using json
import json
retail_schemas = json.load(open('schemas/retail_db/retail.json'))
In [9]:
retail_schemas
Out[9]:
{'categories': [{'column_name': 'category_id', 'data_type': 'int'}, {'column_name': 'category_department_id', 'data_type': 'int'}, {'column_name': 'category_name', 'data_type': 'str'}], 'customers': [{'column_name': 'customer_id', 'data_type': 'int'}, {'column_name': 'customer_fname', 'data_type': 'str'}, {'column_name': 'customer_lname', 'data_type': 'str'}, {'column_name': 'customer_email', 'data_type': 'str'}, {'column_name': 'customer_password', 'data_type': 'str'}, {'column_name': 'customer_street', 'data_type': 'str'}, {'column_name': 'customer_city', 'data_type': 'str'}, {'column_name': 'customer_state', 'data_type': 'str'}, {'column_name': 'customer_zipcode', 'data_type': 'str'}], 'departments': [{'column_name': 'department_id', 'data_type': 'int'}, {'column_name': 'department_name', 'data_type': 'str'}], 'order_items': [{'column_name': 'order_item_id', 'data_type': 'int'}, {'column_name': 'order_item_order_id', 'data_type': 'int'}, {'column_name': 'order_item_product_id', 'data_type': 'int'}, {'column_name': 'order_item_quantity', 'data_type': 'int'}, {'column_name': 'order_item_subtotal', 'data_type': 'float'}, {'column_name': 'order_item_product_price', 'data_type': 'float'}], 'orders': [{'column_name': 'order_id', 'data_type': 'int'}, {'column_name': 'order_date', 'data_type': 'str'}, {'column_name': 'order_customer_id', 'data_type': 'int'}, {'column_name': 'order_status', 'data_type': 'int'}], 'products': [{'column_name': 'product_id', 'data_type': 'int'}, {'column_name': 'product_category_id', 'data_type': 'int'}, {'column_name': 'product_name', 'data_type': 'str'}, {'column_name': 'product_description', 'data_type': 'str'}, {'column_name': 'product_price', 'data_type': 'float'}, {'column_name': 'product_image', 'data_type': 'str'}]}
In [10]:
# Get the schema for relevant data set
retail_schemas['orders']
Out[10]:
[{'column_name': 'order_id', 'data_type': 'int'}, {'column_name': 'order_date', 'data_type': 'str'}, {'column_name': 'order_customer_id', 'data_type': 'int'}, {'column_name': 'order_status', 'data_type': 'int'}]
In [11]:
# Fetch the column names
columns = list(map(lambda col: col['column_name'], retail_schemas['orders']))
In [12]:
columns
Out[12]:
['order_id', 'order_date', 'order_customer_id', 'order_status']
In [13]:
import pandas as pd
In [14]:
pd.read_csv('/data/retail_db/orders/part-00000', names=columns)
Out[14]:
order_id | order_date | order_customer_id | order_status | |
---|---|---|---|---|
0 | 1 | 2013-07-25 00:00:00.0 | 11599 | CLOSED |
1 | 2 | 2013-07-25 00:00:00.0 | 256 | PENDING_PAYMENT |
2 | 3 | 2013-07-25 00:00:00.0 | 12111 | COMPLETE |
3 | 4 | 2013-07-25 00:00:00.0 | 8827 | CLOSED |
4 | 5 | 2013-07-25 00:00:00.0 | 11318 | COMPLETE |
… | … | … | … | … |
68878 | 68879 | 2014-07-09 00:00:00.0 | 778 | COMPLETE |
68879 | 68880 | 2014-07-13 00:00:00.0 | 1117 | COMPLETE |
68880 | 68881 | 2014-07-19 00:00:00.0 | 2518 | PENDING_PAYMENT |
68881 | 68882 | 2014-07-22 00:00:00.0 | 10000 | ON_HOLD |
68882 | 68883 | 2014-07-23 00:00:00.0 | 5533 | COMPLETE |
68883 rows × 4 columns
In [15]:
pd.DataFrame(map(lambda rec: rec.split(','), orders), columns=columns)
Out[15]:
order_id | order_date | order_customer_id | order_status | |
---|---|---|---|---|
0 | 1 | 2013-07-25 00:00:00.0 | 11599 | CLOSED |
1 | 2 | 2013-07-25 00:00:00.0 | 256 | PENDING_PAYMENT |
2 | 3 | 2013-07-25 00:00:00.0 | 12111 | COMPLETE |
3 | 4 | 2013-07-25 00:00:00.0 | 8827 | CLOSED |
4 | 5 | 2013-07-25 00:00:00.0 | 11318 | COMPLETE |
… | … | … | … | … |
68878 | 68879 | 2014-07-09 00:00:00.0 | 778 | COMPLETE |
68879 | 68880 | 2014-07-13 00:00:00.0 | 1117 | COMPLETE |
68880 | 68881 | 2014-07-19 00:00:00.0 | 2518 | PENDING_PAYMENT |
68881 | 68882 | 2014-07-22 00:00:00.0 | 10000 | ON_HOLD |
68882 | 68883 | 2014-07-23 00:00:00.0 | 5533 | COMPLETE |
68883 rows × 4 columns
In [16]:
import os
import json
import csv
import pandas as pd
def get_df(base_folder, data_set_name, schema_file):
file_names = os.listdir(f'{base_folder}/{data_set_name}')
retail_schemas = json.load(open(schema_file))
columns = list(map(lambda col: col['column_name'], retail_schemas[data_set_name]))
data = []
for file_name in file_names:
file_path = f'{base_folder}/{data_set_name}/{file_name}'
raw_data = open(file_path)
data += list(raw_data)
return pd.DataFrame(map(lambda rec: rec.split(','), data), columns=columns)
In [20]:
orders = get_df('/data/retail_db', 'orders', 'schemas/retail_db/retail.json')
In [21]:
order_items = get_df('/data/retail_db', 'order_items', 'schemas/retail_db/retail.json')
In [22]:
customers = get_df('/data/retail_db', 'customers', 'schemas/retail_db/retail.json')
]