Extensions

bottom doesn’t have any clever import hooks to identify plugins based on name, shape, or other significant denomination. Instead, we can create extensions by using client.on on a Client instance.

Keepalive

Instead of writing the same PING handler everywhere, a reusable plugin:

# my_plugin.py
def keepalive(client):
    @client.on("ping")
    def handle(message=None, **kwargs):
        message = message or ""
        client.send("pong", message=message)

That’s it! And to use it:

import bottom
from my_plugin import keepalive

client = bottom.Client(...)
keepalive(client)

Returning new objects

Aside from subclassing bottom.Client, we can use a class to expose additional behavior around a client. This can be useful when we’re worried about other plugins assigning different meaning to the same attributes:

# irc_logging.py
class Logger:
    def __init__(self, client, local_logger):
        self.client = client
        self.local = local_logger
        client.on("client_disconnect", self._on_disconnect)

    def log(self, level, message):
        try:
            self.client.send("{}: {}".format(level.upper(), message))
        catch RuntimeError:
            self.local.warning("Failed to log to remote")

            # Get the local logger's method by name
            # ex. `self.local.info`
            method = getattr(self.local, level.lower())
            method(message)

    def _on_disconnect(self):
        self.local.warning("Remote logging client disconnected!")


    def debug(self, message):
        self.log("debug", message)

    # Same for info, warning, ...
    ...

And its usage:

import bottom
import logging
from irc_logging import Logger

local_logger = logging.getLogger(__name__)

client = bottom.Client(...)
remote_logger = Logger(client, local_logger)


@client.on("client_connect")
def log_connect(**kwargs):
    remote_logger.info("Connected!")

# Connect and send "INFO: Connected!"
client.loop.run_until_complete(client.connect())

Notice that the logging functionality is part of a different object, not the client. This keeps the namespace clean, and reduces the attribute contention that can occur when multiple plugins store their information directly on the client instance.

This line hooked the logger’s disconnect handler to the client:

def __init__(self, client, ...):
    ...
    client.on("client_disconnect", self._on_disconnect)

Pattern matching

We can write a simple wrapper class to annotate functions to handle PRIVMSG matching a regex. To keep the interface simple, we can use bottom’s annotation pattern and pass the regex to match.

In the following example, we’ll define a handler that echos whatever a user asks for, if it’s in the correct format:

import bottom

client = bottom.Client(host=host, port=port, ssl=ssl)
router = Router(client)


@router.route("^bot, say (\w+)\.$")
def echo(self, nick, target, message, match, **kwargs):
    if target == router.nick:
        # respond in a direct message
        target = nick
    client.send("privmsg", target=target, message=match.group(1))

Now, the Router class needs to manage the regex -> handler mapping and connect an event handler to PRIVMSG on its client:

import asyncio
import functools
import re


class Router(object):
    def __init__(self, client):
        self.client = client
        self.routes = {}
        client.on("PRIVMSG")(self._handle)

    def _handle(self, nick, target, message, **kwargs):
        """ client callback entrance """
        for regex, (func, pattern) in self.routes.items():
            match = regex.match(message)
            if match:
                self.client.loop.create_task(func(nick, target, message, match, **kwargs))

    def route(self, pattern, func=None, **kwargs):
        if func is None:
            return functools.partial(self.route, pattern)

        # Decorator should always return the original function
        wrapped = func
        if not asyncio.iscoroutinefunction(wrapped):
            wrapped = asyncio.coroutine(wrapped)

        compiled = re.compile(pattern)
        self.routes[compiled] = (wrapped, pattern)
        return func

Wait for any events

Use Client.wait() to pause until one or all signals have fired. For example, after sending NICK/USER during CLIENT_CONNECT, some servers will ignore subsequent commands until they have finished sending RPL_ENDOFMOTD. This can be used to wait for any signal that the MOTD has been sent (eg. ERR_NOMOTD may be sent instead of RPL_ENDOFMOTD).

import asyncio


def waiter(client):
    async def wait_for(*events, return_when=asyncio.FIRST_COMPLETED):
        if not events:
            return
        done, pending = await asyncio.wait(
            [bot.wait(event) for event in events],
            loop=bot.loop,
            return_when=return_when)

        # Cancel any events that didn't come in.
        for future in pending:
            future.cancel()
    return wait_for

To use in the CLIENT_CONNECT process:

import bottom
client = bottom.Client(...)
wait_for = waiter(client)


@client.on("CLIENT_CONNECT")
async def on_connect(**kwargs):
    client.send('nick', ...)
    client.send('user', ...)

    await wait_for('RPL_ENDOFMOTD', 'ERR_NOMOTD')

    client.send('join', ...)

Send and trigger raw messages

New in version 2.1.0.

Extensions do not need to strictly conform to rfc 2812. You can send or trigger custom messages with Client.send_raw and Client.handle_raw. For example, the following can be used to request Twitch.tv’s Membership capability using IRC v3’s capabilities registration:

client = MyTwitchClient(...)
client.send_raw("CAP REQ :twitch.tv/membership")

Just as Client.trigger can be used to manually invoke handlers for a specific event, Client.handle_raw can be called to manually invoke raw handlers for a given message. For the above example, you can ensure you handle the response from Twitch.tv with the following:

response = ":tmi.twitch.tv CAP * ACK :twitch.tv/membership"
client = MyTwitchClient(...)
client.handle_raw(response)

Raw handlers

New in version 2.1.0.

Clients can extend or replace the default message handler by modifying the Client.raw_handlers list. This is a list of async functions that take a (next_handler, message) tuple. To allow the next handler to process a message, call next_handler(message) within your handler. You may also send a different message to the subsequent handler, or not invoke it at all.

The following listens for responses from twitch.tv about capabilities and logs them. Otherwise, it passes the message on to the next handler.

import re
CAPABILITY_RESPONSE_PATTERN = re.compile(
    "^:tmi\.twitch\.tv CAP \* ACK :twitch\.tv/\w+$")


async def capability_handler(next_handler, message):
    if CAPABILITY_RESPONSE_PATTERN.match(message):
        print("Capability granted: " + message)
    else:
        await next_handler(message)

And to ensure it runs before the default handler:

client = Client(...)
client.raw_handlers.insert(0, capability_handler)

Unlike Client.on, raw handlers must be async functions.

Handlers may send a different message than they receive. The following can be used to forward messages from one chat room to another:

from bottom.pack import pack_command
from bottom.unpack import unpack_command


def forward(old_room, new_room):
    async def handle(next_handler, message):
        try:
            event, kwargs = unpack_command(message)
        except ValueError:
            # pass message unchanged
            pass
        else:
            if event.lower() == "privmsg":
                if kwargs["target"].lower() == old_room.lower():
                    kwargs["target"] = new_room
                    message = pack_command("privmsg", **kwargs)
        await next_handler(message)
    return handle

And its usage:

client = Client(...)

forwarding = forward("bottom-legacy", "bottom-dev")
client.raw_handlers.insert(0, forwarding)

Full message encryption

This is a more complex example of a raw handler where messages are encrypted and then base64 encoded. On the wire their only similarity with the IRC protocol is a newline terminating character. This is enough to build an extension to transparently encrypt data.

Assume you have implemented a class with the following interface:

class EncryptionContext:
    def encrypt(self, data: bytes) -> bytes:
        ...

    def decrypt(self, data: bytes) -> bytes:
        ...

the following extension can be written:

import base64

def encryption_handler(context: EncryptionContext):
    async def handle_decrypt(next_handler, message):
        message = context.decrypt(
            base64.b64decode(
                message.encode("utf-8")
            )
        ).decode("utf-8")
        await next_handler(message)
    return handle_decrypt

to encrypt messages as they are sent, the class can override Client.send_raw. Adding in the encryption handler above:

class EncryptedClient(Client):
    def __init__(self, encryption_context, **kwargs):
        super().__init__(**kwargs)
        self.raw_handlers.append(
            encryption_handler(encryption_context))
        self.context = encryption_context

    def send_raw(self, message: str) -> None:
        message = base64.b64encode(
            self.context.encrypt(
                message.encode("utf-8")
            )
        ).decode("utf-8")
        super().send_raw(message)