"""
Interfaces for NEW jobcontrol objects.
**Data model**::
Build id SERIAL
----- job_id TEXT
start_time TIMESTAMP
end_time TIMESTAMP
started BOOLEAN
finished BOOLEAN
success BOOLEAN
skipped BOOLEAN
job_config TEXT (YAML)
Copy of the job configuration whan the build was started
build_config TEXT (YAML)
Extra configuration, such as dependency build "pinning"
retval BINARY (Pickled return value)
exception BINARY
Pickled exception object (or None)
exception_tb BINARY
Pickled TracebackInfo object
Build progress
--------------
build_id INTEGER (references Build.id)
group_name VARCHAR(128)
Name of the "progress group" (separated by '::')
current INTEGER
Current progress value
total INTEGER
Total progress value
status_line TEXT
An optional line of text describing current state
UNIQUE constraint on (build_id, group_name)
Log id SERIAL
--- build_id INTEGER (references Build.id)
created TIMESTAMP
level INTEGER
record BINARY
Pickled LogRecord
exception_tb BINARY
Pickled TracebackInfo object
**Job configuration:**
The job configuration is stored as a YAML-serialized dict.
Recognised keys are:
- ``function`` in ``module:function`` format, specify the function to be called
- ``args`` a list of arguments to be passed to the function
- ``kwargs`` a dict of keyword arguments to be passed to the function
- ``title`` a descriptive title, to be shown on the interfaces
- ``notes`` notes, to be shown in interfaces (in restructured text)
- ``dependencies`` list of dependency job names
Additionally, args/kwargs may contain references to return value of dependency
builds, by using the ``!retval <name>`` syntax.
**Exception traceback serialization**
To be used both in build records and associated with log messages containing
an exception.
We want to include the following information:
- Details about the call stack, as in normal tracebacks: filename, line
number, function name, line of code (plus some context)
- Local variables: we are not guaranteed we can safely pickle / unpickle
arbitrary values; moreover this might result in huge fields, etc.
So our better chance is to just store a dictionary mapping names
to repr()s of the values (trimmed to a -- large -- maximum length,
just to be on the safe side).
"""
from datetime import datetime
import abc
import pickle
import types
import warnings
# This needs to be imported here in order for it to work
# from celery.contrib import rdb
import jobcontrol.job_conf
from jobcontrol.exceptions import SerializationError
from jobcontrol.utils import ExceptionPlaceholder
[docs]class StorageBase(object):
__metaclass__ = abc.ABCMeta
def __init__(self):
pass
@classmethod
[docs] def from_url(cls, url):
raise NotImplementedError('')
# ------------------------------------------------------------
# Installation methods.
# For resource initialization, if needed.
# ------------------------------------------------------------
[docs] def install(self):
pass
[docs] def uninstall(self):
pass
# ------------------------------------------------------------
# Build CRUD methods
# ------------------------------------------------------------
@abc.abstractmethod
[docs] def get_job_builds(self, job_id, started=None, finished=None,
success=None, skipped=None, order='asc', limit=100):
"""
Iterate over all the builds for a job, sorted by date, according
to the order specified by ``order``.
:param job_id:
The job id
:param started:
If set to a boolean, filter on the "started" field
:param finished:
If set to a boolean, filter on the "finished" field
:param success:
If set to a boolean, filter on the "success" field
:param skipped:
If set to a boolean, filter on the "skipped" field
:param order:
'asc' (default) or 'desc'
:param limit:
only return the first ``limit`` builds
:yield: Dictionaries representing build information
"""
pass
@abc.abstractmethod
[docs] def create_build(self, job_id, job_config, build_config):
"""
Create a build.
:param job_id:
The job for which a build should be started
:param job_config:
The job configuration ``(function, args, kwargs, ..)``
to be copied inside the object (we will use this from now on).
:param build_config:
Build configuration, containing things like dependency build
pinning, etc.
- ``dependency_builds``: dict mapping job ids to build ids,
or ``None`` to indicate "create a new build" for this job.
:return: the build id
"""
pass
@abc.abstractmethod
[docs] def get_build(self, build_id):
"""
Get information about a build.
:return: the build information, as a dict
"""
pass
@abc.abstractmethod
[docs] def delete_build(self, build_id):
"""
Delete a build, by id.
"""
pass
@abc.abstractmethod
[docs] def start_build(self, build_id):
"""
Register a build execution start.
"""
pass
@abc.abstractmethod
[docs] def finish_build(self, build_id, success=None, skipped=None, retval=None,
exception=None, exception_tb=None):
"""
Register a build execution end.
"""
pass
[docs] def finish_build_with_exception(self, build_id):
# todo: build a tracebackinfo object
# todo: return finish_build() with failure + exception trace
raise NotImplementedError
[docs] def update_build_progress(self, build_id, current, total):
warnings.warn(DeprecationWarning(
'The update_build_progress() method is deprecated. '
'Use report_build_progress() instead.'))
return self.report_build_progress(build_id, current, total)
@abc.abstractmethod
[docs] def report_build_progress(self, build_id, current, total, group_name='',
status_line=''):
"""
Report progress for a build.
:param build_id:
The build id for which to report progress
:param current:
The current number of "steps" done
:param total:
The total amount of "steps"
:param group_name:
Optionally, a name used to nest multiple progress "levels".
A tuple (or string separated by '::' can be used to specify
multiple "nesting" levels)
:param status_line:
Optionally, a line of text indicating the current build status.
"""
pass
@abc.abstractmethod
[docs] def get_build_progress_info(self, build_id):
"""
Return progress information for a build.
:return: a list of tuples: ``(name, current, total, status_line)``
"""
pass
[docs] def get_latest_successful_build(self, job_id):
"""
Helper method to retrieve the latest successful build for a given
job. Calls ``get_job_builds()`` in the background.
:return: information about the build, as a dict
"""
builds = list(self.get_job_builds(
job_id, started=True, finished=True, success=True, skipped=False,
order='desc', limit=1))
if len(builds) < 1:
return None # No build!
assert len(builds) == 1 # Or something is broken..
return builds[0]
@abc.abstractmethod
[docs] def log_message(self, build_id, record):
"""
Store a log record associated with a build.
"""
# Todo: provide "shortcut" methods to convert the traceback
# (from exc_info) to a serializable object, and to clean
# up the record object for decent serialization in the
# database.
pass
@abc.abstractmethod
[docs] def prune_log_messages(self, job_id=None, build_id=None, max_age=None,
level=None):
"""
Delete (old) log messages.
:param job_id:
If specified, only delete messages for this job
:param build_id:
If specified, only delete messages for this build
:param max_age:
If specified, only delete log messages with an age
greater than this one (in seconds)
:param level:
If specified, only delete log messages with a level
equal or minor to this one
"""
pass
@abc.abstractmethod
[docs] def iter_log_messages(self, build_id=None, max_date=None,
min_date=None, min_level=None):
"""
Iterate over log messages, applying some filters.
:param build_id:
If specified, only return messages for this build
:param max_date:
If specified, only return messages newer than this date
:param min_date:
If specified, only return messages older than this date
:param min_level:
If specified, only return messages with a level at least
equal to this one
"""
pass
# ------------------------------------------------------------
# Helper methods for serialization
# ------------------------------------------------------------
[docs] def pack(self, obj, safe=False):
try:
return pickle.dumps(obj)
except Exception as exc:
raise SerializationError(
'Object serialization failed: {0!r}'
.format(exc))
[docs] def pack_log_record(self, record):
"""
Pack a log record.
This special-cased function is meant to gracefully handle cases
of log messages not being serializable, usually due to some
"attr" or the attached exception not being serializable.
"""
# Store a hard copy of the message
record.message = record.getMessage()
# If the exception is not serializable, we want to convert
# it to an object holding its representation.
try:
self.pack(record.exc_info)
except:
# Just keep the string representation of the exception
# if the original exception was not serializable..
record.exc_info = (
record.exc_info[0],
ExceptionPlaceholder(record.exc_info[1]),
None)
# print("Packing record: {0!r}".format(record.__dict__))
# rdb.set_trace()
# import pdb; pdb.set_trace()
return self.pack(record)
[docs] def pack_exception(self, exception):
try:
return self.pack(exception)
except:
return self.pack(ExceptionPlaceholder(exception))
[docs] def unpack(self, obj, safe=False):
try:
return pickle.loads(obj)
except Exception as e:
if not safe:
raise
return 'Error deserializing object: {0!r}'.format(e)
[docs] def yaml_pack(self, obj):
return jobcontrol.job_conf.dump(obj)
[docs] def yaml_unpack(self, obj):
return jobcontrol.job_conf.load(obj)
# ------------------------------------------------------------
# Generic helper methods
# ------------------------------------------------------------
def _normalize_job_config(self, job_conf):
if job_conf is None:
job_conf = {}
if not isinstance(job_conf, dict):
raise TypeError('job_conf must be a dict, got {0} instead'
.format(type(job_conf).__name__))
job_conf.setdefault('function', None)
job_conf.setdefault('args', ())
job_conf.setdefault('kwargs', {})
if isinstance(job_conf['args'], list):
job_conf['args'] = tuple(job_conf['args'])
if not isinstance(job_conf['args'], tuple):
raise TypeError('args must be a tuple')
if not isinstance(job_conf['kwargs'], dict):
raise TypeError('kwargs must be a dict')
job_conf.setdefault('dependencies', [])
if not isinstance(job_conf['dependencies'], (list, tuple)):
raise TypeError('dependencies must be a list (or tuple)')
return job_conf
def _normalize_build_config(self, build_conf):
if build_conf is None:
build_conf = {}
if not isinstance(build_conf, dict):
raise TypeError('build_conf must be a dict, got {0} instead'
.format(type(build_conf).__name__))
build_conf.setdefault('dependency_builds', {})
return build_conf
def _normalize_build_info(self, build_info):
if not isinstance(build_info, dict):
raise TypeError('build_info must be a dict')
build_info.setdefault('job_id', None)
for key in ('start_time', 'end_time'):
build_info.setdefault(key, None)
for key in ('started', 'finished', 'success', 'skipped'):
build_info.setdefault(key, False)
build_info.setdefault('job_config', {})
build_info['job_config'] = \
self._normalize_job_config(build_info['job_config'])
build_info.setdefault('build_config', {})
build_info['build_config'] = \
self._normalize_build_config(build_info['build_config'])
for key in ('retval', 'exception', 'exception_tb'):
build_info.setdefault(key, None)
for key in ('title', 'notes'):
build_info.setdefault(key, None)
return build_info
def _serialize_log_record(self, record):
from jobcontrol.utils import TracebackInfo
row = {
'record': record,
'created': datetime.utcfromtimestamp(record.created),
'exception_tb': None,
}
if record.exc_info:
etype, exc, tb = record.exc_info
record.exc_info = (etype, exc, None)
assert isinstance(tb, types.TracebackType)
row['exception_tb'] = TracebackInfo.from_tb(tb)
return row