Source code for sans.response
from __future__ import annotations
from email.message import Message
from itertools import chain
from typing import Any, AsyncIterator, Iterator
from xml.etree.ElementTree import Element
import httpx
from .decoder import GZipDecoder, XMLChunker, XMLDecoder
from .errors import narrow
try:
import xmltodict
HAS_XMLTODICT = True
except ImportError:
HAS_XMLTODICT = False
try:
from lxml.etree import _Element
from lxml.objectify import ObjectifiedElement
HAS_LXML = True
except ImportError:
HAS_LXML = False
else:
from .decoder import LXMLDecoder, ObjectifyDecoder
__all__ = ["Response"]
class Response(httpx.Response):
@property
def content_type(self) -> str:
if not hasattr(self, "_content_type"):
message = Message()
content_type = self.headers.get("Content-Type")
if content_type:
message["Content-Type"] = self.headers.get("Content-Type")
self._content_type = message.get_content_type()
return self._content_type
[docs] def iter_gzip(self) -> Iterator[bytes]:
decoder = GZipDecoder()
yield from map(decoder.decode, self.iter_bytes())
yield decoder.flush()
[docs] async def aiter_gzip(self) -> AsyncIterator[bytes]:
decoder = GZipDecoder()
async for chunk in self.aiter_bytes():
yield decoder.decode(chunk)
yield decoder.flush()
if HAS_XMLTODICT:
[docs] def json(self, **kwargs: Any) -> dict[str, Any]:
if self.content_type.endswith("/xml"):
return xmltodict.parse(self.content, encoding=self.encoding, **kwargs)
return super().json(**kwargs)
@property
def xml(self) -> Element:
if not hasattr(self, "_xml"):
content = self.content
decoder = XMLDecoder(self.encoding)
decoder.decode(content)
self._xml = decoder.flush()
return self._xml
[docs] def iter_xml(self) -> Iterator[Element]:
decoder = XMLChunker(encoding=self.encoding)
chunker = (
self.iter_gzip()
if self.content_type.endswith(("/x-gzip", "/gzip"))
else self.iter_bytes()
)
yield from chain.from_iterable(map(decoder.decode, chunker))
yield from decoder.flush()
[docs] async def aiter_xml(self) -> AsyncIterator[Element]:
decoder = XMLChunker(encoding=self.encoding)
chunker = (
self.aiter_gzip()
if self.content_type.endswith(("/x-gzip", "/gzip"))
else self.aiter_bytes()
)
async for chunk in chunker:
for element in decoder.decode(chunk):
yield element
for element in decoder.flush():
yield element
if HAS_LXML:
@property
def lxml(self) -> _Element:
if not hasattr(self, "_lxml"):
content = self.content
decoder = LXMLDecoder(self.encoding)
decoder.decode(content)
self._lxml = decoder.flush()
return self._lxml
@property
def objectified(self) -> ObjectifiedElement:
if not hasattr(self, "_objectified"):
content = self.content
decoder = ObjectifyDecoder(self.encoding)
decoder.decode(content)
self._objectified = decoder.flush()
return self._objectified
[docs] def raise_for_status(self) -> None:
try:
return super().raise_for_status()
except httpx.HTTPStatusError as exc:
raise narrow(exc).with_traceback(exc.__traceback__) from None