专栏名称: 阿里云大数据AI平台
阿里云大数据AI平台依托阿里领先的云基础设施、大数据和AI工程能力、场景算法技术和多年行业实践,一站式地为企业和开发者提供云原生的大数据和AI能力体系。帮助提升AI应用开发效率,促进AI在产业中规模化落地,激发业务价值。
目录
相关文章推荐
51好读  ›  专栏  ›  阿里云大数据AI平台

最佳实践 | 在 EMR Serverless Spark 中实现 StarRocks 读写操作

阿里云大数据AI平台  · 公众号  ·  · 2025-03-20 09:06

正文

请到「今天看啥」查看全文


EMR Serverless Spark 是一款兼容开源 Spark 的高性能 Lakehouse 产品。它为用户提供任务开发、调试、发布、调度和运维等全方位的产品化服务,显著简化了大数据计算的工作流程,使用户能更专注于数据分析和价值提炼。
StarRocks官方提供了Spark Connector用于Spark和StarRocks之间的数据读写,EMR Serverless Spark可以在开发时添加对应的配置连接StarRocks。本文为您介绍在EMR Serverless Spark中实现StarRocks的读取和写入操作。
EMR Serverless Spark 新用户可 免费领取 1000 CU*小时 资源包,快速体验 ETL 开发、任务调度、数据查询与分析全流程。 (链接:https://www.aliyun.com/product/emr/getting-started?utm_content=g_1000402199)

前提条件

  • 已创建Serverless Spark工作空间,详情请参见 创建工作空间 (链接:https://x.sm.cn/AN7vPD2)
  • 已创建EMR Serverless StarRocks实例,详情请 参见创建实例 (链接:https://x.sm.cn/CAbeJiu)

使用限制

EMR Serverless Spark引擎的版本要求为esr-2.5.0、esr-3.1.0、esr-4.1.0及以上版本。

操作流程

步骤一:获取 Spark Connector JAR 并上传至OSS

1. 参见 使用Spark Connector读取数据 ,选择相应的方式下载对应版本的Spark Connector JAR。 (链接:https://x.sm.cn/v4QPaz)
例如,本文选择直接下载已经编译好的JAR,即从 Maven Central Repository 获取不同版本的Connector JAR包。 (链接:https://x.sm.cn/DLYRbXP)

说明: Connector JAR 包的命名格式为 starrocks-spark-connec tor-${spark_version}_${scala_version}-${connector_version}.jar 。例如,您使用的引擎版本为 esr-4.1.0 (Spark 3.5.2, Scala 2.12),想使用 1.1.2 版本的 Connector,则可以选择 starrocks-spark-connector-3.5_2.12-1.1.2.jar

2. 将下载的Spark Connector JAR上传至阿里云OSS中,上传操作可以参见 简单上传 (链接:https://x.sm.cn/AcdD9UZ)

步骤二:添加网络连接

1. 获取网络信息。
您可以在 EMR Serverless Starrocks 页面,进入目标StarRocks实例的 实例详情 页面,以获取该实例的专有网络和交换机信息。 (链接:https://x.sm.cn/D5dHUwu)
2. 新增网络连接。
  • EMR Serverless Spark 页面,进入目标Spark工作空间的 网络连接 页面 单击 新增网络连接 (链接:https://x.sm.cn/FLs7Nq)
  • 新增网络连接 对话框中,输入 连接名称 ,并选择之前获取到的StarRocks实例的专有网络和交换机信息,然后单击 确定
更多网络连接信息,请参见 EMR Serverless Spark与其他VPC间网络互通 (链接:https://x.sm.cn/AURlOSD)

步骤三:在 StarRocks 中创建库表

1. 连接StarRocks实例,详情请参见 通过EMR StarRocks Manager连接StarRocks实例 (链接:https://x.sm.cn/JGSgejX)
2. 在 SQL Editor 查询列表 页面,单击 文件 或者右侧区域的 图标,然后单击 确认 以新增文件。
3. 在新增的文件中输入以下SQL语句,单击 运行
CREATE DATABASE `testdb`;
CREATE TABLE `testdb`.`score_board`( `id` int(11) NOT NULL COMMENT "", `name` varchar(65533) NULL DEFAULT "" COMMENT "", `score` int(11) NOT NULL DEFAULT "0" COMMENT "")ENGINE=OLAPPRIMARY KEY(`id`)COMMENT "OLAP"DISTRIBUTED BY HASH(`id`);

通过 Serverless Spark 读写 StarRocks

方式一:使用SQL会话、Notebook会话读写StarRocks

会话类型更多介绍,请参见 会话管理 (链接:https://x.sm.cn/8MqzEJP)

SQL 会话

1. 通过Serverless Spark向StarRocks写入数据。
a. 创建SQL会话,详情请参见 管理SQL会话 (链接:https://x.sm.cn/CxvVvQR)
创建会话时,选择与 StarRocks Connector 版本对应的引擎版本,在 网络连接 中选择上一步创建好的网络连接,并在 Spark配置 中添加以下参数来加载Spark Connector。
spark.user.defined.jars  oss:///path/connector.jar
其中, oss:// /path/connector.jar 为您步骤一中上传至OSS的Spark Connector的路径。例如, oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar
b. 在 数据开发 页面,创建一个 SQL > SparkSQL 类型的任务,然后在右上角选择创建好的SQL会话。更多操作,请参见 SparkSQL开发 (链接:https://x.sm.cn/9rcGjIi)
c. 拷贝如下代码到新增的SparkSQL页签中,并根据需要修改相应的参数信息,然后单击 运行
CREATE TABLE score_boardUSING starrocksOPTIONS(  "starrocks.table.identifier" = "testdb.score_board",  "starrocks.fe.http.url" = ":",  "starrocks.fe.jdbc.url" = "jdbc:mysql://:",  "starrocks.user" = "",  "starrocks.password" = "");
INSERT INTO `score_board` VALUES (1, "starrocks", 100), (2, "spark", 100);
其中,涉及参数说明如下:
  • :Serverless StarRocks实例中FE的内网或公网地址。您可以在 实例详情 页面的 FE 详情 区域查看。
    • 如果使用内网地址,请确保在同一VPC内。
    • 如果使用公网地址,需确保安全组规则允许相应的端口通信,详情请参见 网络访问与安全设置 (链接:https://x.sm.cn/ApMsVuF)
  • :Serverless StarRocks实例中FE的HTTP端口(默认为8030)。您可以在 实例详情 页面的 FE 详情 区域查看。
  • :Serverless StarRocks实例中FE的查询端口(默认为9030)。您可以在 实例详情 页面的 FE 详情 区域查看。
  • :Serverless StarRocks实例的用户名。默认提供admin用户,具有管理员权限。您也可以通过 用户管理 页面新增用户来连接。新增用户操作,请参见 管理用户及数据授权。(链接:https://x.sm.cn/AiIP1PP)
  • :用户 对应的密码。
2. 通过Serverless Spark查询写入的数据。
在本文示例中,我们是在上述的SparkSQL任务中创建了一个临时视图 test_view ,然后通过该视图查询 score_board 表的数据。拷贝如下代码到新增的 SparkSQL页签中,选中代码后单击 运行选中
CREATE TEMPORARY VIEW test_viewUSING starrocksOPTIONS(   "starrocks.table.identifier" = "testdb.score_board",   "starrocks.fe.http.url" = ":",   "starrocks.fe.jdbc.url" = "jdbc:mysql://:",   "starrocks.user" = "",   "starrocks.password" = "");
SELECT * FROM test_view;
返回信息如下图所示。

Notebook 会话

1. 通过Serverless Spark向StarRocks写入数据。
a. 创建Notebook会话,详情请参见 管理Notebook会话 (链接:https://x.sm.cn/DZ8b4bR)
创建会话时,选择与StarRocks Connector版本对应的引擎版本,在 网络连接 中选择上一步创建好的网络连接,并在 Spark配置 中添加以下参数来加载Spark Connector。
spark.user.defined.jars  oss:///path/connector.jar
其中, oss:// /path/connector.jar 为您步骤一中上传至OSS的Spark Connector的路径。例如, oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar
b. 在 数据开发 页面,选择创建一个 Python > Notebook 类型的任务,然后在右上角选择创建的Notebook会话。
更多操作,请参见管理Notebook会话。
c. 拷贝如下代码到新增的Notebook的Python单元格中,单击 运行
# 替换为您的Serverless StarRocks配置。fe_host = ""fe_http_port = ""fe_query_port = ""user = ""password = ""
# 创建表create_table_sql = f"""CREATE TABLE score_boardUSING starrocksOPTIONS ( "starrocks.table.identifier" = "testdb.score_board", "starrocks.fe.http.url" = "{fe_host}:{fe_http_port}", "starrocks.fe.jdbc.url" = "jdbc:mysql://{fe_host}:{fe_query_port}", "starrocks.user" = "{user}", "starrocks.password" = "{password}")"""
spark.sql(create_table_sql)
#插入数据insert_data_sql = """INSERT INTO `score_board` VALUES (1, "starrocks", 100), (2, "spark", 100)"""
spark.sql(insert_data_sql)
填写示例如下图所示。
其中,涉及参数说明如下:
  • :Serverless StarRocks实例中FE的内网或公网地址。您可以在 实例详情 页面的 FE 详情 区域查看。
    • 如果使用内网地址,请确保在同一VPC内。
    • 如果使用公网地址,需确保安全组规则允许相应的端口通信,详情请参见 网络访问与安全设置 (链接:https://x.sm.cn/BMpwm94)
  • :Serverless StarRocks实例中FE的HTTP端口(默认为8030)。您可以在 实例详情 页面的 FE 详情 区域查看。
  • :Serverless StarRocks实例中FE的查询端口(默认为9030)。您可以在 实例详情 页面的 FE 详情 区域查看。
  • :Serverless StarRocks实例的用户名。默认提供admin用户,具有管理员权限。您也可以通过 用户管理 页面新增用户来连接。新增用户操作,请参见 管理用户及数据授权 (链接:https://x.sm.cn/AiIP1PP)
  • :用户 对应的密码。
2. 通过Serverless Spark查询写入的数据。
在本文示例中,我们新增一个Python单元格,在其中创建了一个临时视图 test_view ,然后通过该视图查询 score_board 表的数据。拷贝如下代码到新增的Python单元格中,然后单击 图标。
#创建viewcreate_view_sql=f"""CREATE TEMPORARY VIEW test_viewUSING starrocksOPTIONS (  "starrocks.table.identifier" = "testdb.score_board",  "starrocks.fe.http.url" = "{fe_host}:{fe_http_port}",  "starrocks.fe.jdbc.url" = "jdbc:mysql://{fe_host}:{fe_query_port}",  "starrocks.user" = "{user}",  "starrocks.password" = "{password}")"""spark.sql(create_view_sql)  #查询query_sql="SELECT * FROM test_view"result_df = spark.sql(query_sql)result_df.show()
返回信息如下图所示。

方式二:使用 Spark 批任务读写 StarRocks

1. 创建Spark批任务。
a. 在EMR Serverless Spark页面,单击左侧的 数据开发
b. 在 开发目录 页签下,单击 图标。
c. 在新建对话框中,输入 名称 ,类型选择 批任务 > SQL ,然后单击 确定
类型您可以根据实际情况进行调整,本文以SQL为例。更多类型参数介绍,请参见 Application开发 (链接:https://x.sm.cn/HUsOvJI)
2. 通过Spark批任务读写StarRocks。
a. 在新建的任务开发的右上角选择队列。
添加队列的具体操作,请参见 管理资源队列 (链接:https://x.sm.cn/2xTsBdJ)
b. 在新建的任务开发中,配置以下信息,其余参数无需配置,然后单击 运行

参数

说明

SQL 文件

本示例所使用的文件为 spark_sql_starrocks.sql ,其内容是SQL会话中的SQL语句,请根据实际情况对具体配置进行替换。在使用之前,您需要先下载该文件并进行相应的修改,然后在 文件管理 页面进行上传。 (链接:https://x.sm.cn/AhcNaCt)

spark_sql_starrocks.sql 参数说明:

  • :Serverless StarRocks实例中FE的内网或公网地址。您可以在 实例详情 页面的 FE 详情 区域查看。

    • 如果使用内网地址,请确保在同一VPC内。

    • 如果使用公网地址,需确保安全组规则允许相应的端口通信,详情请参见 网络访问与安全设置 (链接:https://x.sm.cn/BMpwm94)

  • :Serverless StarRocks实例中FE的HTTP端口(默认为8030)。您可以在 实例详情 页面的 FE 详情 区域查看。

  • :Serverless StarRocks实例中FE的查询端口(默认为9030)。您可以在 实例详情 页面的 FE 详情 区域查看。

  • :Serverless StarRocks实例的用户名。默认提供admin用户,具有管理员权限。您也可以通过 用户管理 页面新增用户来连接。新增用户操作,请参见 管理用户及数据授权 。(链接: https://x.sm.cn/BrfbZoN

  • :用户 对应的密码。

引擎版本

选择与Spark Connector版本对应的引擎版本。

网络连接

选择前一步创建好的网络连接。

Spark 配置

Spark 配置 中添加以下参数来加载Spark Connector。

spark.user.defined.jars  oss:///path/connector.jar

其中, oss:// /path/connector.jar 为您步骤一中上传至OSS的Spark Connector的路径。例如, oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar

3. 查看日志信息。
a. 您可以在下方的 运行记录 区域,单击操作列的 详情
b.单击 日志探查 页签,查看该任务的日志信息。
/ END /

更多推荐

图片
图片
图片
如您对文中所提到的内容及产品有疑问,或者有明确的业务需求,可 点击 「阅读原文」 填写表单联系我们,有专 业的人员来联系您响应您的需求。







请到「今天看啥」查看全文