Anton Shestakov <av6@dwimlabs.net>, Wed, 20 Jul 2016 00:32:32 +0800
rollbot: no need to wait for MUCs
If a user sends groupchat message to a room while the bot is still in the
process of joining that room, the message will not be handled. And it shouldn't
be, because the bot is not there yet! So there's no point in blocking the
current thread on joining MUCs.
Additionally, using wait=True triggers a warning: "Use of send mask waiters is
deprecated." (from XMLStream.send)
rollbot.py
Permissions: -rw-r--r--
from argparse import ArgumentParser, FileType, SUPPRESS if sys.version_info < (3, 0): from sleekxmpp.util.misc_ops import setdefaultencoding setdefaultencoding('utf8') def simple_markup(string): tagre = re.compile(r'\{(\w+)\}(.*?)\{/\1\}') plain = tagre.sub(r'\2', string) def get_replacement(match): 'i': 'font-style: italic;', 't': 'font-family: monospace;' style = codes.get(code, '') return '<span style="{}">{}</span>'.format(style, text) html = tagre.sub(get_replacement, cgi.escape(string)) def __init__(self, rollbot): return '(no help available, try experimenting)' regex = re.compile('^(?P<times>\d+)?d(?P<sides>\d+)$', re.IGNORECASE) '{t}%s <X>D<Y>{/t} to roll {t}Y{/t}-sided die {t}X{/t} times,' ' if {t}X{/t} is 1 it can be omitted' % self.name) def respond(self, words, message): match = self.regex.match(words[1]) times = match.group('times') sides = int(match.group('sides')) if times > 0 and sides > 1: rolls, total = self.roll(times, sides) return self.format(rolls, total) return '%s usage: %s' % (self.name, self.usage()) def roll(self, times, sides): results = [random.randint(1, sides) for i in range(times)] def format(self, rolls, total): return '{G}%d{/G}' % total sequence = ' + '.join('{B}%d{/B}' % r for r in rolls) return '(%s) = {G}%d{/G}' % (sequence, total) return '{t}%s{/t} {i}or{/i} {t}%s <1st-option>[ or] <2nd-option>{/t}' % (self.name, self.name) def respond(self, words, message): option = self.flip(('heads', 'tails')) option = self.flip(words[1:]) elif len(words) == 4 and words[2].lower() == 'or': option = self.flip((words[1], words[3])) return '%s usage: %s' % (self.name, self.usage()) return self.format(option) return random.choice(options) def format(self, option): return '{G}%s{/G}' % option return '{t}%s{/t} {i}or{/i} {t}%s <command>{/t}' % (self.name, self.name) def respond(self, words, message): if cmdname in self.rollbot.commands: command = self.rollbot.commands[cmdname] return '%s usage: %s' % (command.name, command.usage()) return 'no such command: %s' % cmdname cmds = ' '.join(sorted(self.rollbot.commands.keys())) return 'available commands: %s' % cmds return '%s usage: %s' % (self.name, self.usage()) class RollBot(sleekxmpp.ClientXMPP): def __init__(self, jid, password, nick, prefix): sleekxmpp.ClientXMPP.__init__(self, jid, password) self.add_event_handler('session_start', self.start) self.add_event_handler('message', self.message) self.add_event_handler('groupchat_message', self.muc_message) self.add_event_handler('groupchat_invite', self.muc_invite) self.add_event_handler('groupchat_direct_invite', self.muc_direct_invite) def add_command(self, commandcls): self.commands[commandcls.name] = commandcls(self) if msg['type'] not in ('chat', 'normal'): mbody = msg['body'].lstrip() if mbody.startswith(self.prefix): mbody = mbody[len(self.prefix):] mbody = mbody.lstrip(' ') rbody, rxbody = self.respond(mbody) msg['html']['body'] = rxbody def muc_message(self, msg): if msg['mucnick'] == self.nick: mbody = msg['body'].lstrip() if mbody.startswith(self.nick): mbody = mbody[len(self.nick):] elif mbody.startswith(self.prefix): mbody = mbody[len(self.prefix):] mbody = mbody.lstrip(':, ') rbody, rxbody = self.respond(mbody) rbody = '%s: %s' % (msg['mucnick'], rbody) rxbody = '%s: %s' % (cgi.escape(msg['mucnick']), rxbody) msg['html']['body'] = rxbody def muc_invite(self, invite): # https://github.com/fritzy/SleekXMPP/issues/409 ns = 'http://jabber.org/protocol/muc#user' pel = invite.find('{%(ns)s}x/{%(ns)s}password' % {'ns': ns}) password = pel.text if pel is not None else '' self.plugin['xep_0045'].joinMUC(room, self.nick, password=password) def muc_direct_invite(self, invite): room = invite['groupchat_invite']['jid'] password = invite['groupchat_invite']['password'] self.plugin['xep_0045'].joinMUC(room, self.nick, password=password) def respond(self, message, prefix=None): cmdname = words[0].lower() if cmdname in self.commands: command = self.commands[cmdname] response = command.respond(words, message) plain, html = simple_markup(response) def lookup(key, args, config, default=None): return getattr(args, key) return config.get(key, default) parser = ArgumentParser(argument_default=SUPPRESS) '-c', '--config', type=FileType('r'), help='configuration file (YAML)') group = parser.add_mutually_exclusive_group() extra = dict(action='store_const', dest='loglevel', default=logging.INFO) '-q', '--quiet', const=logging.ERROR, help='set logging to ERROR', **extra) '-d', '--debug', const=logging.DEBUG, help='set logging to DEBUG', **extra) group = parser.add_argument_group( 'options that can also be specified in the configuration file') group.add_argument('-j', '--jid', help='JID to use') group.add_argument('-p', '--password', help='password to use') group.add_argument('-n', '--nick', help='MUC nickname (default: rollbot)') group.add_argument('--prefix', help='command prefix (default: !)') args = parser.parse_args() config = yaml.safe_load(args.config) if hasattr(args, 'config') else {} jid = lookup('jid', args, config) password = lookup('password', args, config) nick = lookup('nick', args, config, default='rollbot') prefix = lookup('prefix', args, config, default='!') format='%(asctime)s %(levelname)-8s %(message)s') rollbot = RollBot(jid, password, nick, prefix) rollbot.auto_authorize = True rollbot.auto_subscribe = True rollbot.register_plugin('xep_0030') # Service Discovery rollbot.register_plugin('xep_0045') # Multi-User Chat rollbot.register_plugin('xep_0092') # Software Version rollbot.register_plugin('xep_0199') # XMPP Ping rollbot.register_plugin('xep_0249') # Direct MUC Invitations logging.info('RollBot connected') rollbot.process(block=True) logging.info('RollBot disconnected') logging.fatal("RollBot couldn't connect") if __name__ == '__main__':