EMR Serverless Spark 是一款兼容开源 Spark 的高性能 Lakehouse 产品。它为用户提供任务开发、调试、发布、调度和运维等全方位的产品化服务,显著简化了大数据计算的工作流程,使用户能更专注于数据分析和价值提炼。
StarRocks官方提供了Spark Connector用于Spark和StarRocks之间的数据读写,EMR Serverless Spark可以在开发时添加对应的配置连接StarRocks。本文为您介绍在EMR Serverless Spark中实现StarRocks的读取和写入操作。
EMR Serverless Spark 新用户可
免费领取 1000 CU*小时
资源包,快速体验 ETL 开发、任务调度、数据查询与分析全流程。
(链接:https://www.aliyun.com/product/emr/getting-started?utm_content=g_1000402199)
-
已创建Serverless Spark工作空间,详情请参见
创建工作空间
。
(链接:https://x.sm.cn/AN7vPD2)
-
已创建EMR Serverless StarRocks实例,详情请
参见创建实例
。
(链接:https://x.sm.cn/CAbeJiu)
EMR Serverless Spark引擎的版本要求为esr-2.5.0、esr-3.1.0、esr-4.1.0及以上版本。
步骤一:获取 Spark Connector JAR 并上传至OSS
1. 参见
使用Spark Connector读取数据
,选择相应的方式下载对应版本的Spark Connector JAR。
(链接:https://x.sm.cn/v4QPaz)
例如,本文选择直接下载已经编译好的JAR,即从
Maven Central Repository
获取不同版本的Connector JAR包。
(链接:https://x.sm.cn/DLYRbXP)
说明:
Connector JAR
包的命名格式为
starrocks-spark-connec
tor-${spark_version}_${scala_version}-${connector_version}.jar
。例如,您使用的引擎版本为
esr-4.1.0 (Spark 3.5.2, Scala 2.12),想使用
1.1.2
版本的
Connector,则可以选择
starrocks-spark-connector-3.5_2.12-1.1.2.jar
。
2. 将下载的Spark Connector JAR上传至阿里云OSS中,上传操作可以参见
简单上传
。
(链接:https://x.sm.cn/AcdD9UZ)
您可以在
EMR Serverless Starrocks
页面,进入目标StarRocks实例的
实例详情
页面,以获取该实例的专有网络和交换机信息。
(链接:https://x.sm.cn/D5dHUwu)
-
在
EMR Serverless Spark
页面,进入目标Spark工作空间的
网络连接
页面
,
单击
新增网络连接
。
(链接:https://x.sm.cn/FLs7Nq)
-
在
新增网络连接
对话框中,输入
连接名称
,并选择之前获取到的StarRocks实例的专有网络和交换机信息,然后单击
确定
。
更多网络连接信息,请参见
EMR Serverless Spark与其他VPC间网络互通
。
(链接:https://x.sm.cn/AURlOSD)
1. 连接StarRocks实例,详情请参见
通过EMR StarRocks Manager连接StarRocks实例
。
(链接:https://x.sm.cn/JGSgejX)
2. 在
SQL Editor
的
查询列表
页面,单击
文件
或者右侧区域的
图标,然后单击
确认
以新增文件。
3. 在新增的文件中输入以下SQL语句,单击
运行
。
CREATE DATABASE `testdb`;
CREATE TABLE `testdb`.`score_board`
(
`id` int(11) NOT NULL COMMENT "",
`name` varchar(65533) NULL DEFAULT "" COMMENT "",
`score` int(11) NOT NULL DEFAULT "0" COMMENT ""
)
ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`);
通过 Serverless Spark 读写 StarRocks
方式一:使用SQL会话、Notebook会话读写StarRocks
会话类型更多介绍,请参见
会话管理
。
(链接:https://x.sm.cn/8MqzEJP)
▶
SQL 会话
1. 通过Serverless Spark向StarRocks写入数据。
a. 创建SQL会话,详情请参见
管理SQL会话
。
(链接:https://x.sm.cn/CxvVvQR)
创建会话时,选择与 StarRocks Connector 版本对应的引擎版本,在
网络连接
中选择上一步创建好的网络连接,并在
Spark配置
中添加以下参数来加载Spark Connector。
spark.user.defined.jars oss:///path/connector.jar
其中,
oss://
/path/connector.jar
为您步骤一中上传至OSS的Spark Connector的路径。例如,
oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar
。
b. 在
数据开发
页面,创建一个
SQL
>
SparkSQL
类型的任务,然后在右上角选择创建好的SQL会话。更多操作,请参见
SparkSQL开发
。
(链接:https://x.sm.cn/9rcGjIi)
c. 拷贝如下代码到新增的SparkSQL页签中,并根据需要修改相应的参数信息,然后单击
运行
。
CREATE TABLE score_board
USING starrocks
OPTIONS
(
"starrocks.table.identifier" = "testdb.score_board",
"starrocks.fe.http.url" = ":",
"starrocks.fe.jdbc.url" = "jdbc:mysql://:",
"starrocks.user" = "",
"starrocks.password" = ""
);
INSERT INTO `score_board` VALUES (1, "starrocks", 100), (2, "spark", 100);
-
:Serverless StarRocks实例中FE的内网或公网地址。您可以在
实例详情
页面的
FE
详情
区域查看。
-
-
如果使用公网地址,需确保安全组规则允许相应的端口通信,详情请参见
网络访问与安全设置
。
(链接:https://x.sm.cn/ApMsVuF)
-
:Serverless StarRocks实例中FE的HTTP端口(默认为8030)。您可以在
实例详情
页面的
FE
详情
区域查看。
-
:Serverless StarRocks实例中FE的查询端口(默认为9030)。您可以在
实例详情
页面的
FE
详情
区域查看。
-
:Serverless StarRocks实例的用户名。默认提供admin用户,具有管理员权限。您也可以通过
用户管理
页面新增用户来连接。新增用户操作,请参见
管理用户及数据授权。(链接:https://x.sm.cn/AiIP1PP)
-
2. 通过Serverless Spark查询写入的数据。
在本文示例中,我们是在上述的SparkSQL任务中创建了一个临时视图
test_view
,然后通过该视图查询
score_board
表的数据。拷贝如下代码到新增的
SparkSQL页签中,选中代码后单击
运行选中
。
CREATE TEMPORARY VIEW test_view
USING starrocks
OPTIONS
(
"starrocks.table.identifier" = "testdb.score_board",
"starrocks.fe.http.url" = ":",
"starrocks.fe.jdbc.url" = "jdbc:mysql://:",
"starrocks.user" = "",
"starrocks.password" = ""
);
SELECT * FROM test_view;
▶
Notebook 会话
1. 通过Serverless Spark向StarRocks写入数据。
a. 创建Notebook会话,详情请参见
管理Notebook会话
。
(链接:https://x.sm.cn/DZ8b4bR)
创建会话时,选择与StarRocks Connector版本对应的引擎版本,在
网络连接
中选择上一步创建好的网络连接,并在
Spark配置
中添加以下参数来加载Spark Connector。
spark.user.defined.jars oss:///path/connector.jar
其中,
oss://
/path/connector.jar
为您步骤一中上传至OSS的Spark Connector的路径。例如,
oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar
。
b. 在
数据开发
页面,选择创建一个
Python
>
Notebook
类型的任务,然后在右上角选择创建的Notebook会话。
c. 拷贝如下代码到新增的Notebook的Python单元格中,单击
运行
。
fe_host = ""
fe_http_port = ""
fe_query_port = ""
user = ""
password = ""
create_table_sql = f"""
CREATE TABLE score_board
USING starrocks
OPTIONS (
"starrocks.table.identifier" = "testdb.score_board",
"starrocks.fe.http.url" = "{fe_host}:{fe_http_port}",
"starrocks.fe.jdbc.url" = "jdbc:mysql://{fe_host}:{fe_query_port}",
"starrocks.user" = "{user}",
"starrocks.password" = "{password}"
)
"""
spark.sql(create_table_sql)
insert_data_sql = """
INSERT INTO `score_board` VALUES (1, "starrocks", 100), (2, "spark", 100)
"""
spark.sql(insert_data_sql)
-
:Serverless StarRocks实例中FE的内网或公网地址。您可以在
实例详情
页面的
FE
详情
区域查看。
-
-
如果使用公网地址,需确保安全组规则允许相应的端口通信,详情请参见
网络访问与安全设置
。
(链接:https://x.sm.cn/BMpwm94)
-
:Serverless StarRocks实例中FE的HTTP端口(默认为8030)。您可以在
实例详情
页面的
FE
详情
区域查看。
-
:Serverless StarRocks实例中FE的查询端口(默认为9030)。您可以在
实例详情
页面的
FE
详情
区域查看。
-
:Serverless StarRocks实例的用户名。默认提供admin用户,具有管理员权限。您也可以通过
用户管理
页面新增用户来连接。新增用户操作,请参见
管理用户及数据授权
。
(链接:https://x.sm.cn/AiIP1PP)
-
2. 通过Serverless Spark查询写入的数据。
在本文示例中,我们新增一个Python单元格,在其中创建了一个临时视图
test_view
,然后通过该视图查询
score_board
表的数据。拷贝如下代码到新增的Python单元格中,然后单击
图标。
create_view_sql=f"""
CREATE TEMPORARY VIEW test_view
USING starrocks
OPTIONS (
"starrocks.table.identifier" = "testdb.score_board",
"starrocks.fe.http.url" = "{fe_host}:{fe_http_port}",
"starrocks.fe.jdbc.url" = "jdbc:mysql://{fe_host}:{fe_query_port}",
"starrocks.user" = "{user}",
"starrocks.password" = "{password}"
)
"""
spark.sql(create_view_sql)
query_sql="SELECT * FROM test_view"
result_df = spark.sql(query_sql)
result_df.show()
方式二:使用 Spark 批任务读写 StarRocks
a. 在EMR Serverless Spark页面,单击左侧的
数据开发
。
b. 在
开发目录
页签下,单击
图标。
c. 在新建对话框中,输入
名称
,类型选择
批任务
>
SQL
,然后单击
确定
。
类型您可以根据实际情况进行调整,本文以SQL为例。更多类型参数介绍,请参见
Application开发
。
(链接:https://x.sm.cn/HUsOvJI)
2. 通过Spark批任务读写StarRocks。
添加队列的具体操作,请参见
管理资源队列
。
(链接:https://x.sm.cn/2xTsBdJ)
b. 在新建的任务开发中,配置以下信息,其余参数无需配置,然后单击
运行
。
参数
|
说明
|
|
本示例所使用的文件为
spark_sql_starrocks.sql
,其内容是SQL会话中的SQL语句,请根据实际情况对具体配置进行替换。在使用之前,您需要先下载该文件并进行相应的修改,然后在
文件管理
页面进行上传。
(链接:https://x.sm.cn/AhcNaCt)
spark_sql_starrocks.sql
参数说明:
-
:Serverless StarRocks实例中FE的HTTP端口(默认为8030)。您可以在
实例详情
页面的
FE
详情
区域查看。
-
:Serverless StarRocks实例中FE的查询端口(默认为9030)。您可以在
实例详情
页面的
FE
详情
区域查看。
-
:Serverless StarRocks实例的用户名。默认提供admin用户,具有管理员权限。您也可以通过
用户管理
页面新增用户来连接。新增用户操作,请参见
管理用户及数据授权
。(链接:
https://x.sm.cn/BrfbZoN
)
-
:用户
对应的密码。
|
引擎版本
|
选择与Spark Connector版本对应的引擎版本。
|
网络连接
|
选择前一步创建好的网络连接。
|
|
在
Spark
配置
中添加以下参数来加载Spark Connector。
spark.user.defined.jars oss:
其中,
oss://
/path/connector.jar
为您步骤一中上传至OSS的Spark Connector的路径。例如,
oss://emr-oss/spark/starrocks-spark-connector-3.5_2.12-1.1.2.jar
。
|
a. 您可以在下方的
运行记录
区域,单击操作列的
详情
。
如您对文中所提到的内容及产品有疑问,或者有明确的业务需求,可
点击
「阅读原文」
填写表单联系我们,有专
业的人员来联系您响应您的需求。