176:545d03db7218
Anton Shestakov <av6@dwimlabs.net>, Fri, 22 Jul 2016 20:34:44 +0800
queue: chmod +x poll-hgweb-queue.py

next change 177:c62bd675b409
previous change 175:6d53a5a4d517

poll-hgweb-queue.py

Permissions: -rwxr-xr-x

Other formats: Feeds:
#!/usr/bin/env python
import json
import logging
from argparse import ArgumentParser, FileType, SUPPRESS
import redis
import yaml
from tornado.gen import coroutine
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.options import define, options
from tornado.web import Application
from candolint.utils import lookup_option, timestamp
define('listen', metavar='IP', default='127.0.0.1')
define('port', metavar='PORT', default=8035, type=int)
define('xheaders', metavar='True|False', default=False, type=bool)
options.logging = None
class CandolintPoller(Application):
def __init__(self, rconn, targets, interval, debug):
handlers = []
super(CandolintPoller, self).__init__(handlers, debug=debug)
self.rconn = rconn
self.targets = targets
self.interval = interval
IOLoop.instance().add_callback(self.setup)
@coroutine
def setup(self):
for target in self.targets:
yield self.poll(target)
@coroutine
def routine(target=target):
yield self.poll(target)
pc = PeriodicCallback(routine, self.interval * 1000)
pc.start()
@coroutine
def poll(self, target):
url = target['url']
http = AsyncHTTPClient()
headers = {}
if 'etag' in target:
headers['If-None-Match'] = target['etag']
logging.debug('Fetching %s', url)
response = yield http.fetch(url, raise_error=False, headers=headers)
if response.code == 200:
data = json.loads(response.body)
hashes = set(branch['node'] for branch in data['branches'])
if 'hashes' in target:
new = hashes - target['hashes']
base = {
'url': url,
'scm': 'hg',
'repo': target['repo'],
'timestamp': timestamp(),
'source': target['source']
}
self.push(base, new)
logging.info('Got %d new hash(es) from %s', len(new), url)
else:
logging.info('Got %d current hash(es) from %s', len(hashes), url)
target['hashes'] = hashes
target['etag'] = response.headers.get('ETag', None)
elif response.code != 304:
logging.error('Error %d fetching %s', response.code, url)
response.rethrow()
else:
logging.debug('Not modified: %s', url)
def push(self, base, changes):
for change in changes:
item = base.copy()
item['change'] = change
data = json.dumps(item)
logging.debug('Pushing %s', data)
self.rconn.rpush('candolint:queue:changes', data)
logging.info('Pushed a change for %s', item['repo'])
def listen(self, port, address='', **kwargs):
name = self.__class__.__name__
logging.info('%s is serving on %s:%d', name, address, port)
super(CandolintPoller, self).listen(port, address, **kwargs)
def main():
parser = ArgumentParser(argument_default=SUPPRESS)
parser.add_argument(
'-c', '--config', type=FileType('r'),
help='configuration file (YAML)')
parser.add_argument(
'-d', '--debug', action='store_true', default=False,
help='enable debugging output')
group = parser.add_argument_group(
'queue configuration',
'these options that can also be specified in the configuration file')
group.add_argument('--redis-host', help='(default: 127.0.0.1)')
group.add_argument('--redis-port', type=int, help='(default: 6379)')
group.add_argument('--redis-password')
group.add_argument('--interval', type=int, help='(default: 8 hours)')
args, extra = parser.parse_known_args()
options.parse_command_line([None] + extra) # 1st argument is ignored
config = yaml.safe_load(args.config) if hasattr(args, 'config') else {}
rhost = lookup_option('redis-host', args, config, default='127.0.0.1')
rport = lookup_option('redis-port', args, config, default=6379)
rpass = lookup_option('redis-password', args, config)
interval = lookup_option('interval', args, config, default=8 * 60 * 60)
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format='%(asctime)s %(levelname)-8s %(message)s')
if config and 'targets' in config:
targets = config['targets']
else:
logging.warn('No targets defined.')
targets = []
logging.debug('Connecting to Redis server')
rconn = redis.StrictRedis(host=rhost, port=rport, password=rpass, db=0)
logging.info('Connected to Redis server')
application = CandolintPoller(rconn, targets, interval, args.debug)
application.listen(options.port, options.listen, xheaders=options.xheaders)
IOLoop.instance().start()
if __name__ == '__main__':
main()