加入收藏 | 设为首页 | 会员中心 | 我要投稿 大连站长网 (https://www.0411zz.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 大数据 > 正文

全面分析Apache Spark窗口功能

发布时间:2021-05-25 09:51:38 所属栏目:大数据 来源:互联网
导读:在此博客文章中,我们将深入探讨Apache Spark窗口函数。 您可能也对我之前有关Apache Spark的帖子感兴趣。 使用Apache Spark开始您的旅程-第1部分 使用Apache Spark开始您的旅程-第2部分 Apache Spark开始您的旅程-第3部分 深入研究Apache Spark DateTime

在此博客文章中,我们将深入探讨Apache Spark窗口函数。 您可能也对我之前有关Apache Spark的帖子感兴趣。

使用Apache Spark开始您的旅程-第1部分

使用Apache Spark开始您的旅程-第2部分

Apache Spark开始您的旅程-第3部分

深入研究Apache Spark DateTime函数

在Apache Spark中使用JSON

首先,让我们看看什么是窗口函数以及何时使用它们。 我们在Apache Spark中使用了各种功能,例如月份(从日期返回月份),四舍五入(舍入值)和地板(为给定的输入提供底值)等,这些功能将在每条记录上执行并返回一个值 每条记录。 然后,我们将对一组数据执行各种聚合函数,并为每个组返回一个值,例如sum,avg,min,max和count。 但是,如果我们想对一组数据执行该操作,并且希望对每个记录有一个单一的值/结果怎么办? 在这种情况下,我们可以使用窗口函数。 他们可以定义记录的排名,累积分布,移动平均值,或标识当前记录之前或之后的记录。

让我们使用一些Scala API示例来了解以下窗口函数:

汇总:min, max, avg, count, 和 sum.

排名:rank,dense_rank,percent_rank,row_num和ntile

分析性:cume_dist,lag和lead

自定义边界:rangeBetween和rowsBetween

为便于参考,GitHub上提供了一个以JSON文件格式导出的Zeppelin笔记本和一个Scala文件。

创建Spark DataFrame

现在,我们创建一个示例Spark DataFrame,我们将在整个博客中使用它。 首先,让我们加载所需的库。

import org.apache.spark.sql.expressions.Window 

import org.apache.spark.sql.functions._ 

现在,我们将使用一些虚拟数据创建DataFrame,这些虚拟数据将用于讨论各种窗口函数。

case class Salary(depName: String, empNo: Long, salary: Long) 

 

val empsalary = Seq( 

  Salary("sales", 1, 5000), 

  Salary("personnel", 2, 3900), 

  Salary("sales", 3, 4800), 

  Salary("sales", 4, 4800), 

  Salary("personnel", 5, 3500), 

  Salary("develop", 7, 4200), 

  Salary("develop", 8, 6000), 

  Salary("develop", 9, 4500), 

  Salary("develop", 10, 5200), 

  Salary("develop", 11, 5200)).toDF() 

这是我们的DataFrame的样子:

 

窗口集合函数

让我们看一些聚合的窗口函数,看看它们如何工作。

首先,我们需要定义窗口的规范。 假设我们要根据部门获取汇总数据。 因此,在此示例中,我们将基于部门名称(列:depname)定义窗口。

为聚合函数创建窗口规范

val byDepName = Window.partitionBy("depName") 

在窗口上应用聚合函数

现在,在部门内(列:depname),我们可以应用各种聚合函数。 因此,让我们尝试查找每个部门的最高和最低工资。 在这里,我们仅选择了所需的列(depName,max_salary和min_salary),并删除了重复的记录。

val agg_sal = empsalary 

           .withColumn("max_salary", max("salary").over(byDepName)) 

           .withColumn("min_salary", min("salary").over(byDepName)) 

                 

 

agg_sal.select("depname", "max_salary", "min_salary") 

        .dropDuplicates() 

        .show() 

输出:

+---------+----------+----------+ 

|  depname|max_salary|min_salary| 

+---------+----------+----------+ 

|  develop|      6000|      4200| 

|    sales|      5000|      4800| 

|personnel|      3900|      3500| 

+---------+----------+----------+ 

(编辑:大连站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!