252:faef9902f1d1
Anton Shestakov <av6@dwimlabs.net>, Thu, 17 Nov 2016 00:41:33 +0800
docs: add sample queue config

next change 340:2d97d5a9d437
previous change 251:5f154c0a2457

poller-queue.py

Permissions: -rwxr-xr-x

Other formats: Feeds:
#!/usr/bin/env python
import json
import logging
from argparse import ArgumentParser, FileType, SUPPRESS
from urlparse import urlparse
import redis
import yaml
from tornado.gen import coroutine
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import Application
from candolint.utils import lookup_option, timestamp
def fill_blanks(target):
parsed = urlparse(target['url'])
path = parsed.path.rstrip('/')
if 'source' not in target:
target['source'] = parsed.hostname
if 'repo' not in target:
target['repo'] = path.rpartition('/')[-1]
if 'scm' not in target:
if target['type'] == 'hgweb':
target['scm'] = 'hg'
elif target['type'] == 'github':
target['scm'] = 'git'
class CandolintPoller(Application):
def __init__(self, rconn, targets, interval, debug):
handlers = []
super(CandolintPoller, self).__init__(handlers, debug=debug)
self.rconn = rconn
self.targets = targets
self.interval = interval
IOLoop.instance().add_callback(self.setup)
@coroutine
def setup(self):
for target in self.targets:
fill_blanks(target)
yield self.poll(target)
@coroutine
def routine(target=target):
yield self.poll(target)
pc = PeriodicCallback(routine, self.interval * 1000)
pc.start()
@coroutine
def poll(self, target):
http = AsyncHTTPClient()
headers = {
'User-Agent': 'Candolint poller (https://bitbucket.org/av6/candolint)'
}
if 'etag' in target:
headers['If-None-Match'] = target['etag']
logging.debug('Fetching %s', target['poll'])
response = yield http.fetch(target['poll'], raise_error=False, headers=headers)
if response.code == 200:
data = json.loads(response.body)
if target['type'] == 'hgweb':
self.parse_hgweb(target, data)
elif target['type'] == 'github':
self.parse_github(target, data)
target['etag'] = response.headers.get('ETag', None)
elif response.code == 304:
logging.debug('Not modified: %s', target['poll'])
else:
logging.error('Error %d fetching %s', response.code, target['poll'])
response.rethrow()
def parse_hgweb(self, target, data):
hashes = set(branch['node'] for branch in data['branches'])
if 'hashes' in target:
new = hashes - target['hashes']
if new:
base = {
'url': target['url'],
'scm': target['scm'],
'repo': target['repo'],
'source': target['source'],
'timestamp': timestamp()
}
self.push(base, new)
logging.info('Got %d new hash(es) from %s', len(new), target['poll'])
else:
logging.info('Got %d current hash(es) from %s', len(hashes), target['poll'])
target['hashes'] = hashes
def parse_github(self, target, data):
hashes = {branch['name']: branch['commit']['sha'] for branch in data}
if 'hashes' in target:
new = []
for name, sha in hashes.items():
if target['hashes'].get(name) != sha:
new.append(name)
if new:
base = {
'url': target['url'],
'scm': target['scm'],
'repo': target['repo'],
'source': target['source'],
'timestamp': timestamp()
}
self.push(base, new)
logging.info('Got %d new hash(es) from %s', len(new), target['poll'])
else:
logging.info('Got %d current hash(es) from %s', len(hashes), target['poll'])
target['hashes'] = hashes
def push(self, base, changes):
for change in changes:
item = base.copy()
item['change'] = change
data = json.dumps(item)
logging.debug('Pushing %s', data)
self.rconn.rpush('candolint:queue:changes', data)
logging.info('Pushed a change for %s', item['repo'])
def main():
parser = ArgumentParser(argument_default=SUPPRESS)
parser.add_argument(
'-c', '--config', type=FileType('r'),
help='configuration file (YAML)')
parser.add_argument(
'-d', '--debug', action='store_true', default=False,
help='enable debugging output')
group = parser.add_argument_group(
'queue configuration',
'these options that can also be specified in the configuration file')
group.add_argument('--redis-host', help='(default: 127.0.0.1)')
group.add_argument('--redis-port', type=int, help='(default: 6379)')
group.add_argument('--redis-password')
group.add_argument('--poll-interval', type=int, help='(default: 1 hour)')
args = parser.parse_args()
config = yaml.safe_load(args.config) if hasattr(args, 'config') else {}
rhost = lookup_option('redis-host', args, config, default='127.0.0.1')
rport = lookup_option('redis-port', args, config, default=6379)
rpass = lookup_option('redis-password', args, config)
interval = lookup_option('poll-interval', args, config, default=60 * 60)
logging.basicConfig(
level=logging.DEBUG if args.debug else logging.INFO,
format='%(asctime)s %(levelname)-8s %(message)s')
if config and 'targets' in config:
targets = config['targets']
else:
logging.warn('No targets defined.')
targets = []
logging.debug('Connecting to Redis server')
rconn = redis.StrictRedis(host=rhost, port=rport, password=rpass, db=0)
logging.info('Connected to Redis server')
CandolintPoller(rconn, targets, interval, args.debug)
IOLoop.instance().start()
if __name__ == '__main__':
main()