Source code for jobcontrol.cli

import ast

import click
from flask.config import Config
import logging
from nicelog.formatters import ColorLineFormatter
from prettytable import PrettyTable
import sys

from jobcontrol.core import JobControl
from jobcontrol.utils import (
    get_storage_from_url, get_storage_from_config, short_repr,
    json_dumps)


logger = logging.getLogger('')
logger.setLevel(logging.DEBUG)

handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(ColorLineFormatter(
    show_date=True, show_function=True, show_filename=True,
    message_inline=False))
handler.setLevel(logging.DEBUG)

logger.addHandler(handler)


[docs]def cli_main(jc_app): pass
config = None jc = None output_fmt = None DATE_FMT = '%Y-%m-%d %H:%M' def _fmt_date(dt): if dt is None: return '' return dt.strftime(DATE_FMT) def _fmt_bool(val, inv=False): col = '\x1b[32m' if bool(val) ^ bool(inv) else '\x1b[31m' return '{0}{1}\x1b[0m'.format(col, val) def _fmt_progress(cur, tot): if tot == 0: return 'N/A' return '{0}/{1} ({2:.1f}%)'.format(cur, tot, cur * 100.0 / tot) @click.group() @click.option('--config-file', metavar='FILE', help='Path to YAML configuration file') # @click.option('--storage', metavar='FILE', help='Storage URL') @click.option('--format', 'outfmt', default='human', help='Output format. "human" or "json" (default: "human").', type=click.Choice(('json', 'human'))) def cli_main_grp(config_file, outfmt): # todo: use pass_context for passing context instead of global objects? global jc, output_fmt output_fmt = outfmt if config_file is None: raise ValueError('Configuration file missing') jc = JobControl.from_config_file(config_file) @cli_main_grp.command() def install(): jc.storage.install() @cli_main_grp.command() def uninstall(): jc.storage.uninstall() @cli_main_grp.command() @click.option('--function', help="Function to be called", required=True) @click.option('--args', help="Arguments, as a Python tuple") @click.option('--kwargs', help="Keyword arguments, as a Python dict") @click.option('--dependencies', help="Comma-separated list of job ids") def create_job(function, args, kwargs, dependencies): args = ast.literal_eval(args) if args else () kwargs = ast.literal_eval(kwargs) if kwargs else {} if dependencies: dependencies = [int(x) for x in dependencies.split(',')] else: dependencies = [] retval = jc.storage.create_job(function, args=args, kwargs=kwargs, dependencies=dependencies) if output_fmt == 'human': click.echo('Job id: {0}'.format(retval)) elif output_fmt == 'json': click.echo(json_dumps({'id': retval})) else: raise AssertionError('Invalid output format') @cli_main_grp.command() @click.argument('job_id', type=click.INT) @click.option('--function', help="Function to be called") @click.option('--args', help="Arguments, as a Python tuple") @click.option('--kwargs', help="Keyword arguments, as a Python dict") @click.option('--dependencies', help="Comma-separated list of job ids") def update_job(job_id, function, args, kwargs, dependencies): _kwargs = {} if function is not None: _kwargs['function'] = function if args is not None: _kwargs['args'] = ast.literal_eval(args) if kwargs is not None: _kwargs['kwargs'] = ast.literal_eval(kwargs) if dependencies is not None: _kwargs['dependencies'] = [int(x) for x in dependencies.split(',')] jc.storage.update_job(job_id, **_kwargs) @cli_main_grp.command() @click.argument('job_id') def show_job(job_id): job = jc.get_job(job_id) # job['reverse_dependencies'] = [ # x['id'] for x in jc.storage.get_job_revdeps(job['id'])] if output_fmt == 'human': click.echo('Job id: {0}'.format(job.id)) click.echo('Title: {0}'.format(job['title'])) click.echo('Function: {0}'.format(job['function'])) click.echo('Args:\n {0!r}'.format(job['args'])) click.echo('Kwargs:\n {0!r}'.format(job['kwargs'])) click.echo('Dependencies:') for dep in job.get_deps(): click.echo(' - {0} - {1!r}'.format(dep.id, dep['title'])) click.echo('Reverse dependencies:') for dep in job.get_revdeps(): click.echo(' - {0} - {1!r}'.format(dep.id, dep['title'])) click.echo('') # Blank line click.echo('Status: {0}'.format(job.get_status())) lsb = job.get_latest_successful_build() click.echo('Latest successful build:') if lsb: click.echo(' Build id: {0}'.format(lsb.id)) click.echo(' Started: {0}'.format(lsb['start_time'])) click.echo(' Finished: {0}'.format(lsb['end_time'])) else: click.echo(' No successful builds') # table = PrettyTable(['Key', 'Value']) # table.align.update({'Key': 'r', 'Value': 'l'}) # table.add_row(("Job id:", job['id'])) # table.add_row(("Created:", _fmt_date(job['ctime']))) # table.add_row(("Updated:", _fmt_date(job['mtime']))) # table.add_row(("Function:", job['function'])) # table.add_row(("args:", job['args'])) # table.add_row(("kwargs:", job['kwargs'])) # table.add_row(("Deps:", job['dependencies'])) # table.add_row(("Rev. deps:", job['reverse_dependencies'])) # click.echo(table) elif output_fmt == 'json': raise NotImplementedError # click.echo(json_dumps(job)) else: raise AssertionError('Invalid output format') @cli_main_grp.command() def list_jobs(): jobs = list(jc.iter_jobs()) if output_fmt == 'human': table = PrettyTable( ['Id', 'Title', 'Function']) for item in jobs: table.add_row([ item['id'], item['title'], item['function'], ]) click.echo(table) elif output_fmt == 'json': # todo: serialize datetimes click.echo(json_dumps(jobs)) else: raise AssertionError('Invalid output format') @cli_main_grp.command() @click.argument('job_id', type=click.INT) @click.option('--started', type=click.BOOL) @click.option('--finished', type=click.BOOL) @click.option('--success', type=click.BOOL) @click.option('--skipped', type=click.BOOL) @click.option('--order', type=click.Choice(('asc', 'desc')), default='asc') @click.option('--limit', type=click.INT, default=100) def list_builds(job_id, started, finished, success, skipped, order, limit): builds = jc.storage.get_job_builds( job_id, started=started, finished=finished, success=success, skipped=skipped, order=order, limit=limit) if output_fmt == 'human': table = PrettyTable( ['Job id', 'Build id', 'Start time', 'End time', 'Started', 'Finished', 'Success', 'Skipped', 'Progress', 'Return value', 'Exception']) for item in builds: table.add_row([ item['job_id'], item['id'], item['start_time'], item['end_time'], _fmt_bool(item['started']), _fmt_bool(item['finished']), _fmt_bool(item['success']), _fmt_bool(item['skipped'], inv=True), _fmt_progress(item['progress_current'], item['progress_total']), item['retval'], item['exception'], ]) click.echo(table) elif output_fmt == 'json': click.echo(json_dumps(list(builds))) else: raise AssertionError('Invalid output format') @cli_main_grp.command() @click.argument('build_id', type=click.INT) def get_build(build_id): pass @cli_main_grp.command() @click.argument('build_id', type=click.INT) def delete_build(build_id): pass @cli_main_grp.command() @click.argument('job_id', type=click.INT) def build_job(job_id): """Run a new build for a job""" build_id = jc.build_job(job_id) click.echo('Build id: {0}'.format(build_id)) @cli_main_grp.command() @click.option('--host', help='Server host', default='127.0.0.1') @click.option('--port', type=click.INT, help='Server port', default=5000) @click.option('--debug/--no-debug', help='Whether to enable debug mode (reloader, etc.)', default=False) def web(host, port, debug): """Run the web API service""" from jobcontrol.web.app import app if 'webapp' in jc.config: app.config.update(jc.config['webapp']) server_port = port or app.config.get('PORT') or 5000 # todo: figure out a better way to pass context.. app.config['JOBCONTROL'] = jc app.run(port=server_port, debug=debug, host=host) @cli_main_grp.command() @click.option('--broker', metavar='URL', help='Broker URL') def worker(broker): """Run the Celery worker""" # from jobcontrol.async.tasks import app as celery_app import jobcontrol.core # should set up logging.. # noqa # nope... :( celery_app = jc.get_celery_app() # celery_app.conf.JOBCONTROL = jc if broker: celery_app.conf.BROKER_URL = broker # todo: allow passing arguments celery_app.worker_main(argv=['jobcontrol-cli']) @cli_main_grp.command() @click.argument('job_id', type=click.INT) @click.option('--broker', metavar='URL', help='Broker URL', default='redis://localhost:6379') def async_build_job(job_id, broker): """Run a new build for a job, via Celery""" from jobcontrol.async.tasks import app as celery_app, build_job celery_app.conf.JOBCONTROL = jc celery_app.conf.BROKER_URL = broker build_job.delay(job_id)
[docs]def main(): cli_main_grp(obj={})