Source code for ucloud.core.transport._requests

# -*- coding: utf-8 -*-

import time
import requests
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from ucloud.core.transport import http
from ucloud.core.transport.http import Request, Response, SSLOption
from ucloud.core.utils.middleware import Middleware


[docs]class RequestsTransport(http.Transport): """ transport is the implementation of http client, use for send a request and return a http response :type max_retries: int :param max_retries: max retries is the max number of transport request when occur http error :type backoff_factor: float :param backoff_factor: backoff factor will calculate the backoff delay during retrying, the backoff delay = {backoff factor} * (2 ^ ({number of total retries} - 1)) :type status_forcelist: tuple :param status_forcelist: the status code list that could be retried """ def __init__( self, max_retries=3, backoff_factor=0.3, status_forcelist=(500, 502, 504), ): self.max_retries = max_retries self.backoff_factor = backoff_factor self.status_forcelist = status_forcelist self._adapter = self._load_adapter(max_retries) self._middleware = Middleware()
[docs] def send(self, req, **options): """ send request and return the response :param req: the full http request descriptor :return: the response of http request """ for handler in self.middleware.request_handlers: req = handler(req) try: resp = self._send(req, **options) except Exception as e: for handler in self.middleware.exception_handlers: handler(e) raise e for handler in self.middleware.response_handlers: resp = handler(resp) return resp
@property def middleware(self): """ the middleware object, see :mod: :return: the transport middleware """ return self._middleware def _send(self, req, **options): with requests.Session() as session: adapter = self._load_adapter(options.get("max_retries")) session.mount("http://", adapter=adapter) session.mount("https://", adapter=adapter) ssl_option = options.get("ssl_option") kwargs = self._build_ssl_option(ssl_option) if ssl_option else {} req.request_time = time.time() session_resp = session.request( method=req.method.upper(), url=req.url, json=req.json, data=req.data, params=req.params, headers=req.headers, **kwargs ) resp = self.convert_response(session_resp) resp.request = req resp.response_time = time.time() return resp @staticmethod def _build_ssl_option(ssl_option): kwargs = {"verify": ssl_option.ssl_verify and ssl_option.ssl_cacert} if not ssl_option.ssl_cert: return kwargs if ssl_option.ssl_key: kwargs["cert"] = ssl_option.ssl_cert, ssl_option.ssl_key else: kwargs["cert"] = ssl_option.ssl_cert return kwargs def _load_adapter(self, max_retries=None): if max_retries is None and self._adapter is not None: return self._adapter max_retries = max_retries or 0 adapter = HTTPAdapter() adapter.max_retries = Retry( total=max_retries, read=max_retries, connect=max_retries, backoff_factor=self.backoff_factor, status_forcelist=self.status_forcelist, ) return adapter @staticmethod def convert_response(r): return Response( url=r.url, method=r.request.method, status_code=r.status_code, reason=r.reason, headers=r.headers, content=r.content, encoding=r.encoding or r.apparent_encoding, )