Apply Schema on the lists from files¶
Let us understand how to apply schema while processing the data from the files.
- In many cases, data files might not contain the metadata such as column names, data types, etc.
- We might get the data metadata in the form of separate files. Also, it is common that metadata is available via Database Tables or REST based schema registries.
- We need to make sure that the metadata (schema) is applied on the data as part of data processing.
In this case data files are available under /data/retail_db, the json file with metadata is available under schemas/retail_db/retail.json.
In [1]:
!ls -ltr /data/retail_db
total 20128 drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 categories drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 customers -rw-rw-r-- 1 itversity itversity 1748 Mar 8 02:04 create_db_tables_pg.sql -rw-rw-r-- 1 itversity itversity 10303297 Mar 8 02:04 create_db.sql drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 departments drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 order_items -rw-rw-r-- 1 itversity itversity 10297372 Mar 8 02:04 load_db_tables_pg.sql drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 orders drwxrwxr-x 2 itversity itversity 24 Mar 8 02:04 products
In [2]:
!ls -ltr schemas/retail_db/retail.json
-rw-rw-r-- 1 itversity itversity 2083 Mar 8 02:04 schemas/retail_db/retail.json
In [3]:
!cat schemas/retail_db/retail.json
{ "categories": [ {"column_name": "category_id", "data_type": "int"}, {"column_name": "category_department_id", "data_type": "int"}, {"column_name": "category_name", "data_type": "str"} ], "customers": [ {"column_name": "customer_id", "data_type": "int"}, {"column_name": "customer_fname", "data_type": "str"}, {"column_name": "customer_lname", "data_type": "str"}, {"column_name": "customer_email", "data_type": "str"}, {"column_name": "customer_password", "data_type": "str"}, {"column_name": "customer_street", "data_type": "str"}, {"column_name": "customer_city", "data_type": "str"}, {"column_name": "customer_state", "data_type": "str"}, {"column_name": "customer_zipcode", "data_type": "str"} ], "departments": [ {"column_name": "department_id", "data_type": "int"}, {"column_name": "department_name", "data_type": "str"} ], "order_items": [ {"column_name": "order_item_id", "data_type": "int"}, {"column_name": "order_item_order_id", "data_type": "int"}, {"column_name": "order_item_product_id", "data_type": "int"}, {"column_name": "order_item_quantity", "data_type": "int"}, {"column_name": "order_item_subtotal", "data_type": "float"}, {"column_name": "order_item_product_price", "data_type": "float"} ], "orders": [ {"column_name": "order_id", "data_type": "int"}, {"column_name": "order_date", "data_type": "str"}, {"column_name": "order_customer_id", "data_type": "int"}, {"column_name": "order_status", "data_type": "int"} ], "products": [ {"column_name": "product_id", "data_type": "int"}, {"column_name": "product_category_id", "data_type": "int"}, {"column_name": "product_name", "data_type": "str"}, {"column_name": "product_description", "data_type": "str"}, {"column_name": "product_price", "data_type": "float"}, {"column_name": "product_image", "data_type": "str"} ] }
In [4]:
!ls -ltr /data/retail_db/orders
total 2932 -rw-rw-r-- 1 itversity itversity 2999944 Mar 8 02:04 part-00000
In [5]:
# Read orders data into list of strings
orders_path = '/data/retail_db/orders/part-00000'
orders = open(orders_path). \
read(). \
splitlines()
In [6]:
orders[:10]
Out[6]:
['1,2013-07-25 00:00:00.0,11599,CLOSED', '2,2013-07-25 00:00:00.0,256,PENDING_PAYMENT', '3,2013-07-25 00:00:00.0,12111,COMPLETE', '4,2013-07-25 00:00:00.0,8827,CLOSED', '5,2013-07-25 00:00:00.0,11318,COMPLETE', '6,2013-07-25 00:00:00.0,7130,COMPLETE', '7,2013-07-25 00:00:00.0,4530,COMPLETE', '8,2013-07-25 00:00:00.0,2911,PROCESSING', '9,2013-07-25 00:00:00.0,5657,PENDING_PAYMENT', '10,2013-07-25 00:00:00.0,5648,PENDING_PAYMENT']
In [7]:
# Load schemas into dict using json
import json
retail_schemas = json.load(open('schemas/retail_db/retail.json'))
In [8]:
retail_schemas
Out[8]:
{'categories': [{'column_name': 'category_id', 'data_type': 'int'}, {'column_name': 'category_department_id', 'data_type': 'int'}, {'column_name': 'category_name', 'data_type': 'str'}], 'customers': [{'column_name': 'customer_id', 'data_type': 'int'}, {'column_name': 'customer_fname', 'data_type': 'str'}, {'column_name': 'customer_lname', 'data_type': 'str'}, {'column_name': 'customer_email', 'data_type': 'str'}, {'column_name': 'customer_password', 'data_type': 'str'}, {'column_name': 'customer_street', 'data_type': 'str'}, {'column_name': 'customer_city', 'data_type': 'str'}, {'column_name': 'customer_state', 'data_type': 'str'}, {'column_name': 'customer_zipcode', 'data_type': 'str'}], 'departments': [{'column_name': 'department_id', 'data_type': 'int'}, {'column_name': 'department_name', 'data_type': 'str'}], 'order_items': [{'column_name': 'order_item_id', 'data_type': 'int'}, {'column_name': 'order_item_order_id', 'data_type': 'int'}, {'column_name': 'order_item_product_id', 'data_type': 'int'}, {'column_name': 'order_item_quantity', 'data_type': 'int'}, {'column_name': 'order_item_subtotal', 'data_type': 'float'}, {'column_name': 'order_item_product_price', 'data_type': 'float'}], 'orders': [{'column_name': 'order_id', 'data_type': 'int'}, {'column_name': 'order_date', 'data_type': 'str'}, {'column_name': 'order_customer_id', 'data_type': 'int'}, {'column_name': 'order_status', 'data_type': 'int'}], 'products': [{'column_name': 'product_id', 'data_type': 'int'}, {'column_name': 'product_category_id', 'data_type': 'int'}, {'column_name': 'product_name', 'data_type': 'str'}, {'column_name': 'product_description', 'data_type': 'str'}, {'column_name': 'product_price', 'data_type': 'float'}, {'column_name': 'product_image', 'data_type': 'str'}]}
In [9]:
# Get the schema for relevant data set
retail_schemas['orders']
Out[9]:
[{'column_name': 'order_id', 'data_type': 'int'}, {'column_name': 'order_date', 'data_type': 'str'}, {'column_name': 'order_customer_id', 'data_type': 'int'}, {'column_name': 'order_status', 'data_type': 'int'}]
In [10]:
# Fetch the column names
columns = list(map(lambda col: col['column_name'], retail_schemas['orders']))
In [11]:
columns
Out[11]:
['order_id', 'order_date', 'order_customer_id', 'order_status']
In [12]:
import csv
In [13]:
csv.DictReader?
Init signature: csv.DictReader( f, fieldnames=None, restkey=None, restval=None, dialect='excel', *args, **kwds, ) Docstring: <no docstring> File: /usr/local/lib/python3.8/csv.py Type: type Subclasses:
In [14]:
# Create DictReader object using list of strings and column names
# We will get list of dicts. The keys in the dicts are from columns
csv_reader = csv.DictReader(open(orders_path), fieldnames=columns)
In [15]:
csv_reader
Out[15]:
<csv.DictReader at 0x7f43b7e77070>
In [16]:
list(csv_reader)[:10]
Out[16]:
[{'order_id': '1', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '11599', 'order_status': 'CLOSED'}, {'order_id': '2', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '256', 'order_status': 'PENDING_PAYMENT'}, {'order_id': '3', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '12111', 'order_status': 'COMPLETE'}, {'order_id': '4', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '8827', 'order_status': 'CLOSED'}, {'order_id': '5', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '11318', 'order_status': 'COMPLETE'}, {'order_id': '6', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '7130', 'order_status': 'COMPLETE'}, {'order_id': '7', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '4530', 'order_status': 'COMPLETE'}, {'order_id': '8', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '2911', 'order_status': 'PROCESSING'}, {'order_id': '9', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '5657', 'order_status': 'PENDING_PAYMENT'}, {'order_id': '10', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '5648', 'order_status': 'PENDING_PAYMENT'}]
In [17]:
folder_name = '/data/retail_db/orders'
In [18]:
import os
file_names = os.listdir(folder_name)
In [19]:
file_names
Out[19]:
['part-00000']
In [20]:
l1 = [1, 2, 3]
In [21]:
l2 = [4, 5]
In [22]:
l1 + l2
Out[22]:
[1, 2, 3, 4, 5]
In [23]:
import os
import json
import csv
def get_dicts(base_folder, data_set_name, schema_file):
file_names = os.listdir(f'{base_folder}/{data_set_name}')
retail_schemas = json.load(open(schema_file))
columns = list(map(lambda col: col['column_name'], retail_schemas[data_set_name]))
data = []
for file_name in file_names:
file_path = f'{base_folder}/{data_set_name}/{file_name}'
csv_reader = csv.DictReader(open(file_path), fieldnames=columns)
data += list(csv_reader)
return data
In [24]:
data = get_dicts('/data/retail_db', 'orders', 'schemas/retail_db/retail.json')
In [25]:
len(data)
Out[25]:
68883
In [26]:
data[:10]
Out[26]:
[{'order_id': '1', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '11599', 'order_status': 'CLOSED'}, {'order_id': '2', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '256', 'order_status': 'PENDING_PAYMENT'}, {'order_id': '3', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '12111', 'order_status': 'COMPLETE'}, {'order_id': '4', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '8827', 'order_status': 'CLOSED'}, {'order_id': '5', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '11318', 'order_status': 'COMPLETE'}, {'order_id': '6', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '7130', 'order_status': 'COMPLETE'}, {'order_id': '7', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '4530', 'order_status': 'COMPLETE'}, {'order_id': '8', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '2911', 'order_status': 'PROCESSING'}, {'order_id': '9', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '5657', 'order_status': 'PENDING_PAYMENT'}, {'order_id': '10', 'order_date': '2013-07-25 00:00:00.0', 'order_customer_id': '5648', 'order_status': 'PENDING_PAYMENT'}]
In [27]:
data = get_dicts('/data/retail_db', 'order_items', 'schemas/retail_db/retail.json')
In [28]:
len(data)
Out[28]:
172198
In [29]:
data[:10]
Out[29]:
[{'order_item_id': '1', 'order_item_order_id': '1', 'order_item_product_id': '957', 'order_item_quantity': '1', 'order_item_subtotal': '299.98', 'order_item_product_price': '299.98'}, {'order_item_id': '2', 'order_item_order_id': '2', 'order_item_product_id': '1073', 'order_item_quantity': '1', 'order_item_subtotal': '199.99', 'order_item_product_price': '199.99'}, {'order_item_id': '3', 'order_item_order_id': '2', 'order_item_product_id': '502', 'order_item_quantity': '5', 'order_item_subtotal': '250.0', 'order_item_product_price': '50.0'}, {'order_item_id': '4', 'order_item_order_id': '2', 'order_item_product_id': '403', 'order_item_quantity': '1', 'order_item_subtotal': '129.99', 'order_item_product_price': '129.99'}, {'order_item_id': '5', 'order_item_order_id': '4', 'order_item_product_id': '897', 'order_item_quantity': '2', 'order_item_subtotal': '49.98', 'order_item_product_price': '24.99'}, {'order_item_id': '6', 'order_item_order_id': '4', 'order_item_product_id': '365', 'order_item_quantity': '5', 'order_item_subtotal': '299.95', 'order_item_product_price': '59.99'}, {'order_item_id': '7', 'order_item_order_id': '4', 'order_item_product_id': '502', 'order_item_quantity': '3', 'order_item_subtotal': '150.0', 'order_item_product_price': '50.0'}, {'order_item_id': '8', 'order_item_order_id': '4', 'order_item_product_id': '1014', 'order_item_quantity': '4', 'order_item_subtotal': '199.92', 'order_item_product_price': '49.98'}, {'order_item_id': '9', 'order_item_order_id': '5', 'order_item_product_id': '957', 'order_item_quantity': '1', 'order_item_subtotal': '299.98', 'order_item_product_price': '299.98'}, {'order_item_id': '10', 'order_item_order_id': '5', 'order_item_product_id': '365', 'order_item_quantity': '5', 'order_item_subtotal': '299.95', 'order_item_product_price': '59.99'}]