Download:
child 154:e9b7e35f97bb
parent 152:48220f17a776
153:c50882dabe4f
Anton Shestakov <av6@dwimlabs.net>, Sun, 17 Jul 2016 23:25:29 +0800
queue: add incoming queue handler

2 файлов изменено, 94 вставок(+), 0 удалений(-) [+]
candolint/queue.py file | annotate | diff | comparison | revisions
incoming-queue.py file | annotate | diff | comparison | revisions
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/candolint/queue.py Sun Jul 17 23:25:29 2016 +0800
@@ -0,0 +1,24 @@
+from __future__ import absolute_import
+
+import logging
+
+
+def loop(rconn, keys, handler):
+ failures = {}
+ while True:
+ key, value = rconn.blpop(keys)
+ try:
+ handler(rconn, value)
+ except KeyboardInterrupt:
+ raise
+ except Exception as e:
+ logging.error('%r (while handling %r)', e, value)
+ fkey = hash(value)
+ failures.setdefault(fkey, 0)
+ failures[fkey] += 1
+ if failures[fkey] > 3:
+ logging.error('Dropping this value (too many errors)')
+ failures.pop(fkey)
+ else:
+ logging.info('Saving this value for later')
+ rconn.rpush(key, value)
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/incoming-queue.py Sun Jul 17 23:25:29 2016 +0800
@@ -0,0 +1,70 @@
+#!/usr/bin/env python
+import json
+import logging
+import sys
+from argparse import ArgumentParser, FileType, SUPPRESS
+from os import fdopen, remove
+from subprocess import check_call
+from tempfile import mkstemp
+from time import sleep
+
+import redis
+import yaml
+
+from candolint.queue import loop
+from candolint.utils import lookup_option, rel
+
+
+def handle(rconn, incoming):
+ payload = json.loads(incoming)
+ outfd, outfn = mkstemp('.txt', '{repo}.{timestamp}.'.format(**payload))
+ logging.debug('Writing output to %s', outfn)
+ with fdopen(outfd, 'wb') as outf:
+ outf.write(payload['output'].encode('utf-8'))
+ logging.debug('Running %s incoming.py %r', sys.executable, outfn)
+ check_call([sys.executable, rel('incoming.py'), outfn])
+ logging.info('Processed file %s', outfn)
+ remove(outfn)
+
+
+def main():
+ parser = ArgumentParser(argument_default=SUPPRESS)
+ parser.add_argument(
+ '-c', '--config', type=FileType('r'),
+ help='configuration file (YAML)')
+ parser.add_argument(
+ '-d', '--debug', action='store_true', default=False,
+ help='enable debugging output')
+
+ group = parser.add_argument_group(
+ 'queue configuration',
+ 'these options that can also be specified in the configuration file')
+ group.add_argument('--redis-host', help='(default: 127.0.0.1)')
+ group.add_argument('--redis-port', type=int, help='(default: 6379)')
+ group.add_argument('--redis-password')
+
+ args = parser.parse_args()
+ config = yaml.safe_load(args.config) if hasattr(args, 'config') else {}
+
+ rhost = lookup_option('redis-host', args, config, default='127.0.0.1')
+ rport = lookup_option('redis-port', args, config, default=6379)
+ rpass = lookup_option('redis-password', args, config)
+
+ logging.basicConfig(
+ level=logging.DEBUG if args.debug else logging.INFO,
+ format='%(asctime)s %(levelname)-8s %(message)s')
+
+ while True:
+ logging.debug('Connecting to Redis server %s:%d', rhost, rport)
+ rconn = redis.StrictRedis(host=rhost, port=rport, password=rpass, db=0)
+ logging.info('Connected to Redis server')
+
+ try:
+ loop(rconn, 'candolint:queue:incoming', handle)
+ except redis.exceptions.ConnectionError:
+ logging.warn('Connection to Redis lost, reconnecting in 1 minute')
+ sleep(60)
+
+
+if __name__ == '__main__':
+ main()