lightweight asyncio IRC client¶
With a very small API, bottom lets you wrap an IRC connection and handle events how you want. There are no assumptions about reconnecting, rate limiting, or even when to respond to PINGs.
Explicit is better than implicit: no magic importing or naming to remember for
plugins. Extend the client with the same @on
decorator.
Create an instance:
import asyncio
import bottom
host = 'chat.freenode.net'
port = 6697
ssl = True
NICK = "bottom-bot"
CHANNEL = "#bottom-dev"
bot = bottom.Client(host=host, port=port, ssl=ssl)
Send nick/user/join when connection is established:
@bot.on('CLIENT_CONNECT')
async def connect(**kwargs):
bot.send('NICK', nick=NICK)
bot.send('USER', user=NICK,
realname='https://github.com/numberoverzero/bottom')
# Don't try to join channels until the server has
# sent the MOTD, or signaled that there's no MOTD.
done, pending = await asyncio.wait(
[bot.wait("RPL_ENDOFMOTD"),
bot.wait("ERR_NOMOTD")],
loop=bot.loop,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel whichever waiter's event didn't come in.
for future in pending:
future.cancel()
bot.send('JOIN', channel=CHANNEL)
Respond to ping:
@bot.on('PING')
def keepalive(message, **kwargs):
bot.send('PONG', message=message)
Echo messages (channel and direct messages):
@bot.on('PRIVMSG')
def message(nick, target, message, **kwargs):
""" Echo all messages """
# Don't echo ourselves
if nick == NICK:
return
# Respond directly to direct messages
if target == NICK:
bot.send("PRIVMSG", target=nick, message=message)
# Channel message
else:
bot.send("PRIVMSG", target=target, message=message)
Connect and run the bot forever:
bot.loop.create_task(bot.connect())
bot.loop.run_forever()
Installation¶
Bottom supports Python 3.10+ and has no external dependencies.
Standard Installation¶
The easiest way is with pip:
pip install bottom
Alternative Installation¶
Sometimes the code on GitHub is ahead of the latest PyPI release. There are two ways to install from GitHub. First, by cloning the repo:
git clone git://github.com/numberoverzero/bottom.git
pip install ./bottom
Since pip supports installing from a git repo, you can also use:
pip install -e git://github.com/numberoverzero/bottom.git#egg=bottom
Using Async¶
Bottom accepts bot synchronous and async functions as callbacks.
Both of these are valid handlers for the privmsg
event:
@client.on('privmsg')
def synchronous_handler(**kwargs):
print("Synchronous call")
@client.on('privmsg')
async def async_handler(**kwargs):
await asyncio.sleep(1)
print("Async call")
Connect/Disconnect¶
Client connect and disconnect are coroutines so that we can easily wait for their completion before performing more actions in a handler. However, we don’t always want to wait for the action to complete. How can we do both?
Let’s say that on disconnect we want to reconnect, then notify the room that
we’re back. We need to await
for the connection before sending anything:
@client.on('client_disconnect')
async def reconnect(**kwargs):
# Wait a second so we don't flood
await asyncio.sleep(2)
# Wait until we've reconnected
await client.connect()
# Notify the room
client.send('privmsg', target='#bottom-dev',
message="I'm baaack!")
What about a handler that doesn’t need an established connection to finish? Instead of notifying the room, let’s log the reconnect time and return:
import arrow
import asyncio
import logging
logger = logging.getLogger(__name__)
@client.on('client_disconnect')
async def reconnect(**kwargs):
# Wait a second so we don't flood
await asyncio.sleep(2)
# Schedule a connection when the loop's next available
asyncio.create_task(client.connect())
# Record the time of the disconnect event
now = arrow.now()
logger.info("Reconnect started at " + now.isoformat())
We can also wait for the client_connect
event to trigger, which is slightly
different than waiting for client.connect to complete:
@client.on('client_disconnect')
async def reconnect(**kwargs):
# Wait a second so we don't flood
await asyncio.sleep(2)
# Schedule a connection when the loop's next available
asyncio.create_task(client.connect())
# Wait until client_connect has triggered
await client.wait("client_connect")
# Notify the room
client.send('privmsg', target='#bottom-dev',
message="I'm baaack!")
Debugging¶
You can get more asyncio debugging info by running python with the -X dev
flag:
python -X dev my_bot.py
For more information, see: Python Development Mode.
API¶
Client.on
¶
client.on(event)(func)
This decorator is the main way you’ll interact with a Client
. For a given
event name, it registers the decorated function to be invoked when that event
occurs. Your decorated functions should always accept **kwargs
, in case
unexpected kwargs are included when the event is triggered.
The usual IRC commands sent from a server are triggered automatically, or can
be manually invoked with trigger
. You may register handlers for any string,
making it easy to extend bottom with your own signals.
Not all available arguments need to be used. Both of the following are valid:
@bot.on('PRIVMSG')
def event(nick, message, target, **kwargs):
""" Doesn't use user, host. argument order is different """
# message sent to bot - echo message
if target == bot.nick:
bot.send('PRIVMSG', target, message=message)
# Some channel we're watching
elif target == bot.monitored_channel:
logger.info("{} -> {}: {}".format(nick, target, message))
@bot.on('PRIVMSG')
def func(message, target, **kwargs):
""" Just waiting for the signal """
if message == codeword && target == secret_channel:
execute_heist()
Handlers do not need to be async functions - non async will be wrapped prior to the bot running. For example, both of these are valid:
@bot.on('PRIVMSG')
def handle(message, **kwargs):
print(message)
@bot.on('PRIVMSG')
async def handle(message, **kwargs):
await async_logger.log(message)
Finally, you can create your own events to trigger and handle. For example, let’s catch SIGINT and gracefully shut down the event loop:
import signal
def handle_sigint(signum, frame):
print("SIGINT handler")
bot.trigger("my.sigint.event")
signal.signal(signal.SIGINT, handle_sigint)
@bot.on("my.sigint.event")
async def handle(**kwargs):
print("SIGINT trigger")
await bot.disconnect()
# Signal a stop before disconnecting so that any reconnect
# coros aren't run by the last run_forever sweep.
bot.loop.stop()
bot.loop.create_task(bot.connect())
bot.loop.run_forever() # Ctrl + C here
Client.trigger
¶
client.trigger(event, **kwargs)
Manually inject a command or reply as if it came from the server. This is
useful for invoking other handlers. Because trigger
doesn’t block, registered
callbacks for the event won’t run until the event loop yields to them.
Events don’t need to be valid irc commands; any string is available.
# Manually trigger `PRIVMSG` handlers:
bot.trigger('privmsg', nick="always_says_no", message="yes")
# Rename !commands to !help
@bot.on('privmsg')
def parse(nick, target, message, **kwargs):
if message == '!commands':
bot.send('privmsg', target=nick,
message="!commands was renamed to !help in 1.2")
# Don't make them retype it, trigger the correct command
bot.trigger('privmsg', nick=nick,
target=target, message="!help")
Because the @on
decorator returns the original function, you can register
a handler for multiple events. It’s especially important to use **kwargs
correctly here, to handle different keywords for each event.
# Simple recursive-style countdown
@bot.on('privmsg')
@bot.on('countdown')
async def handle(target, message, remaining=None, **kwargs):
# Entry point, verify command and parse from message
if remaining is None:
if not message.startswith("!countdown"):
return
# !countdown 10
remaining = int(message.split(" ")[-1])
if remaining == 0:
message = "Countdown complete!"
else:
message = "{}...".format(remaining)
# Assume for now that target is always a channel
bot.send("privmsg", target=target, message=message)
if remaining:
# After a second trigger another countdown event
await asyncio.sleep(1, loop=bot.loop)
bot.trigger('countdown', target=target,
message=message, remaining=remaining - 1)
Client.wait
¶
await client.wait(event)
Wait for an event to trigger:
@bot.on("client_disconnect")
async def reconnect(**kwargs):
# Trigger an event that may cascade to a client_connect.
# Don't continue until a client_connect occurs,
# which may be never.
bot.trigger("some.plugin.connection.lost")
await client.wait("client_connect")
# If we get here, one of the plugins handled connection lost by
# reconnecting, and we're back. Send some messages, etc.
client.send("privmsg", target=bot.CHANNEL,
message="Happy Birthday!")
Client.connect
¶
await client.connect()
Connect to the client’s host, port.
@bot.on('client_disconnect')
async def reconnect(**kwargs):
# Wait a few seconds
await asyncio.sleep(3, loop=bot.loop)
await bot.connect()
# Now that we're connected, let everyone know
bot.send('privmsg', target=bot.channel, message="I'm back.")
You can schedule a non-blocking connect with the client’s event loop:
@bot.on('client_disconnect')
def reconnect(**kwargs):
# Wait a few seconds
# Note that we're not in a coroutine, so we don't have access
# to await and asyncio.sleep
time.sleep(3)
# After this line we won't necessarily be connected.
# We've simply scheduled the connect to happen in the future
bot.loop.create_task(bot.connect())
print("Reconnect scheduled.")
Client.disconnect
¶
await client.disconnect()
Immediately disconnect from the server.
@bot.on('privmsg')
async def disconnect_bot(nick, message, **kwargs):
if nick == "myNick" and message == "disconnect:hunter2":
await bot.disconnect()
logger.log("disconnected bot.")
Like connect
, use the bot’s event loop to schedule a disconnect:
bot.loop.create_task(bot.disconnect())
Client.send
¶
client.send(command, **kwargs)
Send a command to the server. See Commands.
Client.handle_raw
¶
New in version 2.1.0.
client.handle_raw(message)
Manually inject a raw command. The client’s raw_handlers
will process
the message. By default, every Client
is configured with a rfc2812_handler
which unpacks a conforming rfc 2812 message into an event and calls client.trigger
.
You can disable this functionality by removing the handler:
client = Client(host="localhost", port=443)
client.raw_handlers.clear()
Client.send_raw
¶
New in version 2.1.0.
client.send_raw(message)
Send a complete IRC line without the Client reconstructing or modifying the message.
To easily send an rfc 2812 message, you should instead consider Client.send
.
Events¶
In bottom, an event is simply a string and set of **kwargs
to be passed to
any handlers listening for that event:
@client.on('any string is fine')
def handle(**kwargs):
if 'potato' in kwargs:
print("Found a potato!")
IRC Events¶
While connected, a client will trigger events for valid IRC commands that it
receives, with kwargs according to that command’s structure. For example, the
“part” event will always include the nickmask (nick
, user
, host
),
message
, and channel
kwargs, even if the message was empty:
@client.on("part")
def handle(nick, user, host, message, channel, **kwargs):
out = "User {}!{}@{} left {} with '{}'"
print(out.format(nick, user, host, channel, message))
Because kwargs
contains those fields, we could also use:
@client.on("part")
def handle(**kwargs):
out = ("User {nick}!{user}@{host} left"
" {channel} with '{message}'")
print(out.format(**kwargs))
Triggering¶
The same mechanism that the client uses to dispatch events can be invoked manually, either for custom events or to simulate receiving an irc command:
@client.on("privmsg")
def handle(**kwargs):
print("Someone sent a message!")
client.trigger("privmsg")
Running the above won’t print anything, however. Triggering an event only schedules the registered handlers (like the function we defined) to run in the future. Until we run the event loop, the triggered handlers won’t be invoked. Let’s see that print statement:
asyncio.get_event_loop().run_forever()
We can pass arbitrary kwargs to handlers through trigger
:
client.trigger("event")
client.trigger("event", **some_dict)
client.trigger("event", nick="bot", message="hello, world")
Waiting¶
Sometimes we need to wait for another event to occur before continuing. For example, consider a reconnect handler that wants to trigger the “reconnect” event for some plugins, but only after the connection has actually been established. The following will incorrectly signal that the reconnect has completed, while in reality the client has only scheduled a connection for the future:
@client.on("client_disconnect")
def reconnect(**kwargs):
client.connect()
client.trigger("reconnect", reconnect_msg="May not be connected!")
@client.on("reconnect")
def handle_reconnect(reconnect_msg="", **kwargs):
if reconnect_msg:
client.send("privmsg", target=CHANNEL, message=reconnect_msg)
Because both client.send
and client.connect
schedule coroutines, the
event loop may reorder (or process out of order). In reconnect
what we
really want to do is wait until the client_connect event is emitted, and then
trigger the reconnect event:
@client.on("client_disconnect")
async def reconnect(**kwargs):
client.connect()
await client.wait("client_connect")
client.trigger("reconnect", reconnect_msg="May not be connected!")
Whenever an event triggers, an asyncio.Event
is set and cleared, which
allows any code that is waiting on that event to continue. Be careful using
client.wait
- because we can call trigger with any string, wait
will
allow us to wait (forever) for events that may never trigger.
Supported Events¶
# Local only events
client.trigger('CLIENT_CONNECT')
client.trigger('CLIENT_DISCONNECT')
PING
JOIN
PART
PRIVMSG
NOTICE
USERMODE (renamed from MODE)
CHANNELMODE (renamed from MODE)
RPL_WELCOME (001)
RPL_YOURHOST (002)
RPL_CREATED (003)
RPL_MYINFO (004)
RPL_BOUNCE (005)
RPL_MOTDSTART (375)
RPL_MOTD (372)
RPL_ENDOFMOTD (376)
RPL_LUSERCLIENT (251)
RPL_LUSERME (255)
RPL_LUSEROP (252)
RPL_LUSERUNKNOWN (253)
RPL_LUSERCHANNELS (254)
ERR_NOMOTD (422)
Commands¶
client.send('PASS', password='hunter2')
client.send('NICK', nick='WiZ')
# mode is optional, default is 0
client.send('USER', user='WiZ-user', realname='Ronnie')
client.send('USER', user='WiZ-user', mode='8', realname='Ronnie')
client.send('OPER', user='WiZ', password='hunter2')
# Renamed from MODE
client.send('USERMODE', nick='WiZ')
client.send('USERMODE', nick='WiZ', modes='+io')
client.send('SERVICE', nick='CHANSERV', distribution='*.en',
type='0', info='manages channels')
client.send('QUIT')
client.send('QUIT', message='Gone to Lunch')
client.send('SQUIT', server='tolsun.oulu.fi')
client.send('SQUIT', server='tolsun.oulu.fi', message='Bad Link')
client.send('JOIN', channel='#foo-chan')
client.send('JOIN', channel='#foo-chan', key='foo-key')
client.send('JOIN', channel=['#foo-chan', '#other'],
key='foo-key') # other has no key
client.send('JOIN', channel=['#foo-chan', '#other'],
key=['foo-key', 'other-key'])
# this will cause you to LEAVE all currently joined channels
client.send('JOIN', channel='0')
client.send('PART', channel='#foo-chan')
client.send('PART', channel=['#foo-chan', '#other'])
client.send('PART', channel='#foo-chan', message='I lost')
# Renamed from MODE
client.send('CHANNELMODE', channel='#foo-chan', modes='+b')
client.send('CHANNELMODE', channel='#foo-chan', modes='+l',
params='10')
client.send('TOPIC', channel='#foo-chan')
client.send('TOPIC', channel='#foo-chan', # Clear channel message
message='')
client.send('TOPIC', channel='#foo-chan',
message='Yes, this is dog')
# target requires channel
client.send('NAMES')
client.send('NAMES', channel='#foo-chan')
client.send('NAMES', channel=['#foo-chan', '#other'])
client.send('NAMES', channel=['#foo-chan', '#other'],
target='remote.*.edu')
# target requires channel
client.send('LIST')
client.send('LIST', channel='#foo-chan')
client.send('LIST', channel=['#foo-chan', '#other'])
client.send('LIST', channel=['#foo-chan', '#other'],
target='remote.*.edu')
client.send('INVITE', nick='WiZ-friend', channel='#bar-chan')
# nick and channel must have the same number of elements
client.send('KICK', channel='#foo-chan', nick='WiZ')
client.send('KICK', channel='#foo-chan', nick='WiZ',
message='Spamming')
client.send('KICK', channel='#foo-chan', nick=['WiZ', 'WiZ-friend'])
client.send('KICK', channel=['#foo', '#bar'],
nick=['WiZ', 'WiZ-friend'])
client.send('PRIVMSG', target='WiZ-friend', message='Hello, friend!')
client.send('NOTICE', target='#foo-chan',
message='Maintenance in 5 mins')
client.send('MOTD')
client.send('MOTD', target='remote.*.edu')
client.send('LUSERS')
client.send('LUSERS', mask='*.edu')
client.send('LUSERS', mask='*.edu', target='remote.*.edu')
client.send('VERSION')
# target requires query
client.send('STATS')
client.send('STATS', query='m')
client.send('STATS', query='m', target='remote.*.edu')
# remote requires mask
client.send('LINKS')
client.send('LINKS', mask='*.bu.edu')
client.send('LINKS', mask='*.bu.edu', remote='*.edu')
client.send('TIME')
client.send('TIME', target='remote.*.edu')
client.send('CONNECT', target='tolsun.oulu.fi', port=6667)
client.send('CONNECT', target='tolsun.oulu.fi', port=6667,
remote='*.edu')
client.send('TRACE')
client.send('TRACE', target='remote.*.edu')
client.send('ADMIN')
client.send('ADMIN', target='remote.*.edu')
client.send('INFO')
client.send('INFO', target='remote.*.edu')
# type requires mask
client.send('SERVLIST', mask='*SERV')
client.send('SERVLIST', mask='*SERV', type=3)
client.send('SQUERY', target='irchelp', message='HELP privmsg')
client.send('WHO')
client.send('WHO', mask='*.fi')
client.send('WHO', mask='*.fi', o=True)
client.send('WHOIS', mask='*.fi')
client.send('WHOIS', mask=['*.fi', '*.edu'], target='remote.*.edu')
# target requires count
client.send('WHOWAS', nick='WiZ')
client.send('WHOWAS', nick='WiZ', count=10)
client.send('WHOWAS', nick=['WiZ', 'WiZ-friend'], count=10)
client.send('WHOWAS', nick='WiZ', count=10, target='remote.*.edu')
client.send('KILL', nick='WiZ', message='Spamming Joins')
# PING the server you are connected to
client.send('PING')
client.send('PING', message='Test..')
# when replying to a PING, the message should be the same
client.send('PONG')
client.send('PONG', message='Test..')
client.send('AWAY')
client.send('AWAY', message='Gone to Lunch')
client.send('REHASH')
client.send('DIE')
client.send('RESTART')
# target requires channel
client.send('SUMMON', nick='WiZ')
client.send('SUMMON', nick='WiZ', target='remote.*.edu')
client.send('SUMMON', nick='WiZ', target='remote.*.edu',
channel='#foo-chan')
client.send('USERS')
client.send('USERS', target='remote.*.edu')
client.send('WALLOPS', message='Maintenance in 5 minutes')
client.send('USERHOST', nick='WiZ')
client.send('USERHOST', nick=['WiZ', 'WiZ-friend'])
client.send('ISON', nick='WiZ')
client.send('ISON', nick=['WiZ', 'WiZ-friend'])
Extensions¶
bottom doesn’t have any clever import hooks to identify plugins based on name,
shape, or other significant denomination. Instead, we can create extensions
by using client.on
on a Client
instance.
Keepalive¶
Instead of writing the same PING
handler everywhere, a reusable plugin:
# my_plugin.py
def keepalive(client):
@client.on("ping")
def handle(message=None, **kwargs):
message = message or ""
client.send("pong", message=message)
That’s it! And to use it:
import bottom
from my_plugin import keepalive
client = bottom.Client(...)
keepalive(client)
Returning new objects¶
Aside from subclassing bottom.Client
, we can use a class to expose
additional behavior around a client. This can be useful when we’re worried
about other plugins assigning different meaning to the same attributes:
# irc_logging.py
class Logger:
def __init__(self, client, local_logger):
self.client = client
self.local = local_logger
client.on("client_disconnect", self._on_disconnect)
def log(self, level, message):
try:
self.client.send("{}: {}".format(level.upper(), message))
catch RuntimeError:
self.local.warning("Failed to log to remote")
# Get the local logger's method by name
# ex. `self.local.info`
method = getattr(self.local, level.lower())
method(message)
def _on_disconnect(self):
self.local.warning("Remote logging client disconnected!")
def debug(self, message):
self.log("debug", message)
# Same for info, warning, ...
...
And its usage:
import asyncio
import bottom
import logging
from irc_logging import Logger
local_logger = logging.getLogger(__name__)
client = bottom.Client(...)
remote_logger = Logger(client, local_logger)
@client.on("client_connect")
def log_connect(**kwargs):
remote_logger.info("Connected!")
# Connect and send "INFO: Connected!"
asyncio.create_task(client.connect())
asyncio.get_event_loop().run_forever()
Notice that the logging functionality is part of a different object, not the client. This keeps the namespace clean, and reduces the attribute contention that can occur when multiple plugins store their information directly on the client instance.
This line hooked the logger’s disconnect handler to the client:
def __init__(self, client, ...):
...
client.on("client_disconnect", self._on_disconnect)
Pattern matching¶
We can write a simple wrapper class to annotate functions to handle PRIVMSG matching a regex. To keep the interface simple, we can use bottom’s annotation pattern and pass the regex to match.
In the following example, we’ll define a handler that echos whatever a user asks for, if it’s in the correct format:
import bottom
client = bottom.Client(host=host, port=port, ssl=ssl)
router = Router(client)
@router.route("^bot, say (\w+)\.$")
def echo(self, nick, target, message, match, **kwargs):
if target == router.nick:
# respond in a direct message
target = nick
client.send("privmsg", target=target, message=match.group(1))
Now, the Router class needs to manage the regex -> handler mapping and connect an event handler to PRIVMSG on its client:
import asyncio
import functools
import re
class Router(object):
def __init__(self, client):
self.client = client
self.routes = {}
client.on("PRIVMSG")(self._handle)
def _handle(self, nick, target, message, **kwargs):
""" client callback entrance """
for regex, (func, pattern) in self.routes.items():
match = regex.match(message)
if match:
self.client.loop.create_task(func(nick, target, message, match, **kwargs))
def route(self, pattern, func=None, **kwargs):
if func is None:
return functools.partial(self.route, pattern)
# Decorator should always return the original function
wrapped = func
if not asyncio.iscoroutinefunction(wrapped):
wrapped = asyncio.coroutine(wrapped)
compiled = re.compile(pattern)
self.routes[compiled] = (wrapped, pattern)
return func
Wait for any events¶
Use Client.wait()
to pause until one or all signals have fired. For example, after sending NICK/USER during
CLIENT_CONNECT, some servers will ignore subsequent commands until they have finished sending RPL_ENDOFMOTD. This
can be used to wait for any signal that the MOTD has been sent (eg. ERR_NOMOTD may be sent instead of RPL_ENDOFMOTD).
import asyncio
def waiter(client):
async def wait_for(*events, return_when=asyncio.FIRST_COMPLETED):
if not events:
return
done, pending = await asyncio.wait(
[bot.wait(event) for event in events],
return_when=return_when)
# Cancel any events that didn't come in.
for future in pending:
future.cancel()
return wait_for
To use in the CLIENT_CONNECT process:
import bottom
client = bottom.Client(...)
wait_for = waiter(client)
@client.on("CLIENT_CONNECT")
async def on_connect(**kwargs):
client.send('nick', ...)
client.send('user', ...)
await wait_for('RPL_ENDOFMOTD', 'ERR_NOMOTD')
client.send('join', ...)
Send and trigger raw messages¶
New in version 2.1.0.
Extensions do not need to strictly conform to rfc 2812.
You can send or trigger custom messages with Client.send_raw
and
Client.handle_raw
. For example, the following can be used to request
Twitch.tv’s Membership capability using IRC v3’s capabilities registration:
client = MyTwitchClient(...)
client.send_raw("CAP REQ :twitch.tv/membership")
Just as Client.trigger
can be used to manually invoke handlers for a specific
event, Client.handle_raw
can be called to manually invoke raw handlers for a
given message. For the above example, you can ensure you handle the response from
Twitch.tv with the following:
response = ":tmi.twitch.tv CAP * ACK :twitch.tv/membership"
client = MyTwitchClient(...)
client.handle_raw(response)
Raw handlers¶
New in version 2.1.0.
Clients can extend or replace the default message handler by
modifying the Client.raw_handlers
list. This is a list of async
functions that take a (next_handler, message)
tuple. To allow
the next handler to process a message, call next_handler(message)
within your handler. You may also send a different message to the subsequent
handler, or not invoke it at all.
The following listens for responses from twitch.tv about capabilities and logs them. Otherwise, it passes the message on to the next handler.
import re
CAPABILITY_RESPONSE_PATTERN = re.compile(
"^:tmi\.twitch\.tv CAP \* ACK :twitch\.tv/\w+$")
async def capability_handler(next_handler, message):
if CAPABILITY_RESPONSE_PATTERN.match(message):
print("Capability granted: " + message)
else:
await next_handler(message)
And to ensure it runs before the default handler:
client = Client(...)
client.raw_handlers.insert(0, capability_handler)
Unlike Client.on
, raw handlers must be async functions.
Handlers may send a different message than they receive. The following can be used to forward messages from one chat room to another:
from bottom.pack import pack_command
from bottom.unpack import unpack_command
def forward(old_room, new_room):
async def handle(next_handler, message):
try:
event, kwargs = unpack_command(message)
except ValueError:
# pass message unchanged
pass
else:
if event.lower() == "privmsg":
if kwargs["target"].lower() == old_room.lower():
kwargs["target"] = new_room
message = pack_command("privmsg", **kwargs)
await next_handler(message)
return handle
And its usage:
client = Client(...)
forwarding = forward("bottom-legacy", "bottom-dev")
client.raw_handlers.insert(0, forwarding)
Full message encryption¶
This is a more complex example of a raw handler where messages are encrypted and then base64 encoded. On the wire their only similarity with the IRC protocol is a newline terminating character. This is enough to build an extension to transparently encrypt data.
Assume you have implemented a class with the following interface:
class EncryptionContext:
def encrypt(self, data: bytes) -> bytes:
...
def decrypt(self, data: bytes) -> bytes:
...
the following extension can be written:
import base64
def encryption_handler(context: EncryptionContext):
async def handle_decrypt(next_handler, message):
message = context.decrypt(
base64.b64decode(
message.encode("utf-8")
)
).decode("utf-8")
await next_handler(message)
return handle_decrypt
to encrypt messages as they are sent, the class can override
Client.send_raw
. Adding in the encryption handler above:
class EncryptedClient(Client):
def __init__(self, encryption_context, **kwargs):
super().__init__(**kwargs)
self.raw_handlers.append(
encryption_handler(encryption_context))
self.context = encryption_context
def send_raw(self, message: str) -> None:
message = base64.b64encode(
self.context.encrypt(
message.encode("utf-8")
)
).decode("utf-8")
super().send_raw(message)
Development¶
Versioning and RFC2812¶
Bottom follows semver for its public API.
Currently,
Client
is the only public member of bottom.IRC replies/codes which are not yet implemented may be added at any time, and will correspond to a patch - the function contract of
@on
method does not change.You should not rely on the internal api staying the same between minor versions.
Over time, private apis may be raised to become public. The reverse will never occur.
Contributing¶
Contributions welcome! Please make sure tox
passes (including flake8 and
docs build) before submitting a PR.
Pull requests that decrease coverage will not be merged.
Development¶
bottom uses tox
, pytest
, coverage
, and flake8
. To get
everything set up in a new virtualenv:
git clone https://github.com/numberoverzero/bottom.git
cd bottom
python3.10 -m venv --copies .venv
source .venv/bin/activate
pip install -r requirements.txt
pip install -e .
tox
Documentation¶
Documentation improvements are especially appreciated. For small changes, open a pull request! If there’s an area you feel is lacking and will require more than a small change, open an issue to discuss the problem - others are probably also confused, and may have suggestions to improve the same area.
TODO¶
Better
Client
docstringsAdd missing replies/errors to
unpack.py:unpack_command
Add reply/error parameters to
unpack.py:parameters
Document events, client.send