专栏名称: 阿里云云原生
发布云原生技术最新资讯、汇集云原生技术最全内容,定期举办云原生活动、直播,阿里产品及用户最佳实践发布。与你并肩探索云原生技术点滴,分享你需要的云原生内容。
目录
相关文章推荐
曲线猎手  ·  2025年融资融券的最低利率【最低3.0%】 ·  昨天  
壹股经  ·  调整中寻机会! ·  昨天  
壹股经  ·  调整中寻机会! ·  昨天  
银行家杂志  ·  中国外贸信托卫濛濛:提升信托服务质效 ... ·  4 天前  
淘股吧  ·  DeepSeek,大消息! ·  3 天前  
淘股吧  ·  牛市二波!又要吹爆了...... ·  3 天前  
51好读  ›  专栏  ›  阿里云云原生

YAML 焦虑再见:PythonSDK 助力大规模 Argo Workflows 构建

阿里云云原生  · 公众号  ·  · 2024-06-30 09:00

正文

Argo Workflows 是一个开源的工作流管理系统,专为 Kubernetes 设计,旨在帮助用户创建和运行复杂的工作流程。它允许用户定义一系列的任务,这些任务可以按照特定的顺序执行,也可以设置任务间的依赖关系,从而实现自动化的工作流程编排。

使用 Argo Workflows 的场景非常广泛,包括定时任务、机器学习、仿真计算、科学计算、ETL数据处理、模型训练、CI/CD 等。

Argo Workflows 默认使用 YAML 格式进行编排,对于初次接触或者不熟悉 YAML 格式及 Argo Workflows 的人来说,使用 YAML 来编排复杂的工作流可能会显得有些挑战性。YAML 虽然简洁且易于阅读,但是编写大型或复杂的工作流配置时,确实可能因为其严格的缩进规则和较为繁琐的结构而显得有些棘手。

Hera 是一个用于构建和提交 Argo 工作流程的 Python SDK 框架,其主要目标是简化工作流程的构建和提交,尤其是对于数据科学家,使用 Python 能更好的兼容平时的使用习惯,克服 YAML 的阻碍。使用 Hera PythonSDK 具有以下优势:

1) 简洁性: 编写代码简短易懂,大大提高编写效率。
2) 支持复杂工作流: 在编写复杂工作流时,如果用 YAML 进行编辑的话,容易出现语法问题。
3) Python 生态集成: 每个 Function 就是一个 Template,非常容易和 Python 生态的框架进行集成。
4) 可测试性: 能够直接使用 Python 测试框架来提升代码质量。

ACK One Serverless Argo 工作流集群托管了 Argo Workflow,本文将介绍使用如何使用 Hera 和 ACK One Serveless Argo 集群进行交互,其架构如下所示:

01

开通 Argo 工作流

集群 并获取访问认证 Token

Cloud Native

参考链接:

1)创建工作流集群:

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/create-a-workflow-cluster?spm=a2c4g.11186623.0.i2

2)开通 Argo Server:

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/enable-argo-server-for-a-workflow-cluster?spm=a2c4g.11186623.0.0.3548463fxRw5sf

3)开通 Argo Server 公网访问(专线用户可选):

https://help.aliyun.com/zh/ack/distributed-cloud-container-platform-for-kubernetes/user-guide/enable-public-access-to-the-argo-server?spm=a2c4g.11186623.0.0.467217ea6qBU6z

4)创建并获取集群 Token:

kubectl create token default -n default
02

开启 Hera PythonSDK 之旅

Cloud Native


1) 安装 Hera

安装 Hera 非常简便,只需一条命令:

pip install hera-workflows

2) 编写并提交 Workflows

在 Argo Workflows 中,DAG(有向无环图)是一种常用的方式来定义复杂的任务依赖关系,其中"Diamond"结构是指一个常见的工作流模式,其中两个或多个任务并行执行后,它们的结果汇聚到一个共同的后续任务。这种结构在需要合并不同数据流或处理结果的场景中非常有用。

下面是一个具体的示例,展示如何使用 Hera 定义一个具有"Diamond"结构的工作流,即两个任务 taskA 和 taskB 并行运行,它们的输出都作为输入给到 taskC:

a. Simple DAG diamond

# 导入相关包from hera.workflows import DAG, Workflow, scriptfrom hera.shared import global_configimport urllib3
urllib3.disable_warnings()
# 配置访问地址和tokenglobal_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"global_config.token = "abcdefgxxxxxx" # 填入之前获取的tokenglobal_config.verify_ssl = ""
# 装饰器函数script是 Hera 实现近乎原生的 Python 函数编排的关键功能。# 它允许您在 Hera 上下文管理器(例如Workflow或Steps上下文)下调用该函数,# 该函数在任何 Hera 上下文之外仍将正常运行,这意味着您可以在给定函数上编写单元测试。# 该示例是打印输入的信息。@script()def echo(message: str): print(message)
# 构建workflow,Workflow是 Argo 中的主要资源,# 也是 Hera 的关键类,负责保存模板、设置入口点和运行模板。with Workflow( generate_name="dag-diamond-", entrypoint="diamond",) as w: with DAG(name="diamond"): A = echo(name="A", arguments={"message": "A"}) # 构建template B = echo(name="B", arguments={"message": "B"}) C = echo(name="C", arguments={"message": "C"}) D = echo(name="D", arguments={"message": "D"}) A >> [B, C] >> D # 构建依赖关系,B、C任务依赖A,D依赖B和C# 创建workfloww.create()
提交工作流:
python simpleDAG.py

在控制台查看工作流运行状态,可以看到任务运行成功:

b. Map-Reduce

在 Argo Workflows 中实现 MapReduce 风格的数据处理,关键在于如何有效利用其 DAG(有向无环图)模板来组织和协调多个任务,从而模拟 Map 和 Reduce 阶段。

以下是一个更加详细的示例,展示了如何使用 Hera 构建一个简单的 MapReduce 工作流,用于处理文本文件的单词计数任务,其中每一步都是一个 Python 函数,可以非常容易和 Python 生态进行集成。

from hera.workflows import DAG, Artifact, NoneArchiveStrategy, Parameter, OSSArtifact, Workflow, scriptfrom hera.shared import global_configimport urllib3
urllib3.disable_warnings()# 设置访问地址global_config.host = "https://argo.{{clusterid}}.{{region-id}}.alicontainer.com:2746"global_config.token = "abcdefgxxxxxx" # 填入之前获取的tokenglobal_config.verify_ssl = ""
# 使用script装饰函数时,将script参数传递给script装饰器。这包括image、inputs、outputs、resources等。@script( image="python:alpine3.6", inputs=Parameter(name="num_parts"), outputs=OSSArtifact(name="parts", path="/mnt/out", archive=NoneArchiveStrategy(), key="{{workflow.name}}/parts"),)def split(num_parts: int) -> None: # 根据输入参数num_parts创建多个文件,文件中写入foo字符和parts编号 import json import os import sys
os.mkdir("/mnt/out")
part_ids = list(map(lambda x: str(x), range(num_parts))) for i, part_id in enumerate(part_ids, start=1): with open("/mnt/out/" + part_id + ".json", "w") as f: json.dump({"foo": i}, f) json.dump(part_ids, sys.stdout)
# script中定义image、inputs、outputs@script( image="python:alpine3.6", inputs=[Parameter(name="part_id", value="0"), Artifact(name="part", path="/mnt/in/part.json"),], outputs=OSSArtifact( name="part", path="/mnt/out/part.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/results/{{inputs.parameters.part_id}}.json", ),)def map_() -> None: # 根据文件中foo字符的个数,生成新文件,将foo内容parts编号乘以2,写入bar内容 import json import os
os.mkdir("/mnt/out") with open("/mnt/in/part.json") as f: part = json.load(f) with open("/mnt/out/part.json", "w") as f: json.dump({"bar": part["foo"] * 2}, f)
# script中定义image、inputs、outputs、resources@script( image="python:alpine3.6", inputs=OSSArtifact(name="results", path="/mnt/in", key="{{workflow.name}}/results"), outputs=OSSArtifact( name="total", path="/mnt/out/total.json", archive=NoneArchiveStrategy(), key="{{workflow.name}}/total.json" ),)def reduce() -> None: # 计算每个parts的bar的值的总和。 import json import os
os.mkdir("/mnt/out")
total = 0






请到「今天看啥」查看全文