--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/poll-hgweb-queue.py Fri Jul 22 20:13:28 2016 +0800
+from argparse import ArgumentParser, FileType, SUPPRESS
+from tornado.gen import coroutine
+from tornado.httpclient import AsyncHTTPClient
+from tornado.ioloop import IOLoop, PeriodicCallback
+from tornado.options import define, options
+from tornado.web import Application
+from candolint.utils import lookup_option, timestamp
+define('listen', metavar='IP', default='127.0.0.1')
+define('port', metavar='PORT', default=8035, type=int)
+define('xheaders', metavar='True|False', default=False, type=bool)
+class CandolintPoller(Application):
+ def __init__(self, rconn, targets, interval, debug):
+ super(CandolintPoller, self).__init__(handlers, debug=debug)
+ self.interval = interval
+ IOLoop.instance().add_callback(self.setup)
+ for target in self.targets:
+ yield self.poll(target)
+ def routine(target=target):
+ yield self.poll(target)
+ pc = PeriodicCallback(routine, self.interval * 1000)
+ def poll(self, target):
+ http = AsyncHTTPClient()
+ headers['If-None-Match'] = target['etag']
+ logging.debug('Fetching %s', url)
+ response = yield http.fetch(url, raise_error=False, headers=headers)
+ if response.code == 200:
+ data = json.loads(response.body)
+ hashes = set(branch['node'] for branch in data['branches'])
+ new = hashes - target['hashes']
+ 'repo': target['repo'],
+ 'timestamp': timestamp(),
+ 'source': target['source']
+ logging.info('Got %d new hash(es) from %s', len(new), url)
+ logging.info('Got %d current hash(es) from %s', len(hashes), url)
+ target['hashes'] = hashes
+ target['etag'] = response.headers.get('ETag', None)
+ elif response.code != 304:
+ logging.error('Error %d fetching %s', response.code, url)
+ logging.debug('Not modified: %s', url)
+ def push(self, base, changes):
+ item['change'] = change
+ data = json.dumps(item)
+ logging.debug('Pushing %s', data)
+ self.rconn.rpush('candolint:queue:changes', data)
+ logging.info('Pushed a change for %s', item['repo'])
+ def listen(self, port, address='', **kwargs):
+ name = self.__class__.__name__
+ logging.info('%s is serving on %s:%d', name, address, port)
+ super(CandolintPoller, self).listen(port, address, **kwargs)
+ parser = ArgumentParser(argument_default=SUPPRESS)
+ '-c', '--config', type=FileType('r'),
+ help='configuration file (YAML)')
+ '-d', '--debug', action='store_true', default=False,
+ help='enable debugging output')
+ group = parser.add_argument_group(
+ 'these options that can also be specified in the configuration file')
+ group.add_argument('--redis-host', help='(default: 127.0.0.1)')
+ group.add_argument('--redis-port', type=int, help='(default: 6379)')
+ group.add_argument('--redis-password')
+ group.add_argument('--interval', type=int, help='(default: 8 hours)')
+ args, extra = parser.parse_known_args()
+ options.parse_command_line([None] + extra) # 1st argument is ignored
+ config = yaml.safe_load(args.config) if hasattr(args, 'config') else {}
+ rhost = lookup_option('redis-host', args, config, default='127.0.0.1')
+ rport = lookup_option('redis-port', args, config, default=6379)
+ rpass = lookup_option('redis-password', args, config)
+ interval = lookup_option('interval', args, config, default=8 * 60 * 60)
+ level=logging.DEBUG if args.debug else logging.INFO,
+ format='%(asctime)s %(levelname)-8s %(message)s')
+ if config and 'targets' in config:
+ targets = config['targets']
+ logging.warn('No targets defined.')
+ logging.debug('Connecting to Redis server')
+ rconn = redis.StrictRedis(host=rhost, port=rport, password=rpass, db=0)
+ logging.info('Connected to Redis server')
+ application = CandolintPoller(rconn, targets, interval, args.debug)
+ application.listen(options.port, options.listen, xheaders=options.xheaders)
+ IOLoop.instance().start()
+if __name__ == '__main__':