Source code for ucloud.core.transport._requests
# -*- coding: utf-8 -*-
import time
import requests
from urllib3.util.retry import Retry
from requests.adapters import HTTPAdapter
from ucloud.core.transport import http
from ucloud.core.transport.http import Request, Response, SSLOption
from ucloud.core.utils.middleware import Middleware
[docs]class RequestsTransport(http.Transport):
""" transport is the implementation of http client, use for send a request and return a http response
:type max_retries: int
:param max_retries: max retries is the max number of transport request when occur http error
:type backoff_factor: float
:param backoff_factor: backoff factor will calculate the backoff delay during retrying,
the backoff delay = {backoff factor} * (2 ^ ({number of total retries} - 1))
:type status_forcelist: tuple
:param status_forcelist: the status code list that could be retried
"""
def __init__(
self,
max_retries=3,
backoff_factor=0.3,
status_forcelist=(500, 502, 504),
):
self.max_retries = max_retries
self.backoff_factor = backoff_factor
self.status_forcelist = status_forcelist
self._adapter = self._load_adapter(max_retries)
self._middleware = Middleware()
[docs] def send(self, req, **options):
""" send request and return the response
:param req: the full http request descriptor
:return: the response of http request
"""
for handler in self.middleware.request_handlers:
req = handler(req)
try:
resp = self._send(req, **options)
except Exception as e:
for handler in self.middleware.exception_handlers:
handler(e)
raise e
for handler in self.middleware.response_handlers:
resp = handler(resp)
return resp
@property
def middleware(self):
""" the middleware object, see :mod:
:return: the transport middleware
"""
return self._middleware
def _send(self, req, **options):
with requests.Session() as session:
adapter = self._load_adapter(options.get("max_retries"))
session.mount("http://", adapter=adapter)
session.mount("https://", adapter=adapter)
ssl_option = options.get("ssl_option")
kwargs = self._build_ssl_option(ssl_option) if ssl_option else {}
req.request_time = time.time()
session_resp = session.request(
method=req.method.upper(),
url=req.url,
json=req.json,
data=req.data,
params=req.params,
headers=req.headers,
**kwargs
)
resp = self.convert_response(session_resp)
resp.request = req
resp.response_time = time.time()
return resp
@staticmethod
def _build_ssl_option(ssl_option):
kwargs = {"verify": ssl_option.ssl_verify and ssl_option.ssl_cacert}
if not ssl_option.ssl_cert:
return kwargs
if ssl_option.ssl_key:
kwargs["cert"] = ssl_option.ssl_cert, ssl_option.ssl_key
else:
kwargs["cert"] = ssl_option.ssl_cert
return kwargs
def _load_adapter(self, max_retries=None):
if max_retries is None and self._adapter is not None:
return self._adapter
max_retries = max_retries or 0
adapter = HTTPAdapter()
adapter.max_retries = Retry(
total=max_retries,
read=max_retries,
connect=max_retries,
backoff_factor=self.backoff_factor,
status_forcelist=self.status_forcelist,
)
return adapter
@staticmethod
def convert_response(r):
return Response(
url=r.url,
method=r.request.method,
status_code=r.status_code,
reason=r.reason,
headers=r.headers,
content=r.content,
encoding=r.encoding or r.apparent_encoding,
)