Download:
child 159:c233397bcb7e
parent 157:1770e08a2431
158:e5f7af8bbcbe
Anton Shestakov <av6@dwimlabs.net>, Mon, 18 Jul 2016 14:04:50 +0800
queue: worker queue for running checks

1 файлов изменено, 90 вставок(+), 0 удалений(-) [+]
worker-queue.py file | annotate | diff | comparison | revisions
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/worker-queue.py Mon Jul 18 14:04:50 2016 +0800
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+import json
+import logging
+import sys
+from argparse import ArgumentParser, FileType, SUPPRESS
+from os.path import exists
+from subprocess import check_output, STDOUT
+from time import sleep
+
+import redis
+import yaml
+
+from candolint.queue import loop
+from candolint.utils import lookup_option, rel, timestamp
+
+
+def handle(isolation, rconn, change):
+ payload = json.loads(change)
+
+ config = rel('projects/{source}/{repo}.yml'.format(**payload))
+
+ if not exists(config):
+ logging.warn("Skipping %s (file doesn't exist: %r)", payload['repo'], config)
+ return
+
+ logging.info('Checking %s (payload: %r)', config, payload)
+ if isolation == 'none':
+ output = check_output([
+ sys.executable, '-u', rel('checker.py'), config, payload['change']
+ ], stderr=STDOUT)
+ elif isolation == 'docker':
+ output = check_output([
+ rel('check-in-docker.sh'), config, payload['change']
+ ], stderr=STDOUT)
+ logging.info('Checked %s', config)
+
+ payload.update({
+ 'output': output,
+ 'timestamp': timestamp()
+ })
+ rconn.rpush('candolint:queue:incoming', json.dumps(payload))
+
+
+def main():
+ parser = ArgumentParser(argument_default=SUPPRESS)
+ parser.add_argument(
+ '-c', '--config', type=FileType('r'),
+ help='configuration file (YAML)')
+ parser.add_argument(
+ '-i', '--isolation', choices=('none', 'docker'), default='docker',
+ help='select isolation level for the checker')
+ parser.add_argument(
+ '-d', '--debug', action='store_true', default=False,
+ help='enable debugging output')
+
+ group = parser.add_argument_group(
+ 'queue configuration',
+ 'these options that can also be specified in the configuration file')
+ group.add_argument('--redis-host', help='(default: 127.0.0.1)')
+ group.add_argument('--redis-port', type=int, help='(default: 6379)')
+ group.add_argument('--redis-password')
+
+ args = parser.parse_args()
+ config = yaml.safe_load(args.config) if hasattr(args, 'config') else {}
+
+ rhost = lookup_option('redis-host', args, config, default='127.0.0.1')
+ rport = lookup_option('redis-port', args, config, default=6379)
+ rpass = lookup_option('redis-password', args, config)
+
+ logging.basicConfig(
+ level=logging.DEBUG if args.debug else logging.INFO,
+ format='%(asctime)s %(levelname)-8s %(message)s')
+
+ def wrapper(rconn, change):
+ return handle(args.isolation, rconn, change)
+
+ while True:
+ logging.debug('Connecting to Redis server %s:%d', rhost, rport)
+ rconn = redis.StrictRedis(host=rhost, port=rport, password=rpass, db=0)
+ logging.info('Connected to Redis server')
+
+ try:
+ loop(rconn, 'candolint:queue:changes', wrapper)
+ except redis.exceptions.ConnectionError:
+ logging.warn('Connection to Redis lost, reconnecting in 1 minute')
+ sleep(60)
+
+
+if __name__ == '__main__':
+ main()