hikari.impl.event_manager_base
#
A base implementation for an event manager.
EventManagerBase
#
EventManagerBase(
event_factory: EventFactory,
intents: Intents,
*,
cache_components: CacheComponents = config.CacheComponents.NONE
)
Bases: EventManager
Provides functionality to consume and dispatch events.
Specific event handlers should be in functions named on_xxx
where xxx
is the raw event name being dispatched in lower-case.
consume_raw_event
#
consume_raw_event(
event_name: str, shard: GatewayShard, payload: JSONObject
) -> None
Consume a raw event.
PARAMETER | DESCRIPTION |
---|---|
event_name |
The case-insensitive name of the event being triggered.
TYPE:
|
shard |
Object of the shard that received this event.
TYPE:
|
payload |
Payload of the event being triggered.
TYPE:
|
RAISES | DESCRIPTION |
---|---|
LookupError
|
If there is no consumer for the event. |
dispatch
#
Dispatch an event.
PARAMETER | DESCRIPTION |
---|---|
event |
The event to dispatch.
TYPE:
|
Examples:
We can dispatch custom events by first defining a class that
derives from hikari.events.base_events.Event
.
import attrs
from hikari.traits import RESTAware
from hikari.events.base_events import Event
from hikari.users import User
from hikari.snowflakes import Snowflake
@attrs.define()
class EveryoneMentionedEvent(Event):
app: RESTAware = attrs.field()
author: User = attrs.field()
'''The user who mentioned everyone.'''
content: str = attrs.field()
'''The message that was sent.'''
message_id: Snowflake = attrs.field()
'''The message ID.'''
channel_id: Snowflake = attrs.field()
'''The channel ID.'''
We can then dispatch our event as we see fit.
from hikari.events.messages import MessageCreateEvent
@bot.listen(MessageCreateEvent)
async def on_message(event):
if "@everyone" in event.content or "@here" in event.content:
event = EveryoneMentionedEvent(
author=event.author,
content=event.content,
message_id=event.id,
channel_id=event.channel_id,
)
bot.dispatch(event)
This event can be listened to elsewhere by subscribing to it with
hikari.api.event_manager.EventManager.subscribe
.
@bot.listen(EveryoneMentionedEvent)
async def on_everyone_mentioned(event):
print(event.user, "just pinged everyone in", event.channel_id)
RETURNS | DESCRIPTION |
---|---|
Future[Any]
|
A future that can be optionally awaited. If awaited, the future will complete once all corresponding event listeners have been invoked. If not awaited, this will schedule the dispatch of the events in the background for later. |
See Also
Listen : hikari.api.event_manager.EventManager.listen
.
Stream : hikari.api.event_manager.EventManager.stream
.
Subscribe : hikari.api.event_manager.EventManager.subscribe
.
Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe
.
Wait_for: hikari.api.event_manager.EventManager.wait_for
.
get_listeners
#
get_listeners(
event_type: Type[EventT], /, *, polymorphic: bool = True
) -> Collection[CallbackT[EventT]]
Get the listeners for a given event type, if there are any.
PARAMETER | DESCRIPTION |
---|---|
event_type |
The event type to look for.
TYPE:
|
polymorphic |
If
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
Collection[Callable[[T], Coroutine[Any, Any, None]]]
|
A copy of the collection of listeners for the event. Will return an empty collection if nothing is registered. |
listen
#
Generate a decorator to subscribe a callback to an event type.
This is a second-order decorator.
PARAMETER | DESCRIPTION |
---|---|
*event_types |
The event types to subscribe to. The implementation may allow this
to be undefined. If this is the case, the event type will be inferred
instead from the type hints on the function signature.
|
RETURNS | DESCRIPTION |
---|---|
Callable[[T], T]
|
A decorator for a coroutine function that passes it to
|
See Also
Dispatch : hikari.api.event_manager.EventManager.dispatch
.
Stream : hikari.api.event_manager.EventManager.stream
.
Subscribe : hikari.api.event_manager.EventManager.subscribe
.
Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe
.
Wait_for : hikari.api.event_manager.EventManager.wait_for
.
stream
#
stream(
event_type: Type[EventT],
/,
timeout: Union[float, int, None],
limit: Optional[int] = None,
) -> EventStream[EventT]
Return a stream iterator for the given event and sub-events.
Warning
If you use [await stream.open()][] to start the stream then you must also close it with [await stream.close()][] otherwise it may queue events in memory indefinitely.
PARAMETER | DESCRIPTION |
---|---|
event_type |
The event type to listen for. This will listen for subclasses of this type additionally. |
timeout |
How long this streamer should wait for the next event before
ending the iteration. If |
limit |
The limit for how many events this should queue at one time before
dropping extra incoming events, leave this as |
RETURNS | DESCRIPTION |
---|---|
EventStream[Event]
|
The async iterator to handle streamed events. This must be started
with |
Examples:
with bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id)) as stream:
async for user_id in stream.map("user_id").limit(50):
...
or using open()
and close()
stream = bot.stream(events.ReactionAddEvent, timeout=30).filter(("message_id", message.id))
stream.open()
async for user_id in stream.map("user_id").limit(50)
...
stream.close()
See Also
Dispatch : hikari.api.event_manager.EventManager.dispatch
.
Listen : hikari.api.event_manager.EventManager.listen
.
Subscribe : hikari.api.event_manager.EventManager.subscribe
.
Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe
.
Wait_for : hikari.api.event_manager.EventManager.wait_for
.
subscribe
#
Subscribe a given callback to a given event type.
PARAMETER | DESCRIPTION |
---|---|
event_type |
The event type to listen for. This will also listen for any
subclasses of the given type.
TYPE:
|
callback |
Must be a coroutine function to invoke. This should consume an instance of the given event, or an instance of a valid subclass if one exists. Any result is discarded.
TYPE:
|
Examples:
The following demonstrates subscribing a callback to message creation events.
from hikari.events.messages import MessageCreateEvent
async def on_message(event):
...
bot.subscribe(MessageCreateEvent, on_message)
See Also
Dispatch : hikari.api.event_manager.EventManager.dispatch
.
Listen : hikari.api.event_manager.EventManager.listen
.
Stream : hikari.api.event_manager.EventManager.stream
.
Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe
.
Wait_for : hikari.api.event_manager.EventManager.wait_for
.
unsubscribe
#
Unsubscribe a given callback from a given event type, if present.
PARAMETER | DESCRIPTION |
---|---|
event_type |
The event type to unsubscribe from. This must be the same exact
type as was originally subscribed with to be removed correctly.
TYPE:
|
callback |
The callback to unsubscribe.
TYPE:
|
Examples:
The following demonstrates unsubscribing a callback from a message creation event.
from hikari.events.messages import MessageCreateEvent
async def on_message(event):
...
bot.unsubscribe(MessageCreateEvent, on_message)
See Also
Dispatch : hikari.api.event_manager.EventManager.dispatch
.
Listen : hikari.api.event_manager.EventManager.listen
.
Stream : hikari.api.event_manager.EventManager.stream
.
Subscribe : hikari.api.event_manager.EventManager.subscribe
.
Wait_for : hikari.api.event_manager.EventManager.wait_for
.
wait_for
async
#
wait_for(
event_type: Type[EventT],
/,
timeout: Union[float, int, None],
predicate: Optional[PredicateT[EventT]] = None,
) -> EventT
Wait for a given event to occur once, then return the event.
Warning
Async predicates are not supported.
PARAMETER | DESCRIPTION |
---|---|
event_type |
The event type to listen for. This will listen for subclasses of this type additionally. |
predicate |
A function taking the event as the single parameter.
This should return
TYPE:
|
timeout |
The amount of time to wait before raising an |
RETURNS | DESCRIPTION |
---|---|
Event
|
The event that was provided. |
RAISES | DESCRIPTION |
---|---|
TimeoutError
|
See Also
Dispatch : hikari.api.event_manager.EventManager.dispatch
.
Listen : hikari.api.event_manager.EventManager.listen
.
Stream : hikari.api.event_manager.EventManager.stream
.
Subscribe : hikari.api.event_manager.EventManager.subscribe
.
Unsubscribe : hikari.api.event_manager.EventManager.unsubscribe
.
EventStream
#
EventStream(
event_manager: EventManager,
event_type: Type[EventT],
*,
timeout: Union[float, int, None],
limit: Optional[int] = None
)
Bases: EventStream[EventT]
An implementation of an event hikari.api.event_manager.EventStream
class.
Note
While calling hikari.impl.event_manager_base.EventStream.filter
on an active "opened" event stream
will return a wrapping lazy iterator, calling it on an inactive "closed"
event stream will return the event stream and add the given predicates
to the streamer.
awaiting
#
awaiting(window_size: int = 10) -> LazyIterator[ValueT]
Await each item concurrently in a fixed size window.
Warning
Setting a large window size, or setting it to 0 to await everything
is a dangerous thing to do if you are making API calls. Some
endpoints will get ratelimited and cause a backup of waiting
tasks, others may begin to spam global rate limits instead
(the fetch_user
endpoint seems to be notorious for doing this).
Note
This call assumes that the iterator contains awaitable values as
input. MyPy cannot detect this nicely, so any cast is forced
internally.
If the item is not awaitable, you will receive a
TypeError
instead.
You have been warned. You cannot escape the ways of the duck type
young grasshopper.
PARAMETER | DESCRIPTION |
---|---|
window_size |
The window size of how many tasks to await at once. You can set this
to
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LazyIterator[ValueT]
|
The new lazy iterator to return. |
chunk
#
chunk(chunk_size: int) -> LazyIterator[Sequence[ValueT]]
Return results in chunks of up to chunk_size
amount of entries.
PARAMETER | DESCRIPTION |
---|---|
chunk_size |
The limit for how many results should be returned in each chunk.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LazyIterator[Sequence[ValueT]]
|
|
close
#
Mark this streamer as closed to stop it from queueing and receiving events.
If called on an already closed streamer then this will do nothing.
Note
[with streamer][] may be used as a short-cut for opening and closing a streamer.
collect
async
#
collect(
collector: Callable[[Sequence[ValueT]], Collection[ValueT]]
) -> Collection[ValueT]
Collect the results into a given type and return it.
PARAMETER | DESCRIPTION |
---|---|
collector |
A function that consumes a sequence of values and returns a collection.
TYPE:
|
count
async
#
count() -> int
Count the number of results.
RETURNS | DESCRIPTION |
---|---|
int
|
Number of results found. |
enumerate
#
enumerate(*, start: int = 0) -> LazyIterator[Tuple[int, ValueT]]
Enumerate the paginated results lazily.
This behaves as an asyncio-friendly version of enumerate
which uses much less memory than collecting all the results first and
calling enumerate
across them.
PARAMETER | DESCRIPTION |
---|---|
start |
Optional int to start at. If omitted, this is
TYPE:
|
Examples:
>>> async for i, item in paginated_results.enumerate():
... print(i, item)
(0, foo)
(1, bar)
(2, baz)
(3, bork)
(4, qux)
>>> async for i, item in paginated_results.enumerate(start=9):
... print(i, item)
(9, foo)
(10, bar)
(11, baz)
(12, bork)
(13, qux)
>>> async for i, item in paginated_results.enumerate(start=9).limit(3):
... print(i, item)
(9, foo)
(10, bar)
(11, baz)
RETURNS | DESCRIPTION |
---|---|
LazyIterator[Tuple[int, T]]
|
A paginated results view that asynchronously yields an increasing counter in a tuple with each result, lazily. |
filter
#
Filter the items by one or more conditions.
Each condition is treated as a predicate, being called with each item that this iterator would return when it is requested.
All conditions must evaluate to True
for the item to be
returned. If this is not met, then the item is discarded and ignored,
the next matching item will be returned instead, if there is one.
PARAMETER | DESCRIPTION |
---|---|
*predicates |
Predicates to invoke. These are functions that take a value and
return
TYPE:
|
**attrs |
Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
EventStream[ValueT]
|
The current stream with the new filter applied. |
flat_map
#
flat_map(
flattener: _FlattenerT[ValueT, AnotherValueT]
) -> LazyIterator[AnotherValueT]
Perform a flat mapping operation.
This will pass each item in the iterator to the given function
parameter, expecting a new typing.Iterable
or typing.AsyncIterator
to be returned as the result. This means you can map to a new
hikari.iterators.LazyIterator
, typing.AsyncIterator
, typing.Iterable
,
async generator, or generator.
Remember that typing.Iterator
implicitly provides typing.Iterable
compatibility.
This is used to provide lazy conversions, and can be used to implement reactive-like pipelines if desired.
All results are combined into one large lazy iterator and yielded lazily.
PARAMETER | DESCRIPTION |
---|---|
flattener |
A function that returns either an async iterator or iterator of new values. Could be an attribute name instead.
TYPE:
|
Examples:
The following example generates a distinct collection of all mentioned users in the given channel from the past 500 messages.
def iter_mentioned_users(message: hikari.Message) -> typing.Iterable[Snowflake]:
for match in re.findall(r"<@!?(\d+)>", message.content):
yield Snowflake(match)
mentioned_users = await (
channel
.history()
.limit(500)
.map(".content")
.flat_map(iter_mentioned_users)
.distinct()
)
RETURNS | DESCRIPTION |
---|---|
LazyIterator[AnotherValueT]
|
The new lazy iterator to return. |
for_each
async
#
Forward each value to a given consumer immediately.
last
async
#
last() -> ValueT
Return the last element of this iterator only.
Note
This method will consume the whole iterator if run.
RETURNS | DESCRIPTION |
---|---|
ValueT
|
The last result. |
RAISES | DESCRIPTION |
---|---|
LookupError
|
If no result exists. |
limit
#
limit(limit: int) -> LazyIterator[ValueT]
Limit the number of items you receive from this async iterator.
PARAMETER | DESCRIPTION |
---|---|
limit |
The number of items to get. This must be greater than zero.
TYPE:
|
Examples:
RETURNS | DESCRIPTION |
---|---|
LazyIterator[ValueT]
|
A paginated results view that asynchronously yields a maximum of the given number of items before completing. |
map
#
map(
transformation: Union[Callable[[ValueT], AnotherValueT], str]
) -> LazyIterator[AnotherValueT]
Map the values to a different value.
PARAMETER | DESCRIPTION |
---|---|
transformation |
The function to use to map the attribute. This may alternatively
be a string attribute name to replace the input value with. You
can provide nested attributes using the |
RETURNS | DESCRIPTION |
---|---|
LazyIterator[AnotherValueT]
|
|
next
async
#
next() -> ValueT
Return the next element of this iterator only.
RETURNS | DESCRIPTION |
---|---|
ValueT
|
The next result. |
RAISES | DESCRIPTION |
---|---|
LookupError
|
If no more results exist. |
open
#
Mark this streamer as opened to let it start receiving and queueing events.
If called on an already started streamer then this will do nothing.
Note
[with streamer][] may be used as a short-cut for opening and closing a stream.
reversed
#
reversed() -> LazyIterator[ValueT]
Return a lazy iterator of the remainder of this iterator's values reversed.
RETURNS | DESCRIPTION |
---|---|
LazyIterator[ValueT]
|
The lazy iterator of this iterator's remaining values reversed. |
skip
#
skip(number: int) -> LazyIterator[ValueT]
Drop the given number of items, then yield anything after.
PARAMETER | DESCRIPTION |
---|---|
number |
The max number of items to drop before any items are yielded.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LazyIterator[ValueT]
|
A paginated results view that asynchronously yields all items AFTER the given number of items are discarded first. |
skip_until
#
skip_until(
*predicates: Union[Tuple[str, Any], Callable[[ValueT], bool]], **attrs: Any
) -> LazyIterator[ValueT]
Discard items while all conditions are False.
Items after this will be yielded as normal.
PARAMETER | DESCRIPTION |
---|---|
*predicates |
Predicates to invoke. These are functions that take a value and
return
TYPE:
|
**attrs |
Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LazyIterator[ValueT]
|
LazyIterator that only emits values once a condition has failed. All items before this are discarded. |
skip_while
#
skip_while(
*predicates: Union[Tuple[str, Any], Callable[[ValueT], bool]], **attrs: Any
) -> LazyIterator[ValueT]
Discard items while all conditions are True.
Items after this will be yielded as normal.
PARAMETER | DESCRIPTION |
---|---|
*predicates |
Predicates to invoke. These are functions that take a value and
return
TYPE:
|
**attrs |
Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LazyIterator[ValueT]
|
LazyIterator that only emits values once a condition has been met. All items before this are discarded. |
sort
async
#
Collect all results, then sort the collection before returning it.
take_until
#
take_until(
*predicates: Union[Tuple[str, Any], Callable[[ValueT], bool]], **attrs: Any
) -> LazyIterator[ValueT]
Return each item until any conditions pass or the end is reached.
PARAMETER | DESCRIPTION |
---|---|
*predicates |
Predicates to invoke. These are functions that take a value and
return
TYPE:
|
**attrs |
Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LazyIterator[ValueT]
|
LazyIterator that only emits values until any conditions are matched. |
take_while
#
take_while(
*predicates: Union[Tuple[str, Any], Callable[[ValueT], bool]], **attrs: Any
) -> LazyIterator[ValueT]
Return each item until any conditions fail or the end is reached.
PARAMETER | DESCRIPTION |
---|---|
*predicates |
Predicates to invoke. These are functions that take a value and
return
TYPE:
|
**attrs |
Alternative to passing 2-tuples. Cannot specify nested attributes using this method.
TYPE:
|
RETURNS | DESCRIPTION |
---|---|
LazyIterator[ValueT]
|
LazyIterator that only emits values until any conditions are not matched. |
filtered
#
filtered(
event_types: Union[Type[Event], Sequence[Type[Event]]],
cache_components: CacheComponents = config.CacheComponents.NONE,
) -> Callable[
[_UnboundMethodT[_EventManagerBaseT]], _UnboundMethodT[_EventManagerBaseT]
]
Add metadata to a consumer method to indicate when it should be unmarshalled and dispatched.
PARAMETER | DESCRIPTION |
---|---|
event_types |
Types of the events this raw consumer method may dispatch. This may either be a singular type of a sequence of types. |
PARAMETER | DESCRIPTION |
---|---|
cache_components |
Bitfield of the cache components this event may make altering calls to.
TYPE:
|