a. 单击
car_prices.csv
直接下载测试文件。
(链接:https://x.sm.cn/78i54Zh)
b. 上传测试文件(car_prices.csv)到阿里云对象存储OSS控制台,详情请参见
简单上传
。
(链接:https://x.sm.cn/7MG8aWB)
例如,本文上传的路径为oss://emr-oss-hdfs/spark/car_prices.csv。
-
登录
E-MapReduce控制台
。
(链接:https://x.sm.cn/7x9r7ZN)
-
在左侧导航栏,选择EMR Serverless > Spark。
-
-
在EMR Serverless Spark页面,单击左侧导航栏中的会话管理。
-
-
更多参数详情,请参见
管理SQL会话
。
(链接:https://x.sm.cn/B5cbq28)
-
-
在Notebook 会话页面,单击创建 Notebook 会话。
-
在创建 Notebook 会话页面,单击创建。
更多参数详情,请参见
管理Notebook会话
。
(链接:https://x.sm.cn/CBcxXS2)
3. 新建load_data任务,将测试文件中的数据加载到数据目录中。
a. 在EMR Serverless Spark页面,单击左侧导航栏中的数据开发。
c. 在新建对话框中,输入名称(例如load_data),类型使用Python > Notebook,然后单击确定。
d. 在右上角的未选择 Notebook 会话下拉列表中,选择已创建并启动的Notebook会话实例。
您也可以在下拉列表中选择创建 Notebook 会话,新建一个Notebook会话实例。关于Notebook会话更多介绍,请参见管理Notebook会话。
-
拷贝如下代码至新增的Notebook Python单元格中。
sql = "CREATE DATABASE IF NOT EXISTS serverless_spark"
spark.sql(sql)
#创建一个简单的DataFrame,其中OSS路径需要替换为步骤1中上传的文件路径
url = "oss://emr-oss-hdfs/spark/car_prices.csv"
df = spark.read.csv(url, header=True, inferSchema= True)
df.write.format("paimon").saveAsTable("serverless_spark.ods_car_sales_history")
4. 新建cleaned_data_task任务,对数据进行清洗处理,排除数据记录不全或存在较大误差的数据。
b. 在新建对话框中,输入名称(例如cleaned_data_task),类型使用默认的SparkSQL,然后单击确定。
c. 拷贝如下代码到新增的Spark SQL页签(cleaned_data_task)中。
USE serverless_spark;
--进行数据清洗:去除空值、脏数据和极限范围的记录
CREATE TABLE IF NOT EXISTS cleaned_car_sales AS
SELECT
`make`,
`model`,
`trim`,
`body`,
`transmission`,
`vin`,
`state`,
`color`,
`interior`,
`seller`,
TO_DATE(`saledate`, 'yyyy-MM-dd') AS sale_date,
CAST(`year` AS INT) AS `year`,
CAST(`condition` AS DECIMAL(3,1)) AS `condition`,
CAST(`odometer` AS DECIMAL(10,2)) AS `odomete`,
CAST(`mmr` AS DECIMAL(10,2)) AS `mmr`,
CAST(`sellingprice` AS DECIMAL(10,2)) AS `sellingprice`
FROM ods_car_sales_history
WHERE `year` IS NOT NULL AND `year` != 0
AND `make` IS NOT NULL
AND `model` IS NOT NULL
AND `trim` IS NOT NULL
AND `body` IS NOT NULL
AND `transmission` IS NOT NULL
AND `vin` IS NOT NULL
AND `state` IS NOT NULL
AND `condition` IS NOT NULL AND `condition` != 0
AND `odometer` IS NOT NULL AND `odometer` != 0
AND `color` IS NOT NULL
AND `interior` IS NOT NULL
AND `seller` IS NOT NULL
AND `mmr` IS NOT NULL AND `mmr` != 0
AND `sellingprice` IS NOT NULL AND `sellingprice` != 0
AND `saledate` IS NOT NULL;
d. 在右上角的
未选择 SQL 会话下拉列表中,选择已创建并启动的SQL会话实例。
5. 新建car_summary任务,对明细表cleaned_car_sales进行进一步加工处理,计算不同品牌的销售总额和总车辆数,供后续的业务查询、数据分析使用。
b. 在新建对话框中,输入名称(例如car_summary),类型使用默认的SparkSQL,然后单击确定。
c. 拷贝如下代码到新增的Spark SQL页签(car_summary)中。
-- 计算每个品牌的销售总额和总车辆数
CREATE TABLE IF NOT EXISTS brand_sales_summary AS
SELECT
make,
COUNT(*) AS total_vehicles,
SUM(sellingprice) AS total_sales,
AVG(sellingprice) AS avg_sellingprice,
AVG(condition) AS avg_condition
FROM cleaned_car_sales
GROUP BY make
ORDER BY total_sales DESC;
select * from brand_sales_summary limit 10;
d. 在未选择默认数据库下拉列表中选择一个数据库,在未选择 SQL 会话下拉列表中选择一个已启动的SQL会话实例。
返回结果信息可以在下方的运行结果中查看。如果有异常,则可以在运行问题中查看。
6. 新建数据分析任务,使用Notebook进行数据可视化分析。
b. 在新建对话框中,输入名称(例如数据分析),类型使用Python > Notebook,然后单击确定。
c. 在右上角选择已创建并启动的Notebook会话实例。您也可以在下拉列表中选择创建Notebook会话,新建一个Notebook会话实例。关于Notebook会话更多介绍,请参见管理Notebook会话。
-
拷贝如下代码至新增的Notebook Python单元格中。
import matplotlib.pyplot as plt
df = spark.table("serverless_spark.brand_sales_summary")
# 将 DataFrame 转换为 Pandas DataFrame 以便于绘图
pandas_df = df.toPandas()
plt.figure(figsize=(10, 6))
plt.bar(pandas_df['make'], pandas_df['total_vehicles'])
plt.xlabel('Brand')
plt.ylabel('Total Vehicles')
plt.title('Total Vehicles by Brand')
plt.xticks(rotation=90)
plt.tight_layout()
plt.show()