hikari.api.event_manager#
Core interface for components that manage events in the library.
Module Contents#
- class hikari.api.event_manager.EventManager[source]#
Bases:
abc.ABC
Base interface for event manager implementations.
This is a listener of a
hikari.events.base_events.Event
object and consumer of raw event payloads, and is expected to invoke one or more corresponding event listeners where appropriate.- abstract consume_raw_event(event_name, shard, payload)[source]#
Consume a raw event.
- Parameters:
- event_name
str
The case-insensitive name of the event being triggered.
- shard
hikari.api.shard.GatewayShard
Object of the shard that received this event.
- payload
hikari.internal.data_binding.JSONObject
Payload of the event being triggered.
- event_name
- Raises:
LookupError
If there is no consumer for the event.
- abstract dispatch(event)[source]#
Dispatch an event.
- Parameters:
- event
hikari.events.base_events.Event
The event to dispatch.
- event
- Returns:
asyncio.Future
[typing.Any
]A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later.
See also
Examples
We can dispatch custom events by first defining a class that derives from
hikari.events.base_events.Event
.import attrs from hikari.traits import RESTAware from hikari.events.base_events import Event from hikari.users import User from hikari.snowflakes import Snowflake @attrs.define() class EveryoneMentionedEvent(Event): app: RESTAware = attrs.field() author: User = attrs.field() '''The user who mentioned everyone.''' content: str = attrs.field() '''The message that was sent.''' message_id: Snowflake = attrs.field() '''The message ID.''' channel_id: Snowflake = attrs.field() '''The channel ID.'''
We can then dispatch our event as we see fit.
from hikari.events.messages import MessageCreateEvent @bot.listen(MessageCreateEvent) async def on_message(event): if "@everyone" in event.content or "@here" in event.content: event = EveryoneMentionedEvent( author=event.author, content=event.content, message_id=event.id, channel_id=event.channel_id, ) bot.dispatch(event)
This event can be listened to elsewhere by subscribing to it with
EventManager.subscribe
.@bot.listen(EveryoneMentionedEvent) async def on_everyone_mentioned(event): print(event.user, "just pinged everyone in", event.channel_id)
- abstract get_listeners(event_type, /, *, polymorphic=True)[source]#
Get the listeners for a given event type, if there are any.
- Parameters:
- event_type
typing.Type
[T
] The event type to look for.
T
must be a subclass ofhikari.events.base_events.Event
.- polymorphicbool
If
True
, this will also return the listeners for all the event typesevent_type
will dispatch. IfFalse
, then only listeners for this class specifically are returned. The default isTrue
.
- event_type
- Returns:
typing.Collection
[typing.Callable
[[T
],typing.Coroutine
[typing.Any
,typing.Any
,None
]]]A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered.
- abstract listen(*event_types)[source]#
Generate a decorator to subscribe a callback to an event type.
This is a second-order decorator.
- Parameters:
- *event_types
typing.Optional
[typing.Type
[T
]] The event types to subscribe to. The implementation may allow this to be undefined. If this is the case, the event type will be inferred instead from the type hints on the function signature.
T
must be a subclass ofhikari.events.base_events.Event
.
- *event_types
- Returns:
typing.Callable
[[T
],T
]A decorator for a coroutine function that passes it to
EventManager.subscribe
before returning the function reference.
See also
- abstract stream(event_type, /, timeout, limit=None)[source]#
Return a stream iterator for the given event and sub-events.
Warning
If you use
await stream.open()
to start the stream then you must also close it withawait stream.close()
otherwise it may queue events in memory indefinitely.- Parameters:
- event_type
typing.Type
[hikari.events.base_events.Event
] The event type to listen for. This will listen for subclasses of this type additionally.
- timeout
typing.Optional
[int
,float
] How long this streamer should wait for the next event before ending the iteration. If
None
then this will continue until explicitly broken from.- limit
typing.Optional
[int
] The limit for how many events this should queue at one time before dropping extra incoming events, leave this as
None
for the cache size to be unlimited.
- event_type
- Returns:
EventStream
[hikari.events.base_events.Event
]The async iterator to handle streamed events. This must be started with
with stream:
orstream.open()
before asynchronously iterating over it.
See also
Examples
with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream: async for user_id in stream.map("user_id").limit(50): ...
or using
open()
andclose()
stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) stream.open() async for user_id in stream.map("user_id").limit(50) ... stream.close()
- abstract subscribe(event_type, callback)[source]#
Subscribe a given callback to a given event type.
- Parameters:
- event_type
typing.Type
[T
] The event type to listen for. This will also listen for any subclasses of the given type.
T
must be a subclass ofhikari.events.base_events.Event
.- callback
Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.
- event_type
See also
Examples
The following demonstrates subscribing a callback to message creation events.
from hikari.events.messages import MessageCreateEvent async def on_message(event): ... bot.subscribe(MessageCreateEvent, on_message)
- abstract unsubscribe(event_type, callback)[source]#
Unsubscribe a given callback from a given event type, if present.
- Parameters:
- event_type
typing.Type
[T
] The event type to unsubscribe from. This must be the same exact type as was originally subscribed with to be removed correctly.
T
must derive fromhikari.events.base_events.Event
.- callback
The callback to unsubscribe.
- event_type
See also
Examples
The following demonstrates unsubscribing a callback from a message creation event.
from hikari.events.messages import MessageCreateEvent async def on_message(event): ... bot.unsubscribe(MessageCreateEvent, on_message)
- abstract async wait_for(event_type, /, timeout, predicate=None)[source]#
Wait for a given event to occur once, then return the event.
Warning
Async predicates are not supported.
- Parameters:
- event_type
typing.Type
[hikari.events.base_events.Event
] The event type to listen for. This will listen for subclasses of this type additionally.
- predicate
A function taking the event as the single parameter. This should return
True
if the event is one you want to return, orFalse
if the event should not be returned. If left asNone
(the default), then the first matching event type that the bot receives (or any subtype) will be the one returned.- timeout
typing.Union
[float
,int
,None
] The amount of time to wait before raising an
asyncio.TimeoutError
and giving up instead. This is measured in seconds. IfNone
, then no timeout will be waited for (no timeout can result in “leaking” of coroutines that never complete if called in an uncontrolled way, so is not recommended).
- event_type
- Returns:
hikari.events.base_events.Event
The event that was provided.
- Raises:
asyncio.TimeoutError
If the timeout is not
None
and is reached before an event is received that the predicate returnsTrue
for.
See also
- class hikari.api.event_manager.EventStream[source]#
Bases:
hikari.iterators.LazyIterator
[hikari.events.base_events.EventT
],abc.ABC
A base abstract class for all event streamers.
Unlike
hikari.iterators.LazyIterator
(which this extends), an event streamer must be started and closed.See also
LazyIterator
Examples
A streamer may either be started and closed using
with
syntax whereEventStream.open
andEventStream.close
are implicitly called based on context.with EventStream(app, EventType, timeout=50) as stream: async for entry in stream: ...
A streamer may also be directly started and closed using the
EventStream.close
andEventStream.open
. Note that if you don’t callEventStream.close
after opening a streamer when you’re finished with it then it may queue events events in memory indefinitely.stream = EventStream(app, EventType, timeout=50) await stream.open() async for event in stream: ... await stream.close()
- abstract close()[source]#
Mark this streamer as closed to stop it from queueing and receiving events.
If called on an already closed streamer then this will do nothing.
Note
with streamer
may be used as a short-cut for opening and closing a streamer.
- abstract filter(*predicates, **attrs)[source]#
Filter the items by one or more conditions.
Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested.
All conditions must evaluate to
True
for the item to be returned. If this is not met, then the item is discarded and ignored, the next matching item will be returned instead, if there is one.- Parameters:
- *predicates
typing.Union
[typing.Callable
[[ValueT
], bool],typing.Tuple
[str
,typing.Any
]] Predicates to invoke. These are functions that take a value and return
True
if it is of interest, orFalse
otherwise. These may instead include 2-tuple
objects consisting of astr
attribute name (nested attributes are referred to using the.
operator), and values to compare for equality. This allows you to specify conditions such asmembers.filter(("user.bot", True))
.- **attrs
typing.Any
Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
- *predicates
- Returns:
EventStream
[ValueT
]The current stream with the new filter applied.