SQL Strikes Back

Competing goods and far too many observations.

sql
postgres
parquet
intermediate
Author

Daniel Kick

Published

September 5, 2024

For those that want to jump straight to the resources here are the files discussed below:

Previously I generated an absolute boat load of simulated data for modeling. Even after taking steps to remove unneeded values1 and storing as a file type with compression (parquet) the full set is still a whopping 285G.

We have several competing goods here that we’d like to balance. Ideally the data would be

As detailed in “Worse is better case study 2” our current solution sacrifices availability of all the data and (initially) query speed to get a set that fits in memory and can be moved to an HPC. Ultimately, we want to add new simulated results to this set while training which isn’t possible.

The solution of course is to bite the bullet and set up a database then write a new dataloader to get minibatches from the database. This means we’ll be moving batches on/off the gpu but we can access any records and need not read in files over and over.

The Plan:

The target workflow for this is to

  1. Initialize a database
  2. Read in metadata parquet files as separate tables (genotypes, ids)
  3. Read in each results parquet file, add the file name as a field, and append its values to a table (results)

Easy. How long could it take?

Learning PostgreSQL On The Way Down:

I considered using duckdb for it’s speed and simplicity but ultimately elected to use PostgreSQL since it’s been around longer (1996) and boasts limits way beyond what I need. Please don’t take the below as best practice or even recommendations per se as this is the first non-SQLite database I’ve administered. This is just notes on the stumbling blocks I encountered. The best thing one can do is read the official PostgreSQL tutorial. Several times I searched for the solution to a problem just to find it covered a few pages later.

Installation (in this case on ubuntu via apt) was easy but database creation takes some doing. PostgreSQL defines accounts that are separate from the host system. This includes a special account postgres with root permisions that is initially available. Using sudo -u postgres <cmd> one can run commands as if this account existed on the host machine.

We use this account to create the database and then login as root.

sudo -u postgres createdb apsimxsim
sudo -u postgres psql

Next we create a non-root account and provide permissions2 for the apsimxsim database and public schema.

postgres=# CREATE ROLE loremipsum LOGIN PASSWORD 'TotallyRealPassword';
CREATE ROLE
postgres=# ALTER ROLE loremipsum CREATEDB;
ALTER ROLE
postgres=# GRANT ALL PRIVILEGES ON DATABASE apsimxsim TO loremipsum;
GRANT
apsimxsim=# GRANT ALL ON SCHEMA public TO loremipsum;
GRANT

We can check permissions using \du (note that psql specific commands begin with \ e.g., \q).

postgres=# \du
                             List of roles
 Role name |                         Attributes
-----------+------------------------------------------------------------
 loremipsum     | Create DB
 postgres  | Superuser, Create role, Create DB, Replication, Bypass RLS

We’re not done with setup yet. We also need to change the search path for the loremipsum user so we can find our tables.

$psql apsimxsim
apsimxsim=> SET search_path TO public;

Now we should be set. We can add / drop tables from psql but we’re going to use python’s SQLAlchemy library to do most of the heavy lifting including creating these tables.

Fast forwarding a little, here’s what we’re aiming at. Ultimately the loremipsum user will own three tables with the metadata (ids, genotypes) and simulated results (results). A final quirk I’ll mention here is that it’s possible to use capitalized names but it a bit of a hasle. We’d have to select them as public."Ids" instead of public.ids. For this reason we’ll standardized all table and field names using only lowercase letters and underscores.

apsimxsim=> SELECT schemaname, tablename, tableowner FROM pg_tables WHERE tableowner='loremipsum';
 schemaname | tablename | tableowner
------------+-----------+------------
 public     | ids       | loremipsum
 public     | genotypes | loremipsum
 public     | results   | loremipsum
(3 rows)

Starting With The End In Mind

Before developing anything too complicated we should check if we can get easily get data from SQL to python. We can’t retrieve data without inserting it so in reality I’m showing things a little out of order. Bear with me and it’ll all fit together.

Thankfully getting data from SQL to python isn’t a new problem so there should be many workable solutions (e.g. discussion, asyncpg, warp_prism).

We’re going to start simple and use sqlalchemy with psycopg2 to connect to the database.

Ultimately we’ll have something like this:

import psycopg2
from   sqlalchemy import create_engine, text

engine = create_engine(
        "db_string_here"
        )

with engine.connect() as conn:
    result = conn.execute(
      text('SELECT * FROM public.ids LIMIT 1')
      )

Once we have result we can transform it into a tensor and be set. There are likely more performant ways to get data out, but as long as we can get data out we can go to the next steps.

Don’t Leak Passwords

That placeholder string up above, "db_string_here" will contain the username, password, host, and port, and database name. Even if our machine isn’t publicly accessible and this data isn’t sensitive, we still don’t want to make this information public. There are different ways to do this such as using storing this information as enviromental variables or keeping it in a separate file (excluded from version control in your .gitignore). The latter is what I have demonstrated here but if you’re working with more sensitive data please consult a security expert regarding your use case.

Here, we’ll create a file psql_details.json that will hold the information needed for "db_string_here".

{
  "user": "loremipsum", 
  "pass": "TotallyRealPassword", 
  "host": "localhost", 
  "port": "5432", 
  "name": "apsimxsim"
}

Now we can read this as a dictionary and insert these values into the connection string.

with open('./psql_details.json', 'r') as f:
    d = json.load(f)
    
engine = create_engine(
        f"postgresql+psycopg2://{d['user']}:{d['pass']}@{d['host']}:{d['port']}/{d['name']}"
        )

Preparation for Data Migration

The workflow to insert data into the database is to read in the parquet file as a pandas dataframe, make any needed changes, then write that table to the database.

The pattern will look something like this.

example = pq.read_table(parquet_path+'example.parquet').to_pandas()
example = example.rename(columns={e:e.lower().replace('.', '_') for e in list(example)})
example.to_sql(name='example', con=engine, if_exists = 'append',  schema='public')

Here the only change to the dataframe we’re making is to set all the column names to be lowercase without periods. As with using all lowercase table names, this consistency will make it easier to write SQL without worrying about quotes.

Data Migration & Ad Hoc Parallelism

When writing to the database we want to append to the table if it already exists. This means that if the script is disrupted we run the risk of writing duplicate rows when the script runs again.

If we keep track of what files have been processed, it’s a short jump to enabling parallelism. The approach we’ll take is to…

  1. Identify all files to be imported

  2. Check if a file has already been processed

  3. Check if a file is being worked on by another process

  4. Write to a log which file is going to be processed

  5. Process file

  6. Write to a log that the file has finished being processed

These steps will be in the script parquet_to_psql.py so we can easily run this from the command line.

We’ll use os.listdir to find all the files in the target directory then filter the files with re to get those that follow a naming convention sim_######_######.parquet.

# Identify files to add
parquet_path = '/path/to/parquet_files/'

# constrain to the parquets that exist in the path
existing_parquet = [e for e in os.listdir(parquet_path) if re.match('sim_\d+_\d+\.parquet', e)]

We’ll use a simple list saved to a json file to track what files have been processed already and use it to exclude these entries from our queue.

finished_parquet = [] 
if os.path.exists('./finished_parquet.json'):
    with open('./finished_parquet.json', 'r') as f:
        finished_parquet = json.load(f)

existing_parquet = [e for e in existing_parquet if e not in finished_parquet]

Now we’ll iterate over some number of the un-processed files but we’ll use the same trick to check if the parquet is being worked on by another instance of this script. The main difference between finished_parquet.json and reserved_parquet.json is that the latter we’ll update as soon as we confirm the parquet we’re considering (variable e) isn’t reserved. After processing we’ll log that the file has been processed.

num_parquets = 3 # How many files to process per run
for e in existing_parquet[0:num_parquets]: 
    # so that we can run this process in parallel we'll track which files have been reserved
    reserved_parquet = []
    if os.path.exists('./reserved_parquet.json'):
        with open('./reserved_parquet.json', 'r') as f:
            reserved_parquet = json.load(f)
            
    if e in reserved_parquet:
        pass
    else:
        # log that this entry reserved
        reserved_parquet.append(e)
        with open('./reserved_parquet.json', 'w') as f:
            json.dump(reserved_parquet, f)
            
        # Process File
        # ...

        # After completion, log that the file has been processed
        finished_parquet.append(e)
        with open('./finished_parquet.json', 'w') as f:
            json.dump(finished_parquet, f)
        print('\n')  

One additional quirk is worth noting. There are a ton of records in some of these files – enough to cause the process to stall out. To get around this we’re going to break the dataframe into blocks of 1,000,000 rows and write each separately (There’s certainly a better way to achieve this, but hey, it works.)

With all that done we can run python ./parquet_to_psql.py and it’ll take care of the migration for us!