Anton Shestakov <av6@dwimlabs.net>, Mon, 13 Apr 2020 19:01:24 +0800
tests: update a link
candolint/handlers.py
Permissions: -rw-r--r--
from __future__ import absolute_import, division from collections import OrderedDict from difflib import unified_diff from peewee import fn, DoesNotExist from tornado.escape import json_decode from tornado.web import HTTPError, RequestHandler from tornado.web import ErrorHandler as BaseErrorHandler from candolint.models import database, Project, Change, Check def get_or_404(query, *args, **kwargs): return query.get(*args, **kwargs) def get_project_or_404(domain, user, name): pq = Project.select().where( Project.domain == domain, Project.user == (user if user != '-' else None), def get_check_or_404(project, check_num): .where(Check.project == project) .order_by(Check.ordinal.desc())) if check_num != 'latest': cq = cq.where(Check.ordinal == check_num) def __init__(self, query, page, paginate_by=30): self.paginate_by = paginate_by self.pages = int(ceil(query.count() / paginate_by)) return self.query.paginate(self.page, self.paginate_by).iterator() class BaseHandler(RequestHandler): super(BaseHandler, self).prepare() if not database.is_closed(): def paginate(self, query): page = self.get_query_argument('page', '1') if page and page.isdigit(): return Paginator(query, page) def write_error(self, status_code, **kwargs): if self.settings.get('serve_traceback'): fexc = traceback.format_exception(*kwargs['exc_info']) data['debug_message'] = '\n'.join(fexc) self.render('404.html', **data) self.render('500.html', **data) class IndexHandler(BaseHandler): .select(Check.project_id, fn.MAX(Check.ordinal).alias('maxord')) .group_by(Check.project_id) .where(Check.project_id == latest.c.project_id) .where(Check.ordinal == latest.c.maxord)) .select(Project, Change, *Check.get_light_fields()) .where(Check.id.in_(check_ids)) .order_by(Check.finished.desc(), Check.ordinal.desc())) self.render('index.html', checks=checks) class ProjectHandler(BaseHandler): def get(self, domain, user, name): project = get_project_or_404(domain, user, name) .select(Change, *Check.get_light_fields()) .where(Check.project == project) .order_by(Check.ordinal.desc())) self.render('project.html', project=project, checks=checks) class AtomHandler(BaseHandler): def get_check_status(self, check): elif check.errors or check.warnings: msg = self.locale.translate( '{} error', '{} errors', check.errors) status.append(msg.format(check.errors)) msg = self.locale.translate( '{} warning', '{} warnings', check.warnings) status.append(msg.format(check.warnings)) def get(self, domain, user, name): project = get_project_or_404(domain, user, name) .select(Change, *Check.get_light_fields()) .where(Check.project == project) .order_by(Check.ordinal.desc())) self.set_header('Content-Type', 'application/atom+xml; charset=utf-8') self.render('atom.xml', project=project, checks=checks, status=self.get_check_status) class CheckHandler(BaseHandler): def get(self, domain, user, name, check_num, format_='html'): project = get_project_or_404(domain, user, name) check = get_check_or_404(project, check_num) lines = json_decode(check.lines) self.get_raw(project, check, lines) self.get_html(project, check, lines) def get_raw(self, project, check, lines): self.set_header('Content-Type', 'text/plain; charset=utf-8') def get_reference_check(self, check): .select(*Check.get_light_fields()) .where(Check.project == check.project) .where(Check.change != check.change) .where(Change.branch == check.change.branch) .where(Check.ordinal < check.ordinal) .order_by(Check.ordinal.desc()) def get_chart_data(self, check): .select(*Check.get_light_fields()) .where(Check.project == check.project) .where(Change.branch == check.change.branch) .order_by(Check.ordinal.desc())) more = checks.where(Check.ordinal >= check.ordinal).count() history = list(reversed(checks .offset(max(0, more - limit)))) 'errors': c.errors if c.success else None, 'warnings': c.warnings if c.success else None, 'duration': int((c.finished - c.started).total_seconds()) 'current': check.ordinal, 'projectURL': check.project.get_url() def get_files_and_codes(self, check, lines): if not check.success or not (check.errors or check.warnings): files = {'errors': OrderedDict(), 'warnings': OrderedDict()} codes = {'errors': {}, 'warnings': {}} for number, line in enumerate(lines, 1): if line.get('cls') not in ('error', 'warning'): for key, data in (('filename', files), ('code', codes)): if value not in data[kind]: data[kind][value] = {'count': 0, 'line': number} data[kind][value]['count'] += 1 {'filename': k, 'count': v['count'], 'line': v['line']} for k, v in files[kind].items() {'code': k, 'count': v['count'], 'line': v['line']} for k, v in sorted(codes[kind].items(), key=lambda i: (-i[1]['count'], i[0])) def get_html(self, project, check, lines): adapter = project.get_adapter() line['link'] = adapter.get_line_url(check.change, line) reference = self.get_reference_check(check) chart = self.get_chart_data(check) files, codes = self.get_files_and_codes(check, lines) self.render('check.html', project=project, check=check, lines=lines, adapter=adapter, reference=reference, chart=chart, files=files, codes=codes) class CompareHandler(BaseHandler): def get(self, domain, user, name, check_num, reference_num): project = get_project_or_404(domain, user, name) adapter = project.get_adapter() check = get_check_or_404(project, check_num) reference = get_check_or_404(project, reference_num) lines = json_decode(check.lines) reflines = json_decode(reference.lines) rcl = [l['text'] for l in reflines if l.get('task') == 'checks'] cl = [l['text'] for l in lines if l.get('task') == 'checks'] diff = unified_diff(rcl, cl, n=max(len(rcl), len(cl))) for text in list(diff)[4:]: elif text.startswith('-'): difflines.append({'text': 'Diff is empty', 'cls': 'meta'}) self.render('compare.html', project=project, check=check, reference=reference, lines=difflines, adapter=adapter) class StatusHandler(BaseHandler): def get(self, domain, user, name): project = get_project_or_404(domain, user, name) .select(*Check.get_light_fields()) .where(Check.project == project) .order_by(Check.ordinal.desc())) branch = self.get_argument('branch', None) checks = checks.join(Change).where(Change.branch == branch) parts = [('#555', 30, 14.5, 'lint')] if check is None or not check.success: parts.append(('#777', 62, 30.5, 'unknown')) elif check.errors or check.warnings: msg = self.locale.translate( '{} error', '{} errors', check.errors) text = msg.format(check.errors) width = 7 + 6 * len(text) + 7 parts.append(('#da314b', width, width // 2 - 0.5, text)) msg = self.locale.translate( '{} warning', '{} warnings', check.warnings) text = msg.format(check.warnings) width = 7 + 6 * len(text) + 9 parts.append(('#faa732', width, width // 2 + 0.5, text)) parts.append(('#8cc14c', 60, 29.5, 'all clean')) width = sum(p[1] for p in parts) self.set_header('Content-Type', 'image/svg+xml; charset=utf-8') self.render('status.svg', width=width, parts=parts, height=20) class DotHandler(BaseHandler): def get(self, domain, user, name, check_num): project = get_project_or_404(domain, user, name) .select(*Check.get_light_fields()) .where(Check.project == project) .order_by(Check.ordinal.desc())) if check_num != 'latest': checks = checks.where(Check.ordinal == check_num) branch = self.get_argument('branch', None) checks = checks.join(Change).where(Change.branch == branch) self.set_header('Content-Type', 'image/svg+xml; charset=utf-8') self.render('dot.svg', color=color) class ErrorHandler(BaseHandler, BaseErrorHandler):