--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test_hooks_queue.py Thu Aug 25 11:22:09 2016 +0800
+from tornado.escape import json_encode
+from tornado.testing import AsyncHTTPTestCase
+hooksmod = __import__('hooks-queue')
+class HookTestCase(AsyncHTTPTestCase):
+ return hooksmod.CandolintHooks(rconn, debug=True)
+ def post_json(self, url, data, extra_headers):
+ body = json_encode(data)
+ headers = {'Content-Type': 'application/json'}
+ headers.update(extra_headers)
+ return self.fetch(url, method='POST', body=body, headers=headers)
+class BitbucketHookTestCase(HookTestCase):
+ def test_invalid_payload(self):
+ response = self.fetch(self.url, method='POST', body='invalid')
+ assert response.code == 400
+ def test_push_hg(self):
+ 'href': 'https://bitbucket.org/example/test'
+ 'target': {'hash': 'bar'}
+ response = self.post_json(self.url, data, {'X-Event-Key': 'repo:push'})
+ assert response.code == 200
+ assert '"queued": 3' in response.body
+ def test_push_git(self):
+ 'href': 'https://bitbucket.org/example/test'
+ 'new': {'type': 'branch', 'name': 'master'}
+ response = self.post_json(self.url, data, {'X-Event-Key': 'repo:push'})
+ assert response.code == 200
+ assert '"queued": 1' in response.body
+class GithubHookTestCase(HookTestCase):
+ def test_no_event(self):
+ response = self.post_json(self.url, {}, {})
+ assert response.code == 200
+ assert response.body == '{"result": "unknown event"}'
+ response = self.post_json(self.url, {}, {'X-GitHub-Event': 'ping'})
+ assert response.code == 200
+ assert response.body == '{"result": "pong"}'
+ 'html_url': 'https://github.com/example/test'
+ 'ref': 'refs/heads/master'
+ response = self.post_json(self.url, data, {'X-GitHub-Event': 'push'})
+ assert response.code == 200
+ assert '"queued": 1' in response.body