API

bottom is designed to be a small library, and structured so that you (hopefully!) feel comfortable jumping in and reading the source if you’re not sure how something works. See the Internals section below for some help navigating the codebase, or the Development section to set up the development environment for bottom.

Public API

class bottom.Client(host: str, port: int, *, encoding: str = 'utf-8', ssl: bool | SSLContext = True, serializer: CommandSerializer | None = None)[source]
__init__(host: str, port: int, *, encoding: str = 'utf-8', ssl: bool | SSLContext = True, serializer: CommandSerializer | None = None) None[source]

Create a new client that can interact with an IRC server.

The client does not automatically connect. Instead, use connect

Parameters:
  • host – the server’s host

  • port – the server’s port

  • encoding – the encoding to use when converting from bytes over the wire. This is almost always “utf-8”

  • sslTrue to create an ssl context, False to not use ssl, or provide your own ssl context.

  • serializer – A command serializer that processes the kwargs from Client.send into a string that is sent through the client’s protocol. Defaults to a global serializer that knows RFC2812 commands.

async connect() None

Connect to the server.

On successful connection, triggers a "client_connect" event.

Returns immediately if the client already has a non-closing connection. When multiple connect calls are in progress at once, only the first call to establish a connection will trigger a "client_connect" and the others will silently disconnect their parallel connections to the server.

Usage:

async def main():
    await client.connect()
    try:
        await client.wait("client_disconnect")
    except asyncio.CancelledError:
        await client.disconnect()
asyncio.run(main())

@client.on("client_disconnect")
async def reconnect(**kwargs):
    # don't reconnect immediately
    await asyncio.sleep(3)

    await client.connect()
    # Now that we're connected, let everyone know
    await client.send("privmsg", target=client.channel, message="I'm back.")

Or to schedule the connect without blocking:

from bottom.util import create_task

@client.on("client_disconnect")
async def reconnect(**kwargs):
    async def run():
        await asyncio.sleep(3)
        await client.connect()

    # note: you could use asyncio.create_task(), but it may get gc'd if you don't keep a ref.
    #   bottom.util.create_task handles this for you.
    #   see: https://docs.python.org/3/library/asyncio-task.html#asyncio.create_task
    create_task(run())
    print("Reconnect scheduled.")
async disconnect() None

Disconnect from the server.

On successful disconnection, triggers a "client_disconnect" event.

Returns immediately if the client is already disconnected or disconnecting. When multiple connect calls are in progress at once, only the first call to disconnect the client will trigger a "client_disconnect".

Usage:

@client.on("privmsg")
async def disconnect_bot(nick, message, **kwargs):
    if nick == "myNick" and message == "disconnect:hunter2":
        await client.disconnect()
        logger.log("disconnected client.")
on(event: str, fn: Callable[[P], R] | None = None) Decorator[P, R] | Callable[[P], R]

Decorate a function to handle an event.

See Events for a list of supported rfc2812 events.

When an event is triggered, either by the default rfc2812 handler or by your code, all functions registered to that event are triggered. Your handlers should always accept **kwargs in case unexpected kwargs are included when the event is triggered.

Event names ignore leading space, trailing space, and case; both when registering and triggering. So " bEgIn  " is registered and triggered as "BEGIN". Since you can pass any event name, it’s easy to extend a client with your own signals.

You don’t need to unpack all arguments, but you should always include **kwargs to collect the rest:

@client.on("privmsg")
async def standby(message, **kwargs):
    if message == codeword:
        await execute_heist()

While you should prefer async handlers, it’s not required. Synchronous functions will be wrapped in an async handler so that the event loop is available, which means you can use asyncio.create_task without checking for a running loop:

@client.on("privmsg")
def handle(message, **kwargs):
    print(message)

@client.on("privmsg")
async def handle(message, **kwargs):
    await async_logger.log(message)

Note

The original function is returned, so you can chain decorators without introducing an async wrapper to your code:

def original(message, **kwargs):
    asyncio.create_task(f"saw msg {message}")

wrapped = client.on("privmsg")(original)
assert wrapped is original

Finally, you can trigger and catch your your own events. For example, to forward SIGINT:

import signal
signal.signal(
    signal.SIGINT,
    lambda *a: client.trigger("my.plugin.sigint")
)


@client.on("my.plugin.sigint")
async def handle(**kwargs):
    print("saw SIGINT")
    await send_farewells(client, db.get_friends_list())
    await client.disconnect()
async send(command: Literal['pass'], *, password: str, **kwargs: Any) None[source]
async send(command: Literal['nick'], *, nick: str, **kwargs: Any) None
async send(command: Literal['user'], *, nick: str, mode: int = 0, realname: str, **kwargs: Any) None
async send(command: Literal['oper'], *, nick: str, password: str, **kwargs: Any) None
async send(command: Literal['usermode'], *, nick: str | None = None, modes: str | None = None, **kwargs: Any) None
async send(command: Literal['service'], *, nick: str, distribution: str, type: str, info: str, **kwargs: Any) None
async send(command: Literal['quit'], *, message: str | None = None, **kwargs: Any) None
async send(command: Literal['squit'], *, server: str, message: str | None = None, **kwargs: Any) None
async send(command: Literal['join'], *, channel: str | Iterable[str], key: str | Iterable[str] | None = None, **kwargs: Any) None
async send(command: Literal['part'], *, channel: str | Iterable[str], message: str | None = None, **kwargs: Any) None
async send(command: Literal['channelmode'], *, channel: str, params: str | Iterable[str] | None = None, **kwargs: Any) None
async send(command: Literal['topic'], *, channel: str, message: str | None = None, **kwargs: Any) None
async send(command: Literal['names'], *, channel: str | Iterable[str] | None = None, target: str | None = None, **kwargs: Any) None
async send(command: Literal['list'], *, channel: str | Iterable[str] | None = None, target: str | None = None, **kwargs: Any) None
async send(command: Literal['invite'], *, nick: str, channel: str, **kwargs: Any) None
async send(command: Literal['kick'], *, nick: str | Iterable[str], channel: str | Iterable[str], message: str | None = None, **kwargs: Any) None
async send(command: Literal['privmsg'], *, target: str, message: str, **kwargs: Any) None
async send(command: Literal['notice'], *, target: str, message: str, **kwargs: Any) None
async send(command: Literal['motd'], *, target: str | None = None, **kwargs: Any) None
async send(command: Literal['lusers'], *, mask: str | None = None, target: str | None = None, **kwargs: Any) None
async send(command: Literal['version'], *, target: str | None = None, **kwargs: Any) None
async send(command: Literal['stats'], *, query: str | None = None, target: str | None = None, **kwargs: Any) None
async send(command: Literal['links'], *, mask: str | None = None, remote: str | None = None, **kwargs: Any) None
async send(command: Literal['time'], *, target: str | None = None, **kwargs: Any) None
async send(command: Literal['connect'], *, target: str, port: int, remote: str | None = None, **kwargs: Any) None
async send(command: Literal['trace'], *, target: str | None = None, **kwargs: Any) None
async send(command: Literal['admin'], *, target: str | None = None, **kwargs: Any) None
async send(command: Literal['info'], *, target: str | None = None, **kwargs: Any) None
async send(command: Literal['servlist'], *, mask: str | None = None, type: str | None = None, **kwargs: Any) None
async send(command: Literal['squery'], *, target: str, message: str, **kwargs: Any) None
async send(command: Literal['who'], *, mask: str | None = None, o: bool | None = None, **kwargs: Any) None
async send(command: Literal['whois'], *, mask: str | Iterable[str], target: str | None = None, **kwargs: Any) None
async send(command: Literal['whowas'], *, nick: str | Iterable[str], count: int | None = None, target: str | None = None, **kwargs: Any) None
async send(command: Literal['kill'], *, nick: str, message: str, **kwargs: Any) None
async send(command: Literal['ping'], *, message: str, target: str | None = None, **kwargs: Any) None
async send(command: Literal['pong'], *, message: str | None = None, **kwargs: Any) None
async send(command: Literal['away'], *, message: str | None = None, **kwargs: Any) None
async send(command: Literal['rehash'], **kwargs: Any) None
async send(command: Literal['die'], **kwargs: Any) None
async send(command: Literal['restart'], **kwargs: Any) None
async send(command: Literal['summon'], *, nick: str, target: str | None = None, channel: str | None = None, **kwargs: Any) None
async send(command: Literal['users'], *, target: str | None = None, **kwargs: Any) None
async send(command: Literal['wallops'], *, message: str | None = None, **kwargs: Any) None
async send(command: Literal['userhost'], *, nick: str | Iterable[str], **kwargs: Any) None
async send(command: Literal['ison'], *, nick: str | Iterable[str], **kwargs: Any) None
async send(command: str, **kwargs: Any) None

Send a message to the server.

await client.send("privmsg", target="n/0", message="it works!")
await client.send("privmsg", target="#mychan", message="hello, world")

await client.send("join", target="#mychan")
await client.send("part", target="#mychan")

See Commands for the list of commands supported by default.

To add your own commands to the global default serializer, use:

from bottom import register_pattern
register_pattern("MYCOMMAND", "MYCOMMAND {some} {args} :{here}")

To add commands to this client’s serializer, use:

client._serializer.register("MYCOMMAND", "MYCOMMAND {some} {args} :{here}")

See also: CommandSerializer

async send_message(message: str) None

Send a complete IRC line without modification.

To easily send an rfc 2812 message, consider Client.send

import base64

async def send_encoded(image: bytes):
    encoded_str = base64.b64encode(image).decode()
    await client.send_message(f"IMG :{encoded_str}")
trigger(event: str, **kwargs: Any) Task

Manually trigger an event, either a supported rfc2812 command or a custom event name.

Trigger returns a task which you can await to block until all registered handlers for the event have completed. You do not have to wait for this task or keep a reference to it.

For example, if you migrate to a third-party extension that expects “!help” but your original command was “!commands” then you can use the following handler to inform users and forward to the new handler:

@client.on("privmsg")
async def help_compat(nick, target, message, **kwargs):
    if message != "!commands": return

    # notify
    await client.send(
        "privmsg", target=nick,
        message="note: !commands was renamed to !help in 1.2")

    # forward
    client.trigger(
        "privmsg", nick=nick,
        target=target, message="!help")

Because the @on decorator returns the original function, you can register a handler for multiple events. It’s especially important to use **kwargs to handle different keywords for each event:

@client.on("privmsg")
@client.on("join")
@client.on("part")
async def handle(target, message=None, channel=None, **kwargs):
    if channel:
        client.trigger("my.plugin.events.channel", channel=channel, **kwargs)
    elif message:
        client.trigger("my.plugin.events.messages", message=message, **kwargs)

If you want to trigger an event and then wait until all handlers for that event have run, you can await the returned task:

complete = []

@client.on("my.event")
async def fast_processor(data: str, **kwargs):
    await asyncio.sleep(1)
    complete.append(f"fast {data}")

@client.on("my.event")
async def slow_processor(data: str, **kwargs):
    await asyncio.sleep(2)
    complete.append(f"slow {data}")

@client.on("part")
async def handle_part(channel: str, **kwargs):
    complete.clear()
    await client.trigger("my.event", data=channel)
    assert complete == [f"slow {channel}", f"fast {channel}"]
async wait(event: str) dict

Wait for an event to be triggered. Returns a dict including the kwargs the event was triggered with, as well as the name of the event in the "__event__" key.

See Events for a list of supported rfc2812 events.

Useful for blocking an async task until an event occurs, like join or disconnect:

async def reconnect():
    while True:
        print("waiting for disconnect event")
        await client.wait("client_disconnect")

        print("reconnecting...")
        await client.connect()
        await client.send("nick", nick="mybot")
        await client.send("pass", password="hunter2")

        print("reconnected!")

You can inspect the kwargs that the event was triggered with, and get the event name from "__event__":

async def wait_join():
    client.join(channel="#chan")
    kwargs = await client.wait("join")

    assert kwargs["__event__"] == "join"
    print(f"complete join event: {kwargs}")

You can use asyncio.wait_for() or asyncio.timeout() to add a timeout to your wait:

async def connect()
    try:
        with asyncio.timeout(5):
            await client.connect()
    except TimeoutError:
        print("failed to connect within 5 seconds, is the server available?")

Use wait_for to wait for multiple events, either the first to complete or all.

message_handlers: list[ClientMessageHandler]

List of message handlers that runs on each incoming IRC line from the server.

The first handler is passed the next_handler in the chain as well as the client and message. The handler can choose to process the message, and/or invoke the next handler, or do nothing.

The basic structure of a handler is:

from bottom import Client, NextHandler

async def handle_message(next_handler: NextHandler[Client], client: Client, message: bytes) -> None:
    print("before")
    await next_handler(client, message)
    print("after")

By default, every client is configured with a rfc2812_handler which unpacks supported rfc2812 commands into events and triggers them, connecting them to handlers you’ve registered with Client.on

You can disable the default functionality by removing that handler:

from bottom import Client
client = Client(host="localhost", port=443)
client.message_handlers.clear()

You can add your own handlers before or after this one, or replace it:

from bottom import Client, NextMessageHandler

async def print_everything(next_handler: NextMessageHandler[Client], client: Client, message: bytes) -> None:
    print(f"incoming message: {message.decode()}")
    await next_handler(client, message)

client = Client(host="localhost", port=443)
# run after other handlers
client.message_handlers.append(print_everything)

Message handlers don’t have to call the next handler, and don’t have to pass the same message to the next handler:

from bottom import Client, NextMessageHandler

async def uno_handler(next_handler: NextMessageHandler[Client], client: Client, message: bytes) -> None:
    if message.startswith(b"reverse:"):
        message = message[len(b"reverse:"):]
        print(f"reversing {message.decode()}")
        await next_handler(client, message[::-1])
    elif message.startswith(b"skip:"):
        message = message[len(b"skip:"):]
        print(f"skipping: {message.decode()}")
    else:
        print("passing message through unchanged")
        await next_handler(client, message)

# run before other handlers
client.message_handlers.insert(0, uno_handler)

Modifying message_handlers is the primary way to extend or customize the client. For examples of writing your own router, or replacing these handlers, see the extensions section of the user guide.

Note

Each incoming message is handled using a copy of the handlers when the message arrived. Changes to the list do not affect handling of that message:

async def remove_other_handlers(next_handler, client, message):
    client.message_handlers.clear()
    await next_handler(client, message)  # still uses original handlers
class bottom.CommandSerializer(formatters: dict[str, Callable[[str, Any], Any]] | None = None)[source]

A mapping of command names to a list of templates (dict[str, list[SerializerTemplate]]) that can apply custom formatting to each parameter.

When a command registers more than one template, then during serialization they are tried in order from most to least parameters, until the provided params are sufficient. When two templates for the same command have the same number of arguments, they are tried in the order that they were registered.

For example, the LIST command is implemented as follows:

serializer.register("LIST", "LIST {channel:comma} {target}")
serializer.register("LIST", "LIST {channel:comma}")
serializer.register("LIST", "LIST")

See: serialize

__init__(formatters: dict[str, Callable[[str, Any], Any]] | None = None) None[source]
Parameters:

formatters – dict of functions that can be referenced in templates. See formatters

register(command: str, template: str | SerializerTemplate) SerializerTemplate[source]

Register a template to a command. Each command may have more than one template. When serializing a command, the templates for that command will be tried in order from most -> least parameters until one matches. If mutliple templates are registered with the same number of arguments, they will be tried in the order they were registered:

serializer = CommandSerializer()
serializer.register("foo", "{one} -> {two}")
serializer.register("foo", "{two} <- {one}")

print(serializer.serializer("foo", {"one": "A", "two": "B"}))
# prints A -> B

Returns the prepared template.

serialize(command: str, params: dict[str, Any]) str[source]

Render a dict into a command. This is like string.format():

"{greet}, {name}!".format({"greet": "hello", "name": "world"})

serializer.serialize("greeting", {"greet": "hello", "name": "world"})

Unlike string.format, each command may have more than one template. When serializing a command, the templates for that command will be tried in order from most -> least parameters until one matches. If mutliple templates are registered with the same number of arguments, they will be tried in the order they were registered:

serializer = CommandSerializer()
serializer.register("foo", "{one} -> {two}")
serializer.register("foo", "{two} <- {one}")

print(serializer.serializer("foo", {"one": "A", "two": "B"}))
# prints A -> B
formatters: dict[str, Callable[[str, Any], Any]]

A dict of functions that can be referenced in templates. Each function should take the name of the param being replaced, and the value for that param. Its return value will be formatted into the template, or passed to the next function if there is a chain:

def upper(id: str, value: str) -> str:
    return value.upper()

def reverse(id: str, value: str) -> str:
    return value[::-1]

serializer = CommandSerializer(formatters={"up": upper, "rev": reverse})
template = "hello, {name:up|rev}!"
serializer.register("greet", template)

print(serializer.serialize("greet", {"name": "world"}))
# prints: hello, DLROW!

You may raise an error from a formatter as a way to guard values:

def not_admin(id: str, user: User) -> str:
    if user.is_admin:
        raise ValueError("can't format an admin")
    return user.info.as_irc_line()

template = "found user: {user|noadmin}"
serializer = CommandSerializer(formatters={"noadmin": not_admin})
templates: dict[str, list[SerializerTemplate]]

a mapping of command -> list[template] that this serializer knows.

you can use this to make a new command serializer from an existing one:

from bottom.irc.serialize import GLOBAL_SERIALIZER
from copy import deepcopy

my_serializer = deepcopy(GLOBAL_SERIALIZER)
del my_serializer["AWAY"]
my_serializer.register("AWAY", "AWAY {my} {args}")
class bottom.SerializerTemplate(_components: tuple[Component, ...], original: str, params: tuple[str, ...])[source]

Note

Do not instantiate SerializerTemplate directly. Use SerializerTemplate.parse instead.

This is an optimized version of string.format() that can apply custom formatting functions:

def upper(id: str, value: str) -> str:
    return value.upper()

def reverse(id: str, value: str) -> str:
    return value[::-1]

template = "hello, {name:up|rev}!"
tpl = SerializedTemplate.parse(template, formatters={"up": upper, "rev": reverse})

print(tpl.format("greet", {"name": "world"}))
# prints: hello, DLROW!
classmethod parse(template: str, formatters: dict[str, Callable[[str, Any], Any]] | None = None) SerializerTemplate[source]

Parses a provided string into a SerializedTemplate, an optimized representation of the template for future rendering. If provided, the formatters dict is used to look up any custom formatters referenced from the template string:

def upper(id: str, value: str) -> str:
    return value.upper()

def reverse(id: str, value: str) -> str:
    return value[::-1]

template = "hello, {name:up|rev}!"
tpl = SerializedTemplate.parse(template, formatters={"up": upper, "rev": reverse})

print(tpl.format("greet", {"name": "world"}))
# prints: hello, DLROW!
format(params: dict[str, Any], is_filtered: bool = False, wrap_exc: bool = False) str[source]

Similar to string.format():

template = SerializerTemplate.parse("{one} + {two} = {three}")

params = {"one": "A", "two": "B", "three": "C"}
print(template.format(params))
# prints: A + B = C
Parameters:
  • params – the values to render into the template

  • filtered – when False, removes any params whose value is None. pass True if you have already done this, or if you want to pass explicit None values into the dict.

  • wrap_exc – when True, any exceptions from formatter functions is wrapped in a FormattingError, whose cause attribute is the underlying error.

original: str

The original str this template was parsed from:

src = "{foo} {bar}"
assert SerializerTemplate.parse(src).original == src
params: tuple[str, ...]

unique names of the placeholders used in the string.

ordered by first appearance left to right:

src = "{foo},{bar},{foo}"
tpl = SerializerTemplate.parse(src)
assert tpl.params == ("foo", "bar")
score: int

The number of unique arguments, not the number of replacements that occur:

first = SerializerTemplate.parse("{foo}")
second = SerializerTemplate.parse("{foo}{foo}{foo}")
assert first.score == second.score
async bottom.wait_for(client: Client, events: list[str], *, mode: Literal['first', 'all'] = 'first') list[dict][source]

Wait for one or all of the events to happen, depending on mode.

The results are the dicts that each event was triggered with, and the event name stored in the key "__event__"

When waiting for the first event, note that more than one may trigger:

from bottom import wait_for

async def on_first():
    completed = await wait_for(
        client,
        ["RPL_ENDOFMOTD", "ERR_NOMOTD"],
        mode="first"
    )
    names = [o["__event__"] for o in completed]
    print(f"first task(s) done: {names}")

When waiting for all, the return order is the same as the input order, not necessarily the completion order:

async def all_events():
    completed = await wait_for(
        client,
        ["RPL_MOTDSTART", "RPL_MOTD", "RPL_ENDOFMOTD"],
        mode="all"
    )
    print("collected whole MOTD")
    names = [o["__event__"] for o in completed]
    assert names == ["RPL_MOTDSTART", "RPL_MOTD", "RPL_ENDOFMOTD"]
bottom.register_pattern(command: str, template: str | SerializerTemplate, serializer: CommandSerializer | None = None) SerializerTemplate[source]

register a template for the given command into the provided serializer (default: global). this is a thin wrapper around CommandSerializer.register

see also: Custom Serialization.

bottom.NextMessageHandler

Type hint for an async function that takes a message to process.

This is the type of the first argument in a message handler:

from bottom import Client, ClientMessageHandler, NextMessageHandler

class MyClient(Client):
    pass

async def handle_message(next_handler: NextMessageHandler[MyClient], client: MyClient, message: bytes):
    print(f"I saw a message: {message.decode()}")
    await next_handler(client, message)

see message_handlers for details, or Extensions for examples of customizing a Client’s functionality.

bottom.ClientMessageHandler

Type hint for an async function that processes a message, and may call the next handler in the chain.

This is the type of the entire message handler:

from bottom import Client, ClientMessageHandler, NextMessageHandler

class MyClient(Client):
    pass

async def handle_message(next_handler: NextMessageHandler[MyClient], client: MyClient, message: bytes):
    print(f"I saw a message: {message.decode()}")
    await next_handler(client, message)

handler: ClientMessageHandler[MyClient] = handle_message

see message_handlers for details, or Extensions for examples of customizing a Client’s functionality.

Internal

If you want to understand how something works internally, you’re welcome to open an issue to discuss, or you can review the source code. Here are some general pointers to help with the latter:

Outgoing Messages

Client.send(**kwargs) -> Protocol.write outgoing messages are packed from **kwargs to a single IRC line.

  1. Start at bottom.Client.send()

  2. Check out src/bottom/pack.py::pack_command

  3. For each command, kwargs are usually looked up in one of the helpers f or b

  4. The packed line is sent through src/bottom/core.py::Protocol.write

Incoming Messages

Protocol.data_received -> Client.on(...) incoming messages are unpacked from an IRC line into a dict.

  1. Each incoming line is passed through the Client.message_handlers list

  2. This is connected to the Protocol in src/bottom/core.py::make_protocol_factory

  3. The chaining and implementation of next_handler is in src/bottom/util.py::stack_process which passes its own next_processor function into the handlers in order

  4. The public Client has a default handler at src/bottom/irc/__init__.py::rfc2812_handler which calls unpack_command to unpack a dict, then calls Client.trigger to schedule a task to invoke any handlers annotated with Client.on

  5. In src/bottom/unpack.py::unpack_command the broad structure of an IRC line is split with a regex, then aliases are resolved (see: synonym) and then kwargs is built up according to canonical command name.

Connection State

Protocol manages the connection state.

  1. A protocol is created in Client.connect which makes a new protocol_factory in src/bottom/core.py::make_protocol_factory. Note that the Protocol needs to know how to surface two things, and those are both defined in the factory function: (1) what to call when the connection is lost, and (2) what to call when a full inbound IRC line is ready. Neither of these is defined inside the Client – the coupling is done inside the factory function.

  2. Incoming data is chunked in src/bottom/core.py::Protocol.data_received and outgoing data passes through Protocol.write

  3. Because there are a number of ways to close a connection (remote closes, we close, connection dropped) it’s possible for one close call at one level to pass down through the layers and then propagate back up again. To avoid emitting double "client_disconnect" events, the closing process needs to maintain two properties: (1) Any close call must clean up underlying resources, if they exist and (2) Any close call must not re-trigger handlers above in a higher abstraction. This means that a lower level handler must cleanly handle the case that it receives a close() call while it is already closed or closing.