285:29eb30fb2a3d
Anton Shestakov <av6@dwimlabs.net>, Thu, 14 Sep 2017 17:23:42 +0800
requirements: pytest-cov 2.5.1

next change 290:a08915486c39
previous change 269:f7bed0a8edef

candolint/handlers.py

Permissions: -rw-r--r--

Other formats: Feeds:
from __future__ import absolute_import, division
import traceback
from math import ceil
from peewee import DoesNotExist
from tornado.escape import json_decode
from tornado.web import HTTPError, RequestHandler
from tornado.web import ErrorHandler as BaseErrorHandler
from candolint.models import database, Project, Change, Check
def get_or_404(query, *args, **kwargs):
try:
return query.get(*args, **kwargs)
except DoesNotExist:
raise HTTPError(404)
def get_project_or_404(domain, user, name):
pq = Project.select().where(
Project.domain == domain,
Project.user == (user if user != '-' else None),
Project.name == name)
return get_or_404(pq)
def get_check_or_404(project, check_num):
cq = (Check
.select(Check, Change)
.join(Change)
.where(Check.project == project))
if check_num != 'latest':
cq = cq.where(Check.ordinal == check_num)
return get_or_404(cq)
class Paginator(object):
def __init__(self, query, page, paginate_by=30):
self.query = query
self.page = page
self.paginate_by = paginate_by
self.pages = int(ceil(query.count() / paginate_by))
def iterator(self):
return self.query.paginate(self.page, self.paginate_by).iterator()
class BaseHandler(RequestHandler):
def prepare(self):
database.connect()
super(BaseHandler, self).prepare()
def on_finish(self):
if not database.is_closed():
database.close()
def paginate(self, query):
page = self.get_query_argument('page', '1')
if page and page.isdigit():
page = max(1, int(page))
else:
page = 1
return Paginator(query, page)
def write_error(self, status_code, **kwargs):
data = {
'code': status_code,
'message': self._reason,
'debug_message': ''
}
if 'exc_info' in kwargs:
if self.settings.get('serve_traceback'):
fexc = traceback.format_exception(*kwargs['exc_info'])
data['debug_message'] = '\n'.join(fexc)
if status_code == 404:
self.render('404.html', **data)
else:
self.render('500.html', **data)
class IndexHandler(BaseHandler):
def get(self):
checks = (Check
.select(Project, Change, *Check.get_light_fields())
.join(Project)
.switch(Check)
.join(Change)
.group_by(Check.project))
self.render('index.html', checks=checks)
class ProjectHandler(BaseHandler):
def get(self, domain, user, name):
project = get_project_or_404(domain, user, name)
checks = (Check
.select(Change, *Check.get_light_fields())
.join(Change)
.where(Check.project == project))
self.render('project.html', project=project, checks=checks)
class AtomHandler(BaseHandler):
def get(self, domain, user, name):
project = get_project_or_404(domain, user, name)
checks = (Check
.select(Change, *Check.get_light_fields())
.join(Change)
.where(Check.project == project))
def status(check):
if not check.success:
return 'unknown'
elif check.errors or check.warnings:
status = []
if check.errors:
msg = self.locale.translate(
'{} error', '{} errors', check.errors)
status.append(msg.format(check.errors))
if check.warnings:
msg = self.locale.translate(
'{} warning', '{} warnings', check.warnings)
status.append(msg.format(check.warnings))
return ', '.join(status)
else:
return 'all clean'
self.set_header('Content-Type', 'application/atom+xml; charset=utf-8')
self.render('atom.xml', project=project, checks=checks, status=status)
class CheckHandler(BaseHandler):
def get(self, domain, user, name, check_num, format_='html'):
project = get_project_or_404(domain, user, name)
check = get_check_or_404(project, check_num)
lines = json_decode(check.lines)
if format_ == 'raw':
self.get_raw(project, check, lines)
else:
self.get_html(project, check, lines)
def get_raw(self, project, check, lines):
self.set_header('Content-Type', 'text/plain; charset=utf-8')
for line in lines:
self.write(line['text'])
self.write('\n')
def get_html(self, project, check, lines):
adapter = project.get_adapter()
for line in lines:
if 'filename' in line:
line['link'] = adapter.get_line_url(check.change, line)
self.render('check.html', project=project, check=check, lines=lines, adapter=adapter)
class StatusHandler(BaseHandler):
def get(self, domain, user, name):
project = get_project_or_404(domain, user, name)
checks = (Check
.select(*Check.get_light_fields())
.where(Check.project == project))
branch = self.get_argument('branch', None)
if branch is not None:
checks = checks.join(Change).where(Change.branch == branch)
check = checks.first()
parts = [('#555', 30, 14.5, 'lint')]
if check is None or not check.success:
parts.append(('#777', 62, 30.5, 'unknown'))
elif check.errors or check.warnings:
if check.errors:
msg = self.locale.translate(
'{} error', '{} errors', check.errors)
text = msg.format(check.errors)
width = 7 + 6 * len(text) + 7
parts.append(('#da314b', width, width // 2 - 0.5, text))
if check.warnings:
msg = self.locale.translate(
'{} warning', '{} warnings', check.warnings)
text = msg.format(check.warnings)
width = 7 + 6 * len(text) + 9
parts.append(('#faa732', width, width // 2 + 0.5, text))
else:
parts.append(('#8cc14c', 60, 29.5, 'all clean'))
width = sum(p[1] for p in parts)
self.set_header('Content-Type', 'image/svg+xml; charset=utf-8')
self.render('status.svg', width=width, parts=parts, height=20)
class ErrorHandler(BaseHandler, BaseErrorHandler):
pass