Argo Workflows 是一个开源的工作流管理系统,专为 Kubernetes 设计,旨在帮助用户创建和运行复杂的工作流程。它允许用户定义一系列的任务,这些任务可以按照特定的顺序执行,也可以设置任务间的依赖关系,从而实现自动化的工作流程编排。
使用 Argo Workflows 的场景非常广泛,包括定时任务、机器学习、仿真计算、科学计算、ETL数据处理、模型训练、CI/CD 等。
Argo Workflows 默认使用 YAML 格式进行编排,对于初次接触或者不熟悉 YAML 格式及 Argo Workflows 的人来说,使用 YAML 来编排复杂的工作流可能会显得有些挑战性。YAML 虽然简洁且易于阅读,但是编写大型或复杂的工作流配置时,确实可能因为其严格的缩进规则和较为繁琐的结构而显得有些棘手。
Hera 是一个用于构建和提交 Argo 工作流程的 Python SDK 框架,其主要目标是简化工作流程的构建和提交,尤其是对于数据科学家,使用 Python 能更好的兼容平时的使用习惯,克服 YAML 的阻碍。使用 Hera PythonSDK 具有以下优势:
1) 简洁性:
编写代码简短易懂,大大提高编写效率。
2) 支持复杂工作流:
在编写复杂工作流时,如果用 YAML 进行编辑的话,容易出现语法问题。
3) Python 生态集成:
每个 Function 就是一个 Template,非常容易和 Python 生态的框架进行集成。
4) 可测试性:
能够直接使用 Python 测试框架来提升代码质量。
ACK One Serverless Argo 工作流集群托管了 Argo Workflow,本文将介绍使用如何使用 Hera 和 ACK One Serveless Argo 集群进行交互,其架构如下所示:
开通 Argo 工作流
集群
并获取访问认证 Token
参考链接:
https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/create-a-workflow-cluster?spm=a2c4g.11186623.0.i2
https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/enable-argo-server-for-a-workflow-cluster?spm=a2c4g.11186623.0.0.3548463fxRw5sf
3)开通 Argo Server 公网访问(专线用户可选):
https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/enable-public-access-to-the-argo-server?spm=a2c4g.11186623.0.0.467217ea6qBU6z
4)创建并获取集群 Token:
kubectl create token default -n default
安装 Hera 非常简便,只需一条命令:
pip install hera-workflows
在 Argo Workflows 中,DAG(有向无环图)是一种常用的方式来定义复杂的任务依赖关系,其中"Diamond"结构是指一个常见的工作流模式,其中两个或多个任务并行执行后,它们的结果汇聚到一个共同的后续任务。这种结构在需要合并不同数据流或处理结果的场景中非常有用。
下面是一个具体的示例,展示如何使用 Hera 定义一个具有"Diamond"结构的工作流,即两个任务 taskA 和 taskB 并行运行,它们的输出都作为输入给到 taskC:
a. Simple DAG diamond
from hera.workflows import DAG, Workflow, script
from hera.shared import global_config
import urllib3
urllib3.disable_warnings()
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx"
global_config.verify_ssl = ""
@script()
def echo(message: str):
print(message)
with Workflow(
generate_name="dag-diamond-",
entrypoint="diamond",
) as w:
with DAG(name="diamond"):
A = echo(name="A", arguments={"message": "A"})
B = echo(name="B", arguments={"message": "B"})
C = echo(name="C", arguments={"message": "C"})
D = echo(name="D", arguments={"message": "D"})
A >> [B, C] >> D
w.create()
在控制台查看工作流运行状态,可以看到任务运行成功:
b. Map-Reduce
在 Argo Workflows 中实现 MapReduce 风格的数据处理,关键在于如何有效利用其 DAG(有向无环图)模板来组织和协调多个任务,从而模拟 Map 和 Reduce 阶段。
以下是一个更加详细的示例,展示了如何使用 Hera 构建一个简单的 MapReduce 工作流,用于处理文本文件的单词计数任务,其中每一步都是一个 Python 函数,可以非常容易和 Python 生态进行集成。
from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, script
from hera.shared import global_config
import urllib3
urllib3.disable_warnings()
global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"
global_config.token = "abcdefgxxxxxx"
global_config.verify_ssl = ""
@script(
image="python:alpine3.6",
inputs=Parameter(name="num_parts"),
outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),
)
def split(num_parts: int) -> None:
import json
import os
import sys
os.mkdir("/mnt/out")
part_ids = list(map(lambda x: str(x), range(num_parts)))
for i, part_id in enumerate(part_ids, start=1):
with open("/mnt/out/" + part_id + ".json", "w") as f:
json.dump({"foo": i}, f)
json.dump(part_ids, sys.stdout)
@script(
image="python:alpine3.6",
inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),],
outputs=OSSArtifact(
name="part",
path="/mnt/out/part.json",
archive=NoneArchiveStrategy(),
key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json",
),
)
def map_() -> None:
import json
import os
os.mkdir("/mnt/out")
with open("/mnt/in/part.json") as f:
part = json.load(f)
with open("/mnt/out/part.json", "w") as f:
json.dump({"bar": part["foo"] * 2}, f)
@script(
image="python:alpine3.6",
inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"),
outputs=OSSArtifact(
name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json"
),
)
def reduce() -> None:
import json
import os
os.mkdir("/mnt/out")
total = 0