Download:
child 176:545d03db7218
parent 174:9ef694997649
175:6d53a5a4d517
Anton Shestakov <av6@dwimlabs.net>, Fri, 22 Jul 2016 20:13:28 +0800
queue: poller to replace hooks in hgweb for now (pretty hacky!)

1 файлов изменено, 140 вставок(+), 0 удалений(-) [+]
poll-hgweb-queue.py file | annotate | diff | comparison | revisions
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/poll-hgweb-queue.py Fri Jul 22 20:13:28 2016 +0800
@@ -0,0 +1,140 @@
+#!/usr/bin/env python
+import json
+import logging
+from argparse import ArgumentParser, FileType, SUPPRESS
+
+import redis
+import yaml
+from tornado.gen import coroutine
+from tornado.httpclient import AsyncHTTPClient
+from tornado.ioloop import IOLoop, PeriodicCallback
+from tornado.options import define, options
+from tornado.web import Application
+
+from candolint.utils import lookup_option, timestamp
+
+
+define('listen', metavar='IP', default='127.0.0.1')
+define('port', metavar='PORT', default=8035, type=int)
+define('xheaders', metavar='True|False', default=False, type=bool)
+options.logging = None
+
+
+class CandolintPoller(Application):
+ def __init__(self, rconn, targets, interval, debug):
+ handlers = []
+ super(CandolintPoller, self).__init__(handlers, debug=debug)
+ self.rconn = rconn
+ self.targets = targets
+ self.interval = interval
+ IOLoop.instance().add_callback(self.setup)
+
+ @coroutine
+ def setup(self):
+ for target in self.targets:
+ yield self.poll(target)
+
+ @coroutine
+ def routine(target=target):
+ yield self.poll(target)
+
+ pc = PeriodicCallback(routine, self.interval * 1000)
+ pc.start()
+
+ @coroutine
+ def poll(self, target):
+ url = target['url']
+ http = AsyncHTTPClient()
+ headers = {}
+ if 'etag' in target:
+ headers['If-None-Match'] = target['etag']
+ logging.debug('Fetching %s', url)
+ response = yield http.fetch(url, raise_error=False, headers=headers)
+
+ if response.code == 200:
+ data = json.loads(response.body)
+ hashes = set(branch['node'] for branch in data['branches'])
+ if 'hashes' in target:
+ new = hashes - target['hashes']
+ base = {
+ 'url': url,
+ 'scm': 'hg',
+ 'repo': target['repo'],
+ 'timestamp': timestamp(),
+ 'source': target['source']
+ }
+ self.push(base, new)
+ logging.info('Got %d new hash(es) from %s', len(new), url)
+ else:
+ logging.info('Got %d current hash(es) from %s', len(hashes), url)
+ target['hashes'] = hashes
+ target['etag'] = response.headers.get('ETag', None)
+ elif response.code != 304:
+ logging.error('Error %d fetching %s', response.code, url)
+ response.rethrow()
+ else:
+ logging.debug('Not modified: %s', url)
+
+ def push(self, base, changes):
+ for change in changes:
+ item = base.copy()
+ item['change'] = change
+ data = json.dumps(item)
+ logging.debug('Pushing %s', data)
+ self.rconn.rpush('candolint:queue:changes', data)
+ logging.info('Pushed a change for %s', item['repo'])
+
+ def listen(self, port, address='', **kwargs):
+ name = self.__class__.__name__
+ logging.info('%s is serving on %s:%d', name, address, port)
+ super(CandolintPoller, self).listen(port, address, **kwargs)
+
+
+def main():
+ parser = ArgumentParser(argument_default=SUPPRESS)
+ parser.add_argument(
+ '-c', '--config', type=FileType('r'),
+ help='configuration file (YAML)')
+ parser.add_argument(
+ '-d', '--debug', action='store_true', default=False,
+ help='enable debugging output')
+
+ group = parser.add_argument_group(
+ 'queue configuration',
+ 'these options that can also be specified in the configuration file')
+ group.add_argument('--redis-host', help='(default: 127.0.0.1)')
+ group.add_argument('--redis-port', type=int, help='(default: 6379)')
+ group.add_argument('--redis-password')
+ group.add_argument('--interval', type=int, help='(default: 8 hours)')
+
+ args, extra = parser.parse_known_args()
+ options.parse_command_line([None] + extra) # 1st argument is ignored
+ config = yaml.safe_load(args.config) if hasattr(args, 'config') else {}
+
+ rhost = lookup_option('redis-host', args, config, default='127.0.0.1')
+ rport = lookup_option('redis-port', args, config, default=6379)
+ rpass = lookup_option('redis-password', args, config)
+ interval = lookup_option('interval', args, config, default=8 * 60 * 60)
+
+ logging.basicConfig(
+ level=logging.DEBUG if args.debug else logging.INFO,
+ format='%(asctime)s %(levelname)-8s %(message)s')
+
+ if config and 'targets' in config:
+ targets = config['targets']
+ else:
+ logging.warn('No targets defined.')
+ targets = []
+
+ logging.debug('Connecting to Redis server')
+ rconn = redis.StrictRedis(host=rhost, port=rport, password=rpass, db=0)
+ logging.info('Connected to Redis server')
+
+ application = CandolintPoller(rconn, targets, interval, args.debug)
+ application.listen(options.port, options.listen, xheaders=options.xheaders)
+
+ IOLoop.instance().start()
+
+
+if __name__ == '__main__':
+ main()