Managing MySQL databases can often be costly and time-consuming. If you’re working with databases containing static data, an effective alternative is to convert your database tables into individual Parquet files. By storing these files and leveraging Python for direct querying, you’ll maintain your existing querying capabilities and benefit from improved query performance, cost reduction, and serverless infrastructure. In this blog post, we’ll guide you through the process of crafting a Python function that allows you to effortlessly export specific tables from your database into Parquet files.
Step 1: Import necessary libraries
To get started, we’ll import the necessary libraries for our function.
# Standard library imports import json import os import subprocess import sys import time import warnings # Threading and concurrency import concurrent.futures # Date and time import datetime as dt import pytz # Data processing and database import numpy as np import pandas as pd import pyarrow import pyarrow.parquet as pq import sqlalchemy from sqlalchemy import create_engine # Timezone setup central = pytz.timezone('US/Central') # Warnings configuration warnings.filterwarnings("ignore", category=FutureWarning, module="pyarrow")
Step 2: Create a Credentials File and a MySQL Connection
The first code snippet has the format for a credentials file. I recommend saving this as a JSON file separately from your code so that your credentials are not exposed in your code.
The code snippet sets the credentials as an environmental variable and then creates a function that establishes a connection engine. This engine will be used to read the data from the MySQL database into a Pandas DataFrame.
The base_folder
variable is where the parquet files will be saved.
#Example format to store credetials in seperate file { "host":"127.0.0.1", "username":"user", "password":"password", "port":3306, "database":"database", # "ssl_ca":"path_to_file" #Optional if ssl }
#Import the creds as an environmental variable os.environ["CREDS_PATH"] = "/path_to_file/creds.json" base_folder = "path_to_folder" #Path to save parquet files def create_sqlalchemy_engine(): creds_path = os.environ.get("CREDS_PATH") with open(creds_path, 'r') as file: creds = json.load(file) # Construct the connection string (URI) for SQLAlchemy connection_string = ( f"mysql+pymysql://{creds['username']}:{creds['password']}@" f"{creds['host']}:{creds['port']}/{creds['database']}" ) if 'ssl_ca' in creds: ssl_args = {'ssl_ca': creds['ssl_ca']} engine = create_engine(connection_string, connect_args=ssl_args) else: engine = create_engine(connection_string) return engine
Step 3: Define Functions to Read the Database
The first function allows you to enter a SQL query as a string and receive the results in a Pandas DataFrame. The advantage of using the read_sql
function from Pandas is that it ensures the integrity of the database schema is maintained.
def run_sql_query(query, mysql_connection, params=None): """ Execute an SQL query and return the results as a pandas DataFrame. :param query: SQL query string :param mysql_connection: Active pymysql connection :param params: Optional parameters for the query :return: Resulting data as a pandas DataFrame """ try: # Use pandas.read_sql() to fetch data directly into a DataFrame df = pd.read_sql(query, mysql_connection, params=params) except Exception as e: print(f"Error executing query: {e}") return None return df
This next function is a little helper function to help create a list of every table in the database to iterate through later.
def retrieve_all_tables(mysql_connection): # Use pandas.read_sql() to directly fetch data into a DataFrame query = "SHOW TABLES" df = pd.read_sql(query, mysql_connection) # Assuming the DataFrame has a single column with table names tables_lst = df.iloc[:, 0].tolist() return tables_lst
Finally, the main function here will read the table data incrementally and store it in a Parquet file format. Some things to point out:
- The data is streamed to parquet by leveraging the
LIMIT
andOFFSET
. This way large tables will not cause an out-of-memory issue with Pandas. Changing thechunk_size
parameter will also reduce the size of data held in memory. - The data is stored in a directory as the name of the file followed by parquet. The individual data files are in that directory. You can direct pandas to the file directory to read all the data. For example,
pd.read_parquet(table.parquet)
. - The
set_dtype_for_null_columns
function will help explicitly set the column types for columns that are all null. Null columns get set as floats which can cause a problem if the data later in the table is a different data type. - The function dynamically ends once it reads all the data in the table.
def set_dtype_for_null_columns(mysql_connection, df, table_name): """ This function explicitly sets the data type when the column is all null. """ type_map = { 'bigint': 'Int64', 'datetime': 'datetime64[ns]', 'tinyint': 'Int8', 'varchar': 'str', 'smallint': 'Int16', 'int': 'Int32', 'float': 'float32', 'text': 'str', 'enum': 'str', 'char': 'str', 'longtext': 'str', 'mediumtext': 'str', 'timestamp': 'datetime64[ns]', 'double': 'float64', 'mediumint': 'Int32', 'bit':'object' } df_sample = df.head(1000) #pandas will infer schema based on a sample of the data so if the column is mostly nulls then the schema can still be interpreted incorrectly all_null_columns = df_sample.columns[df_sample.isnull().all()].tolist() if not all_null_columns: return df # Fetching column types for all null columns in one query col_placeholders = ', '.join(['%s'] * len(all_null_columns)) query = f""" SELECT COLUMN_NAME, DATA_TYPE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = 'brazenconnect' AND TABLE_NAME = %s AND COLUMN_NAME IN ({col_placeholders}) """ params = tuple([table_name] + all_null_columns) information_schema_df = run_sql_query(query, mysql_connection, params) # Mapping column names to their corresponding data types column_to_dtype_map = information_schema_df.set_index('COLUMN_NAME')['DATA_TYPE'].map(type_map).to_dict() for col, dtype in column_to_dtype_map.items(): df[col] = df[col].astype(dtype) return df def mysql_to_parquet(table_name, mysql_connection, base_folder, chunk_size=500_000, offset_start=0, max_iterations=None): # Create the directory for the table if it doesn't exist table_folder = os.path.join(base_folder, f"{table_name}.parquet") if not os.path.exists(table_folder): os.makedirs(table_folder) # Fetch the table schema once schema_query = f'DESCRIBE {table_name}' schema_df = pd.read_sql(schema_query, mysql_connection) offset = offset_start print(f"Starting to process {table_name} at {dt.datetime.now(central).strftime('%Y-%m-%d %H:%M:%S')}.") # Row count to monitor progress row_count_query = f"SELECT COUNT(*) FROM {table_name}" row_count_df = pd.read_sql(row_count_query, mysql_connection) row_count = row_count_df.iat[0, 0] iteration_count = 0 while True: # If we've reached max_iterations, break out of the loop if max_iterations is not None and iteration_count >= max_iterations: break start_time = time.time() # Construct the SQL query with LIMIT and OFFSET query = f"SELECT * FROM {table_name} LIMIT {chunk_size} OFFSET {offset}" # Read data into a Pandas DataFrame df_chunk = pd.read_sql(query, mysql_connection) # If no data is returned, we're done if df_chunk.empty: break # Convert the columns (assuming you have this function from the previous version) df_chunk = set_dtype_for_null_columns(mysql_connection, df_chunk, table_name) # Convert bit(1) columns to boolean bit_cols = schema_df[schema_df['Type'] == 'bit(1)']['Field'].tolist() for col in bit_cols: if col in df_chunk.columns: df_chunk[col] = df_chunk[col].apply(lambda x: x == b'\x01') # Save the chunk to a separate Parquet file parquet_chunk_path = os.path.join(table_folder, f"{table_name}_{offset}_to_{offset + chunk_size}.parquet") df_chunk.to_parquet(parquet_chunk_path, index=False) end_time = time.time() elapsed_time = end_time - start_time progress = min((offset + chunk_size) / row_count * 100, 100) print(f"Finished processing {table_name} from {offset:,} to {offset + chunk_size:,} at {dt.datetime.now(central).strftime('%Y-%m-%d %H:%M:%S')}. The progress is {progress:.2f}% complete. Took {elapsed_time:.2f} seconds to process.") # Increment the offset offset += chunk_size # Increment the iteration count iteration_count += 1
Step 4: Run the Function
Now you can start exporting data! The first function saves a list of every table in the database. The second function starts iterating through each table and saving the data. The progress will be printed as you go along.
#The code above can be saved in a seperate python folder in the same directory and imported from mysql_to_parquet import * mysql_connection = create_sqlalchemy_engine() #Open the connection # mysql_connection.close() #Close the connection #List of mysql tables tables_lst = retrieve_all_tables() print(len(tables_lst)) for table in tables_lst: mysql_to_parquet(table)
The above function iterates through each table one by one. To speed things up, here is how you can run each loop item concurrently:
import concurrent.futures # Using ThreadPoolExecutor with concurrent.futures.ThreadPoolExecutor() as executor: results = list(executor.map(mysql_to_parquet, tables_lst))
Step 5: Check the Files
Lastly, a simple validation step involves comparing the row count of the database table with the corresponding row count in the respective Parquet file to ensure data consistency.
If the database is static, then the data will match exactly. If your database is active, you will see differences in the tables, but the idea is that it is a way to programmatically check the results to make sure there weren’t any issues iterating through the tables.
def get_table_count(table): temp_df = run_sql_query(f'SELECT "{table}" as table_name, COUNT(*) as row_count FROM {table}', mysql_connection) return temp_df # Using ThreadPoolExecutor to run queries in parallel with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: # You can adjust max_workers based on your needs results = list(executor.map(get_table_count, tables_lst)) # Concatenate the results into a single DataFrame mysql_row_counts_df = pd.concat(results, ignore_index=True) #Get the row counts for the parquet files parquet_row_counts = {} for table_name in tables_lst: directory = os.path.join(base_folder, f"{table_name}.parquet") files = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.parquet')] parquet_row_count = sum(pq.read_metadata(f).num_rows for f in files) parquet_row_counts[table_name] = parquet_row_count parquet_row_counts_df = pd.DataFrame.from_dict(parquet_row_counts, orient='index').reset_index().rename(columns={'index':'table_name',0:'row_count'}) #Merge the DataFrames merged_df = pd.merge(mysql_row_counts_df, parquet_row_counts_df, on='table_name', how='outer', suffixes=('_mysql', '_parquet')) # merged_df['row_count_bq'] = pd.to_numeric(merged_df['row_count_bq'].fillna(0)) merged_df #Find the difference in row counts merged_df['row_count_diff'] = merged_df['row_count_mysql'] - merged_df['row_count_parquet'] #View the results pd.set_option('display.max_rows', None) merged_df.sort_values('row_count_diff')
Google Colab Notebook
A Google Colab Notebook with the code can be found here.
Final Thoughts
Check out more Python tricks in this Colab Notebook or in my recent Python Posts.
Thanks for reading!