354:23e5b4d4f272 default tip
Anton Shestakov <av6@dwimlabs.net>, Mon, 13 Apr 2020 19:01:24 +0800
tests: update a link

previous change 330:ca5eed415a48

candolint/handlers.py

Permissions: -rw-r--r--

Other formats: Feeds:
from __future__ import absolute_import, division
import traceback
from collections import OrderedDict
from difflib import unified_diff
from math import ceil
from peewee import fn, DoesNotExist
from tornado.escape import json_decode
from tornado.web import HTTPError, RequestHandler
from tornado.web import ErrorHandler as BaseErrorHandler
from candolint.models import database, Project, Change, Check
def get_or_404(query, *args, **kwargs):
try:
return query.get(*args, **kwargs)
except DoesNotExist:
raise HTTPError(404)
def get_project_or_404(domain, user, name):
pq = Project.select().where(
Project.domain == domain,
Project.user == (user if user != '-' else None),
Project.name == name)
return get_or_404(pq)
def get_check_or_404(project, check_num):
cq = (Check
.select(Check, Change)
.join(Change)
.where(Check.project == project)
.order_by(Check.ordinal.desc()))
if check_num != 'latest':
cq = cq.where(Check.ordinal == check_num)
return get_or_404(cq)
class Paginator(object):
def __init__(self, query, page, paginate_by=30):
self.query = query
self.page = page
self.paginate_by = paginate_by
self.pages = int(ceil(query.count() / paginate_by))
def iterator(self):
return self.query.paginate(self.page, self.paginate_by).iterator()
class BaseHandler(RequestHandler):
def prepare(self):
database.connect()
super(BaseHandler, self).prepare()
def on_finish(self):
if not database.is_closed():
database.close()
def paginate(self, query):
page = self.get_query_argument('page', '1')
if page and page.isdigit():
page = max(1, int(page))
else:
page = 1
return Paginator(query, page)
def write_error(self, status_code, **kwargs):
data = {
'code': status_code,
'message': self._reason,
'debug_message': ''
}
if 'exc_info' in kwargs:
if self.settings.get('serve_traceback'):
fexc = traceback.format_exception(*kwargs['exc_info'])
data['debug_message'] = '\n'.join(fexc)
if status_code == 404:
self.render('404.html', **data)
else:
self.render('500.html', **data)
class IndexHandler(BaseHandler):
def get(self):
latest = (Check
.select(Check.project_id, fn.MAX(Check.ordinal).alias('maxord'))
.group_by(Check.project_id)
.alias('latest'))
check_ids = (Check
.select(Check.id)
.from_(Check, latest)
.where(Check.project_id == latest.c.project_id)
.where(Check.ordinal == latest.c.maxord))
checks = (Check
.select(Project, Change, *Check.get_light_fields())
.join(Project)
.switch(Check)
.join(Change)
.where(Check.id.in_(check_ids))
.order_by(Check.finished.desc(), Check.ordinal.desc()))
self.render('index.html', checks=checks)
class ProjectHandler(BaseHandler):
def get(self, domain, user, name):
project = get_project_or_404(domain, user, name)
checks = (Check
.select(Change, *Check.get_light_fields())
.join(Change)
.where(Check.project == project)
.order_by(Check.ordinal.desc()))
self.render('project.html', project=project, checks=checks)
class AtomHandler(BaseHandler):
def get_check_status(self, check):
if not check.success:
return 'unknown'
elif check.errors or check.warnings:
status = []
if check.errors:
msg = self.locale.translate(
'{} error', '{} errors', check.errors)
status.append(msg.format(check.errors))
if check.warnings:
msg = self.locale.translate(
'{} warning', '{} warnings', check.warnings)
status.append(msg.format(check.warnings))
return ', '.join(status)
else:
return 'all clean'
def get(self, domain, user, name):
project = get_project_or_404(domain, user, name)
checks = (Check
.select(Change, *Check.get_light_fields())
.join(Change)
.where(Check.project == project)
.order_by(Check.ordinal.desc()))
self.set_header('Content-Type', 'application/atom+xml; charset=utf-8')
self.render('atom.xml', project=project, checks=checks, status=self.get_check_status)
class CheckHandler(BaseHandler):
def get(self, domain, user, name, check_num, format_='html'):
project = get_project_or_404(domain, user, name)
check = get_check_or_404(project, check_num)
lines = json_decode(check.lines)
if format_ == 'raw':
self.get_raw(project, check, lines)
else:
self.get_html(project, check, lines)
def get_raw(self, project, check, lines):
self.set_header('Content-Type', 'text/plain; charset=utf-8')
for line in lines:
self.write(line['text'])
self.write('\n')
def get_reference_check(self, check):
if check.success:
return (Check
.select(*Check.get_light_fields())
.join(Change)
.where(Check.project == check.project)
.where(Check.success)
.where(Check.change != check.change)
.where(Change.branch == check.change.branch)
.where(Check.ordinal < check.ordinal)
.order_by(Check.ordinal.desc())
.first())
return None
def get_chart_data(self, check):
checks = (Check
.select(*Check.get_light_fields())
.join(Change)
.where(Check.project == check.project)
.where(Check.success)
.where(Change.branch == check.change.branch)
.order_by(Check.ordinal.desc()))
more = checks.where(Check.ordinal >= check.ordinal).count()
limit = 50
history = list(reversed(checks
.limit(limit)
.offset(max(0, more - limit))))
if len(history) < 3:
return {}
points = [
{
'ordinal': c.ordinal,
'errors': c.errors if c.success else None,
'warnings': c.warnings if c.success else None,
'duration': int((c.finished - c.started).total_seconds())
} for c in history
]
return {
'points': points,
'current': check.ordinal,
'projectURL': check.project.get_url()
}
def get_files_and_codes(self, check, lines):
if not check.success or not (check.errors or check.warnings):
return {}, {}
files = {'errors': OrderedDict(), 'warnings': OrderedDict()}
codes = {'errors': {}, 'warnings': {}}
for number, line in enumerate(lines, 1):
if line.get('cls') not in ('error', 'warning'):
continue
kind = line['cls'] + 's'
for key, data in (('filename', files), ('code', codes)):
if key in line:
value = line[key]
if value not in data[kind]:
data[kind][value] = {'count': 0, 'line': number}
data[kind][value]['count'] += 1
files = {
kind: [
{'filename': k, 'count': v['count'], 'line': v['line']}
for k, v in files[kind].items()
] for kind in files
}
codes = {
kind: [
{'code': k, 'count': v['count'], 'line': v['line']}
for k, v in sorted(codes[kind].items(), key=lambda i: (-i[1]['count'], i[0]))
] for kind in codes
}
return files, codes
def get_html(self, project, check, lines):
adapter = project.get_adapter()
for line in lines:
if 'filename' in line:
line['link'] = adapter.get_line_url(check.change, line)
reference = self.get_reference_check(check)
chart = self.get_chart_data(check)
files, codes = self.get_files_and_codes(check, lines)
self.render('check.html', project=project, check=check, lines=lines, adapter=adapter, reference=reference, chart=chart, files=files, codes=codes)
class CompareHandler(BaseHandler):
def get(self, domain, user, name, check_num, reference_num):
project = get_project_or_404(domain, user, name)
adapter = project.get_adapter()
check = get_check_or_404(project, check_num)
reference = get_check_or_404(project, reference_num)
lines = json_decode(check.lines)
reflines = json_decode(reference.lines)
rcl = [l['text'] for l in reflines if l.get('task') == 'checks']
cl = [l['text'] for l in lines if l.get('task') == 'checks']
diff = unified_diff(rcl, cl, n=max(len(rcl), len(cl)))
difflines = []
for text in list(diff)[4:]:
line = {'text': text}
if text.startswith('+'):
line['cls'] = 'added'
elif text.startswith('-'):
line['cls'] = 'removed'
difflines.append(line)
if not difflines:
difflines.append({'text': 'Diff is empty', 'cls': 'meta'})
self.render('compare.html', project=project, check=check, reference=reference, lines=difflines, adapter=adapter)
class StatusHandler(BaseHandler):
def get(self, domain, user, name):
project = get_project_or_404(domain, user, name)
checks = (Check
.select(*Check.get_light_fields())
.where(Check.project == project)
.order_by(Check.ordinal.desc()))
branch = self.get_argument('branch', None)
if branch is not None:
checks = checks.join(Change).where(Change.branch == branch)
check = checks.first()
parts = [('#555', 30, 14.5, 'lint')]
if check is None or not check.success:
parts.append(('#777', 62, 30.5, 'unknown'))
elif check.errors or check.warnings:
if check.errors:
msg = self.locale.translate(
'{} error', '{} errors', check.errors)
text = msg.format(check.errors)
width = 7 + 6 * len(text) + 7
parts.append(('#da314b', width, width // 2 - 0.5, text))
if check.warnings:
msg = self.locale.translate(
'{} warning', '{} warnings', check.warnings)
text = msg.format(check.warnings)
width = 7 + 6 * len(text) + 9
parts.append(('#faa732', width, width // 2 + 0.5, text))
else:
parts.append(('#8cc14c', 60, 29.5, 'all clean'))
width = sum(p[1] for p in parts)
self.set_header('Content-Type', 'image/svg+xml; charset=utf-8')
self.render('status.svg', width=width, parts=parts, height=20)
class DotHandler(BaseHandler):
def get(self, domain, user, name, check_num):
project = get_project_or_404(domain, user, name)
checks = (Check
.select(*Check.get_light_fields())
.where(Check.project == project)
.order_by(Check.ordinal.desc()))
if check_num != 'latest':
checks = checks.where(Check.ordinal == check_num)
branch = self.get_argument('branch', None)
if branch is not None:
checks = checks.join(Change).where(Change.branch == branch)
check = checks.first()
if check is None:
color = 'transparent'
elif not check.success:
color = '#777'
elif check.errors:
color = '#da314b'
elif check.warnings:
color = '#faa732'
else:
color = '#8cc14c'
self.set_header('Content-Type', 'image/svg+xml; charset=utf-8')
self.render('dot.svg', color=color)
class ErrorHandler(BaseHandler, BaseErrorHandler):
pass