Welcome to JobControl’s documentation!¶
User guide¶
Just click those buttons ;)
System administrator guide¶
Configuration¶
The main configuration file is written in YAML and pre-processed through Jinja, to allow things like defining variables, macros, etc.
Storage¶
Define an URL pointing to the storage (for build status).
storage: "postgresql://jobcontrol_test:test@localhost:5432/jc-harvester-141125"
Webapp¶
Configuration for the web application.
Uppercase names will be merged with standard Flask configuration.
webapp:
PORT: 5050
DEBUG: False
Celery¶
Configuration for Celery (the asynchronous task running library).
See all the possible configuration options here: http://docs.celeryproject.org/en/latest/configuration.html
celery:
BROKER_URL: "redis://localhost:6379"
Jobs¶
Job definition is a list of objects like this:
id: some_job_id
title: "Some job title here"
function: mypackage.mymodule:myfunction
args:
- spam
- eggs
- bacon
kwargs:
foobar: 'Something completely different'
blah: !retval 'some_other_job'
dependencies: ['some_other_job']
..which tells JobControl to run something roughly equivalent to:
from mypackage.mymodule import myfunction
myfunction('spam', 'eggs', 'bacon',
foobar='Something completely different',
blah=get_return_value('some_other_job'))
Where the (immaginary) get_return_value() function returns the return value from the latest successful build of the specified job (which must be amongst the job dependencies).
Planned job configuration keys¶
- protect boolean indicating whether this job must be “protected”: by “protect” we mean “from accidental mistakes”; for example, it would be handy to prevent accidental builds of jobs that import things in production websites. If this flag is set, the “quick build” feature will be disabled and the build form submit button will need “arming” (by clicking another button) before being actually usable.
- cleanup indicate a function to be called on build deletion to clean up any data stored externally. That function requires access to the build status, eg. in order to get a pointer to the storage containing the data.
Example macros¶
For example, let’s say we want to “crawl” and “process” a bunch of websites.
We could use a macro like this to keep repetitions at minimum:
{% macro process_website(name, url) %}
- id: crawl_{{ name }}
title: "Crawl {{ url }}"
function: mycrawler:crawl
kwargs:
storage: postgresql://.../crawled_data_{{ name }}
- id: process_{{ name }}
title: "Process {{ url }}"
function: mycrawler:process
kwargs:
input_storage: !retval crawl_{{ name }}
storage: postgresql://.../processed_data_{{ name }}
{% endmacro %}
jobs:
{{ process_website('example_com', 'http://www.example.com') }}
{{ process_website('example_org', 'http://www.example.org') }}
{{ process_website('example_net', 'http://www.example.net') }}
Will get expanded to:
jobs:
- id: crawl_example_com
title: "Crawl http://www.example.com"
function: mycrawler:crawl
kwargs:
storage: postgresql://.../crawled_data_example_com
- id: process_example_com
title: "Process http://www.example.com"
function: mycrawler:process
kwargs:
input_storage: !retval crawl_example_com
storage: postgresql://.../processed_data_example_com
- id: crawl_example_org
title: "Crawl http://www.example.org"
function: mycrawler:crawl
kwargs:
storage: postgresql://.../crawled_data_example_org
- id: process_example_org
title: "Process http://www.example.org"
function: mycrawler:process
kwargs:
input_storage: !retval crawl_example_org
storage: postgresql://.../processed_data_example_org
- id: crawl_example_net
title: "Crawl http://www.example.net"
function: mycrawler:crawl
kwargs:
storage: postgresql://.../crawled_data_example_net
- id: process_example_net
title: "Process http://www.example.net"
function: mycrawler:process
kwargs:
input_storage: !retval crawl_example_net
storage: postgresql://.../processed_data_example_net
Warning
Mind the indentation! The best way is to use the desired final indentation in the macro definition, then call the macro at “zero” indentation level.
Command-line interface¶
All the operations can be run through the “jobcontrol-cli” command.
It is self-documented: running jobcontrol-cli --help will give information on available commands; jobcontrol-cli <command> --help will give usage information on a specific command.
Installing database schema¶
jobcontrol-cli --config-file myconfig.yaml install
Uninstalling database schema¶
Warning
This will drop all tables without any further warning!
jobcontrol-cli --config-file myconfig.yaml uninstall
Running the web app¶
Note
For production mode, the application should be run via a proper WSGI container, such as gunicorn or uWSGI.
jobcontrol-cli --config-file myconfig.yaml web --port 5050 --debug
Deployment instructions¶
Requisites:
- Python 2.7 (2.6 should work but it’s untested)
- PostgreSQL 9.1+ (tested on 9.4 but older 9.x versions should do)
- Redis (any recent version should do; tested on 2.8.17)
Steps:
Create a PostgreSQL database for jobcontrol
Install jobcontrol in a virtualenv:
virtualenv jobcontrol pip install jobcontrol
Create database tables:
jobcontrol-cli --config-file path/to/conf.yaml install
Launch the webapp:
jobcontrol-cli --config-file path/to/conf.yaml web --port 5050
Start redis server:
redis-server
Launch the celery worker:
jobcontrol-cli --config-file path/to/conf.yaml worker
Visit http://127.0.0.1:5050
Enjoy!
todo¶
- Give some better details for production deployment, eg.
Developer guide¶
Writing a job function¶
First rule: keep it simple¶
That is, for basic usage, you don’t have to do anything “fancy”.
Just create a Python function, drop it inside a module somewhere in the path of the interpreter running jobcontrol, list it in the configuration file and that’s it.
An example project can be found here: https://github.com/rshk/ckan_crawl_demo
Note
Although not strictly necessary, it is a good practice to create a setup.py in order to make your project properly installable, then install it in your virtualenv using pip install ... or python setup.py install.
Logging messages¶
Just use the standard Python logging facilities:
import logging
logger = logging.getLogger(__name__)
logger.info('Hello, world')
logger.warning('Aw, snap!')
logger.error('Dammit!!')
Reporting progress¶
Unluckily Python doesn’t provide any facility to “report progress”, so we had to implement our own. But no fear, as it gets as simple as:
from jobcontrol.globals import current_app
current_app.report_progress(None, 20, 100) # 20%
Ok, let me explain the arguments a bit more in detail:
The first one, group_name, is used for building “trees” of progress reports. It can be either None, indicating the top level, or a tuple of name “parts”, used to build the tree.
For example, let’s suppose we need to perform two different “kinds” of steps in our function: first we want to download a bunch of web pages, then we want to extract links from all of them and import to somewhere.
The first iteration will report progress like this:
current_app.report_progress(('Download webpages',), current, total)
The second one:
current_app.report_progress(('Extracting links',), current, total)
This will create three progress bars on the UIs, pretty much like this:
[0/20] Total |-- [0/10] Downloading webpages '-- [0/10] Extracting links
Multiple name parts can be used like this:
current_ap.report_progress(('http://example.com/foo.zip', 'downloading'), ...) current_ap.report_progress(('http://example.com/foo.zip', 'extracting'), ...) current_ap.report_progress(('http://example.com/bar.zip', 'downloading'), ...) current_ap.report_progress(('http://example.com/bar.zip', 'extracting'), ...)
Will generate the following progress bars:
[../400] Total |-- [../200] http://www.example.com/foo.zip | |-- [../100] downloading | '-- [../100] extracting '-- [../200] http://www.example.com/bar.zip |-- [../100] downloading '-- [../100] extracting
(And, of course, intermediate “branches” can be overridden by specifying them manually)
The second and third ones, current and total must be integers indicating, respectively, the current amount of items completed and the total number of items.
A fourth optional argument, status_line, may be used to report a (brief) description of what’s currently going on (eg, "Downloading http://www.example.com")
Generator functions¶
Warning
Generator functions are not supported yet, that means, they will be executed, a generator will be obtained and stored (not sure it can be pickled, though..) but it will not be iterated, meaning the execution will have no effect whatsoever.
If you really need to run a generator function, just wrap it in something like list(myfunction()).
Note
There are future plans of changing this, probably using generator functions to return “multiple” values that can be then used for “parametrized” builds..
Internals documentation¶
jobcontrol.config¶
Objects to manage the configuration.
The configuration object (stored as YAML in the configuration file) must be a dict. Supported keys for the “main” dict are:
- storage: URL to a supported “state” storage
- webapp: Configuration for the webapp, passed to Flask
- celery: Configuration for celery
- jobs: List of job configuration blocks
- secret: Dictionary of “secrets”, which can be referenced by the configuration but are never shown on administration pages, ...
- class jobcontrol.config.JobControlConfig(initial=None)[source]¶
- classmethod from_file(filename)[source]¶
Initialize configuration from a file, or a file-like providing a read() method.
- classmethod from_string(s)[source]¶
Initialize configuration from a string.
The string will first be pre-processed through jinja, then passed to the from_object() constructor.
- get_job(job_id)¶
jobcontrol.core¶
Objects responsible for JobControl core functionality.
Note
Important objects from this module should be imported in main __init___, in order to “abstract away” the namespace and have them in a more nicely accessible place.
- class jobcontrol.core.JobControl(storage, config)[source]¶
The main JobControl class.
Parameters: - storage – A valid storage for the builds state. Must be an instance of a jobcontrol.interfaces.StorageBase subclass (or a compatible one).
- config – A jobcontrol.config.JobControlConfig instance, or a dict which will be passed to that class constructor.
- classmethod from_config_file(config_file)[source]¶
Initialize JobControl by loading configuration from a file. Will also initialize storage taking values from the configuration.
Parameters: config_file – Path to configuration file, or an open file descriptor (or file-like object). Returns: a JobControl instance
- classmethod from_config(config)[source]¶
Initialize JobControl from some configuration.
Parameters: config – Either a jobcontrol.config.JobControlConfig instance, or a dict to be passed as argument to that class constructor. Returns: a JobControl instance
- get_job(job_id)[source]¶
Get a job, by id.
Parameters: job_id – The job id Returns: a JobInfo class instance associated with the requested job. Raises: jobcontrol.exceptions.NotFound if a job with that id was not found in the configuration.
- iter_jobs()[source]¶
Generator yielding all the jobs, one by one.
Yields: for each job, a JobInfo class instance associated with the job.
- get_build(build_id)[source]¶
Get a build, by id.
Parameters: build_id – The build id Returns: a BuildInfo instance associated with the build. Raises: jobcontrol.exceptions.NotFound if a build with that id was not found in the configuration.
- create_build(job_id)[source]¶
Create a build, from a job configuration.
Note
Currently, we require that all the dependencies have already been built; in the future, it will be possible to build them automatically.
Note
Also, current implementation doesn’t allow for customizations to either the job configuration nor the build one (pinning, dep/revdep building, ...).
Parameters: job_id – Id of the job for which to start a build
Returns: a BuildInfo instance associated with the newly created build.
Raises: - jobcontrol.exceptions.NotFound if the specified job was not found.
- jobcontrol.exceptions.MissingDependencies if any required dependency has no successful build.
- build_job(job_id)[source]¶
Create and run a new build for the specified job.
This is simply a shortcut that runs create_build() then run_build(). (Mostly for compatibility reasons).
Returns: a BuildInfo instance associated with the newly created build.
- run_build(build_id)[source]¶
Actually run a build.
- take the build configuration
- make sure all the dependencies are built
- take return values from the dependencies -> pass as arguments
- run the build
- build the reverse dependencies as well, if required to do so
Parameters: build_id – either a BuildInfo instance, or a build id
- report_progress(group_name, current, total, status_line='')[source]¶
Report progress for the currently running build.
Parameters: - group_name – The report “group name”: either a tuple representing the “path”, or None for the top-level.
- current – Current progress
- total – Total progress
- status_line – An optional line of text, describing the currently running operation.
- class jobcontrol.core.JobExecutionContext(app, job_id, build_id)[source]¶
Class to hold “global” context during job execution.
This class can also act as a context manager for temporary context:
with JobExecutionContext(app, job_id, build_id): pass # do stuff in an execution context
Parameters: - app – The JobControl instance running jobs
- job_id – Id of the currently running job
- build_id – Id of the currently running build
- class jobcontrol.core.JobControlLogHandler[source]¶
Logging handler sending messages to the appropriate JobControl instance that will dispatch them to storage.
- class jobcontrol.core.JobInfo(app, job_id, config)[source]¶
High-level interface to jobs
- get_status()[source]¶
Return a label describing the current status of the job.
Returns: - 'not_built' the job has no builds
- 'running' the job has running builds
- 'success' the job has at least a successful build
- 'failed' the job only has failed builds
- 'outdated' the job has at least a successful build, but older than one dependency build
- iter_builds(*a, **kw)[source]¶
Iterate over builds for this job.
Accepts the same arguments as jobcontrol.interfaces.StorageBase.get_job_builds()
Yields: BuildInfo instances
- get_latest_successful_build()[source]¶
Get latest successful build for this job, if any. Otherwise, returns None.
- get_conf_as_yaml()[source]¶
Return the job configuration as serialized YAML, mostly for displaying on user interfaces.
- class jobcontrol.core.BuildInfo(app, build_id, info=None)[source]¶
High-level interface to builds.
Parameters: - app – The JobControl instance this build was retrieved from
- build_id – The build id
- info – Optionally, this can be used to pre-populate the build information (useful, eg. if we are retrieving a bunch of builds from the database at once).
- app¶
- build_id¶
- info[source]¶
Property used to lazily access the build attributes.
Returns a dict with the following keys:
- 'id'
- 'job_id'
- 'start_time'
- 'end_time'
- 'started'
- 'finished'
- 'success'
- 'skipped'
- 'config'
- 'retval'
- 'exception'
- 'exception_tb'
- descriptive_status[source]¶
Return a label describing the current status of the build.
Returns: - 'CREATED' if the build was not started yet
- 'RUNNING' if the build was started but did not finish
- 'SUCCESSFUL' if the build run with success
- 'SKIPPED' if the build was skipped
- 'FAILED' if the build execution failed
jobcontrol.exceptions¶
This module contains the exceptions used by JobControl.
- exception jobcontrol.exceptions.NotFound[source]¶
Exception used to indicate something was not found. Pretty generic, but useful for returning 404s..
- exception jobcontrol.exceptions.MissingDependencies[source]¶
Exception used to indicate a build dependency was not met (i.e. job has no successful builds).
jobcontrol.globals¶
jobcontrol.interfaces¶
Interfaces for NEW jobcontrol objects.
Data model:
Build id SERIAL
----- job_id TEXT
start_time TIMESTAMP
end_time TIMESTAMP
started BOOLEAN
finished BOOLEAN
success BOOLEAN
skipped BOOLEAN
config BINARY (pickled)
Copy of the job configuration whan the build was started,
along with build-specific configuration (such as pinning)
retval BINARY (pickled)
exception BINARY (pickled)
Pickled exception object (or None)
exception_tb BINARY (pickled)
Pickled TracebackInfo object
Build progress
--------------
build_id INTEGER (references Build.id)
group_name VARCHAR(128)
Name of the "progress group" (separated by '::')
current INTEGER
Current progress value
total INTEGER
Total progress value
status_line TEXT
An optional line of text describing current state
UNIQUE constraint on (build_id, group_name)
Log id SERIAL
--- build_id INTEGER (references Build.id)
created TIMESTAMP
level INTEGER
record BINARY (pickled)
Pickled "custom" LogRecord object
exception_tb BINARY
Pickled TracebackInfo object
Job configuration:
The job configuration is stored as a YAML-serialized dict.
Recognised keys are:
- function in module:function format, specify the function to be called
- args a list of arguments to be passed to the function
- kwargs a dict of keyword arguments to be passed to the function
- title a descriptive title, to be shown on the interfaces
- notes notes, to be shown in interfaces (in restructured text)
- dependencies list of dependency job names
Additionally, args/kwargs may contain references to return value of dependency builds, by using the !retval <name> syntax.
Exception traceback serialization
To be used both in build records and associated with log messages containing an exception.
We want to include the following information:
- Details about the call stack, as in normal tracebacks: filename, line number, function name, line of code (plus some context)
- Local variables: we are not guaranteed we can safely pickle / unpickle arbitrary values; moreover this might result in huge fields, etc. So our better chance is to just store a dictionary mapping names to repr()s of the values (trimmed to a – large – maximum length, just to be on the safe side).
- class jobcontrol.interfaces.StorageBase[source]¶
-
- get_job_builds(job_id, started=None, finished=None, success=None, skipped=None, order='asc', limit=100)[source]¶
Iterate over all the builds for a job, sorted by date, according to the order specified by order.
Parameters: - job_id – The job id
- started – If set to a boolean, filter on the “started” field
- finished – If set to a boolean, filter on the “finished” field
- success – If set to a boolean, filter on the “success” field
- skipped – If set to a boolean, filter on the “skipped” field
- order – ‘asc’ (default) or ‘desc’
- limit – only return the first limit builds
Yield: Dictionaries representing build information
- create_build(job_id, config=None)[source]¶
Create a build.
Parameters: - job_id – The job for which a build should be started
- job_config – The job configuration (function, args, kwargs, ..) to be copied inside the object (we will use this from now on).
- build_config –
Build configuration, containing things like dependency build pinning, etc.
- dependency_builds: dict mapping job ids to build ids, or None to indicate “create a new build” for this job.
Returns: the build id
- get_build(build_id)[source]¶
Get information about a build.
Returns: the build information, as a dict
- finish_build(build_id, success=None, skipped=None, retval=None, exception=None, exception_tb=None)[source]¶
Register a build execution end.
- report_build_progress(build_id, current, total, group_name='', status_line='')[source]¶
Report progress for a build.
Parameters: - build_id – The build id for which to report progress
- current – The current number of “steps” done
- total – The total amount of “steps”
- group_name – Optionally, a name used to nest multiple progress “levels”. A tuple (or string separated by ‘::’ can be used to specify multiple “nesting” levels)
- status_line – Optionally, a line of text indicating the current build status.
- get_build_progress_info(build_id)[source]¶
Return progress information for a build.
Returns: a list of tuples: (name, current, total, status_line)
- get_latest_successful_build(job_id)[source]¶
Helper method to retrieve the latest successful build for a given job. Calls get_job_builds() in the background.
Returns: information about the build, as a dict
- prune_log_messages(job_id=None, build_id=None, max_age=None, level=None)[source]¶
Delete (old) log messages.
Parameters: - job_id – If specified, only delete messages for this job
- build_id – If specified, only delete messages for this build
- max_age – If specified, only delete log messages with an age greater than this one (in seconds)
- level – If specified, only delete log messages with a level equal or minor to this one
- iter_log_messages(build_id=None, max_date=None, min_date=None, min_level=None)[source]¶
Iterate over log messages, applying some filters.
Parameters: - build_id – If specified, only return messages for this build
- max_date – If specified, only return messages newer than this date
- min_date – If specified, only return messages older than this date
- min_level – If specified, only return messages with a level at least equal to this one
jobcontrol.utils¶
jobcontrol.utils.depgraph¶
Dependency graph exploration / resolution functions.
The dependency graph is represented as a dictionary of {<vertex>: [<dependencies>]}.
jobcontrol.utils.local¶
werkzeug.local¶
This module implements context-local objects.
copyright: |
|
---|---|
license: | BSD, see LICENSE for more details. |
- jobcontrol.utils.local.release_local(local)[source]¶
Releases the contents of the local for the current context. This makes it possible to use locals without a manager.
Example:
>>> loc = Local() >>> loc.foo = 42 >>> release_local(loc) >>> hasattr(loc, 'foo') False
With this function one can release Local objects as well as LocalStack objects. However it is not possible to release data held by proxies that way, one always has to retain a reference to the underlying local object in order to be able to release it.
New in version 0.6.1.
- class jobcontrol.utils.local.LocalStack[source]¶
This class works similar to a Local but keeps a stack of objects instead. This is best explained with an example:
>>> ls = LocalStack() >>> ls.push(42) >>> ls.top 42 >>> ls.push(23) >>> ls.top 23 >>> ls.pop() 23 >>> ls.top 42
They can be force released by using a LocalManager or with the release_local() function but the correct way is to pop the item from the stack after using. When the stack is empty it will no longer be bound to the current context (and as such released).
By calling the stack without arguments it returns a proxy that resolves to the topmost item on the stack.
New in version 0.6.1.
- class jobcontrol.utils.local.LocalProxy(local, name=None)[source]¶
Acts as a proxy for a werkzeug local. Forwards all operations to a proxied object. The only operations not supported for forwarding are right handed operands and any kind of assignment.
Example usage:
from werkzeug.local import Local l = Local() # these are proxies request = l('request') user = l('user') from werkzeug.local import LocalStack _response_local = LocalStack() # this is a proxy response = _response_local()
Whenever something is bound to l.user / l.request the proxy objects will forward all operations. If no object is bound a RuntimeError will be raised.
To create proxies to Local or LocalStack objects, call the object as shown above. If you want to have a proxy to an object looked up by a function, you can (as of Werkzeug 0.6.1) pass a function to the LocalProxy constructor:
session = LocalProxy(lambda: get_current_request().session)
Changed in version 0.6.1: The class can be instanciated with a callable as well now.
jobcontrol.utils.testing¶
- jobcontrol.utils.testing.testing_job(progress_steps=None, retval=None, fail=False, skip=False, log_messages=None, step_duration=0)[source]¶
Job used for testing purposes.
Parameters: - progress_steps –
A list of tuples: (<group_name>, <steps>), where “group_name” is a tuple of name “levels”, “steps” an integer representing how many steps should that level have.
Progress reports will be sent in randomized order.
- retval – The return value for the job.
- fail – Whether this job should fail.
- skip – Whether this job should be skipped.
- log_messages – A list of tuples: (level, message)
- step_duration – The time to sleep between steps, in milliseconds.
- progress_steps –
- jobcontrol.utils.testing.job_failing_once()[source]¶
This job will fail exactly once; retry will be successful
- jobcontrol.utils.testing.job_echo_config(*args, **kwargs)[source]¶
Simple job, “echoing” back the current configuration.
- class jobcontrol.utils.cached_property(func, name=None, doc=None)[source]¶
A decorator that converts a function into a lazy property. The function wrapped is called the first time to retrieve the result and then that calculated result is used the next time you access the value:
class Foo(object): @cached_property def foo(self): # calculate something important here return 42
The class has to have a __dict__ in order for this property to work.
- jobcontrol.utils.import_object(name)[source]¶
Import an object from a module, by name.
Parameters: name – The object name, in the package.module:name format. Returns: The imported object
- jobcontrol.utils.get_storage_from_url(url)[source]¶
Get a storage from URL.
Storages URLs are in the format:
- <scheme>://
- <class>+<scheme>:// Load <class>, pass the URL removing <class>+
- jobcontrol.utils.short_repr(obj, maxlen=50)[source]¶
Returns a “shortened representation” of an object; that is, the return value of repr(obj) limited to a certain length, with a trailing ellipsis '...' if text was truncated.
This function is mainly used in order to provide a nice representation of local variables in TracebackInfo objects
- jobcontrol.utils.trim_string(s, maxlen=1024, ellps='...')[source]¶
Trim a string to a maximum length, adding an “ellipsis” indicator if the string was trimmed
- class jobcontrol.utils.TracebackInfo[source]¶
Class used to hold information about an error traceback.
This is meant to be serialized & stored in the database, instead of a full traceback object, which is not serializable.
It holds information about:
- the exception that caused the thing to fail
- the stack frames (with file / line number, function and exact code around the point in which the exception occurred)
- a representation of the local variables for each frame.
A textual representation of the traceback information may be retrieved by using str() or unicode() on the object instance.
- class jobcontrol.utils.ProgressReport(name, current=None, total=None, status_line=None, children=None)[source]¶
Class used to represent progress reports.
It supports progress reporting on a multi-level “tree” structure; each level can have its own progress status, or it will generate it automatically by summing up values from children.