Source code for ucloud.services.stepflow.client
""" Code is generated by ucloud-model, DO NOT EDIT IT. """
import typing
from ucloud.core.client import Client
from ucloud.services.stepflow.schemas import apis
[docs]class StepFlowClient(Client):
def __init__(
self, config: dict, transport=None, middleware=None, logger=None
):
super(StepFlowClient, self).__init__(
config, transport, middleware, logger
)
[docs] def create_sf_workflow_from_template(
self, req: typing.Optional[dict] = None, **kwargs
) -> dict:
"""CreateSFWorkflowFromTemplate - 导入工作流定义
**Request**
- **ProjectId** (str) - (Config) 项目ID。不填写为默认项目,子帐号必须填写。 请参考 `GetProjectList接口 <https://docs.ucloud.cn/api/summary/get_project_list.html>`_
- **Region** (str) - (Config) 地域。 参见 `地域和可用区列表 <https://docs.ucloud.cn/api/summary/regionlist.html>`_
- **Namespace** (str) - (Required) 需要创建的工作流namespace
- **Workflow** (str) - (Required) 描述工作流定义的base64字符串
- **WorkflowName** (str) - (Required) 需要创建的工作流名称
**Response**
- **Message** (str) - 返回消息
- **Version** (int) - 创建的工作流版本号
"""
# build request
d = {"ProjectId": self.config.project_id, "Region": self.config.region}
req and d.update(req)
d = apis.CreateSFWorkflowFromTemplateRequestSchema().dumps(d)
# build options
kwargs["max_retries"] = 0 # ignore retry when api is not idempotent
resp = self.invoke("CreateSFWorkflowFromTemplate", d, **kwargs)
return apis.CreateSFWorkflowFromTemplateResponseSchema().loads(resp)
[docs] def get_sf_workflow_template(
self, req: typing.Optional[dict] = None, **kwargs
) -> dict:
"""GetSFWorkflowTemplate - 导出工作流定义
**Request**
- **ProjectId** (str) - (Config) 项目ID。不填写为默认项目,子帐号必须填写。 请参考 `GetProjectList接口 <https://docs.ucloud.cn/api/summary/get_project_list.html>`_
- **Region** (str) - (Config) 地域。 参见 `地域和可用区列表 <https://docs.ucloud.cn/api/summary/regionlist.html>`_
- **WorkflowId** (str) - (Required) 被导出工作流的Id
- **WorkflowVersion** (int) - 被导出工作流的版本号。取值范围:WorkflowVersion >= 1;默认会获取发布版本对应的workflow;超过最大版本会返回错误
**Response**
- **Message** (str) - 返回消息
- **Version** (int) - 导出工作流的版本号
- **Workflow** (dict) - 见 **WorkflowTemplate** 模型定义
- **WorkflowId** (str) - 导出工作流的Id
**Response Model**
**Param**
- **Name** (str) - 参数名称
- **Type** (str) - 参数类型
- **Value** (str) - 参数值
**ActivityTemplate**
- **Input** (dict) - Activity的输入
- **Name** (str) - Activity的名字
- **Next** (str) - 下一个Activity的名字
- **Output** (list) - Activity的输出,详见Param
- **RetryTimes** (str) - Activity的重试次数
- **Timeout** (str) - Activity的超时时间
- **Type** (str) - Activity的类型
**WorkflowTemplate**
- **Activites** (list) - 见 **ActivityTemplate** 模型定义
- **Input** (list) - 见 **Param** 模型定义
- **Output** (list) - 见 **Param** 模型定义
"""
# build request
d = {"ProjectId": self.config.project_id, "Region": self.config.region}
req and d.update(req)
d = apis.GetSFWorkflowTemplateRequestSchema().dumps(d)
resp = self.invoke("GetSFWorkflowTemplate", d, **kwargs)
return apis.GetSFWorkflowTemplateResponseSchema().loads(resp)