# Copyright (c) Facebook, Inc. and its affiliates.
# All rights reserved.
#
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
"""Scheduling and job monitoring utilities.
"""
from contextlib import contextmanager, ExitStack
from dataclasses import dataclass, field
import logging
from pathlib import Path
import pickle
import os
import subprocess as sp
import sys
import typing as tp
from submitit import SlurmJob
import submitit
from . import git_save
from .conf import SlurmConfig, SubmitRules
from .distrib import get_distrib_spec
from .main import DecoratedMain
from .utils import try_load
from .xp import XP, _get_sig
logger = logging.getLogger(__name__)
class _SubmitItTarget:
def __call__(self, main: DecoratedMain, argv: tp.Sequence[str]):
self.xp = main.get_xp(argv)
spec = get_distrib_spec()
# We export the RANK as it can be used to customize logging early on
# in the called program (e.g. using Hydra).
os.environ['RANK'] = str(spec.rank)
sys.argv[1:] = argv
main()
def checkpoint(self, *args, **kwargs):
if get_distrib_spec().rank == 0:
# cleanup rendezvous file on requeue, otherwise things will fail.
if self.xp.rendezvous_file.exists():
self.xp.rendezvous_file.unlink()
return submitit.helpers.DelayedSubmission(self, *args, **kwargs)
class Sheep:
"""
A Sheep is a specific run for a given XP. Sheeps are managed
by the Shepherd.
"""
def __init__(self, xp: XP):
self.xp = xp
self.job: tp.Optional[submitit.SlurmJob] = None
# Other jobs contain the list of other jobs in the array
self._other_jobs: tp.Optional[tp.List[submitit.SlurmJob]] = None
if self._job_file.exists():
content = try_load(self._job_file)
if isinstance(content, tuple):
self.job, self._other_jobs = content
else:
self.job = content
@property
def _job_file(self) -> Path:
return self.xp.folder / self.xp.dora.shep.job_file
def state(self, mode="standard"):
"""Return the current state of the `Sheep`.
"""
if self.job is None:
return None
state = self.job.watcher.get_state(self.job.job_id, mode)
if state == 'UNKNOWN' and self._other_jobs:
if any(job.state != 'UNKNOWN' for job in self._other_jobs):
# When cancelling single entries in a job array,
# sacct will just completely forget about it insted of marking
# it as cancelled. So we use a specific 'MISSING' status to handle that.
state = 'MISSING'
if state.startswith('CANCELLED'):
return 'CANCELLED'
return state
def is_done(self, mode="standard"):
"""Return True if the job is no longer running on the cluster.
"""
if self.job is None:
return True
return self.job.watcher.is_done(self.job.job_id, mode)
@property
def log(self):
"""Return the path to the main log.
"""
if self.job is not None:
return self.xp.submitit / f"{self.job.job_id}_0_log.out"
return None
def __repr__(self):
out = f"Sheep({self.xp.sig}, state={self.state()}, "
if self.job is not None:
out += f"sid={self.job.job_id}, "
out += f"argv={self.xp.argv})"
return out
def no_log(x: str):
"""No logging logging function, passed to `Shepherd`.
"""
pass
@dataclass
class _JobArray:
slurm_config: SlurmConfig
sheeps: tp.List[Sheep] = field(default_factory=list)
class Shepherd:
"""
Takes care of the little jobs.
Args:
main (DecoratedMain): main function decorated by Dora.
log (callable): log function, if provided should take a single string
argument.
"""
def __init__(self, main: DecoratedMain, log: tp.Callable[[str], None] = no_log):
self.main = main
self._by_id.mkdir(exist_ok=True, parents=True)
self._orphans.mkdir(exist_ok=True, parents=True)
self._arrays.mkdir(exist_ok=True, parents=True)
self.log = log
self._in_job_array: bool = False
self._existing_git_clone: tp.Optional[Path] = None
self._to_cancel: tp.List[submitit.SlurmJob] = []
self._to_submit: tp.List[_JobArray] = []
self._check_orphans()
def get_sheep_from_argv(self, argv: tp.Sequence[str]) -> Sheep:
"""
Given a list of of arguments, return the matching `Sheep`,
which will contain both information on the `dora.xp.XP`, and on
the latest job associated with that XP.
"""
assert not isinstance(argv, str)
xp = self.main.get_xp(argv)
return Sheep(xp)
def get_sheep_from_sig(self, sig: str) -> tp.Optional[Sheep]:
"""
Returns a `Sheep` given the XP signature, if any exists, otherwise
returns None.
"""
xp = self.main.get_xp_from_sig(sig)
return Sheep(xp)
def get_sheep_from_job_id(self, job_id: str) -> tp.Optional[Sheep]:
"""
Returns the `Sheep` associated with the given `job_id`. If no sheep
is found, returns None.
"""
link = self._by_id / job_id
if link.is_symlink():
sig = link.resolve().name
xp = self.main.get_xp_from_sig(sig)
return Sheep(xp)
return None
def update(self):
"""
Force an update of all job states with submitit.
"""
SlurmJob.watcher.update()
@contextmanager
def job_array(self, slurm_config: SlurmConfig):
"""Context manager to launch XP in job array."""
assert not self._in_job_array
self._to_submit.append(_JobArray(slurm_config))
self._in_job_array = True
try:
yield
finally:
self._in_job_array = False
def maybe_submit_lazy(self, sheep: Sheep, slurm_config: SlurmConfig, rules: SubmitRules):
"""
Decides whether to schedule a new job for the given sheep, based on the rules
given in `rules`.
Jobs are actually only scheduled once the `commit()` method is called.
"""
if sheep.job is not None:
state = sheep.state()
if state == 'COMPLETED':
if rules.replace_done:
logger.debug(f"Ignoring previously completed job {sheep.job.job_id}")
sheep.job = None
elif state in ["FAILED", "CANCELLED", "OUT_OF_MEMORY", "TIMEOUT", "MISSING",
"NODE_FAIL"]:
logger.debug(f"Previous job {sheep.job.job_id} failed or was canceled")
if rules.retry:
sheep.job = None
else:
if rules.replace:
logger.debug(f"Cancelling previous job {sheep.job.job_id} with status {state}")
self.cancel_lazy(sheep.job)
sheep.job = None
if sheep.job is None:
if not self._in_job_array:
self._to_submit.append(_JobArray(slurm_config))
assert slurm_config == self._to_submit[-1].slurm_config
self._to_submit[-1].sheeps.append(sheep)
def cancel_lazy(self, job: submitit.SlurmJob):
"""
Cancel a job. The job is actually cancelled only when `commit()` is called.
"""
self._to_cancel.append(job)
def commit(self):
"""
Commit all changes registered so far with either `maybe_submit_lazy()`
and `cancel_lazy()`.
"""
if self._to_cancel:
self._cancel(self._to_cancel)
self._to_cancel = []
self._existing_git_clone = None
while self._to_submit:
job_array = self._to_submit.pop(0)
self._submit(job_array)
@property
def _by_id(self) -> Path:
return self.main.dora.dir / self.main.dora.shep.by_id
@property
def _orphans(self) -> Path:
return self.main.dora.dir / self.main.dora.shep.orphans
@property
def _arrays(self) -> Path:
return self.main.dora.dir / self.main.dora.shep.arrays
def _cancel(self, jobs: tp.List[SlurmJob]):
cancel_cmd = ["scancel"] + [job.job_id for job in jobs]
logger.debug("Running %s", " ".join(cancel_cmd))
sp.run(cancel_cmd, check=True)
def _get_submitit_executor(self, name: str, folder: Path,
slurm_config: SlurmConfig) -> submitit.SlurmExecutor:
os.environ['SLURM_KILL_BAD_EXIT'] = '1' # Kill the job if any of the task fails
kwargs = dict(slurm_config.__dict__)
executor = submitit.SlurmExecutor(
folder=folder, max_num_timeout=kwargs.pop('max_num_timeout'))
gpus = slurm_config.gpus
if gpus > 8:
if gpus % 8 != 0:
raise ValueError("Can only take <= 8 gpus, or multiple of 8 gpus")
kwargs['nodes'] = gpus // 8
gpus_per_node = 8
else:
gpus_per_node = gpus
kwargs['nodes'] = 1
mem_per_gpu = slurm_config.mem_per_gpu
if mem_per_gpu:
mem = slurm_config.mem_per_gpu * gpus_per_node
kwargs['mem'] = f"{mem}GB"
kwargs['gres'] = f'gpu:{gpus_per_node}'
if slurm_config.one_task_per_node:
kwargs['ntasks_per_node'] = 1
if slurm_config.cpus_per_task is None:
kwargs['cpus_per_task'] = gpus_per_node * slurm_config.cpus_per_gpu
else:
kwargs['ntasks_per_node'] = gpus_per_node
if slurm_config.cpus_per_task is None:
kwargs['cpus_per_task'] = slurm_config.cpus_per_gpu
del kwargs['gpus']
del kwargs['mem_per_gpu']
del kwargs['cpus_per_gpu']
del kwargs['one_task_per_node']
logger.debug("Slurm parameters %r", kwargs)
executor.update_parameters(
job_name=name,
stderr_to_stdout=True,
**kwargs)
return executor
def _check_orphans(self):
"""Check for orphaned jobs."""
for dirty in self._orphans.iterdir():
name = dirty.name
logger.warning(f"Found dirty tag {name}, meaning a job might have been scheduled "
"but Dora or Slurm crashed before the job id was saved.")
proc = sp.run(["squeue", "-u", os.getlogin(), "-n", name, "-o", "%i", "-h"],
capture_output=True, check=True)
ids = [line for line in proc.stdout.decode().strip().split("\n") if line]
if ids:
logger.warning(f"Found orphan job ids {ids}, will cancel")
sp.run(["scancel"] + ids, check=True)
dirty.unlink()
@contextmanager
def _enter_orphan(self, name: str):
"""Context manager to enter a potential orphan."""
token = self._orphans / name
token.touch()
try:
yield
finally:
token.unlink()
def _submit(self, job_array: _JobArray):
sheeps = job_array.sheeps
slurm_config = job_array.slurm_config
if not sheeps:
return
is_array = len(sheeps) > 1
first = sheeps[0]
self.main.init_xp(first.xp)
use_git_save = first.xp.dora.git_save
assert all(other.xp.dora.git_save == use_git_save for other in sheeps), \
"All jobs inside an array must have the same value for git_save."""
if is_array:
name_sig = _get_sig(sorted([sheep.xp.sig for sheep in sheeps]))
else:
name_sig = first.xp.sig
if is_array:
name = self.main.name + "_array_" + name_sig
else:
name = self.main.name + "_" + name_sig
if is_array:
submitit_folder = self._arrays / name
else:
submitit_folder = first.xp._xp_submitit
submitit_folder.mkdir(exist_ok=True)
for sheep in sheeps:
xp = sheep.xp
self.main.init_xp(xp)
if xp.rendezvous_file.exists():
xp.rendezvous_file.unlink()
executor = self._get_submitit_executor(name, submitit_folder, slurm_config)
jobs: tp.List[submitit.Job] = []
if use_git_save and self._existing_git_clone is None:
self._existing_git_clone = git_save.get_new_clone(self.main)
with self._enter_orphan(name):
with ExitStack() as stack:
if use_git_save:
assert self._existing_git_clone is not None
stack.enter_context(git_save.enter_clone(self._existing_git_clone))
if is_array:
stack.enter_context(executor.batch())
for sheep in job_array.sheeps:
if use_git_save:
assert self._existing_git_clone is not None
git_save.assign_clone(sheep.xp, self._existing_git_clone)
jobs.append(executor.submit(_SubmitItTarget(), self.main, sheep.xp.argv))
# Now we can access jobs
for sheep, job in zip(sheeps, jobs):
# See commment in `Sheep.state` function above for storing all jobs in the array.
pickle.dump((job, jobs), open(sheep._job_file, "wb"))
logger.debug("Created job with id %s", job.job_id)
sheep.job = job # type: ignore
sheep._other_jobs = jobs # type: ignore
link = self._by_id / job.job_id
link = link
link.symlink_to(sheep.xp.folder.resolve())
if is_array:
# We link the array submitit folder to be sure
# we keep an history of all arrays the XP was in.
submitit_link = (sheep.xp.folder / submitit_folder.name)
if submitit_link.exists():
assert submitit_link.resolve() == submitit_folder.resolve()
else:
submitit_link.symlink_to(submitit_folder)
latest = sheep.xp._latest_submitit
if latest.exists():
latest.unlink()
latest.symlink_to(submitit_folder)
name = self.main.get_name(sheep.xp)
self.log(f"Scheduled job {job.job_id} for sheep {sheep.xp.sig}/{name}")