139:8155ebf7d128
Anton Shestakov <av6@dwimlabs.net>, Wed, 13 Jul 2016 18:25:43 +0800
incoming: support --help and --debug, and read from stdin by default

next change 142:41d0a4c7376a
previous change 126:76da93a3fe44

checker.py

Permissions: -rwxr-xr-x

Other formats: Feeds:
#!/usr/bin/env python -u
from __future__ import print_function
import os
from argparse import ArgumentParser, FileType
from datetime import datetime
from fnmatch import fnmatchcase
from shutil import rmtree
from subprocess import check_call, check_output, CalledProcessError
from tempfile import mkdtemp
import yaml
rel = lambda *x: os.path.abspath(os.path.join(os.path.dirname(__file__), *x))
def run_ignore_codes(fn, args, codes):
try:
return fn(args)
except CalledProcessError as e:
if e.returncode not in codes:
raise
return e.output
def run(args, silent=False, get_output=False, ignore_codes=None):
if get_output:
fn = check_output
else:
fn = check_call
try:
if not silent:
print('$ ' + ' '.join(args))
if ignore_codes:
result = run_ignore_codes(fn, args, ignore_codes)
else:
result = fn(args)
if get_output:
return result
else:
return True
except Exception as e:
print('# C&O error: {}'.format(e))
print('# C&O job failed')
return False
def now():
return datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S+00:00')
def read_linter_config(name, path=rel('linters')):
try:
fd = open(os.path.join(path, '{}.yml'.format(name)))
except Exception as e:
print("# C&O couldn't load linter config: {}".format(e))
return None
with fd:
try:
return yaml.safe_load(fd)
except Exception as e:
print("# C&O couldn't parse linter config: {}".format(e))
return None
def prevalidate(config):
ok = True
if 'url' not in config:
print("# C&O config doesn't have 'url' defined.")
ok = False
if 'scm' not in config:
print("# C&O config doesn't have 'scm' defined.")
ok = False
elif config['scm'] not in ('git', 'hg'):
print("# C&O checker doesn't support {} yet.".format(config['scm']))
ok = False
return ok
def git_clone(url, dest):
return run(['git', 'clone', '--depth', '1', url, dest])
def hg_clone(url, dest):
return run(['hg', 'clone', url, dest])
def git_status():
return run(['git', 'status'])
def hg_summary():
return run(['hg', 'sum'])
def git_show():
template = (r'# C&O commit: %H%n'
r'# C&O commit ref names: %D%n'
r'# C&O commit date: %ai%n'
r'# C&O commit author: %aN%n'
r'# C&O commit message: %s%n')
return run(['git', 'show', '-q', '--format=format:' + template], silent=True)
def hg_log():
template = (r'# C&O commit: {rev}:{node}\n'
r'# C&O commit branch: {branch}\n'
r'# C&O commit date: {date|isodatesec}\n'
r'# C&O commit author: {author|person}\n'
r'# C&O commit message: {desc|firstline}\n')
return run(['hg', 'log', '-r', '.', '-T', template], silent=True)
def git_files(include, exclude):
cmd = ['git', 'ls-files']
cmd.extend(include)
files = run(cmd, silent=True, get_output=True)
if files is not False:
files = files.splitlines()
for e in exclude:
files = [f for f in files if not fnmatchcase(f, e)]
return files
def hg_files(include, exclude):
cmd = ['hg', 'files']
for pat in include:
cmd.extend(('-I', pat))
for pat in exclude:
cmd.extend(('-X', pat))
files = run(cmd, silent=True, get_output=True, ignore_codes=(1,))
if files is not False:
files = files.splitlines()
return files
def execute(tmp, config):
if not prevalidate(config):
return False
os.chdir(tmp)
source = './source'
print('# C&O task: clone')
print('# C&O project URL: {}'.format(config['url']))
if config['scm'] == 'hg':
os.environ['HGPLAIN'] = '1'
if config['scm'] == 'git':
if not git_clone(config['url'], source):
return False
elif config['scm'] == 'hg':
if not hg_clone(config['url'], source):
return False
print('$ cd {}'.format(source))
os.chdir(source)
if config['scm'] == 'git':
if not git_status():
return False
elif config['scm'] == 'hg':
if not hg_summary():
return False
if config['scm'] == 'git':
if not git_show():
return False
elif config['scm'] == 'hg':
if not hg_log():
return False
print('# C&O task: setup')
to_install = [l['name'] for l in config.get('linters', []) if 'name' in l]
print('# C&O linters to install: {}'.format(' '.join(to_install)))
linter_config = {}
for linter in config.get('linters', []):
if 'name' not in linter:
continue
name = linter['name']
if name in linter_config:
continue
lc = read_linter_config(name)
if lc is None or 'exec' not in lc:
continue
linter_config[name] = lc
for item in lc.get('setup', []):
if not run(item):
return False
if 'version' in lc and not run(lc['exec'] + lc['version']):
return False
print('# C&O linters installed: {}'.format(' '.join(linter_config.keys())))
print('# C&O task: checks')
for linter in config.get('linters', []):
if 'name' not in linter or 'include' not in linter:
continue
if linter['name'] not in linter_config:
continue
if config['scm'] == 'git':
files = git_files(linter['include'], linter.get('exclude', []))
elif config['scm'] == 'hg':
files = hg_files(linter['include'], linter.get('exclude', []))
if files is False:
return False
lc = linter_config[linter['name']]
for f in files:
cmd = lc['exec']
flags = lc.get('flags', []) + linter.get('flags', [])
pf = lc.get('post_flags', []) + linter.get('post_flags', [])
codes = lc.get('codes', (1,))
if not run(cmd + flags + [f] + pf, ignore_codes=codes):
return False
return True
def wrapper(config):
print('# C&O job started: {}'.format(now()))
tmp = mkdtemp(prefix='candolint.')
if not execute(tmp, config):
print('# C&O job failed')
print('# C&O task: cleanup')
rmtree(tmp)
print('# C&O job finished: {}'.format(now()))
def bad_exit():
print('# C&O job failed (exception not caught)')
print('# C&O job finished: {}'.format(now()))
def main():
parser = ArgumentParser()
helptext = 'project configuration file (YAML)'
parser.add_argument('config', type=FileType('r'), help=helptext)
args = parser.parse_args()
config = yaml.safe_load(args.config)
try:
wrapper(config)
except:
bad_exit()
raise
if __name__ == '__main__':
main()