29:c04a4faec835
Anton Shestakov <av6@dwimlabs.net>, Wed, 20 Jul 2016 00:32:32 +0800
rollbot: no need to wait for MUCs If a user sends groupchat message to a room while the bot is still in the process of joining that room, the message will not be handled. And it shouldn't be, because the bot is not there yet! So there's no point in blocking the current thread on joining MUCs. Additionally, using wait=True triggers a warning: "Use of send mask waiters is deprecated." (from XMLStream.send)

next change 30:f21f5ee3e32c
previous change 28:7f7fdce01952

rollbot.py

Permissions: -rw-r--r--

Other formats: Feeds:
#!/usr/bin/env python
import cgi
import logging
import random
import re
import sys
from argparse import ArgumentParser, FileType, SUPPRESS
import yaml
import sleekxmpp
if sys.version_info < (3, 0):
from sleekxmpp.util.misc_ops import setdefaultencoding
setdefaultencoding('utf8')
def simple_markup(string):
plain = ''
html = ''
tagre = re.compile(r'\{(\w+)\}(.*?)\{/\1\}')
plain = tagre.sub(r'\2', string)
def get_replacement(match):
code = match.group(1)
text = match.group(2)
codes = {
'B': 'color: blue;',
'G': 'color: green;',
'i': 'font-style: italic;',
't': 'font-family: monospace;'
}
style = codes.get(code, '')
return '<span style="{}">{}</span>'.format(style, text)
html = tagre.sub(get_replacement, cgi.escape(string))
return plain, html
class Command(object):
name = 'command'
def __init__(self, rollbot):
self.rollbot = rollbot
def usage(self):
return '(no help available, try experimenting)'
class Roll(Command):
name = 'roll'
regex = re.compile('^(?P<times>\d+)?d(?P<sides>\d+)$', re.IGNORECASE)
def usage(self):
return (
'{t}%s <X>D<Y>{/t} to roll {t}Y{/t}-sided die {t}X{/t} times,'
' if {t}X{/t} is 1 it can be omitted' % self.name)
def respond(self, words, message):
if len(words) == 2:
match = self.regex.match(words[1])
if match is not None:
times = match.group('times')
if times is None:
times = 1
else:
times = int(times)
sides = int(match.group('sides'))
if times > 0 and sides > 1:
rolls, total = self.roll(times, sides)
return self.format(rolls, total)
return '%s usage: %s' % (self.name, self.usage())
def roll(self, times, sides):
results = [random.randint(1, sides) for i in range(times)]
total = sum(results)
return results, total
def format(self, rolls, total):
if len(rolls) == 1:
return '{G}%d{/G}' % total
else:
sequence = ' + '.join('{B}%d{/B}' % r for r in rolls)
return '(%s) = {G}%d{/G}' % (sequence, total)
class Flip(Command):
name = 'flip'
def usage(self):
return '{t}%s{/t} {i}or{/i} {t}%s <1st-option>[ or] <2nd-option>{/t}' % (self.name, self.name)
def respond(self, words, message):
if len(words) == 1:
option = self.flip(('heads', 'tails'))
elif len(words) == 3:
option = self.flip(words[1:])
elif len(words) == 4 and words[2].lower() == 'or':
option = self.flip((words[1], words[3]))
else:
return '%s usage: %s' % (self.name, self.usage())
return self.format(option)
def flip(self, options):
return random.choice(options)
def format(self, option):
return '{G}%s{/G}' % option
class Help(Command):
name = 'help'
def usage(self):
return '{t}%s{/t} {i}or{/i} {t}%s <command>{/t}' % (self.name, self.name)
def respond(self, words, message):
if len(words) == 2:
cmdname = words[1]
if cmdname in self.rollbot.commands:
command = self.rollbot.commands[cmdname]
return '%s usage: %s' % (command.name, command.usage())
else:
return 'no such command: %s' % cmdname
elif len(words) == 1:
cmds = ' '.join(sorted(self.rollbot.commands.keys()))
return 'available commands: %s' % cmds
else:
return '%s usage: %s' % (self.name, self.usage())
class RollBot(sleekxmpp.ClientXMPP):
def __init__(self, jid, password, nick, prefix):
sleekxmpp.ClientXMPP.__init__(self, jid, password)
self.nick = nick
self.prefix = prefix
self.commands = {}
self.add_command(Roll)
self.add_command(Flip)
self.add_command(Help)
self.add_event_handler('session_start', self.start)
self.add_event_handler('message', self.message)
self.add_event_handler('groupchat_message', self.muc_message)
self.add_event_handler('groupchat_invite', self.muc_invite)
self.add_event_handler('groupchat_direct_invite', self.muc_direct_invite)
def add_command(self, commandcls):
self.commands[commandcls.name] = commandcls(self)
def start(self, event):
self.get_roster()
self.send_presence()
def message(self, msg):
if msg['type'] not in ('chat', 'normal'):
return
mbody = msg['body'].lstrip()
if mbody.startswith(self.prefix):
mbody = mbody[len(self.prefix):]
mbody = mbody.lstrip(' ')
rbody, rxbody = self.respond(mbody)
if rbody:
msg.reply(rbody)
msg['html']['body'] = rxbody
msg.send()
def muc_message(self, msg):
if msg['mucnick'] == self.nick:
return
mbody = msg['body'].lstrip()
if mbody.startswith(self.nick):
mbody = mbody[len(self.nick):]
elif mbody.startswith(self.prefix):
mbody = mbody[len(self.prefix):]
else:
return
mbody = mbody.lstrip(':, ')
rbody, rxbody = self.respond(mbody)
if rbody:
rbody = '%s: %s' % (msg['mucnick'], rbody)
rxbody = '%s: %s' % (cgi.escape(msg['mucnick']), rxbody)
msg.reply(rbody)
msg['html']['body'] = rxbody
msg.send()
def muc_invite(self, invite):
room = invite['from']
# https://github.com/fritzy/SleekXMPP/issues/409
ns = 'http://jabber.org/protocol/muc#user'
pel = invite.find('{%(ns)s}x/{%(ns)s}password' % {'ns': ns})
password = pel.text if pel is not None else ''
self.plugin['xep_0045'].joinMUC(room, self.nick, password=password)
def muc_direct_invite(self, invite):
room = invite['groupchat_invite']['jid']
password = invite['groupchat_invite']['password']
self.plugin['xep_0045'].joinMUC(room, self.nick, password=password)
def respond(self, message, prefix=None):
plain = ''
html = ''
words = message.split()
if words:
cmdname = words[0].lower()
if cmdname in self.commands:
command = self.commands[cmdname]
response = command.respond(words, message)
plain, html = simple_markup(response)
return plain, html
def lookup(key, args, config, default=None):
if hasattr(args, key):
return getattr(args, key)
return config.get(key, default)
def main():
parser = ArgumentParser(argument_default=SUPPRESS)
parser.add_argument(
'-c', '--config', type=FileType('r'),
help='configuration file (YAML)')
group = parser.add_mutually_exclusive_group()
extra = dict(action='store_const', dest='loglevel', default=logging.INFO)
group.add_argument(
'-q', '--quiet', const=logging.ERROR,
help='set logging to ERROR', **extra)
group.add_argument(
'-d', '--debug', const=logging.DEBUG,
help='set logging to DEBUG', **extra)
group = parser.add_argument_group(
'main configuration',
'options that can also be specified in the configuration file')
group.add_argument('-j', '--jid', help='JID to use')
group.add_argument('-p', '--password', help='password to use')
group.add_argument('-n', '--nick', help='MUC nickname (default: rollbot)')
group.add_argument('--prefix', help='command prefix (default: !)')
args = parser.parse_args()
config = yaml.safe_load(args.config) if hasattr(args, 'config') else {}
jid = lookup('jid', args, config)
password = lookup('password', args, config)
nick = lookup('nick', args, config, default='rollbot')
prefix = lookup('prefix', args, config, default='!')
logging.basicConfig(
level=args.loglevel,
format='%(asctime)s %(levelname)-8s %(message)s')
rollbot = RollBot(jid, password, nick, prefix)
rollbot.auto_authorize = True
rollbot.auto_subscribe = True
rollbot.register_plugin('xep_0030') # Service Discovery
rollbot.register_plugin('xep_0045') # Multi-User Chat
rollbot.register_plugin('xep_0092') # Software Version
rollbot.register_plugin('xep_0199') # XMPP Ping
rollbot.register_plugin('xep_0249') # Direct MUC Invitations
if rollbot.connect():
logging.info('RollBot connected')
rollbot.process(block=True)
logging.info('RollBot disconnected')
else:
logging.fatal("RollBot couldn't connect")
if __name__ == '__main__':
main()