"""
Objects responsible for JobControl core functionality.
.. note::
Important objects from this module should be imported in
main __init___, in order to "abstract away" the namespace
and have them in a more nicely accessible place.
"""
from datetime import timedelta
import inspect
import logging
import warnings
from flask import escape
from jobcontrol.exceptions import MissingDependencies, SkipBuild, NotFound
from jobcontrol.globals import _execution_ctx_stack
from jobcontrol.job_conf import JobControlConfigMgr
from jobcontrol.utils import import_object, cached_property, TracebackInfo
from jobcontrol.utils.depgraph import resolve_deps
logger = logging.getLogger('jobcontrol')
_secs = lambda **kw: timedelta(**kw).total_seconds()
_year = 365.25 # days in a year
_month = _year / 12 # days in a month
DEFAULT_LOG_RETENTION_POLICY = {
logging.DEBUG: _secs(days=15),
logging.INFO: _secs(days=_month),
logging.WARNING: _secs(days=_month * 3),
logging.ERROR: _secs(days=_month * 6),
logging.CRITICAL: _secs(days=_month * 6),
None: _secs(days=_year), # Any level
}
[docs]class JobControl(object):
"""The main JobControl class"""
def __init__(self, storage, config):
self.storage = storage
if not isinstance(config, JobControlConfigMgr):
config = JobControlConfigMgr(config)
self.config = config
@classmethod
[docs] def from_config_file(cls, config_file):
"""
Initialize JobControl by loading configuration from a file.
Will also initialize storage taking values from the configuration.
:param config_file:
Path to configuration file or open file descriptor
"""
config = JobControlConfigMgr.from_file(config_file)
obj = cls(storage=config.get_storage(), config=config)
return obj
@classmethod
[docs] def from_config(cls, config):
"""
Initialize JobControl from some configuration.
:param config:
Either a :py:class:`jobcontrol.job_conf.JobControlConfigMgr`
instance, or a dict to be passed as argument to constructor.
:return:
a :py:class:`JobControl` instance
"""
if not isinstance(config, JobControlConfigMgr):
config = JobControlConfigMgr(config)
obj = cls(storage=config.get_storage(), config=config)
return obj
[docs] def get_job(self, job_id):
"""
Get a job, by id.
:param job_id: The job id
:return:
a :py:class:`JobInfo` class instance associated with the
requested job.
"""
job_cfg = self.config.get_job(job_id)
if job_cfg is None:
raise NotFound('No such job: {0!r}'.format(job_id))
return JobInfo(self, job_id, config=job_cfg)
[docs] def iter_jobs(self):
"""
Generator yielding all the jobs, one by one.
:yields:
for each job, a :py:class:`JobInfo` class instance associated
with the job.
"""
for job in self.config.iter_jobs():
yield JobInfo(self, job['id'], config=job)
[docs] def get_build(self, build_id):
"""
Get a build, by id.
:return: a :py:class:`BuildInfo` instance.
"""
build = BuildInfo(self, build_id)
build.refresh() # To get 404 early..
return build
[docs] def create_build(self, job_id):
"""
Create a build from a job configuration.
Currently, we require that all the dependencies have already
been built; in the future, it will be possible to build them
automatically.
Also, current implementation doesn't allow for customizations
to either the job configuration nor the build one (pinning,
dep/revdep building, ...).
:param job_id:
Id of the job for which to start a build
:return: a :py:class:`BuildInfo` instance.
"""
job = self.get_job(job_id)
build_config = {
'build_deps': False,
'build_revdeps': False,
'dependency_builds': {},
}
# Make sure all dependencies have a successful build.
# Otherwise, raise an exception to abort everything.
for dep in job.get_deps():
assert isinstance(dep, JobInfo)
dep_build = dep.get_latest_successful_build()
if not dep_build:
raise MissingDependencies(
'Dependency job {0!r} has no successful builds!'
.format(job_id))
build_config['dependency_builds'][dep.id] = dep_build.id
# Actually create a record for this build
build_id = self.storage.create_build(
job_id=job_id, job_config=job.config, build_config=build_config)
return self.get_build(build_id)
[docs] def build_job(self, job_id):
"""
Create and run a new build for the specified job
"""
build = self.create_build(job_id)
return self.run_build(build)
[docs] def run_build(self, build_id):
"""
Actually run a build.
- take the build configuration
- make sure all the dependencies are built
- take return values from the dependencies -> pass as arguments
- run the build
- build the reverse dependencies as well, if required to do so
:param build_id: either a :py:class:`BuildInfo` instance, or a build id
"""
if isinstance(build_id, BuildInfo):
build = build_id
build_id = build_id.id
else:
build = BuildInfo(app=self, build_id=build_id)
build.refresh() # Make sure we have up-to-date information
# Make sure the log handler is installed
self._install_log_handler()
# Actually run the build
self._run_build(build)
def _run_build(self, build):
from jobcontrol.job_conf import prepare_args
logger.info('Starting execution of job {0} / build {1}'
.format(build.job_id, build.id))
log_prefix = '[job: {0}, build: {1}] '.format(build.job_id, build.id)
# Mark the build as started
self.storage.start_build(build.id)
# Create and push the global context
ctx = JobExecutionContext(
app=self, job_id=build.job_id, build_id=build.id)
ctx.push()
# note: from now on, we must make sure the context is popped
# no matter what -> no risky code must be executed outside
# the "try" block below.
try:
function = self._get_runner_function(build.job_config['function'])
logger.debug(log_prefix + 'Function is {0!r}'.format(function))
args = prepare_args(build.job_config['args'], build)
kwargs = prepare_args(build.job_config['kwargs'], build)
# Run!
retval = function(*args, **kwargs)
# todo: what if the function is a generator? Should we iterate it
# or just leave it alone?
except SkipBuild:
logger.info(log_prefix + 'Build SKIPPED')
# Indicates no need to build this..
self.storage.finish_build(build.id, skipped=True)
except Exception as exc:
logger.exception(log_prefix + 'Build FAILED')
self.storage.finish_build(
build.id, success=False, exception=exc,
exception_tb=TracebackInfo.from_current_exc())
else:
logger.info(log_prefix + 'Build SUCCESSFUL')
try:
self.storage.finish_build(
build.id, success=True, skipped=False, retval=retval,
exception=None)
except Exception as exc:
logger.error(
log_prefix + 'Build was SUCCESSFUL, but there was '
'an error storing the results. Maybe the return value '
'is not serializable?')
self.storage.finish_build(
build.id, success=False, exception=exc,
exception_tb=TracebackInfo.from_current_exc())
finally:
# POP context from the stack
ctx.pop()
def _create_job_depgraph(self, job_id, complete=False):
processed = set()
DEPGRAPH = {}
def _explore_deps(jid):
if jid in processed:
# Already processed
return
# Early, to prevent infinite recursion
processed.add(jid)
DEPGRAPH[jid] = deps = list(self.config.get_job_deps(jid))
for dep in deps:
_explore_deps(dep)
if complete:
revdeps = list(self.config.get_job_revdeps(jid))
for rdid in revdeps:
if rdid not in DEPGRAPH:
DEPGRAPH[rdid] = []
if jid not in DEPGRAPH[rdid]:
DEPGRAPH[rdid].append(jid)
_explore_deps(rdid)
logger.debug('Building dependency graph for job {0}'.format(job_id))
_explore_deps(job_id)
return DEPGRAPH
def _create_full_depgraph(self):
DEPGRAPH = {}
for job in self.iter_jobs():
DEPGRAPH[job.id] = list(job['dependencies'])
return DEPGRAPH
def _resolve_deps(self, depgraph, job_id):
# Allow changing dependency resolution function
return resolve_deps(depgraph, job_id)
def _latest_successful_build_date(self, job_id):
builds = list(self.storage.get_job_builds(
job_id, started=True, finished=True, success=True, skipped=False,
order='desc', limit=1))
if len(builds) < 1:
return None # No build!
return builds[0]['end_time']
def _get_runner_function(self, name):
if not isinstance(name, basestring):
raise TypeError('Function name must be a string')
if not name:
raise ValueError('Cannot get function for empty name!')
return import_object(name)
[docs] def prune_logs(self, policy=None):
if policy is None:
policy = DEFAULT_LOG_RETENTION_POLICY
for level in sorted(policy):
max_age = policy[level]
self.storage.prune_log_messages(max_age=max_age, leve=level)
def _install_log_handler(self):
_root_logger = logging.getLogger('')
_root_logger.setLevel(logging.DEBUG)
if _log_handler not in _root_logger.handlers:
_root_logger.addHandler(_log_handler)
# ------------------------------------------------------------
# Reporting methods, which require an execution context
# to be active.
# ------------------------------------------------------------
[docs] def report_progress(self, group_name, current, total, status_line=''):
"""
Report progress for the currently running build.
:param group_name:
The report "group name": either a tuple representing
the "path", or None for the top-level.
:param current:
Current progress
:param total:
Total progress
:param status_line:
An optional line of text, describing the currently running
operation.
"""
from jobcontrol.globals import execution_context as ctx
self.storage.report_build_progress(
build_id=ctx.build_id,
group_name=group_name,
current=current,
total=total,
status_line=status_line)
[docs] def get_celery_app(self):
"""
Return the Celery application, configured with values
from the current configuration.
.. note:: this is a bit hackish, as we are just *updating*
configuration values in the global object with ones
from the jobcontrol configuration, not replacing
all the configuration at once.
"""
from jobcontrol.async.tasks import app as celery_app
celery_app.conf['JOBCONTROL'] = self
if 'celery' in self.config:
celery_app.conf.update(self.config['celery'])
return celery_app
[docs]class JobExecutionContext(object):
"""
Class to hold "global" context during job execution.
This class can also act as a context manager for temporary
context:
.. code-block:: python
with JobExecutionContext(app, job_id, build_id):
pass # do stuff in an execution context
:param app: The JobControl instance running jobs
:param job_id: Id of the currently running job
:param build_id: Id of the currently running build
"""
def __init__(self, app, job_id, build_id):
# Kwargs: app, job_id, build_id
self.app = app
self.job_id = job_id
self.build_id = build_id
[docs] def push(self):
"""Push this context in the global stack"""
_execution_ctx_stack.push(self)
[docs] def pop(self):
"""Pop this context from the global stack"""
rv = _execution_ctx_stack.pop()
assert rv is self, \
'Popped wrong context: {0!r} instead of {1!r}'.format(rv, self)
def __enter__(self):
self.push()
return self
def __exit__(self, exc_type, exc_value, tb):
self.pop()
@property
[docs] def current_app(self):
"""Returns the currently running app"""
return self.app
@cached_property
[docs] def current_job(self):
"""
Returns a :py:class:`JobInfo` instance associated with the
currently running job.
"""
return self.app.get_job(self.job_id)
@cached_property
[docs] def current_build(self):
"""
Returns a :py:class:`BuildInfo` instance associated with the
currently running build.
"""
return self.app.get_build(self.build_id)
[docs]class JobControlLogHandler(logging.Handler):
"""
Logging handler sending messages to the appropriate
JobControl instance that will dispatch them to storage.
"""
def __init__(self):
super(JobControlLogHandler, self).__init__()
[docs] def flush(self):
"""No-op, as we don't need to flush anything"""
pass # Nothing to flush!
[docs] def emit(self, record):
"""
"Emit" the log record (if there is an execution context, store
the log record appropriately; otherwise, just ignore it).
"""
from jobcontrol.globals import current_app, execution_context
try:
# If we have no build, do nothing.
# Note that execution_context.build_id should raise an exception
# itself, as there will not be any execution_context..
if execution_context.build_id is None:
raise RuntimeError()
except:
return
# Replace traceback with text representation, as traceback
# objects cannot be pickled
# if record.exc_info is not None:
# tb = traceback.format_exception(*record.exc_info)
# record.exc_info = record.exc_info[0], record.exc_info[1], tb
# NOTE: This will be done by the storage!
current_app.storage.log_message(
build_id=execution_context.build_id,
record=record)
[docs]class JobInfo(object):
"""High-level interface to jobs"""
def __init__(self, app, job_id, config):
self.app = app
self._job_id = job_id
self._config = {
'args': (),
'kwargs': {},
'function': None,
'dependencies': [],
}
self._config.update(config)
def __repr__(self):
return '<Job {0!r}>'.format(self.id)
@property
[docs] def id(self):
return self._job_id
@property
[docs] def config(self):
return self._config
def __getitem__(self, name):
return self._config[name]
[docs] def get_deps(self):
"""
Iterate over jobs this job depends upon.
:yields: :py:class:`JobInfo` instances
"""
for dep_id in self.app.config.get_job_deps(self.id):
dep = self.app.config.get_job(dep_id)
yield JobInfo(self.app, dep['id'], config=dep)
[docs] def get_status(self):
"""
Return a label describing the current status of the job.
:returns:
- ``'not_built'`` the job has no builds
- ``'running'`` the job has running builds
- ``'success'`` the job has at least a successful build
- ``'failed'`` the job only has failed builds
- ``'outdated'`` the job has at least a successful build,
but older than one dependency build
"""
if self.has_running_builds():
return 'running'
if not self.has_builds():
return 'not_built'
if self.is_outdated():
return 'outdated'
if self.has_successful_builds():
return 'success'
return 'failed'
[docs] def get_revdeps(self):
"""
Iterate over jobs depending on this one
:yields: :py:class:`JobInfo` instances
"""
for revdep_id in self.app.config.get_job_revdeps(self.id):
revdep = self.app.config.get_job(revdep_id)
yield JobInfo(self.app, revdep['id'], config=revdep)
[docs] def iter_builds(self, *a, **kw):
"""
Iterate over builds for this job.
Accepts the same arguments as
:py:meth:`jobcontrol.interfaces.StorageBase.get_job_builds`
:yields: :py:class:`BuildInfo` instances
"""
for build in self.app.storage.get_job_builds(self.id, *a, **kw):
yield BuildInfo(self.app, build['id'], info=build)
[docs] def get_builds(self, *a, **kw):
"""DEPRECATED alias for iter_builds()"""
warnings.warn(DeprecationWarning(
'The get_builds() method is deprecated. '
'Use iter_builds() instead.'))
return self.iter_builds(*a, **kw)
# def create_build(self):
# # Meant for future usage, when builds will support .run()
# build_id = self.app.storage.create_build(self.job_id)
# return BuildInfo(self.app, build_id)
[docs] def run(self):
"""
Trigger run for this job (will automatically create
a build, etc.)
"""
return self.app.build_job(self.id)
[docs] def create_build(self):
return self.app.create_build(self.id)
[docs] def get_latest_successful_build(self):
"""
Get latest successful build for this job, if any.
Otherwise, returns ``None``.
"""
build = self.app.storage.get_latest_successful_build(self.id)
if build is None:
return None
return BuildInfo(self.app, build['id'], info=build)
[docs] def get_docs(self):
"""
Get documentation for this job.
"""
return self._get_job_docs()
[docs] def get_conf_as_yaml(self):
"""
Return the job configuration as serialized YAML, mostly
for displaying on user interfaces.
"""
from jobcontrol.job_conf import dump
return dump(self.config)
[docs] def has_builds(self):
"""
Check whether this job has any build.
"""
builds = list(self.get_builds(
started=True, finished=True, order='desc', limit=1))
return len(builds) >= 1
[docs] def has_successful_builds(self):
"""
Check whether this job has any successful build.
"""
builds = list(self.get_builds(
started=True, finished=True, success=True, skipped=False,
order='desc', limit=1))
return len(builds) >= 1
[docs] def has_running_builds(self):
"""
Check whether this job has any running build.
"""
builds = list(self.get_builds(started=True, finished=False, limit=1))
return len(builds) >= 1
[docs] def is_outdated(self):
"""
Check whether any dependency has builds more recent than the newest
build for this job.
"""
latest_build = self.get_latest_successful_build()
if not latest_build:
return None # Unknown (no build)
for dep in self.get_deps():
_build = dep.get_latest_successful_build()
if _build is None:
return None # Unknown (no dep build) [error!]
if _build['end_time'] > latest_build['end_time']:
# dependency build is newer
return True
return False
[docs] def can_be_built(self):
"""
Checks whether a job can be built, i.e.: whether all the
dependencies have at least one successful build.
"""
for dep in self.get_deps():
_build = dep.get_latest_successful_build()
if _build is None:
return False
return True
# todo: move all the docs / ... utilities outside this class
# -> maybe move to some "job config" class?
# -> we need them for the job_config in the build as well!
def _get_job_docs(self):
call_code = self._get_call_code()
docs = {
'call_code': call_code,
'call_code_html': self._highlight_code_html(call_code),
}
try:
func = import_object(self['function'])
except Exception as e:
docs['function_doc'] = escape(u"Error: {0!r}".format(e))
else:
docs['function_doc'] = self._format_function_doc(func)
docs['function_argspec'] = self._get_function_argspec(func)
docs['function_argspec_human'] = \
self._make_human_argspec(docs['function_argspec'])
try:
docs['function_module'], docs['function_name'] = \
self['function'].split(':')
except:
docs['function_module'] = '???'
docs['function_name'] = self['function']
return docs
def _get_call_code(self):
try:
module, func = self['function'].split(':')
except:
return '# Invalid function: {0}'.format(self['function'])
call_args = []
for arg in self['args']:
call_args.append(repr(arg))
for key, val in sorted(self['kwargs'].iteritems()):
call_args.append("{0}={1!r}".format(key, val))
if len(call_args):
_args = "\n {1}".format(func, ",\n ".join(call_args))
else:
_args = ""
return "\n".join((
"from {0} import {1}".format(module, func),
"{0}({1})".format(func, _args)))
def _highlight_code_html(self, code):
from pygments import highlight
from pygments.lexers import PythonLexer
from pygments.formatters import HtmlFormatter
return highlight(code, PythonLexer(), HtmlFormatter())
def _format_function_doc(self, func):
import inspect
import docutils.core
doc = inspect.getdoc(func)
if doc is None:
return 'No docstring available.'
return docutils.core.publish_parts(doc, writer_name='html')['fragment']
def _get_function_argspec(self, func):
aspec = inspect.getargspec(func)
if aspec.defaults is not None:
optargs = zip(aspec.args[len(aspec.defaults):], aspec.defaults)
reqargs = aspec.args[:-len(aspec.defaults)]
else:
optargs = []
reqargs = aspec.args[:]
# ============================================================ #
# Note: #
# ============================================================ #
# #
# Terminology used by the AST is: #
# - args -> positional arguments #
# - keywords -> arguments with default values #
# - startargs -> name of *args #
# - kwargs -> name of **kwargs #
# #
# Terminology used by inspect is quite different; #
# - varargs -> *args #
# - keywords -> **kwargs #
# - args -> all the named arguments #
# - defaults -> default values, for keyword arguments #
# #
# Maybe we should use the AST terminology here, as it better #
# reflect the structure? (the bad part is the different #
# meaning of the "keywords" term here..) #
# #
# ============================================================ #
return {
'varargs': aspec.varargs,
'keywords': aspec.keywords,
'reqargs': reqargs,
'optargs': optargs,
}
def _make_human_argspec(self, argspec):
parts = []
for arg in argspec['reqargs']:
parts.append(arg)
for arg, default in argspec['optargs']:
parts.append('{0}={1!r}'.format(arg, default))
if argspec['varargs']:
parts.append('*' + argspec['varargs'])
if argspec['keywords']:
parts.append('**' + argspec['keywords'])
return ', '.join(parts)
[docs]class BuildInfo(object):
"""
High-level interface to builds.
:param app:
The JobControl instance this build was retrieved from
:param build_id:
The build id
:param info:
Optionally, this can be used to pre-populate the build
information (useful, eg. if we are retrieving a bunch
of builds from the database at once).
"""
def __init__(self, app, build_id, info=None):
self.app = app
self.build_id = build_id
self._info = None
if info is not None:
self._info = {}
self._info.update(info)
def __repr__(self):
return '<Build {0} (job={1}, status={2})>'.format(
self.build_id, self.job_id, self.descriptive_status)
@property
[docs] def id(self):
"""The build id"""
return self.build_id
@property
[docs] def job_id(self):
"""The job id"""
return self.info['job_id']
@property
[docs] def info(self):
"""
Property used to lazily access the build attributes.
Returns a dict with the following keys:
- ``'id'``
- ``'job_id'``
- ``'start_time'``
- ``'end_time'``
- ``'started'``
- ``'finished'``
- ``'success'``
- ``'skipped'``
- ``'job_config'``
- ``'build_config'``
- ``'retval'``
- ``'exception'``
- ``'exception_tb'``
"""
if getattr(self, '_info') is None:
self.refresh()
return self._info
@property
[docs] def job_config(self):
"""
Property to access the job configuration.
"""
return self.info['job_config']
@property
[docs] def build_config(self):
"""
Property to access the build configuration.
"""
return self.info['build_config']
@property
[docs] def descriptive_status(self):
"""
Return a label describing the current status of the build.
:returns:
- ``'CREATED'`` if the build was not started yet
- ``'RUNNING'`` if the build was started but did not finish
- ``'SUCCESSFUL'`` if the build run with success
- ``'SKIPPED'`` if the build was skipped
- ``'FAILED'`` if the build execution failed
"""
if not self['started']:
return 'CREATED'
if not self['finished']:
return 'RUNNING'
if self['success']:
if self['skipped']:
return 'SKIPPED'
return 'SUCCESSFUL'
return 'FAILED'
[docs] def refresh(self):
"""Refresh the build status information from database"""
self._info = self.app.storage.get_build(self.build_id)
def __getitem__(self, name):
# if name not in self.info:
# if name == 'progress_info':
# self._info['progress_info'] = self.get_progress_info()
# return self._info['progress_info']
return self.info[name]
[docs] def get_progress_info(self):
"""Get information about the build progress"""
from jobcontrol.utils import ProgressReport
data = self.app.storage.get_build_progress_info(self.build_id)
return ProgressReport.from_table(data)
[docs] def get_job(self):
"""Get a :py:class:`JobInfo` associated with this build's job"""
return JobInfo(self.app, self.job_id)
[docs] def delete(self):
"""Delete all information related to this build from database"""
self.app.storage.delete_build(self.build_id)
[docs] def run(self):
"""Calls run_build() on the main app for this build"""
return self.app.run_build(self)
[docs] def iter_log_messages(self, **kw):
"""
Iterate over log messages for this build.
Keywords are passed directly to the underlying ``iter_log_messages()``
method of the storage.
"""
return self.app.storage.iter_log_messages(build_id=self.build_id, **kw)
# We need just *one* handler -> create here
_log_handler = JobControlLogHandler()
_log_handler.setLevel(logging.DEBUG)