########################################################################
# File name: service.py
# This file is part of: aioxmpp
#
# LICENSE
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with this program. If not, see
# <http://www.gnu.org/licenses/>.
#
########################################################################
import asyncio
import random
from datetime import timedelta
import aioxmpp
import aioxmpp.callbacks
import aioxmpp.errors as errors
import aioxmpp.service as service
import aioxmpp.utils as utils
from . import xso as ibb_xso
MAX_BLOCK_SIZE = (1 << 16) - 1
[docs]class IBBTransport(asyncio.Transport):
"""
The transport for IBB sessions.
.. note:: Never instantiate this class directly, all instances of
this class are created the methods
:meth:`~aioxmpp.ibb.IBBService.open_session` and
:meth:`~aioxmpp.ibb.IBBService.expect_session` of
:class:`~aioxmpp.ibb.IBBService`.
The following keys are supported for
:meth:`~asyncio.BaseTransport.get_extra_info`:
`block_size`
The maximal block size of data in a IBB stanza.
`peer_jid`
The JID of the peer.
`sid`
The session id of the unerlying IBB session.
`stanza_type`
The used stanza type.
"""
def __init__(self, service, peer_jid, sid, stanza_type, block_size):
self._protocol = None
self._service = service
self._stanza_type = stanza_type
self._sid = sid
self._peer_jid = peer_jid
self._block_size = block_size
self._incoming_seq = 0
self._outgoing_seq = 0
self._closed = False
self._closing = False
self.set_write_buffer_limits()
self._write_buffer = b""
self._can_write = asyncio.Event()
self._reading_paused = False
self._input_buffer = []
self._write_task = asyncio.ensure_future(self._write_task_main())
self._write_task.add_done_callback(
self._handle_close
)
self._wait_time = self._service.initial_wait_time.total_seconds()
self._retries = 0
def set_write_buffer_limits(self, high=None, low=None):
if low is None:
low = 4 * self._block_size
if high is None:
high = 8 * self._block_size
if low < 0:
raise ValueError("the limits must be positive")
if high < 0:
raise ValueError("the limits must be positive")
if low > high:
low = high
self._output_buffer_limit_low = low
self._output_buffer_limit_high = high
def get_write_buffer_limits(self):
return self._output_buffer_limit_low, self._output_buffer_limit_high
def get_write_buffer_size(self):
return len(self._write_buffer)
def set_protocol(self, proto):
self._protocol = proto
proto.connection_made(self)
def get_protocol(self):
return self._protocol
def pause_reading(self):
self._reading_paused = True
def resume_reading(self):
self._reading_paused = False
self._protocol.data_received(b"".join(self._input_buffer))
self._input_buffer.clear()
def is_closing(self):
return self._closing or self._closed
def get_extra_info(self, key, default=None):
return {
"block_size": self._block_size,
"peer_jid": self._peer_jid,
"stanza_type": self._stanza_type,
"sid": self._sid,
}.get(key, default)
async def _write_task_main(self):
e = None
while True:
await self._can_write.wait()
if self._write_buffer:
data = self._write_buffer[:self._block_size]
if self._stanza_type == ibb_xso.IBBStanzaType.IQ:
stanza = aioxmpp.IQ(
aioxmpp.IQType.SET,
to=self._peer_jid,
payload=ibb_xso.Data(
self._sid,
self._outgoing_seq,
data
)
)
elif self._stanza_type == ibb_xso.IBBStanzaType.MESSAGE:
# TODO: use some form of tracking for messages
stanza = aioxmpp.Message(
aioxmpp.MessageType.NORMAL,
to=self._peer_jid,
)
stanza.xep0047_data = ibb_xso.Data(
self._sid,
self._outgoing_seq,
data
)
try:
await self._service.client.send(
stanza
)
except errors.XMPPWaitError:
# wait and try again unless max retries have been reached
if self._retries < self._service.max_retries:
await asyncio.sleep(self._wait_time)
self._wait_time *= self._service.wait_backoff_factor
self._retries += 1
continue
else:
e = asyncio.TimeoutError()
break
except errors.StanzaError as _e:
# break the loop to close the connection
e = _e
break
# update the internal state after the successful
# write: remove the written data from the buffer and
# increment the sequence number
self._write_buffer = self._write_buffer[len(data):]
self._outgoing_seq += 1
self._outgoing_seq &= 0xffff
# reset the wait time
self._wait_time = \
self._service.initial_wait_time.total_seconds()
self._retries = 0
if len(self._write_buffer) < self._output_buffer_limit_low:
self._protocol.resume_writing()
if not self._write_buffer:
if self._closing:
e = None
break
self._can_write.clear()
close = ibb_xso.Close()
close.sid = self._sid
stanza = aioxmpp.IQ(
aioxmpp.IQType.SET,
to=self._peer_jid,
payload=close,
)
try:
await self._service.client.send(stanza)
except errors.StanzaError as _e:
if e is None:
e = _e
finally:
if e is not None:
raise e
def write(self, data):
"""
Send `data` over the IBB. If `data` is larger than the block size
is is chunked and sent in chunks.
Chunks from one call of :meth:`write` will always be sent in
series.
"""
if self.is_closing():
return
self._write_buffer += data
if len(self._write_buffer) >= self._output_buffer_limit_high:
self._protocol.pause_writing()
if self._write_buffer:
self._can_write.set()
def _connection_closed(self):
self._write_task.cancel()
def _handle_close(self, fut):
e = None
self._service._remove_session(self._peer_jid, self._sid)
try:
e = fut.exception()
except asyncio.CancelledError:
pass
self._protocol.connection_lost(e)
self._closed = True
def close(self):
"""
Close the session.
"""
if self.is_closing():
return
self._closing = True
# make sure the writer wakes up
self._can_write.set()
def abort(self):
"""
Abort the session.
"""
if self.is_closing():
return
self._connection_closed()
def _data_received(self, data):
if self._closed:
return
if self._reading_paused:
self._input_buffer.append(data)
else:
self._protocol.data_received(data)
def _process_iq(self, payload):
if payload.seq != self._incoming_seq:
raise errors.XMPPCancelError(
condition=errors.ErrorCondition.UNEXPECTED_REQUEST
)
self._incoming_seq += 1
self._incoming_seq &= 0xffff
self._data_received(payload.content)
def _process_msg(self, payload):
if payload.seq != self._incoming_seq:
return
self._incoming_seq += 1
self._incoming_seq &= 0xffff
self._data_received(payload.content)
[docs]class IBBService(service.Service):
"""
A service implementing in-band bytestreams.
Methods for establishing sessions:
.. automethod:: expect_session
.. automethod:: open_session
The following attributes control the establishment of sessions due
to a received request, that was not announced to the service by
:meth:`expect_session`:
.. attribute:: session_limit
:annotation: = 0
The maximal number of sessions to be accepted. If there are
that many or more active sessions, no new sessions are
accepted, unless they are whitelisted by
:meth:`expect_session`. (This means, that by default only
expected sessions are accepted!).
.. attribute:: default_protocol_factory
The protocol factory to be used when an unexpected connection
is established. This *must* be set when changing
:attr:`session_limit` to a non-zero value.
.. signal:: on_session_accepted(transport, protocol)
Fires when a session is established due to a received open
request that was not expected (compare
:meth:`expect_session`). This can only happen when
:attr:`session_limit` is set to another value than its default
value.
The following attributes control how the IBB sessions react to
errors of type wait:
.. attribute:: max_retries
:annotation: = 5
The number of times it is tried to resend a data stanza, when a
:class:`~aioxmpp.errors.XMPPWaitError` is received. When
:attr:`max_retries` have been tried, the session is closed.
`connection_lost` of the protocol receives an
:class:`asyncio.TimeoutError`.
.. attribute:: initial_wait_time
:annotation: = timedelta(seconds=1)
The time to wait when receiving a
:class:`~aioxmpp.errors.XMPPWaitError` for the first time.
.. attribute:: wait_backoff_factor
:annotation: = 1.2
The factor by which the wait time is prolonged on each
successive wait error.
"""
on_session_accepted = aioxmpp.callbacks.Signal()
def __init__(self, client, **kwargs):
super().__init__(client, **kwargs)
self._sessions = {}
self.session_limit = 0
self._expected_sessions = {}
self.default_protocol_factory = None
self.client.on_stream_destroyed.connect(
self._on_stream_destroyed
)
self.max_retries = 5
self.initial_wait_time = timedelta(seconds=1)
self.wait_backoff_factor = 1.2
def _on_stream_destroyed(self):
self._expected_sessions = {}
# tear down the remaining open sessions
for session in list(self._sessions.values()):
session.abort()
[docs] def expect_session(self, protocol_factory, peer_jid, sid):
"""
Whitelist the session with `peer_jid` and the session id `sid` and
return it when it is established. This is meant to be used
with signalling protocols like Jingle and is the counterpart
to :meth:`open_session`.
:returns: an awaitable object, whose result is the tuple
`(transport, protocol)`
"""
def on_done(fut):
del self._expected_sessions[sid, peer_jid]
_, fut = self._expected_sessions[sid, peer_jid] = (
protocol_factory, asyncio.Future()
)
fut.add_done_callback(on_done)
return fut
[docs] async def open_session(self, protocol_factory, peer_jid, *,
stanza_type=ibb_xso.IBBStanzaType.IQ,
block_size=4096, sid=None):
"""
Establish an in-band bytestream session with `peer_jid` and
return the transport and protocol.
:param protocol_factory: the protocol factory
:type protocol_factory: a nullary callable returning an
:class:`asyncio.Protocol` instance
:param peer_jid: the JID with which to establish the byte-stream.
:type peer_jid: :class:`aioxmpp.JID`
:param stanza_type: the stanza type to use
:type stanza_type: class:`~aioxmpp.ibb.IBBStanzaType`
:param block_size: the maximal size of blocks to transfer
:type block_size: :class:`int`
:param sid: the session id to use
:type sid: :class:`str` (must be a valid NMTOKEN)
:returns: the transport and protocol
:rtype: a tuple of :class:`aioxmpp.ibb.service.IBBTransport`
and :class:`asyncio.Protocol`
"""
if block_size > MAX_BLOCK_SIZE:
raise ValueError("block_size too large")
if sid is None:
sid = utils.to_nmtoken(random.getrandbits(8*8))
open_ = ibb_xso.Open()
open_.stanza = stanza_type
open_.sid = sid
open_.block_size = block_size
# XXX: retry on XMPPModifyError with RESOURCE_CONSTRAINT
await self.client.send(
aioxmpp.IQ(
aioxmpp.IQType.SET,
to=peer_jid,
payload=open_,
)
)
handle = self._sessions[sid, peer_jid] = IBBTransport(
self,
peer_jid,
sid,
stanza_type,
block_size,
)
protocol = protocol_factory()
handle.set_protocol(protocol)
return handle, protocol
@service.iq_handler(
aioxmpp.IQType.SET,
ibb_xso.Open)
async def _handle_open_request(self, iq):
peer_jid = iq.from_
sid = iq.payload.sid
block_size = iq.payload.block_size
stanza_type = iq.payload.stanza
if block_size > MAX_BLOCK_SIZE:
raise errors.XMPPModifyError(
condition=errors.ErrorCondition.RESOURCE_CONSTRAINT
)
try:
protocol_factory, expected_future = \
self._expected_sessions[sid, peer_jid]
except KeyError:
if len(self._sessions) >= self.session_limit:
raise errors.XMPPCancelError(
condition=errors.ErrorCondition.NOT_ACCEPTABLE
)
expected_future = None
protocol_factory = self.default_protocol_factory
if (sid, peer_jid) in self._sessions:
# disallow opening a session twice
if expected_future is not None:
# is this correct?
expected_future.cancel()
raise errors.XMPPCancelError(
condition=errors.ErrorCondition.NOT_ACCEPTABLE
)
handle = self._sessions[sid, peer_jid] = IBBTransport(
self,
peer_jid,
sid,
stanza_type,
block_size
)
protocol = protocol_factory()
handle.set_protocol(protocol)
if expected_future is None:
self.on_session_accepted((handle, protocol))
else:
expected_future.set_result((handle, protocol))
@service.iq_handler(
aioxmpp.IQType.SET,
ibb_xso.Close)
async def _handle_close_request(self, iq):
peer_jid = iq.from_
sid = iq.payload.sid
try:
session_handle = self._sessions[sid, peer_jid]
except KeyError:
raise errors.XMPPCancelError(
condition=errors.ErrorCondition.ITEM_NOT_FOUND
)
session_handle._connection_closed()
@service.iq_handler(
aioxmpp.IQType.SET,
ibb_xso.Data)
async def _handle_data(self, iq):
peer_jid = iq.from_
sid = iq.payload.sid
try:
session_handle = self._sessions[sid, peer_jid]
except KeyError:
raise errors.XMPPCancelError(
condition=errors.ErrorCondition.ITEM_NOT_FOUND
)
session_handle._process_iq(iq.payload)
@service.inbound_message_filter
def _handle_message(self, msg):
if msg.xep0047_data is None:
return msg
payload = msg.xep0047_data
peer_jid = msg.from_
sid = payload.sid
try:
session = self._sessions[sid, peer_jid]
except KeyError:
return None
session._process_msg(payload)
return None
def _remove_session(self, peer_jid, sid):
del self._sessions[sid, peer_jid]