项目作者: basnijholt

项目描述 :
Run many functions (adaptively) on many cores (>10k) using mpi4py.futures, ipyparallel, or dask-mpi. :tada:
高级语言: Python
项目地址: git://github.com/basnijholt/adaptive-scheduler.git
创建时间: 2019-05-03T17:46:40Z
项目社区:https://github.com/basnijholt/adaptive-scheduler

开源协议:BSD 3-Clause "New" or "Revised" License

下载


Asynchronous Job Scheduler for Adaptive :rocket:

PyPI
Conda
Downloads
Build Status
Documentation Status
CodeCov

This is an asynchronous job scheduler for Adaptive, designed to run many adaptive.Learners on many cores (>10k-100k) using mpi4py.futures, ipyparallel, loky, concurrent.futures.ProcessPoolExecutor, or dask.distributed.

:books: Table of Contents

:thinking: What is this?

Adaptive Scheduler is designed to address the challenge of executing a large number of adaptive.Learners in parallel, even when using more than 1k-100k cores.
Traditional engines like ipyparallel and distributed can struggle with such high core counts because there is a central process that communicates with each worker.

This library schedules a separate job for each adaptive.Learner, and manages the creation and execution of these jobs.
This ensures that your calculations will run even if the cluster is currently fully occupied (because job will just be put in the queue).
The approach allows for nearly limitless core usage, whether you allocate 10 nodes for a single job or 1 core for a single job while scheduling hundreds of jobs.

The computation is designed for maximum locality.
If a job crashes, it will automatically reschedule a new one and continue the calculation from where it left off, thanks to Adaptive’s periodic saving functionality.
Even if the central “job manager” fails, the jobs will continue to run, although no new jobs will be scheduled.

:dart: Design Goals

  1. Needs to be able to run efficiently on >30k cores.
  2. Works seamlessly with the Adaptive package.
  3. Minimal load on the file system.
  4. Removes all boilerplate of working with a scheduler:
    • Writes job script.
    • (Re)submits job scripts.
  5. Handles random crashes (or node evictions) with minimal data loss.
  6. Preserves Python kernel and variables inside a job (in contrast to submitting jobs for every parameter).
  7. Separates the simulation definition code from the code that runs the simulation.
  8. Maximizes computation locality, jobs continue to run when the main process dies.

:test_tube: How does it work?

You create a bunch of learners and corresponding fnames so they can be loaded, like:

  1. import adaptive
  2. from functools import partial
  3. def h(x, pow, a):
  4. return a * x**pow
  5. combos = adaptive.utils.named_product(
  6. pow=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
  7. a=[0.1, 0.5],
  8. ) # returns list of dicts, cartesian product of all values
  9. learners = [adaptive.Learner1D(partial(h, **combo),
  10. bounds=(-1, 1)) for combo in combos]
  11. fnames = [f"data/{combo}" for combo in combos]

Then you start a process that creates and submits as many job-scripts as there are learners, like:

  1. import adaptive_scheduler
  2. def goal(learner):
  3. return learner.npoints > 200
  4. scheduler = adaptive_scheduler.scheduler.SLURM(cores=10) # every learner gets this many cores
  5. run_manager = adaptive_scheduler.server_support.RunManager(
  6. scheduler,
  7. learners,
  8. fnames,
  9. goal=goal,
  10. log_interval=30, # write info such as npoints, cpu_usage, time, etc. to the job log file
  11. save_interval=300, # save the data every 300 seconds
  12. )
  13. run_manager.start()

That’s it! You can run run_manager.info() which will display an interactive ipywidget that shows the amount of running, pending, and finished jobs, buttons to cancel your job, and other useful information.

Widget demo

:mag: But how does it really work?

The adaptive_scheduler.server_support.RunManager basically does the following:

  • You need to create N learners and fnames (like in the section above).
  • Then a “job manager” writes and submits max(N, max_simultaneous_jobs) job scripts but doesn’t know which learner it is going to run!
  • This is the responsibility of the “database manager”, which keeps a database of job_id <--> learner.
  • The job script starts a Python file run_learner.py in which the learner is run.

In a Jupyter notebook, you can start the “job manager” and the “database manager”, and create the run_learner.py like:

  1. import adaptive_scheduler
  2. from adaptive_scheduler import server_support
  3. # create a scheduler
  4. scheduler = adaptive_scheduler.scheduler.SLURM(cores=10)
  5. # create a new database that keeps track of job <-> learner
  6. db_fname = "running.json"
  7. url = (
  8. server_support.get_allowed_url()
  9. ) # get a url where we can run the database_manager
  10. database_manager = server_support.DatabaseManager(
  11. url, scheduler, db_fname, learners, fnames
  12. )
  13. database_manager.start()
  14. # create unique names for the jobs
  15. n_jobs = len(learners)
  16. job_names = [f"test-job-{i}" for i in range(n_jobs)]
  17. job_manager = server_support.JobManager(
  18. job_names,
  19. database_manager,
  20. scheduler,
  21. save_interval=300,
  22. log_interval=30,
  23. goal=0.01,
  24. )
  25. job_manager.start()

Then, when the jobs have been running for a while, you can check server_support.parse_log_files(database_manager, scheduler).

And use scheduler.cancel(job_names) to cancel the jobs.

You don’t actually ever have to leave the Jupyter notebook; take a look at the example notebook.

:notebook: Jupyter Notebook Example

See example.ipynb.

:computer: Installation

Install the latest stable version from conda (recommended):

  1. conda install adaptive-scheduler

or from PyPI:

  1. pip install adaptive_scheduler

or install main with:

  1. pip install -U https://github.com/basnijholt/adaptive-scheduler/archive/main.zip

or clone the repository and do a dev install (recommended for dev):

  1. git clone git@github.com:basnijholt/adaptive-scheduler.git
  2. cd adaptive-scheduler
  3. pip install -e .

:hammer_and_wrench: Development

In order not to pollute the history with the output of the notebooks, please set up the git filter by executing:

  1. python ipynb_filter.py

in the repository.

We also use pre-commit, so pip install pre_commit and run:

  1. pre-commit install

in the repository.

:warning: Limitations

Currently, adaptive_scheduler only works for SLURM and PBS.
However, only a class like adaptive_scheduler/scheduler.py would have to be implemented for another type of scheduler.