--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/candolint/queue.py Sun Jul 17 23:25:29 2016 +0800
+from __future__ import absolute_import
+def loop(rconn, keys, handler):
+ key, value = rconn.blpop(keys)
+ except KeyboardInterrupt:
+ logging.error('%r (while handling %r)', e, value)
+ failures.setdefault(fkey, 0)
+ logging.error('Dropping this value (too many errors)')
+ logging.info('Saving this value for later')
+ rconn.rpush(key, value)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/incoming-queue.py Sun Jul 17 23:25:29 2016 +0800
+from argparse import ArgumentParser, FileType, SUPPRESS
+from os import fdopen, remove
+from subprocess import check_call
+from tempfile import mkstemp
+from candolint.queue import loop
+from candolint.utils import lookup_option, rel
+def handle(rconn, incoming):
+ payload = json.loads(incoming)
+ outfd, outfn = mkstemp('.txt', '{repo}.{timestamp}.'.format(**payload))
+ logging.debug('Writing output to %s', outfn)
+ with fdopen(outfd, 'wb') as outf:
+ outf.write(payload['output'].encode('utf-8'))
+ logging.debug('Running %s incoming.py %r', sys.executable, outfn)
+ check_call([sys.executable, rel('incoming.py'), outfn])
+ logging.info('Processed file %s', outfn)
+ parser = ArgumentParser(argument_default=SUPPRESS)
+ '-c', '--config', type=FileType('r'),
+ help='configuration file (YAML)')
+ '-d', '--debug', action='store_true', default=False,
+ help='enable debugging output')
+ group = parser.add_argument_group(
+ 'these options that can also be specified in the configuration file')
+ group.add_argument('--redis-host', help='(default: 127.0.0.1)')
+ group.add_argument('--redis-port', type=int, help='(default: 6379)')
+ group.add_argument('--redis-password')
+ args = parser.parse_args()
+ config = yaml.safe_load(args.config) if hasattr(args, 'config') else {}
+ rhost = lookup_option('redis-host', args, config, default='127.0.0.1')
+ rport = lookup_option('redis-port', args, config, default=6379)
+ rpass = lookup_option('redis-password', args, config)
+ level=logging.DEBUG if args.debug else logging.INFO,
+ format='%(asctime)s %(levelname)-8s %(message)s')
+ logging.debug('Connecting to Redis server %s:%d', rhost, rport)
+ rconn = redis.StrictRedis(host=rhost, port=rport, password=rpass, db=0)
+ logging.info('Connected to Redis server')
+ loop(rconn, 'candolint:queue:incoming', handle)
+ except redis.exceptions.ConnectionError:
+ logging.warn('Connection to Redis lost, reconnecting in 1 minute')
+if __name__ == '__main__':