[
Batch Loading of Data¶
Let us understand how we should take care of loading data in batches. We will perform load using multiple approaches to understand which one is better.
- Approach 1: Insert and commit each record. Whenever there is a commit in database, there is considerable amount of overhead.
- Approach 2: Insert one record at a time, but commit at the end.
- Approach 3: Insert all records at once and commit at the end.
- Approach 4: Insert records in chunks or batches and commit per chunk or batch.
We should follow the fourth approach while dealing with huge amounts of data. It will facilitate us to restartability or recoverability.
In [15]:
%run 02_function_get_database_connection.ipynb
In [16]:
def get_cursor(connection):
return connection.cursor()
In [17]:
%run 06_reading_data_from_file.ipynb
In [27]:
orders.head(3)
Out[27]:
order_id | order_date | order_customer_id | order_status | |
---|---|---|---|---|
0 | 1 | 2013-07-25 00:00:00.0 | 11599 | CLOSED |
1 | 2 | 2013-07-25 00:00:00.0 | 256 | PENDING_PAYMENT |
2 | 3 | 2013-07-25 00:00:00.0 | 12111 | COMPLETE |
In [28]:
order_items.head(3)
Out[28]:
order_item_id | order_item_order_id | order_item_product_id | order_item_quantity | order_item_subtotal | order_item_product_price | |
---|---|---|---|---|---|---|
0 | 1 | 1 | 957 | 1 | 299.98 | 299.98 |
1 | 2 | 2 | 1073 | 1 | 199.99 | 199.99 |
2 | 3 | 2 | 502 | 5 | 250.00 | 50.00 |
In [29]:
query = ("""INSERT INTO orders
(order_id, order_date, order_customer_id, order_status)
VALUES
(%s, %s, %s, %s)""")
{note}
Inserting and committing one row in each iteration. Commit is quite expensive as it result in database checkpoint.
In [30]:
def load_orders(connection, cursor, query, data):
for rec in data:
cursor.execute(query, rec)
connection.commit()
In [31]:
cursor = get_cursor(retail_connection)
In [23]:
%%time
load_orders(retail_connection, cursor, query, orders.values.tolist()[:10000])
--------------------------------------------------------------------------- UndefinedTable Traceback (most recent call last) File <timed eval>:1, in <module> Input In [21], in load_orders(connection, cursor, query, data) 1 def load_orders(connection, cursor, query, data): 2 for rec in data: ----> 3 cursor.execute(query, rec) 4 connection.commit() UndefinedTable: relation "orders" does not exist LINE 1: INSERT INTO orders ^
In [10]:
cursor.execute('TRUNCATE TABLE orders')
--------------------------------------------------------------------------- InFailedSqlTransaction Traceback (most recent call last) Input In [10], in <cell line: 1>() ----> 1 cursor.execute('TRUNCATE TABLE orders') InFailedSqlTransaction: current transaction is aborted, commands ignored until end of transaction block
In [24]:
retail_connection.commit()
{note}
Inserting one row at a time but committing at the end. Even though it is much faster than previous approach, it is transferring one record at a time between Python Engine and Database Engine.
We can further tune by leveraging batch insert.
In [32]:
def load_orders(connection, cursor, query, data):
for rec in data:
cursor.execute(query, rec)
connection.commit()
In [33]:
cursor = get_cursor(retail_connection)
In [34]:
%%time
# Inserting all orders
load_orders(retail_connection, cursor, query, orders.values.tolist())
--------------------------------------------------------------------------- UndefinedTable Traceback (most recent call last) File <timed eval>:2, in <module> Input In [32], in load_orders(connection, cursor, query, data) 1 def load_orders(connection, cursor, query, data): 2 for rec in data: ----> 3 cursor.execute(query, rec) 4 connection.commit() UndefinedTable: relation "orders" does not exist LINE 1: INSERT INTO orders ^
In [ ]:
cursor.execute('TRUNCATE TABLE orders')
{note}
All the records will be inserted as part of one batch insert operation. If there is lot of data to be inserted, then this might start running into issues such as out of memory.
Also, if the job fails in the middle then all the data that is transferred thus far will be lost. Hence it is better to batch with manageable size and then insert as well as commit.
In [ ]:
def load_orders(connection, cursor, query, data):
cursor.executemany(query, data)
connection.commit()
In [ ]:
cursor = get_cursor(retail_connection)
In [ ]:
%%time
# Inserting all orders
load_orders(retail_connection, cursor, query, orders.values.tolist())
{note}
You might not see significant difference in performance as our database is running in the same server from where the code is running to insert the data.
In [ ]:
cursor.execute('TRUNCATE TABLE orders')
In [ ]:
len(orders.values.tolist())
In [ ]:
list(range(0, len(orders.values.tolist()), 10000))
In [ ]:
def load_orders(connection, cursor, query, data, batch_size=10000):
for i in range(0, len(data), batch_size):
cursor.executemany(query, data[i:i+batch_size])
connection.commit()
In [ ]:
cursor = get_cursor(retail_connection)
In [ ]:
%%time
# Inserting all orders
load_orders(retail_connection, cursor, query, orders.values.tolist())
In [ ]:
%load_ext sql
In [ ]:
%env DATABASE_URL=postgresql://itversity_retail_user:retail_password@localhost:5432/itversity_retail_db
In [ ]:
%%sql
SELECT count(1) FROM orders
]