Worse is better case study 1

Safety First with Scheduling

beginner
code
Author

Daniel Kick

Published

December 19, 2023

Slurm1 is a tool commonly available on computing clusters for scheduling job. For a while I’ve wanted a local instalition of it so I can queue computational experiments to run while I’m sleeping or gone. Trouble is, it’s never been a high enough priority to devote much time to getting it set up (or convince a sysadmin to help me set it up). Every few months I’ll work through a tutorial until I find I’ve exhausted the time set aside for the task without a working installation. After the most recent cycle of this I resolved to build an imperfect solution instead.

Thinking throught the system requirements

Let’s start with what I want my solution to do:

  1. Run jupyter notebooks

  2. Maintain a queue of jobs to run

  3. Make sure that resources (here the GPU’s memory) are free for the next job and run jobs as other jobs finish2

  4. Control the system (add jobs to be run and such)

Jupyter notebooks can be run from the command line in a virtual environment (e.g.conda run -n gpu_env jupyter execute notebook0.ipynb).

A queue could be as simple as a list of notebooks to be run (maintained in a list or a text file) so this requirement is easy to satisfy. Without the In the simplest conception, the jobs could listed (separated by a ;) and run from the command line and each would run in turn. This would not allow for modifying the queue however.

Freeing up resources is and starting new jobs is more challenging. Ideally when the notebook finishes running it should release the resources but what if there’s an error or bug the process release them? In a notebook we could include os._exit(00) in the last cell to kill the process but if the notebook runs correctly this shouldn’t be an issue. As a fail safe I could keep an eye on what new programs are using the GPU and if they don’t release memory stop them with kill -9 $PID. Not a pretty solution (and if I start another process that uses the GPU, it could get killed) but it will work.

Lastly is a way to control the system. There needs to be a way to modify it’s state even if it’s running in the background. A simple way to do this would be to specify commands files in a specific location and naming convention.

Implementation

I’m using python for this tool since I find it’s more readable than bash. The notebooks I want to schedule are in a single directory so I’ll add a file, SchedulerControlCenter.py, there and a folder for the control files I’ll use to modify the scheduler’s state.

.
├── notebook0.ipynb
├── notebook1.ipynb
├── ...
├── SchedulerControlCenter.py
└── SchedulerFiles
    └── ctrl0.json

Control

Starting with the control files, I’d like to be able to add, remove, and move jobs in the queue, prevent a process from being closed (if I’m using the GPU while this is running), print information on the system’s state, and provide some notes to the user (in case I forget details later).

Here’s a dictionary with keys corresponding to each of these cases.

data = {
    'info'                :[],                  # Print this message
    'nvidia_base_pids_add':['40082'],           # Prevent a specific PID from being autoclosed. 
    'nvidia_base_pids_del':['40082'],           # Allow a specific PID to be autoclosed.
    'ipynb_names_read'    :[],                  # Print currently queued notebooks.
    'ipynb_names_add'     :['notebook0.ipynb'], # Add a notebook (to the end) of the queue
    'ipynb_names_next'    :['notebook0.ipynb'], # Add a notebook to the beginning of the queue (does not need to be in the queue)
    'ipynb_names_del'     :['notebook0.ipynb'], # Remove a notebook from the queue
}

To make things easy I’ll create a function to write a dictionary as json and autoname the file (so I don’t overwrite unprocessed commands).

import json

def write_ctrl_json(data: dict):
    ctrl_files = [e for e in os.listdir('./SchedulerFiles/') if (re.match('ctrl.json', e) or re.match('ctrl\d+.json', e))]
    # find the next number to write to. In case 'ctrl.json' and 'ctrl0.json' exist, I'll write to 1
    max_num = [e.replace('ctrl', '').replace('.json', '') for e in  ctrl_files]
    max_num = [int(e) for e in max_num if e != '']
    if max_num == []: max_num = 0
    else: max_num = max(max_num)

    with open(f'./SchedulerFiles/ctrl{max_num+1}.json', 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=4)

The Scheduler

Setup

In practice I’ll begin by writing these control files then start the scheduler. I’ll need to parse the instructions, run the first job, check for new instructions, then run the next job and then repeat this until all jobs are complete. In between jobs, I’ll compare the processes running on the GPU and if any are not supposed to be there, stop them.

I’ll keep track of the original state (nvidia_base_pids) of the GPU using a list of the PIDs that were running on it and current state as a dictionary (nvidia_state) which makes it easy to keep additional information for future use (e.g. how much memory is being used). The queue itself (ipynb_names) can be a simple list of the files to be run. These will need to be populated either from the GPU’s current state (with _init_nvidia()) or from the control files (with _parse_ctrl_jsons()). After that, the scheduler can begin working through the queued notebooks, controlled by the main() method.

import os, subprocess, re, json

class Scheduler():
  def __init__(self):
        self.background_mode = background_mode
        self.nvidia_base_pids = []
        self.nvidia_state = {}
        self._init_nvidia()
        self.ipynb_names = []
        self._parse_ctrl_jsons()
        self.main()

The GPU’s initial state needs to be recorded which I’ll do by reading all the processes running on it, and saving these PIDs.

    def _init_nvidia(self):
        self._read_nvidia()
        self.nvidia_base_pids = list(self.nvidia_state.keys())

Finding these processes takes a little doing. From the command line nvidia-smi produces a nicely formatted text table with this information. I’ve used subprocess to capture this information, and then I’ve parsed the table to get the relevant rows and put the information from each into a dictionary (in the list running_processes). Then each dictionary is saved in the self.nvidia_state under it’s process name.

    def _read_nvidia(self):    
        x = subprocess.run("nvidia-smi", shell=True, check=True,  capture_output=True)

        x = str(x).split('\\n')

        table_blocks = [i for i in range(len(x)) if re.match('.+===+.+', x[i])]
        table_breaks = [i for i in range(len(x)) if re.match('.+---+.+', x[i])]
        process_row  = [i for i in range(len(x)) if re.match('.+ Processes: .+', x[i])]
        start_line = [i for i in table_blocks if i > process_row[0] ][0]
        end_line   = [i for i in table_breaks if i > process_row[0] ][0]

        running_processes = [x[i] for i in range(start_line+1, end_line)]
        running_processes = [dict(zip(
            ['GPU', 'GI', 'CI', 'PID', 'Type', 'ProcessName', 'GPUMem'],
            [e for e in line.strip('|').split(' ') if e != ''])) for line in running_processes]

        for e in running_processes:
            self.nvidia_state[e['PID']] = e

Reading controls

Now it needs to read the control files. I’ll identify all the json files in ./SchedulerFiles/ that begin with ‘ctrl’ then run each in turn3. After a file is read the method will check if any of the keys are ‘info’ and return a help message4 if so. Then it will go through each key in order and modify self.nvidia_base_pids or self.ipynb_names accordingly. After a file is processed, it will delete it so that the system doesn’t get trapped in a loop – adding the same notebooks to the queue over and over.

    def _parse_ctrl_jsons(self):
        ctrl_files = [e for e in os.listdir('./SchedulerFiles/') if (re.match('ctrl.json', e) or re.match('ctrl\d+.json', e))]
        if len(ctrl_files) >= 1:
            for ctrl_file in ctrl_files:            
                with open('./SchedulerFiles/'+ctrl_file, 'r') as f:
                    data = json.load(f)

                keys = tuple(data.keys())

                if 'info' in keys:
                    print("""Text ommited for space""")
                for key in keys:
                    if 'nvidia_base_pids_add' == key:
                        self.nvidia_base_pids += data[key]
                    if 'nvidia_base_pids_del' == key:
                        self.nvidia_base_pids = [e for e in self.nvidia_base_pids if e not in data[key]]
                    if 'ipynb_names_read' == key:
                        print(self.ipynb_names)
                    if 'ipynb_names_add' == key:
                        self.ipynb_names += data[key]
                    if 'ipynb_names_next' == key:
                        # technically this could be used to add files and set them to first
                        self.ipynb_names = data[key]+[e for e in self.ipynb_names  if e != data[key]]
                    if 'ipynb_names_del' == key:
                        self.ipynb_names = [e for e in self.ipynb_names if e != data[key]]
                
                # remove the file
                os.unlink('./SchedulerFiles/'+ctrl_file)            

Running the next job

Now there’s a way to add jobs to the queue there needs to be a method to run them. This method will check if there’s a file to be run, if it exists, and if so use subprocess to run it in the appropriate conda virtual environment.

    def _advance_queue(self):
        if len(self.ipynb_names) == 0:
            pass
        else:
            ipynb_name = self.ipynb_names.pop(0)
            if os.path.exists(ipynb_name) == False:
                pass
            else:
                process = subprocess.Popen(
                    f"conda run -n gpu_env jupyter execute {ipynb_name}".split(), stdout=subprocess.PIPE
                    )
                output, error = process.communicate()

The main loop

Now I can use these methods to process all the items in the queue. As long as there are items to process, it will use _advance_queue() to process the one at the front of the queue. Next it will check if there are any new commands to process. Then it will check if the GPU state matches expectations. If there are any PIDs using the GPU that are not listed in the nvidia_base_pids list these will be stopped. Once the queue is exhausted, the script will stop.

    def main(self):
        while len(self.ipynb_names) > 0:
            print(f'Running {self.ipynb_names[0]}')
            self._advance_queue()
            # allow for external controls
            self._parse_ctrl_jsons()
            self._read_nvidia()
            # kill all the processes that were not running at the start. 
            for gpu_pid in [e for e in self.nvidia_state.keys() if e not in self.nvidia_base_pids]:
                subprocess.run(f'kill -9 {gpu_pid}', shell=True)
        print('No more queued ipynbs. Exiting.')

All together (and possible improvements)

This system works nicely for how quick it was to write up5. There are plenty of improvements that could be made. Suppose you wanted this to run in the background and idle until you added a new job to the queue. One could imaging changing the main() method to achieve this and extending _parse_ctrl_jsons() to get the system to stop idling and shut down. Or suppose you wanted to queue different file types or run notebooks in different environments – _advance_queue() could be extended to do this. Finally, suppose you don’t want to manually exempt PIDs that aren’t using much of the GPU’s resources. Each PID’s usage is available in the nvidia_state dictionary of dictionaries under GPUMem, so a threshold could be set.

These changes and other customization for your use case are left as an exercise for the reader.

Edit 2023-12-20: I’ve added a background mode and option to begin the main loop directly on initialization.

import os, subprocess, re, json,  time
class Scheduler():
    def __init__(self, background_mode = False, run_main = False):
        self.background_mode = background_mode
        self.exit = False
        self.nvidia_base_pids = []
        self.nvidia_state = {}
        self._init_nvidia()
        self.ipynb_names = []
        if run_main:
            self.main()


    def _init_nvidia(self):
        self._read_nvidia()
        self.nvidia_base_pids = list(self.nvidia_state.keys())

    def _read_nvidia(self):    
        x = subprocess.run("nvidia-smi", shell=True, check=True,  capture_output=True)

        x = str(x).split('\\n')

        table_blocks = [i for i in range(len(x)) if re.match('.+===+.+', x[i])]
        table_breaks = [i for i in range(len(x)) if re.match('.+---+.+', x[i])]
        process_row  = [i for i in range(len(x)) if re.match('.+ Processes: .+', x[i])]
        start_line = [i for i in table_blocks if i > process_row[0] ][0]
        end_line   = [i for i in table_breaks if i > process_row[0] ][0]

        running_processes = [x[i] for i in range(start_line+1, end_line)]
        running_processes = [dict(zip(
            ['GPU', 'GI', 'CI', 'PID', 'Type', 'ProcessName', 'GPUMem'],
            [e for e in line.strip('|').split(' ') if e != ''])) for line in running_processes]

        for e in running_processes:
            self.nvidia_state[e['PID']] = e
    
    def _parse_ctrl_jsons(self):
        ctrl_files = [e for e in os.listdir('./SchedulerFiles/') if (re.match('ctrl.json', e) or re.match('ctrl\d+.json', e))]
        if len(ctrl_files) >= 1:
            for ctrl_file in ctrl_files:            
                with open('./SchedulerFiles/'+ctrl_file, 'r') as f:
                    data = json.load(f)

                keys = tuple(data.keys())

                if 'info' in keys:
                    print("""
This scheduling tool uses json files to modify its state while running. 
It will look for json files beginning with 'ctrl' and containing 0 or more digits in 
./SchedulerFiles/ and then run each. This json should be interpretable as a python dictionary.
Files are interpreted in the order of the keys but conflicting orders are not recommended. 
Example file:
{
    'info'                :[],                            -> Print this message
    'nvidia_base_pids_add':['40082'],                     -> Prevent a specific PID from being autoclosed. (e.g. if you're running a gpu session interactively)
    'nvidia_base_pids_del':['40082'],                     -> Allow a specific PID to be autoclosed.
    'ipynb_names_read'    :[],                            -> Print currently queued notebooks.
    'ipynb_names_add'     :['SchedulerTestScript.ipynb'], -> Add a notebook (to the end) of the queue
    'ipynb_names_next'    :['SchedulerTestScript.ipynb'], -> Add a notebook to the beginning of the queue (does not need to be in the queue)
    'ipynb_names_del'     :['SchedulerTestScript.ipynb'], -> Remove a notebook from the queue
    'background_mode'     :['True'],                      -> Set to idle if there are no notebooks in the queue
    'exit'                :[],                            -> Remove a notebook from the queue
                          
}""")
                for key in keys:
                    if 'nvidia_base_pids_add' == key:
                        self.nvidia_base_pids += data[key]
                    if 'nvidia_base_pids_del' == key:
                        self.nvidia_base_pids = [e for e in self.nvidia_base_pids if e not in data[key]]
                    if 'ipynb_names_read' == key:
                        print(self.ipynb_names)
                    if 'ipynb_names_add' == key:
                        self.ipynb_names += data[key]
                    if 'ipynb_names_next' == key:
                        # technically this could be used to add files and set them to first
                        self.ipynb_names = data[key]+[e for e in self.ipynb_names  if e != data[key]]
                    if 'ipynb_names_del' == key:
                        self.ipynb_names = [e for e in self.ipynb_names if e != data[key]]
                    if 'background_mode' == key:
                        dat = data[key][0]
                        if type(dat) == str:
                            if dat.lower() == 'true':
                                dat = True
                            elif dat.lower() == 'false':
                                dat = False
                            else:
                                print(f'{dat} not interpretable as True or False')
                        if type(dat) == bool:
                            self.background_mode = dat
                    if 'exit' == key:
                        self.exit = True

                # remove the file
                os.unlink('./SchedulerFiles/'+ctrl_file)

    def _advance_queue(self):
        if len(self.ipynb_names) == 0:
            pass
        else:
            ipynb_name = self.ipynb_names.pop(0)
            if os.path.exists(ipynb_name) == False:
                pass
            else:
                process = subprocess.Popen(
                    f"conda run -n fastai jupyter execute {ipynb_name}".split(), stdout=subprocess.PIPE
                    )
                output, error = process.communicate()

      def main(self):
        while ((len(self.ipynb_names) > 0) or (self.background_mode)):
            if ((len(self.ipynb_names) == 0) and (self.background_mode)):
                # if idling in background mode wait to check for new commands. 
                time.sleep(10)
                # While idling any new gpu PIDs should be ignored.
                self._init_nvidia()

            if self.exit: break        
            self._parse_ctrl_jsons()

            if self.exit: break
            if (len(self.ipynb_names) > 0):
                print(f'Running {self.ipynb_names[0]}')
                self._advance_queue()

                # allow for external controls
                self._parse_ctrl_jsons()
                if self.exit: break        

                self._read_nvidia()
                # kill all the processes that were not running at the start. 
                for gpu_pid in [e for e in self.nvidia_state.keys() if e not in self.nvidia_base_pids]:
                    subprocess.run(f'kill -9 {gpu_pid}', shell=True)
            print(f'Running {time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}')
        print(    f'Exiting {time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())}')
            

# Example usage: Start in background mode. While in background mode new gpu processes shouldn't be killed.
# shlr = Scheduler(background_mode = True, run_main=True)

Footnotes

  1. Indeed, it is named after that slurm.↩︎

  2. This is less useful than running jobs as soon as resources are available. However, it does mean that implementing the system is a lot easier. The goal is to get most of the benefit quickly, then return and replace or extend this system once my needs outgrow it.↩︎

  3. If there are over 10 control files then they might not run in the order the user expects (e.g. ctrl10 would be run between ctrl1 and ctrl2) but I don’t anticipate issuing that many commands at once.↩︎

  4. For formatting this message is omitted here. It is:

    This scheduling tool uses json files to modify its state while running. 
    It will look for json files beginning with 'ctrl' and containing 0 or more digits in 
    ./SchedulerFiles/ and then run each. This json should be interpretable as a python dictionary.
    Files are interpreted in the order of the keys but conflicting orders are not recommended. 
    Example file:
    {
        'info'                :[],                            -> Print this message
        'nvidia_base_pids_add':['40082'],                     -> Prevent a specific PID from being autoclosed. (e.g. if you're running a gpu session interactively)
        'nvidia_base_pids_del':['40082'],                     -> Allow a specific PID to be autoclosed.
        'ipynb_names_read'    :[],                            -> Print currently queued notebooks.
        'ipynb_names_add'     :['SchedulerTestScript.ipynb'], -> Add a notebook (to the end) of the queue
        'ipynb_names_next'    :['SchedulerTestScript.ipynb'], -> Add a notebook to the beginning of the queue (does not need to be in the queue)
        'ipynb_names_del'     :['SchedulerTestScript.ipynb'], -> Remove a notebook from the queue
    }
    ↩︎
  5. Writing this explanation took longer than writing the code.↩︎