import logging
import random
import time
from jobcontrol.exceptions import SkipBuild
[docs]def job_simple_echo(*args, **kwargs):
return (args, kwargs)
_cached_words = None
def _get_words():
global _cached_words
if _cached_words is not None:
return _cached_words
try:
with open('/usr/share/dict/words') as fp:
_cached_words = [x.strip() for x in fp]
except:
_cached_words = []
return _cached_words
def _capfirst(s):
return s[0].upper() + s[1:]
def _random_paragraph(size=10):
return _capfirst(' '.join(random.sample(_get_words(), size)))
def _log_random(logger):
classes = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
logging.CRITICAL)
for num in xrange(random.randint(0, 30)):
logger.log(random.choice(classes),
_random_paragraph(random.randint(10, 20)))
[docs]def testing_job(progress_steps=None, retval=None, fail=False, skip=False,
log_messages=None, step_duration=0):
"""
Job used for testing purposes.
:param progress_steps:
A list of tuples: ``(<group_name>, <steps>)``, where "group_name"
is a tuple of name "levels", "steps" an integer representing how
many steps should that level have.
Progress reports will be sent in randomized order.
:param retval:
The return value for the job.
:param fail:
Whether this job should fail.
:param skip:
Whether this job should be skipped.
:param log_messages:
A list of tuples: ``(level, message)``
:param step_duration:
The time to sleep between steps, in milliseconds.
"""
from jobcontrol.globals import execution_context
logger = logging.getLogger('jobcontrol.utils.testing_job')
log_messages = list(log_messages or [])
if progress_steps is None:
progress_steps = [(None, 10)]
totals = {}
counters = {}
progress_report_items = []
for name, steps in progress_steps:
if isinstance(name, list):
# Safe YAML doesn't have tuples, but names must be tuples
name = tuple(name)
if not (name is None or isinstance(name, tuple)):
raise TypeError("Name must be a tuple or None")
for i in xrange(steps):
progress_report_items.append(name)
totals[name] = steps
counters[name] = 0
random.shuffle(progress_report_items)
sleep_time = step_duration * 1.0 / 1000
def report_progress(name, cur, tot, status=None):
app = execution_context.current_app
app.report_progress(
group_name=name, current=cur, total=tot,
status_line=status)
def _should_fail():
return random.randint(0, len(progress_report_items)) == 0
for item in progress_report_items:
counters[item] += 1
report_progress(item, counters[item], totals[item],
'Doing action {0} [{1}/{2}]'
.format(item, counters[item], totals[item]))
if len(log_messages):
lev, msg = log_messages.pop(0)
logger.log(lev, msg)
if fail and _should_fail():
raise RuntimeError(
'This is a simulated exception in the middle of the loop')
if skip and _should_fail():
raise SkipBuild(
'This is a simulated skip in the middle of the loop')
if sleep_time:
time.sleep(sleep_time)
if skip:
# Make sure the job gets skipped
raise SkipBuild('This build should be skipped!')
if fail:
# Make sure the job fails
raise RuntimeError('This is a simulated exception')
return retval
[docs]def job_with_logging():
logger = logging.getLogger('jobcontrol.utils.testing.job_with_logging')
logger.setLevel(logging.DEBUG)
logger.debug('This is a debug message')
logger.info('This is an info message')
logger.warning('This is a warning message')
logger.error('This is an error message')
logger.critical('This is a critical message')
try:
raise ValueError('Foobar')
except:
logger.exception('This is an exception message')
[docs]def job_with_tracer_log():
from jobcontrol.globals import execution_context
logger = logging.getLogger(__name__)
logger.info('Message from job={0}, build={1}'
.format(execution_context.job_id,
execution_context.build_id))
pass
[docs]def job_failing_once():
"""
This job will fail exactly once; retry will be successful
"""
from jobcontrol.globals import current_job
exec_count = len(list(current_job.iter_runs()))
if exec_count <= 1:
# This is the first run
raise RuntimeError("Simulating failure")
return exec_count
[docs]def job_echo_config(*args, **kwargs):
"""
Simple job, "echoing" back the current configuration.
"""
from jobcontrol.globals import current_job, current_build
return {
'args': args,
'kwargs': kwargs,
'build_id': current_build.id,
'job_id': current_job.id,
'dependencies': current_build.config['dependencies'],
'config': current_build.config,
}
[docs]class RecordingLogHandler(logging.Handler):
"""Log handler that records messages"""
def __init__(self):
super(RecordingLogHandler, self).__init__()
self._messages = []
[docs] def flush(self):
pass # Nothing to flush!
[docs] def emit(self, record):
self._messages.append(record)
[docs] def print_messages(self):
from nicelog.formatters import ColorLineFormatter
formatter = ColorLineFormatter(
show_date=False, show_function=False, show_filename=False,
message_inline=True)
for msg in self._messages:
print(formatter.format(msg))
[docs] def clear_messages(self):
self._messages = []
[docs]class NonSerializableObject(object):
__slots__ = ['foo', 'bar']
def __init__(self):
self.foo = 'foo'
self.bar = 'bar'
[docs]class NonSerializableException(Exception):
def __init__(self):
super(NonSerializableException, self).__init__()
self.nso = NonSerializableObject()
[docs]def job_returning_nonserializable():
return NonSerializableObject()
[docs]def job_raising_nonserializable():
raise NonSerializableException()