Source code for ucloud.services.stepflow.client

""" Code is generated by ucloud-model, DO NOT EDIT IT. """

import typing


from ucloud.core.client import Client
from ucloud.services.stepflow.schemas import apis


[docs]class StepFlowClient(Client): def __init__( self, config: dict, transport=None, middleware=None, logger=None ): super(StepFlowClient, self).__init__( config, transport, middleware, logger )
[docs] def create_sf_workflow_from_template( self, req: typing.Optional[dict] = None, **kwargs ) -> dict: """CreateSFWorkflowFromTemplate - 导入工作流定义 **Request** - **ProjectId** (str) - (Config) 项目ID。不填写为默认项目,子帐号必须填写。 请参考 `GetProjectList接口 <https://docs.ucloud.cn/api/summary/get_project_list.html>`_ - **Region** (str) - (Config) 地域。 参见 `地域和可用区列表 <https://docs.ucloud.cn/api/summary/regionlist.html>`_ - **Namespace** (str) - (Required) 需要创建的工作流namespace - **Workflow** (str) - (Required) 描述工作流定义的base64字符串 - **WorkflowName** (str) - (Required) 需要创建的工作流名称 **Response** - **Message** (str) - 返回消息 - **Version** (int) - 创建的工作流版本号 """ # build request d = {"ProjectId": self.config.project_id, "Region": self.config.region} req and d.update(req) d = apis.CreateSFWorkflowFromTemplateRequestSchema().dumps(d) # build options kwargs["max_retries"] = 0 # ignore retry when api is not idempotent resp = self.invoke("CreateSFWorkflowFromTemplate", d, **kwargs) return apis.CreateSFWorkflowFromTemplateResponseSchema().loads(resp)
[docs] def get_sf_workflow_template( self, req: typing.Optional[dict] = None, **kwargs ) -> dict: """GetSFWorkflowTemplate - 导出工作流定义 **Request** - **ProjectId** (str) - (Config) 项目ID。不填写为默认项目,子帐号必须填写。 请参考 `GetProjectList接口 <https://docs.ucloud.cn/api/summary/get_project_list.html>`_ - **Region** (str) - (Config) 地域。 参见 `地域和可用区列表 <https://docs.ucloud.cn/api/summary/regionlist.html>`_ - **WorkflowId** (str) - (Required) 被导出工作流的Id - **WorkflowVersion** (int) - 被导出工作流的版本号。取值范围:WorkflowVersion >= 1;默认会获取发布版本对应的workflow;超过最大版本会返回错误 **Response** - **Message** (str) - 返回消息 - **Version** (int) - 导出工作流的版本号 - **Workflow** (dict) - 见 **WorkflowTemplate** 模型定义 - **WorkflowId** (str) - 导出工作流的Id **Response Model** **Param** - **Name** (str) - 参数名称 - **Type** (str) - 参数类型 - **Value** (str) - 参数值 **ActivityTemplate** - **Input** (dict) - Activity的输入 - **Name** (str) - Activity的名字 - **Next** (str) - 下一个Activity的名字 - **Output** (list) - Activity的输出,详见Param - **RetryTimes** (str) - Activity的重试次数 - **Timeout** (str) - Activity的超时时间 - **Type** (str) - Activity的类型 **WorkflowTemplate** - **Activites** (list) - 见 **ActivityTemplate** 模型定义 - **Input** (list) - 见 **Param** 模型定义 - **Output** (list) - 见 **Param** 模型定义 """ # build request d = {"ProjectId": self.config.project_id, "Region": self.config.region} req and d.update(req) d = apis.GetSFWorkflowTemplateRequestSchema().dumps(d) resp = self.invoke("GetSFWorkflowTemplate", d, **kwargs) return apis.GetSFWorkflowTemplateResponseSchema().loads(resp)