stream — Stanza stream

The stanza stream is the layer of abstraction above the XML stream. It deals with sending and receiving stream-level elements, mainly stanzas. It also handles stream liveness and stream management.

It provides ways to track stanzas on their way to the remote, as far as that is possible.

class aioxmpp.stream.StanzaStream(local_jid=None, *, loop=None, base_logger=<logging.Logger object at 0x7f9ae21c76d8>)[source]

A stanza stream. This is the next layer of abstraction above the XMPP XML stream, which mostly deals with stanzas (but also with certain other stream-level elements, such as XEP-0198 Stream Management Request/Acks).

It is independent from a specific XMLStream instance. A StanzaStream can be started with one XML stream, stopped later and then resumed with another XML stream. The user of the StanzaStream has to make sure that the XML streams are compatible, identity-wise (use the same JID).

local_jid may be the bare sender JID associated with the stanza stream. This is required for compatibility with ejabberd. If it is omitted, communication with ejabberd instances may not work.

loop may be used to explicitly specify the asyncio.BaseEventLoop to use, otherwise the current event loop is used.

base_logger can be used to explicitly specify a logging.Logger instance to fork off the logger from. The StanzaStream will use a child logger of base_logger called StanzaStream.

Changed in version 0.4: The local_jid argument was added.

The stanza stream takes care of ensuring stream liveness. For that, pings are sent in a periodic interval. If stream management is enabled, stream management ack requests are used as pings, otherwise XEP-0199 pings are used.

The general idea of pinging is, to save computing power, to send pings only when other stanzas are also about to be sent, if possible. The time window for waiting for other stanzas is defined by ping_opportunistic_interval. The general time which the StanzaStream waits between the reception of the previous ping and contemplating the sending of the next ping is controlled by ping_interval. See the attributes descriptions for details:

ping_interval = timedelta(seconds=15)

A datetime.timedelta instance which controls the time between a ping response and starting the next ping. When this time elapses, opportunistic mode is engaged for the time defined by ping_opportunistic_interval.

ping_opportunistic_interval = timedelta(seconds=15)

This is the time interval after ping_interval. During that interval, StanzaStream waits for other stanzas to be sent. If a stanza gets send during that interval, the ping is fired. Otherwise, the ping is fired after the interval.

After a ping has been sent, the response must arrive in a time of ping_interval for the stream to be considered alive. If the response fails to arrive within that interval, the stream fails (see on_failure).

Starting/Stopping the stream:

start(xmlstream)[source]

Start or resume the stanza stream on the given aioxmpp.protocol.XMLStream xmlstream.

This starts the main broker task, registers stanza classes at the xmlstream and reconfigures the ping state.

stop()[source]

Send a signal to the main broker task to terminate. You have to check running and possibly wait for it to become False — the task takes at least one loop through the event loop to terminate.

It is guarenteed that the task will not attempt to send stanzas over the existing xmlstream after a call to stop() has been made.

It is legal to call stop() even if the task is already stopped. It is a no-op in that case.

coroutine wait_stop()[source]

Stop the stream and wait for it to stop.

See stop() for the general stopping conditions. You can assume that stop() is the first thing this coroutine calls.

coroutine close()[source]

Close the stream and the underlying XML stream (if any is connected).

This calls wait_stop() and cleans up any Stream Management state, if no error occurs. If an error occurs while the stream stops, that error is re-raised and the stream management state is not cleared, unless resumption is disabled.

running[source]

True if the broker task is currently running, and False otherwise.

flush_incoming()[source]

Flush all incoming queues to the respective processing methods. The handlers are called as usual, thus it may require at least one iteration through the asyncio event loop before effects can be seen.

The incoming queues are empty after a call to this method.

It is legal (but pretty useless) to call this method while the stream is running.

Sending stanzas:

enqueue_stanza(stanza, **kwargs)[source]

Enqueue a stanza to be sent. Return a StanzaToken to track the stanza. The kwargs are passed to the StanzaToken constructor.

This method calls autoset_id() on the stanza automatically.

coroutine send_iq_and_wait_for_reply(iq, *, timeout=None)[source]

Send an IQ stanza iq and wait for the response. If timeout is not None, it must be the time in seconds for which to wait for a response.

If the response is a "result" IQ, the value of the payload attribute is returned. Otherwise, the exception generated from the error attribute is raised.

See also

register_iq_request_future() for other cases raising exceptions.

Receiving stanzas:

register_iq_request_coro(type_, payload_cls, coro)[source]

Register a coroutine coro to IQ requests of type type_ which have a payload of the given payload_cls class.

Whenever a matching IQ stanza is received, the coroutine is started with the stanza as its only argument. The coroutine must return a valid value for the stanza.IQ.payload attribute. The value will be set as the payload attribute value of an IQ response (with type "result") which is generated and sent by the stream.

If the coroutine raises an exception, it will be converted to a Error object. That error object is then used as payload for an IQ response (with type "error") which is generated and sent by the stream.

If the exception is a subclass of aioxmpp.errors.XMPPError, it is converted to an Error instance directly. Otherwise, it is wrapped in a aioxmpp.errors.XMPPCancelError with undefined-condition.

If there is already a coroutine registered for the given (type_, payload_cls) pair, ValueError is raised.

unregister_iq_request_coro(type_, payload_cls)[source]

Unregister a coroutine previously registered with register_iq_request_coro(). The match is solely made using the type_ and payload_cls arguments, which have the same meaning as in register_iq_request_coro().

This raises KeyError if no coroutine has previously been registered for the type_ and payload_cls.

register_iq_response_future(from_, id_, fut)[source]

Register a future fut for an IQ stanza with type result or error from the JID from_ with the id id_.

If the type of the IQ stanza is result, the stanza is set as result to the future. If the type of the IQ stanza is error, the stanzas error field is converted to an exception and set as the exception of the future.

The future might also receive different exceptions:

  • errors.ErrorneousStanza, if the response stanza received could not be parsed.

    Note that this exception is not emitted if the from address of the stanza is unset, because the code cannot determine whether a sender deliberately used an errorneous address to make parsing fail or no sender address was used. In the former case, an attacker could use that to inject a stanza which would be taken as a stanza from the peer server. Thus, the future will never be fulfilled in these cases.

    Also note that this exception does not derive from errors.XMPPError, as it cannot provide the same attributes. Instead, it dervies from errors.StanzaError, from which errors.XMPPError also derives; to catch all possible stanza errors, catching errors.StanzaError is sufficient and future-proof.

register_iq_response_callback(from_, id_, cb)[source]

Register a callback function cb to be called when a IQ stanza with type result or error is recieved from the JID from_ with the id id_.

The callback is called at most once.

Note

In contrast to register_iq_response_future(), errors which occur on a level below XMPP stanzas cannot be caught using a callback.

If you need notification about other errors and still want to use callbacks, use of a future with asyncio.Future.add_done_callback() is recommended.

unregister_iq_response(from_, id_)[source]

Unregister a registered callback or future for the IQ response identified by from_ and id_. See register_iq_response_future() or register_iq_response_callback() for details on the arguments meanings and how to register futures and callbacks respectively.

Note

Futures will automatically be unregistered when they are cancelled.

register_message_callback(type_, from_, cb)[source]

Register a callback function cb to be called whenever a message stanza of the given type_ from the given JID from_ arrives.

Both type_ and from_ can be None, each, to indicate a wildcard match.

More specific callbacks win over less specific callbacks, and the match on the from_ address takes precedence over the match on the type_.

To be explicit, the order in which callbacks are searched for a given type and from_ of a stanza is:

  • type, from_
  • type, from_.bare()
  • None, from_
  • None, from_.bare()
  • type, None
  • None, None
unregister_message_callback(type_, from_)[source]

Unregister a callback previously registered with register_message_callback(). type_ and from_ have the same semantics as in register_message_callback().

Attempting to unregister a type_, from_ tuple for which no handler has been registered results in a KeyError.

register_presence_callback(type_, from_, cb)[source]

Register a callback function cb to be called whenever a presence stanza of the given type_ arrives from the given JID.

from_ may be None to indicate a wildcard. Like with register_message_callback(), more specific callbacks win over less specific callbacks.

Note

A type_ of None is a valid value for aioxmpp.stanza.Presence stanzas and is not a wildcard here.

unregister_presence_callback(type_, from_)[source]

Unregister a callback previously registered with register_presence_callback(). type_ and from_ have the same semantics as in register_presence_callback().

Attempting to unregister a type_, from_ tuple for which no handler has been registered results in a KeyError.

Inbound stanza filters allow to hook into the stanza processing by replacing, modifying or otherwise processing stanza contents before the above callbacks are invoked. With inbound stanza filters, there are no restrictions as to what processing may take place on a stanza, as no one but the stream may have references to its contents. See below for a guideline on when to use stanza filters.

Warning

Raising an exception from within a stanza filter kills the stream.

Note that if a filter function drops an incoming stanza (by returning None), it must ensure that the client still behaves RFC compliant.

app_inbound_presence_filter

This is a AppFilter based filter chain on inbound presence stanzas. It can be used to attach application-specific filters.

service_inbound_presence_filter

This is another filter chain for inbound presence stanzas. It runs before the app_inbound_presence_filter chain and all functions registered there must have service.Service classes as order value (see Filter.register()).

This filter chain is intended to be used by library services, such as a XEP-0115 implementation which may start a XEP-0030 lookup at the target entity to resolve the capability hash or prime the XEP-0030 cache with the service information obtained by interpreting the XEP-0115 hash value.

app_inbound_message_filter

This is a AppFilter based filter chain on inbound message stanzas. It can be used to attach application-specific filters.

service_inbound_message_filter

This is the analogon of service_inbound_presence_filter for app_inbound_message_filter.

Outbound stanza filters work similar to inbound stanza filters, but due to their location in the processing chain and possible interactions with senders of stanzas, there are some things to consider:

  • Per convention, a outbound stanza filter must not modify any child elements which are already present in the stanza when it receives the stanza.

    It may however add new child elements or remove existing child elements, as well as copying and then modifying existing child elements.

  • If the stanza filter replaces the stanza, it is responsible for making sure that the new stanza has appropriate from_, to and id values. There are no checks to enforce this, because errorr handling at this point is peculiar. The stanzas will be sent as-is.

  • Similar to inbound filters, it is the responsibility of the filters that if stanzas are dropped, the client still behaves RFC-compliant.

Now that you have been warned, here are the attributes for accessing the outbound filter chains. These otherwise work exactly like their inbound counterparts, but service filters run after application filters on outbound processing.

app_outbound_presence_filter

This is a AppFilter based filter chain on outbound presence stanzas. It can be used to attach application-specific filters.

Before using this attribute, make sure that you have read the notes above.

service_outbound_presence_filter

This is the analogon of service_inbound_presence_filter, but for outbound presence. It runs after the app_outbound_presence_filter().

Before using this attribute, make sure that you have read the notes above.

app_outbound_message_filter

This is a AppFilter based filter chain on inbound message stanzas. It can be used to attach application-specific filters.

Before using this attribute, make sure that you have read the notes above.

service_outbound_messages_filter

This is the analogon of service_outbound_presence_filter, but for outbound messages.

Before using this attribute, make sure that you have read the notes above.

When to use stanza filters? In general, applications will rarely need them. However, services may make profitable use of them, and it is a convenient way for them to inspect incoming or outgoing stanzas without having to take up the registration slots (remember that register_message_callback() et. al. only allow one callback per designator).

In general, whenever you do something which supplements the use of the stanza with respect to the RFC but does not fulfill the orignial intent of the stanza, it is advisable to use a filter instead of a callback on the actual stanza.

Vice versa, if you were to develop a service which manages presence subscriptions, it would be more correct to use register_presence_callback(); this prevents other services which try to do the same from conflicting with you. You would then provide callbacks to the application to let it learn about presence subscriptions.

Using stream management:

coroutine start_sm(request_resumption=True)[source]

Start stream management (version 3). This negotiates stream management with the server.

If the server rejects the attempt to enable stream management, a errors.StreamNegotiationFailure is raised. The stream is still running in that case.

Warning

This method cannot and does not check whether the server advertised support for stream management. Attempting to negotiate stream management without server support might lead to termination of the stream.

If an XML stream error occurs during the negotiation, the result depends on a few factors. In any case, the stream is not running afterwards. If the SMEnabled response was not received before the XML stream died, SM is also disabled and the exception which caused the stream to die is re-raised (this is due to the implementation of send_and_wait_for()). If the SMEnabled response was received and annonuced support for resumption, SM is enabled. Otherwise, it is disabled. No exception is raised if SMEnabled was received, as this method has no way to determine that the stream failed.

If negotiation succeeds, this coroutine initializes a new stream management session. The stream management state attributes become available and sm_enabled becomes True.

coroutine resume_sm(xmlstream)[source]

Resume an SM-enabled stream using the given xmlstream.

If the server rejects the attempt to resume stream management, a errors.StreamNegotiationFailure is raised. The stream is then in stopped state and stream management has been stopped.

Warning

This method cannot and does not check whether the server advertised support for stream management. Attempting to negotiate stream management without server support might lead to termination of the stream.

If the XML stream dies at any point during the negotiation, the SM state is left unchanged. If no response has been received yet, the exception which caused the stream to die is re-raised. The state of the stream depends on whether the main task already noticed the dead stream.

If negotiation succeeds, this coroutine resumes the stream management session and initiates the retransmission of any unacked stanzas. The stream is then in running state.

stop_sm()[source]

Disable stream management on the stream.

Attempting to call this method while the stream is running or without stream management enabled results in a RuntimeError.

Any sent stanzas which have not been acked by the remote yet are put into StanzaState.SENT_WITHOUT_SM state.

sm_enabled[source]

True if stream management is currently enabled on the stream, False otherwise.

Stream management state inspection:

sm_outbound_base[source]

The last value of the remote stanza counter.

Note

Accessing this attribute when sm_enabled is False raises RuntimeError.

sm_inbound_ctr[source]

The current value of the inbound stanza counter.

Note

Accessing this attribute when sm_enabled is False raises RuntimeError.

sm_unacked_list[source]

A copy of the list of stanza tokens which have not yet been acked by the remote party.

Note

Accessing this attribute when sm_enabled is False raises RuntimeError.

Accessing this attribute is expensive, as the list is copied. In general, access to this attribute should not be neccessary at all.

sm_id[source]

The value of the id attribute of the SMEnabled response from the server.

Note

Accessing this attribute when sm_enabled is False raises RuntimeError.

sm_max[source]

The value of the max attribute of the SMEnabled response from the server.

Note

Accessing this attribute when sm_enabled is False raises RuntimeError.

sm_location[source]

The value of the location attribute of the SMEnabled response from the server.

Note

Accessing this attribute when sm_enabled is False raises RuntimeError.

sm_resumable[source]

The value of the resume attribute of the SMEnabled response from the server.

Note

Accessing this attribute when sm_enabled is False raises RuntimeError.

Miscellaneous:

local_jid[source]

The local_jid argument to the constructor. This cannot be changed.

Signals:

signal on_failure(exc)

A signal which will fire when the stream has failed. A failure occurs whenever the main task of the StanzaStream (the one started by start()) terminates with an exception.

Examples are ConnectionError as raised upon a ping timeout and any exceptions which may be raised by the aioxmpp.protocol.XMLStream.send_xso() method.

The exception which occured is given as exc.

signal on_stream_destroyed()

When a stream is destroyed so that all state shall be discarded (for example, pending futures), this signal is fired.

This happens if a non-SM stream is stopped or if SM is being disabled.

signal on_stream_established()

When a stream is newly established, this signal is fired. This happens whenever a non-SM stream is started and whenever a stream which previously had SM disabled is started with SM enabled.

Low-level stanza tracking

The following classes are used to track stanzas in the XML stream to the server. This is independent of things like XEP-0184 Message Delivery Receipts (for which services are provided at aioxmpp.tracking); it only provides tracking to the remote server and even that only if stream management is used. Otherwise, it only provides tracking in the aioxmpp internal queues.

class aioxmpp.stream.StanzaToken(stanza, *, on_state_change=None)[source]

A token to follow the processing of a stanza.

on_state_change may be a function which will be called with the token and the new StanzaState whenever the state of the token changes.

state[source]

The current StanzaState of the token. Tokens are created with StanzaState.ACTIVE.

abort()[source]

Abort the stanza. Attempting to call this when the stanza is in any non-ACTIVE, non-ABORTED state results in a RuntimeError.

When a stanza is aborted, it will reside in the active queue of the stream, not will be sent and instead discarded silently.

class aioxmpp.stream.StanzaState[source]

The various states an outgoing stanza can have.

ACTIVE

The stanza has just been enqueued for sending and has not been taken care of by the StanzaStream yet.

SENT

The stanza has been sent over a stream with Stream Management enabled, but not acked by the remote yet.

ACKED

The stanza has been sent over a stream with Stream Management enabled and has been acked by the remote. This is a final state.

SENT_WITHOUT_SM

The stanza has been sent over a stream without Stream Management enabled or has been sent over a stream with Stream Management enabled, but for which resumption has failed before the stanza has been acked.

This is a final state.

ABORTED

The stanza has been retracted before it left the active queue.

This is a final state.

DROPPED

The stanza has been dropped by one of the filters configured in the StanzaStream.

This is a final state.

Filters

The filters used by the StanzaStream are implemented by the following classes:

class aioxmpp.stream.Filter[source]

A filter chain for stanzas. The idea is to process a stanza through a sequence of user- and service-definable functions.

Each function must either return the stanza it received as argument or None. If it returns None the filtering aborts and the caller of filter() also receives None.

Each function receives the result of the previous function for further processing.

register(func, order)[source]

Register a function func as filter in the chain. order must be a value which will be used to order the registered functions relative to each other.

Functions with the same order are sorted in the order of their addition, with the function which was added earliest first.

Remember that all values passed to order which are registered at the same time in the same Filter need to be at least partially orderable with respect to each other.

Return an opaque token which is needed to unregister a function.

filter(stanza_obj)[source]

Pass the given stanza_obj through the filter chain and return the result of the chain. See Filter for details on how the value is passed through the registered functions.

unregister(token_to_remove)[source]

Unregister a function from the filter chain using the token returned by register().

class aioxmpp.stream.AppFilter[source]

A specialized Filter version. The only difference is in the handling of the order argument to register():

register(func, order=0)[source]

This method works exactly like Filter.register(), but order has a default value of 0.