Download:
child 251:5f154c0a2457
parent 249:bfd7ba5b3a30
250:c6c59c348e26
Anton Shestakov <av6@dwimlabs.net>, Thu, 17 Nov 2016 00:33:40 +0800
queue: make poller more generic, rename

3 файлов изменено, 150 вставок(+), 144 удалений(-) [+]
README.rst file | annotate | diff | comparison | revisions
poll-hgweb-queue.py file | annotate | diff | comparison | revisions
poller-queue.py file | annotate | diff | comparison | revisions
--- a/README.rst Thu Nov 17 00:05:04 2016 +0800
+++ b/README.rst Thu Nov 17 00:33:40 2016 +0800
@@ -19,7 +19,7 @@
..
- hooks-queue.py - web service for handling events from code hosting sites
-- poll-hgweb-queue.py - long-lived HTTP client for polling changes in hgweb
+- poller-queue.py - long-lived HTTP client for polling changes
instances (because they don't have defined events API)
- incoming-queue.py - long-lived process for receiving and parsing check
results
--- a/poll-hgweb-queue.py Thu Nov 17 00:05:04 2016 +0800
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,143 +0,0 @@
-#!/usr/bin/env python
-import json
-import logging
-from argparse import ArgumentParser, FileType, SUPPRESS
-from urlparse import urlparse
-
-import redis
-import yaml
-from tornado.gen import coroutine
-from tornado.httpclient import AsyncHTTPClient
-from tornado.ioloop import IOLoop, PeriodicCallback
-from tornado.web import Application
-
-from candolint.utils import lookup_option, timestamp
-
-
-def fill_blanks(target):
- parsed = urlparse(target['url'])
- if 'source' not in target:
- target['source'] = parsed.hostname
- if 'repo' not in target:
- path = parsed.path.rstrip('/')
- if path.endswith('/json-branches'):
- path = path[:-14].rstrip('/')
- target['repo'] = path.rpartition('/')[-1]
-
-
-class CandolintPoller(Application):
- def __init__(self, rconn, targets, interval, debug):
- handlers = []
- super(CandolintPoller, self).__init__(handlers, debug=debug)
- self.rconn = rconn
- self.targets = targets
- self.interval = interval
- IOLoop.instance().add_callback(self.setup)
-
- @coroutine
- def setup(self):
- for target in self.targets:
- fill_blanks(target)
-
- yield self.poll(target)
-
- @coroutine
- def routine(target=target):
- yield self.poll(target)
-
- pc = PeriodicCallback(routine, self.interval * 1000)
- pc.start()
-
- @coroutine
- def poll(self, target):
- url = target['url']
- http = AsyncHTTPClient()
- headers = {
- 'User-Agent': 'hgweb poller (https://bitbucket.org/av6/candolint)'
- }
- if 'etag' in target:
- headers['If-None-Match'] = target['etag']
- logging.debug('Fetching %s', url)
- response = yield http.fetch(url, raise_error=False, headers=headers)
-
- if response.code == 200:
- data = json.loads(response.body)
- hashes = set(branch['node'] for branch in data['branches'])
- if 'hashes' in target:
- new = hashes - target['hashes']
- base = {
- 'url': url,
- 'scm': 'hg',
- 'repo': target['repo'],
- 'timestamp': timestamp(),
- 'source': target['source']
- }
- self.push(base, new)
- logging.info('Got %d new hash(es) from %s', len(new), url)
- else:
- logging.info(
- 'Got %d current hash(es) from %s', len(hashes), url)
- target['hashes'] = hashes
- target['etag'] = response.headers.get('ETag', None)
- elif response.code == 304:
- logging.debug('Not modified: %s', url)
- else:
- logging.error('Error %d fetching %s', response.code, url)
- response.rethrow()
-
- def push(self, base, changes):
- for change in changes:
- item = base.copy()
- item['change'] = change
- data = json.dumps(item)
- logging.debug('Pushing %s', data)
- self.rconn.rpush('candolint:queue:changes', data)
- logging.info('Pushed a change for %s', item['repo'])
-
-
-def main():
- parser = ArgumentParser(argument_default=SUPPRESS)
- parser.add_argument(
- '-c', '--config', type=FileType('r'),
- help='configuration file (YAML)')
- parser.add_argument(
- '-d', '--debug', action='store_true', default=False,
- help='enable debugging output')
-
- group = parser.add_argument_group(
- 'queue configuration',
- 'these options that can also be specified in the configuration file')
- group.add_argument('--redis-host', help='(default: 127.0.0.1)')
- group.add_argument('--redis-port', type=int, help='(default: 6379)')
- group.add_argument('--redis-password')
- group.add_argument('--poll-interval', type=int, help='(default: 1 hour)')
-
- args = parser.parse_args()
- config = yaml.safe_load(args.config) if hasattr(args, 'config') else {}
-
- rhost = lookup_option('redis-host', args, config, default='127.0.0.1')
- rport = lookup_option('redis-port', args, config, default=6379)
- rpass = lookup_option('redis-password', args, config)
- interval = lookup_option('poll-interval', args, config, default=60 * 60)
-
- logging.basicConfig(
- level=logging.DEBUG if args.debug else logging.INFO,
- format='%(asctime)s %(levelname)-8s %(message)s')
-
- if config and 'targets' in config:
- targets = config['targets']
- else:
- logging.warn('No targets defined.')
- targets = []
-
- logging.debug('Connecting to Redis server')
- rconn = redis.StrictRedis(host=rhost, port=rport, password=rpass, db=0)
- logging.info('Connected to Redis server')
-
- CandolintPoller(rconn, targets, interval, args.debug)
-
- IOLoop.instance().start()
-
-
-if __name__ == '__main__':
- main()
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/poller-queue.py Thu Nov 17 00:33:40 2016 +0800
@@ -0,0 +1,149 @@
+#!/usr/bin/env python
+import json
+import logging
+from argparse import ArgumentParser, FileType, SUPPRESS
+from urlparse import urlparse
+
+import redis
+import yaml
+from tornado.gen import coroutine
+from tornado.httpclient import AsyncHTTPClient
+from tornado.ioloop import IOLoop, PeriodicCallback
+from tornado.web import Application
+
+from candolint.utils import lookup_option, timestamp
+
+
+def fill_blanks(target):
+ parsed = urlparse(target['url'])
+ path = parsed.path.rstrip('/')
+ if 'source' not in target:
+ target['source'] = parsed.hostname
+ if 'repo' not in target:
+ target['repo'] = path.rpartition('/')[-1]
+ if 'scm' not in target:
+ if target['type'] == 'hgweb':
+ target['scm'] = 'hg'
+
+
+class CandolintPoller(Application):
+ def __init__(self, rconn, targets, interval, debug):
+ handlers = []
+ super(CandolintPoller, self).__init__(handlers, debug=debug)
+ self.rconn = rconn
+ self.targets = targets
+ self.interval = interval
+ IOLoop.instance().add_callback(self.setup)
+
+ @coroutine
+ def setup(self):
+ for target in self.targets:
+ fill_blanks(target)
+
+ yield self.poll(target)
+
+ @coroutine
+ def routine(target=target):
+ yield self.poll(target)
+
+ pc = PeriodicCallback(routine, self.interval * 1000)
+ pc.start()
+
+ @coroutine
+ def poll(self, target):
+ http = AsyncHTTPClient()
+ headers = {
+ 'User-Agent': 'Candolint poller (https://bitbucket.org/av6/candolint)'
+ }
+ if 'etag' in target:
+ headers['If-None-Match'] = target['etag']
+ logging.debug('Fetching %s', target['poll'])
+ response = yield http.fetch(target['poll'], raise_error=False, headers=headers)
+
+ if response.code == 200:
+ data = json.loads(response.body)
+
+ if target['type'] == 'hgweb':
+ self.parse_hgweb(target, data)
+
+ target['etag'] = response.headers.get('ETag', None)
+ elif response.code == 304:
+ logging.debug('Not modified: %s', target['poll'])
+ else:
+ logging.error('Error %d fetching %s', response.code, target['poll'])
+ response.rethrow()
+
+ def parse_hgweb(self, target, data):
+ hashes = set(branch['node'] for branch in data['branches'])
+ if 'hashes' in target:
+ new = hashes - target['hashes']
+ if new:
+ base = {
+ 'url': target['url'],
+ 'scm': target['scm'],
+ 'repo': target['repo'],
+ 'source': target['source'],
+ 'timestamp': timestamp()
+ }
+ self.push(base, new)
+ logging.info('Got %d new hash(es) from %s', len(new), target['poll'])
+ else:
+ logging.info('Got %d current hash(es) from %s', len(hashes), target['poll'])
+ target['hashes'] = hashes
+
+ def push(self, base, changes):
+ for change in changes:
+ item = base.copy()
+ item['change'] = change
+ data = json.dumps(item)
+ logging.debug('Pushing %s', data)
+ self.rconn.rpush('candolint:queue:changes', data)
+ logging.info('Pushed a change for %s', item['repo'])
+
+
+def main():
+ parser = ArgumentParser(argument_default=SUPPRESS)
+ parser.add_argument(
+ '-c', '--config', type=FileType('r'),
+ help='configuration file (YAML)')
+ parser.add_argument(
+ '-d', '--debug', action='store_true', default=False,
+ help='enable debugging output')
+
+ group = parser.add_argument_group(
+ 'queue configuration',
+ 'these options that can also be specified in the configuration file')
+ group.add_argument('--redis-host', help='(default: 127.0.0.1)')
+ group.add_argument('--redis-port', type=int, help='(default: 6379)')
+ group.add_argument('--redis-password')
+ group.add_argument('--poll-interval', type=int, help='(default: 1 hour)')
+
+ args = parser.parse_args()
+ config = yaml.safe_load(args.config) if hasattr(args, 'config') else {}
+
+ rhost = lookup_option('redis-host', args, config, default='127.0.0.1')
+ rport = lookup_option('redis-port', args, config, default=6379)
+ rpass = lookup_option('redis-password', args, config)
+ interval = lookup_option('poll-interval', args, config, default=60 * 60)
+
+ logging.basicConfig(
+ level=logging.DEBUG if args.debug else logging.INFO,
+ format='%(asctime)s %(levelname)-8s %(message)s')
+
+ if config and 'targets' in config:
+ targets = config['targets']
+ else:
+ logging.warn('No targets defined.')
+ targets = []
+
+ logging.debug('Connecting to Redis server')
+ rconn = redis.StrictRedis(host=rhost, port=rport, password=rpass, db=0)
+ logging.info('Connected to Redis server')
+
+ CandolintPoller(rconn, targets, interval, args.debug)
+
+ IOLoop.instance().start()
+
+
+if __name__ == '__main__':
+ main()