Source code for ucloud.core.transport._requests

import time
import typing
import requests
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from ucloud.core.transport import http
from ucloud.core.transport.http import Request, Response, SSLOption
from ucloud.core.utils.middleware import Middleware
from ucloud.core import exc


[docs]class RequestsTransport(http.Transport): """transport is the implementation of http client, use for send a request and return a http response :type max_retries: int :param max_retries: max retries is the max number of transport request when occur http error :type backoff_factor: float :param backoff_factor: backoff factor will calculate the backoff delay during retrying, the backoff delay = {backoff factor} * (2 ^ ({number of total retries} - 1)) :type status_forcelist: tuple :param status_forcelist: the status code list that could be retried """ def __init__( self, max_retries: int = 3, backoff_factor: float = 0.3, status_forcelist: typing.Tuple[int] = (500, 502, 504), ): self.max_retries = max_retries self.backoff_factor = backoff_factor self.status_forcelist = status_forcelist self._adapter = self._load_adapter(max_retries) self._middleware = Middleware()
[docs] def send(self, req: Request, **options: typing.Any) -> http.Response: """send request and return the response :param req: the full http request descriptor :return: the response of http request """ for handler in self.middleware.request_handlers: req = handler(req) try: resp = self._send(req, **options) except Exception as e: for handler in self.middleware.exception_handlers: handler(e) raise e for handler in self.middleware.response_handlers: resp = handler(resp) return resp
@property def middleware(self) -> Middleware: """the middleware object, see :mod: :return: the transport middleware """ return self._middleware def _send(self, req: Request, **options: typing.Any) -> requests.Response: with requests.Session() as session: adapter = self._load_adapter(options.get("max_retries")) session.mount("http://", adapter=adapter) session.mount("https://", adapter=adapter) ssl_option = options.get("ssl_option") kwargs = self._build_ssl_option(ssl_option) if ssl_option else {} req.request_time = time.time() session_resp = session.request( method=req.method.upper(), url=req.url, json=req.json, data=req.data, params=req.params, headers=req.headers, timeout=options.get("timeout"), **kwargs ) resp = self.convert_response(session_resp) resp.request = req resp.response_time = time.time() if resp.status_code >= 400: raise exc.HTTPStatusException( resp.status_code, resp.request_uuid ) return resp @staticmethod def _build_ssl_option(ssl_option): kwargs = {"verify": ssl_option.ssl_verify and ssl_option.ssl_cacert} if not ssl_option.ssl_cert: return kwargs if ssl_option.ssl_key: kwargs["cert"] = (ssl_option.ssl_cert, ssl_option.ssl_key) else: kwargs["cert"] = ssl_option.ssl_cert return kwargs def _load_adapter( self, max_retries: typing.Optional[int] = None ) -> HTTPAdapter: if max_retries is None and self._adapter is not None: return self._adapter max_retries = max_retries or 0 adapter = HTTPAdapter() adapter.max_retries = Retry( total=max_retries, read=max_retries, connect=max_retries, backoff_factor=self.backoff_factor, status_forcelist=self.status_forcelist, ) return adapter @staticmethod def convert_response(r: requests.Response) -> Response: return Response( url=r.url, method=r.request.method, status_code=r.status_code, reason=r.reason, headers=r.headers, content=r.content, encoding=r.encoding or r.apparent_encoding, )