# encoding=utf-8
# File name: Context.py
# This file is part of: pyxwf
#
# LICENSE
#
# The contents of this file are subject to the Mozilla Public License
# Version 1.1 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS"
# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
# the License for the specific language governing rights and limitations
# under the License.
#
# Alternatively, the contents of this file may be used under the terms
# of the GNU General Public license (the "GPL License"), in which case
# the provisions of GPL License are applicable instead of those above.
#
# FEEDBACK & QUESTIONS
#
# For feedback and questions about pyxwf please e-mail one of the
# authors named in the AUTHORS file.
########################################################################
from __future__ import unicode_literals, print_function
import operator
import abc
import collections
import functools
import logging
import itertools
import base64
import urllib
from PyXWF.utils import _F
import PyXWF.Types as Types
import PyXWF.Errors as Errors
import PyXWF.TimeUtils as TimeUtils
import PyXWF.ContentTypes as ContentTypes
import PyXWF.HTTPUtils as HTTPUtils
import PyXWF.AcceptHeaders as AcceptHeaders
import PyXWF.Message as Message
logging = logging.getLogger(__name__)
map = itertools.imap
class Cookie(object):
@staticmethod
def decode_value(value):
missing = 4 - len(value) % 4
if 0 < missing < 4:
value += b"=" * missing
return base64.urlsafe_b64decode(value).decode(b"utf-8")
@staticmethod
def encode_value(value):
if isinstance(value, unicode):
value = value.encode(b"utf-8")
encoded = base64.urlsafe_b64encode(value)
return encoded.rstrip(b"=")
@classmethod
def from_cookie_header(cls, cookie_pair):
"""
Return the :class:`~Cookie` instance by parsing *cookie_pair* as per
RFC 6265, Section 4.2.1.
If the decoding of *cookie_pair* fails for some reason, None is returned
and an error is logged.
"""
if not isinstance(cookie_pair, str):
raise TypeError("cookie_pair must be str, not {0}".format(type(cookie_pair).__name__))
try:
if b";" in cookie_pair:
raise ValueError("cookie_pair contains \";\"")
name, _, value = cookie_pair.partition(b"=")
if not value:
raise ValueError("cookie_pair does not conform to RFC 6265 (no value)")
instance = cls(name, cls.decode_value(value))
except (ValueError, TypeError) as err:
logging.error(_F(
"Could not decode cookie-av {0!r}: {1}",
cookie_pair,
err
))
return None
instance.from_client = True
return instance
def __init__(self, name, value,
expires=None, domain=None, path=None, secure=False, httponly=False,
maxage=None):
if expires is not None and maxage is not None:
logging.warning("Cookie has both maxage and expires, maxage will \
take precedence")
self.name = str(name)
self.value = value
self.expires = expires
self.domain = str(domain) if domain is not None else None
self.path = str(path) if path is not None else None
self.secure = bool(secure)
self.httponly = bool(httponly)
self.maxage = int(maxage) if maxage is not None else None
self.from_client = False
def to_cookie_string(self):
cookie_pair = b"{0}={1}".format(self.name, self.encode_value(self.value))
cookie_avs = []
if self.domain:
cookie_avs.append(b"Domain={0}".format(self.domain))
if self.path is not None:
cookie_avs.append(b"Path={0}".format(self.path))
if self.expires is not None:
cookie_avs.append(b"Expires={0}".format(
HTTPUtils.format_http_date(self.expires)
))
if self.maxage is not None:
cookie_avs.append(b"Max-Age={0:d}".format(self.maxage))
if self.secure:
cookie_avs.append(b"Secure")
if self.httponly:
cookie_avs.append(b"HttpOnly")
if cookie_avs:
cookie_avs.insert(0, b"")
return cookie_pair + b"; ".join(cookie_avs)
def __eq__(self, other):
try:
return (
self.name == other.name and
self.value == other.value and
self.expires == other.expires and
self.domain == other.domain and
self.path == other.path and
self.secure == other.secure and
self.httponly == other.httponly and
self.maxage == other.maxage
)
except AttributeError:
return NotImplemented
def __ne__(self, other):
result = self == other
if result is not NotImplemented:
return not result
return result
def __repr__(self):
return "<Cookie {0}={1!r} domain={2!r} path={3!r} expires={4} maxage={5}{6}>".format(
self.name,
self.value,
self.domain,
self.path,
self.expires,
self.maxage,
" ".join(attr for attr in [
"secure" if self.secure else None,
"httponly" if self.httponly else None
] if attr is not None)
)
[docs]class Context(object):
"""
The context of a request. It is passed around when retrieving the Document
from the Node tree and can be used to store custom data. All the properties
of the framework are named with a capital first letter or prefixed with an
underscore, so you're safe to use all other names.
.. note::
Do not instanciate this class directly. The web backend will do it for
you.
If you are going to write a web backend, we suggest you have a look at
the :mod:`PyXWF.WebBackends.WSGI` module as a reference. There is a
whole bunch of internal attributes which need to be set up to get the
Context work properly.
"""
__metaclass__ = abc.ABCMeta
html_preferences = [
# prefer delivery of XHTML (no conversion required)
AcceptHeaders.AcceptPreference("application/xhtml+xml", 1.0),
AcceptHeaders.AcceptPreference("text/html", 0.9)
]
charset_preferences = [
# prefer UTF-8, then go through the other unicode encodings in
# ascending order of size. prefer little-endian over big-endian
# encodings
AcceptHeaders.CharsetPreference("utf-8", 1.0),
AcceptHeaders.CharsetPreference("utf-16le", 0.95),
AcceptHeaders.CharsetPreference("utf-16be", 0.9),
AcceptHeaders.CharsetPreference("ucs-2le", 0.85),
AcceptHeaders.CharsetPreference("ucs-2be", 0.8),
AcceptHeaders.CharsetPreference("utf-32le", 0.75),
AcceptHeaders.CharsetPreference("utf-32be", 0.7),
]
useragent_html5_support = {
"ie": 9.0,
"firefox": 4.0,
"chrome": 6.0,
"safari": 5.0,
"opera": 11.1
}
useragent_prefixed_xhtml_support = {
# see <https://bugzilla.mozilla.org/show_bug.cgi?id=816012>
"firefox": None,
"opera": 12.0,
"chrome": 20.0,
"safari": None,
"ie": None
}
def __init__(self):
# Method of the HTTP request ("GET", "POST", ...)
self._method = None
# path relative to the application's root
self._path = None
# query data, as a python dict (can be initialized lazily in
# :meth:`_require_query` )
self._query_data = None
# query data, as a python dict (can be initialized lazily in
# :meth:`_require_post` )
self._post_data = None
# cookie information, as a python dict (can be initialized lazily in
# :meth:`_require_cookies` )
self._cookies = None
# datetime object representing the value of the incoming
# If-Modified-Since header, if any. Otherwise None
self._if_modified_since = None
# :class:`AcceptPreferenceList` instance
self._accept = None
# :class:`CharsetPreferenceList` instance
self._accept_charset = None
# :class:`LanguagePreferenceList` instance
self._accept_language = None
# will be set to True if POST data was requested (can be avoided by not
# calling super()._require_post when overriding _require_post)
self._force_no_cache = False
# these are backing values for the properties below. See their
# docstrings for further information
self._cachable = True
self._pagenode = None
self._used_resources = set()
self._last_modified = None
self._can_use_xhtml = False
self._cache_control = set()
self._html5_support = False
self._prefixed_xhtml_support = False
self._useragent_name = None
self._useragent_version = None
self._is_mobile_client = False
self._response_headers = {}
self._vary = set(["host"])
self._response_cookies = []
@abc.abstractmethod
[docs] def _require_query(self):
"""
Extract query string from the web frameworks transaction and make it
accessible. Also make a note that the query string was used in this
request and is thus needs to be appended to the cache path.
.. note::
This must be overridden when implementing a Context for a specific
web backend.
"""
@abc.abstractmethod
[docs] def _require_post(self):
"""
Extract the post data from the request and make it available at
:attr:`PostData`. This disables caching of the response altogether.
.. note::
This must be overridden when implementing a Context for a specific
web backend.
"""
self._force_no_cache = True
@abc.abstractmethod
[docs] def _require_cookies(self):
"""
Extract the cookie information from the request and make it available at
:attr:`Cookies`.
.. note::
This must be overriden when implementing a Context for a specific
web backend. It is expected that this method sets the
:attr:`_cookies` attribute to a dict mapping cookie names to
:class:`~Cookie` instances.
"""
[docs] def _set_cache_status(self, no_last_modified=False):
"""
Use the values of :attr:`Cachable` and :attr:`LastModified` to set up
the response headers which relate to caching. This may change the value
of the ``Last-Modified`` header and will add a cache control token.
If *no_last_modified* is True (default is False), the Last-Modified
header will not be set. This is required per RFC 2616 for 304 Not
Modified responses.
"""
self._cache_control = set()
if self.Cachable:
last_modified = self.LastModified
if last_modified is not None:
self.add_cache_control("must-revalidate")
# let's see whether that finally forces firefox to fix our
# reload issues.
self.add_cache_control("max-age=0")
if not no_last_modified:
self.set_response_header("Last-Modified",
HTTPUtils.format_http_date(last_modified))
else:
self.add_cache_control("no-cache")
[docs] def _determine_html_content_type(self):
"""
Use the :class:`~PyXWF.AcceptHeaders.AcceptPreferenceList` instance to
figure out whether the client properly supports XHTML.
Will set :attr:`~.CanUseXHTML` to True if the best match for all HTML
content types is the XHTML content type.
"""
logging.debug(_F("Finding out HTML content type to use. User agent: {0}/{1:.2f}",
self._useragent_name, self._useragent_version))
if self._useragent_name == "ie" and self._useragent_version < 9:
# thank you, microsoft, for your really verbose accept headers -
# which do _not_ include an explicit mention of text/html, instead,
# you just assume you can q=1.0 everything.
logging.debug("Forcing XHTML support to false: MSIE < 9 detected!")
html_content_type = ContentTypes.html
elif self._useragent_name == "chrome" and self._useragent_version < 7:
# but open browsers are not neccessarily better -- chromium with
# version <= 6.0 sends:
# application/xml;q=1.00, application/xhtml+xml;q=1.00, \
# text/html;q=0.90, text/plain;q=0.80, image/png;q=1.00, */*;q=0.50
# but is in fact unable to parse valid XHTML.
logging.debug("Forcing XHTML support to false: Chrome < 7 detected!")
html_content_type = ContentTypes.html
elif self._useragent_name == "firefox" and self._useragent_version == 6:
# this is google+ user agent. g+ seems to be unable to correctly
# parse XHTML schema.org information (or metadata in general), which
# screws up snippets.
# see: http://stackoverflow.com/q/12426591/1248008
# see: https://code.google.com/p/google-plus-platform/issues/detail?id=370
logging.warning("EVIL HACK: g+ client detected, disabling XHTML")
logging.debug("Accept: {0}".format(", ".join(map(str, self._accept))))
html_content_type = ContentTypes.html
else:
logging.debug("Accept: {0}".format(", ".join(map(str, self._accept))))
html_content_type = self._accept.best_match(
self.html_preferences,
match_wildcard=False
)
self._can_use_xhtml = html_content_type == ContentTypes.xhtml
logging.debug("CanUseXHTML: {0}".format(self._can_use_xhtml))
return html_content_type
[docs] def parse_accept(self, header_value):
"""
Parse *header_value* as value of an HTTP ``Accept`` header and return the
resulting :class:`~PyXWF.AcceptHeaders.AcceptPreferenceList` instance.
"""
prefs = AcceptHeaders.AcceptPreferenceList()
prefs.append_header(header_value)
return prefs
[docs] def parse_accept_charset(self, header_value):
"""
Parse *header_value* as value of an HTTP ``Accept-Charset`` header and
return the resulting :class:`~PyXWF.AcceptHeaders.CharsetPreferenceList`
instance.
"""
prefs = AcceptHeaders.CharsetPreferenceList()
prefs.append_header(header_value)
prefs.inject_rfc_values()
return prefs
[docs] def get_encoded_body(self, message):
"""
Try to get the best encoded version of the
:class:`~PyXWF.Message.Message` instance *message*.
Use the contents of :attr:`_accept_charset` (the parsed
``Accept-Charset`` HTTP header) to figure out which charsets the client
prefers. Then mix in what charsets _we_ like to deliver and get the
best match, giving priority to the clients wishes.
If no matching encoding can be found,
:class:`~PyXWF.Errors.NotAcceptable` is raised.
"""
candidates = self._accept_charset.get_candidates(
self.charset_preferences,
match_wildcard=True,
include_non_matching=True,
take_everything_on_empty=True)
# to prevent denial of service, we only test the first five encodings
for q, encoding in itertools.islice(reversed(candidates), 0, 5):
try:
message.Encoding = encoding
return message.get_encoded_body()
except UnicodeEncodeError:
pass
else:
# we try to serve the client UTF-8 and log a warning
logging.warning("No charset the client presented us worked to encode the message, returning 406 Not Acceptable")
logging.debug("Accept-Charset: {0}".format(", ".join(map(str, self._accept_charset))))
raise Errors.NotAcceptable()
[docs] def useragent_support(self, useragent, version):
"""
Set the attributes backing :prop:`HTML5Support` and
:prop:`PrefixedXHTMLSupport` based on the *useragent* and its *version*.
"""
self._html5_support = self.useragent_supports_html5(useragent, version)
self._prefixed_xhtml_support = self.useragent_supports_prefixed_xhtml(
useragent,
version
)
@classmethod
[docs] def useragent_supports_html5(cls, useragent, version):
"""
Guess whether the user agent *useragent* with version *version* (as
obtained for example from :func:`PyXWF.utils.guess_useragent`) can deal
with HTML5. This is only more or less accurate for the popular browsers.
Everyone else will just be served HTML5.
"""
try:
minversion = cls.useragent_html5_support[useragent]
return version >= minversion
except KeyError:
return True # we assume the best ... let them burn
@classmethod
[docs] def useragent_supports_prefixed_xhtml(cls, useragent, version):
"""
Make a negative guess on the prefixed XHTML support. Some user agents
(mainly firefox, even with version 16.0) are unable to deal with XHTML
if it has XML namespace prefixes. Running javascript will fail then.
Compare <http://stackoverflow.com/q/13591707/1248008> for more info.
Return whether the current user agent is *known* to support prefixed
XHTML. If the UA is unknown, False will be returned for maximum
compatibility. Please note that the transform removing the prefixes has
to be enabled explicitly in the config (
``<py:compatibility remove-xhtml-prefixes="true" />``).
"""
try:
minversion = cls.useragent_prefixed_xhtml_support[useragent]
if minversion is None:
return False
return version >= minversion
except KeyError:
return False
@classmethod
def _parse_cookie_header(cls, value):
cookie_strings = value.split(b";")
parse_cookie_gen = (Cookie.from_cookie_header(cookie_string.lstrip())
for cookie_string in cookie_strings
if cookie_string)
cookies = (cookie for cookie in parse_cookie_gen if cookie is not None)
return dict((cookie.name, cookie) for cookie in cookies)
@property
[docs] def Method(self):
"""
The request method (i.e. GET, POST
, HEAD, ...)
"""
return self._method
@property
[docs] def RequestPath(self):
"""
The URL path for the request, relative to the applications (not the
servers) root.
"""
return self._path
@property
[docs] def FullURI(self):
"""
The URL path for the request. This is everything behind the host name
and the port number.
"""
return self._fulluri
@property
[docs] def HostName(self):
"""
Host name the request was sent to.
"""
return self._hostname
@property
[docs] def URLScheme(self):
"""
URL scheme used for the request (this is either ``http`` or ``https``).
"""
return self._scheme
Path = RequestPath
@property
[docs] def QueryData(self):
"""
Access to the GET query data of the request as a dict. Accessing this
property will add the query string to the cache path.
"""
self._require_query()
return self._query_data
@property
[docs] def PostData(self):
"""
Access to the POST data of the request. Accessing this property will
disable caching of the response.
"""
self._require_post()
return self._post_data
@property
def RemainingPath(self):
"""
Path suffix which was not interpreted during resolution of the path
inside the node tree. This may be useful for redirects.
"""
return self._rempath
@RemainingPath.setter
[docs] def RemainingPath(self, value):
self._rempath = Typecasts.Types.unicode(value)
@property
def Cachable(self):
"""
Set whether the response is cachable. This may be force-set to False
by the Context if POST data was accessed. Otherwise it is writable by
the application to define whether the response may be cached by the
client.
"""
return self._cachable and not self._force_no_cache
@Cachable.setter
[docs] def Cachable(self, value):
self._cachable = Types.Typecasts.bool(value)
@property
[docs] def IfModifiedSince(self):
"""
Access to the requests If-Modified-Since value. Can be None if not
supplied or malformed.
"""
return self._if_modified_since
@property
def PageNode(self):
"""
The PyXWF node responsible for serving the page.
"""
return self._pagenode
@PageNode.setter
[docs] def PageNode(self, value):
self._pagenode = value
@property
[docs] def LastModified(self):
"""
Current Last-Modified value based on the used resources (see
:meth:`use_resource`). This will return None if no resources are used
or one of them exposes a None value as LastModified (which implies
uncachability).
"""
return self._last_modified
@property
def CanUseXHTML(self):
"""
Whether XHTML can be interpreted by the client. This is by default False
and shall be identified by the request headers sent by the requesting
entity.
If this is False, the application handling the request represented by
this Context, must not send XHTML responses.
"""
self.add_vary("Accept")
return self._can_use_xhtml
@property
[docs] def Accept(self):
"""
Return the contents of the HTTP ``Accept`` header as
:class:`~PyXWF.Accept.AcceptPreferenceList`.
"""
self.add_vary("Accept")
return self._accept
@CanUseXHTML.setter
[docs] def CanUseXHTML(self, value):
self._can_use_xhtml = bool(value)
@property
def IsMobileClient(self):
"""
Return whether the user agent is a mobile phone or similar. This can be
overriden to disable mobile detection and force it to a static value.
This is for example useful to decide about mobile-suited responses
depending on the host name used in the request.
"""
self.add_vary("User-Agent")
return self._is_mobile_client
@IsMobileClient.setter
[docs] def IsMobileClient(self, value):
self._is_mobile_client = Types.Typecasts.bool(value)
@property
[docs] def HTML5Support(self):
"""
Return whether the User-Agent is supposed to support HTML5. This is
used by the site to determine whether to apply a to-html4 backtransform.
"""
self.add_vary("User-Agent")
return self._html5_support
@property
[docs] def PrefixedXHTMLSupport(self):
"""
Return whether the User-Agent is positively known to support XHTML with
namespace prefixes.
"""
self.add_vary("User-Agent")
return self._prefixed_xhtml_support
@property
[docs] def CacheControl(self):
"""
The current contents of the Cache-Control header values.
"""
return frozenset(self._cache_control)
@property
[docs] def Cookies(self):
"""
Return a dictionary mapping cookie names to :class:`~Cookie` instances.
"""
if self._cookies is None:
self.add_vary("Cookie")
self._require_cookies()
return self._cookies
@abc.abstractmethod
[docs] def send_response(self, message):
"""
Send the :class:`~PyXWF.Message.Message` object referred to by *message*
as response. This will render the filelike behind :attr:`Out` invalid
for use for writing. This must be implemented by a derived class.
"""
[docs] def get_reconstructed_uri(self, urlroot, update_query={}):
"""
Return the full URI to reconstruct the request as stored in the
Context currently. This takes into account any changes to the
QueryData for example.
"""
querydict = dict(self.QueryData)
querydict.update(update_query)
if not querydict:
query = ""
else:
query = "?" + urllib.urlencode(querydict)
path = self._path
if path and urlroot and path[0] == '/' and urlroot[-1] == '/':
urlroot = urlroot[:-1]
return "{protocol}://{host}{base}{path}{query}".format(
protocol=self.URLScheme,
host=self.HostName,
base=urlroot,
path=self._path,
query=query
)
[docs] def use_resource(self, resource):
"""
Mark the use of a given resource (which is expected to be a
:class:`~PyXWF.Resource.Resource` instance) to build the response.
This will later be regarded when calculating the Last-Modified value
of the response, and thus whether the full response needs to be
created.
The resource is also asked to recheck its Last-Modified value and reload
if neccessary, so this is a possible costy operation. However, a
resource will never be asked twice during the same request.
"""
if resource in self._used_resources:
return
self._used_resources.add(resource)
resource.threadsafe_update()
last_modified = resource.LastModified
if last_modified is not None:
if self._last_modified is not None:
self._last_modified = max(self._last_modified, last_modified)
else:
self._last_modified = last_modified
[docs] def use_resources(self, resources):
"""
Marks multiple resources for use. This requires *resources* to be an
iterable of Resources.
"""
collections.deque(map(self.use_resource, resources), 0)
[docs] def iter_resources(self):
"""
Returns an iterator over the resources used to build the response.
"""
return iter(self._used_resources)
[docs] def check_not_modified(self):
"""
Check whether the current Last-Modified value (based on the used
resources, see :meth:`use_resource`) is older or equal to the
If-Modified-Since value.
If so, and if caching is not disabled, a
:class:`~PyXWF.Errors.NotModified` is thrown.
"""
if not self.Cachable:
return
last_modified = self.LastModified
if last_modified is None:
return
if self.IfModifiedSince is None:
return
self.add_vary("If-Modified-Since")
if self.LastModified <= self.IfModifiedSince:
raise Errors.NotModified()
[docs] def check_acceptable(self, content_type):
"""
Check whether the given *content_type* (which must be either a
:class:`basestring` or a :class:`~PyXWF.AcceptHeaders.Preference`
instance) is acceptable by the client.
Raise :class:`~PyXWF.Errors.NotAcceptable` if not.
"""
if self._accept is None:
return
if len(self._accept) == 0:
return
if isinstance(content_type, basestring):
content_type = AcceptHeaders.AcceptPreference.from_header_section(content_type)
if self._accept.get_quality(content_type) <= 0.:
raise Errors.NotAcceptable()
[docs] def add_cache_control(self, token):
"""
Add *token* to the set of Cache-Control HTTP tokens. Token must be a
valid Cache-Control response value according to HTTP/1.1 (this is not
enforced though (yet)).
"""
self._cache_control.add(token.lower())
[docs] def add_vary(self, field_name):
"""
Add a HTTP header field name to the Vary HTTP response. *field_name* must
be a valid HTTP/1.1 header name and will be lower-cased.
"""
self._vary.add(field_name.lower())
[docs] def set_response_content_type(self, mimetype, charset):
"""
Set the content type of the response according to *mimetype* and
*charset*. Charset may be :data:`None` or the empty string if it should
be omitted.
"""
if charset:
self.set_response_header(b"Content-Type", b"{0}; charset={1}".format(mimetype, charset))
else:
self.set_response_header(b"Content-Type", str(mimetype))
[docs] def send_empty_response(self, status):
"""
Send an empty response with the HTTP status *status*. *status* may be
either a :class:`~PyXWF.Errors.HTTPStatusBase` descendant class or
instance.
"""
return self.send_response(Message.EmptyMessage(status=status))
[docs] def set_cookie(self, cookie):
"""
Add the given :class:`~Cookie` *cookie* to the list of cookies to be
sent with the response.
"""
self._response_cookies.append(cookie)