如何将rdd和dstream进行spark rdd join详解

查看: 73248|回复: 13
Spark中文手册4:Spark之基本概念(2)
主题帖子积分
金牌会员, 积分 6962, 距离下一级还需 3038 积分
金牌会员, 积分 6962, 距离下一级还需 3038 积分
本帖最后由 pig2 于
18:54 编辑
1、什么是DStream转换?
2、什么是窗口计算?
3、怎样最有效的将发生数据到外部系统?
本文接前篇:
DStream中的转换(transformation)和RDD类似,transformation允许从输入DStream来的数据被修改。DStreams支持很多在RDD中可用的transformation算子。一些常用的算子如下所示:
& &Transformation& &Meaningmap(func)利用函数func处理原DStream的每个元素,返回一个新的DStreamflatMap(func)与map相似,但是每个输入项可用被映射为0个或者多个输出项filter(func)返回一个新的DStream,它仅仅包含源DStream中满足函数func的项repartition(numPartitions)通过创建更多或者更少的partition改变这个DStream的并行级别(level of parallelism)union(otherStream)返回一个新的DStream,它包含源DStream和otherStream的联合元素count()通过计算源DStream中每个RDD的元素数量,返回一个包含单元素(single-element)RDDs的新DStreamreduce(func)利用函数func聚集源DStream中每个RDD的元素,返回一个包含单元素(single-element)RDDs的新DStream。函数应该是相关联的,以使计算可以并行化countByValue()这个算子应用于元素类型为K的DStream上,返回一个(K,long)对的新DStream,每个键的值是在原DStream的每个RDD中的频率。reduceByKey(func,&&[numTasks])当在一个由(K,V)对组成的DStream上调用这个算子,返回一个新的由(K,V)对组成的DStream,每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks参数设置不同的任务数join(otherStream,&&[numTasks])当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, (V, W))对的新DStreamcogroup(otherStream,&&[numTasks])当应用于两个DStream(一个包含(K,V)对,一个包含(K,W)对),返回一个包含(K, Seq[V], Seq[W])的元组transform(func)通过对源DStream的每个RDD应用RDD-to-RDD函数,创建一个新的DStream。这个可以在DStream中的任何RDD操作中使用updateStateByKey(func)利用给定的函数更新DStream的状态,返回一个新&state&的DStream。最后两个transformation算子需要重点介绍一下:
UpdateStateByKey操作updateStateByKey操作允许不断用新信息更新它的同时保持任意状态。你需要通过两步来使用它
定义状态-状态可以是任何的数据类型定义状态更新函数-怎样利用更新前的状态和从输入流里面获取的新值更新状态
让我们举个例子说明。在例子中,你想保持一个文本数据流中每个单词的运行次数,运行次数用一个state表示,它的类型是整数def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {
& & val newCount = ...&&// add the new values with the previous running count to get the new count
& & Some(newCount)
}复制代码
这个函数被用到了DStream包含的单词上import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second
val conf = new SparkConf().setMaster(&local[2]&).setAppName(&NetworkWordCount&)
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream(&localhost&, 9999)
// Split each line into words
val words = lines.flatMap(_.split(& &))
// Count each word in each batch
val pairs = words.map(word =& (word, 1))
val runningCounts = pairs.updateStateByKey[Int](updateFunction _)复制代码
更新函数将会被每个单词调用,newValues拥有一系列的1(从 (词, 1)对而来),runningCount拥有之前的次数。要看完整的代码,见 Transform操作transform操作(以及它的变化形式如transformWith)允许在DStream运行任何RDD-to-RDD函数。它能够被用来应用任何没在DStream API中提供的RDD操作(It can be used to apply any RDD operation that is not exposed in theDStream API)。 例如,连接数据流中的每个批(batch)和另外一个数据集的功能并没有在DStream API中提供,然而你可以简单的利用transform方法做到。如果你想通过连接带有预先计算的垃圾邮件信息的输入数据流 来清理实时数据,然后过了它们,你可以按如下方法来做:val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform(rdd =& {
&&rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
&&...
})复制代码
事实上,你也可以在transform方法中用和算法
窗口(window)操作Spark Streaming也支持窗口计算,它允许你在一个滑动窗口数据上应用transformation算子。下图阐明了这个滑动窗口。
11.png (39.98 KB, 下载次数: 7)
20:13 上传
如上图显示,窗口在源DStream上滑动,合并和操作落入窗内的源RDDs,产生窗口化的DStream的RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操作,并且每两个时间单元滑动一次。 这说明,任何一个窗口操作都需要指定两个参数:
窗口长度:窗口的持续时间滑动的时间间隔:窗口操作执行的时间间隔
这两个参数必须是源DStream的批时间间隔的倍数。下面举例说明窗口操作。例如,你想扩展前面的用来计算过去30秒的词频,间隔时间是10秒。为了达到这个目的,我们必须在过去30秒的pairs DStream上应用reduceByKey 操作。用方法reduceByKeyAndWindow实现。// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) =& (a + b), Seconds(30), Seconds(10))复制代码
一些常用的窗口操作如下所示,这些操作都需要用到上文提到的两个参数:窗口长度和滑动的时间间隔
& &Transformation& &Meaningwindow(windowLength,&&slideInterval)基于源DStream产生的窗口化的批数据计算一个新的DStreamcountByWindow(windowLength,&&slideInterval)返回流中元素的一个滑动窗口数reduceByWindow(func,&&windowLength, slideInterval)返回一个单元素流。利用函数func聚集滑动时间间隔的流的元素创建这个单元素流。函数必须是相关联的以使计算能够正确的并行计算。reduceByKeyAndWindow(func,&&windowLength, slideInterval, [numTasks])应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每一个key的值均由给定的reduce函数聚集起来。注意:在默认情况下,这个算子利用了Spark默认的并发任务数去分组。你可以用numTasks参数设置不同的任务数reduceByKeyAndWindow(func,&&invFunc, windowLength, slideInterval, [numTasks])A more efficient&&version of the above reduceByKeyAndWindow() where the reduce value of each&&window is calculated incrementally using the reduce values of the previous&&window. This is done by reducing the new data that enter the sliding window,&&and &inverse reducing& the old data that leave the window. An&&example would be that of &adding& and &subtracting&&&counts of keys as the window slides. However, it is applicable to only&&&invertible reduce functions&, that is, those reduce functions&&which have a corresponding &inverse reduce& function (taken as&&parameter invFunc. Like in reduceByKeyAndWindow, the number of reduce tasks&&is configurable through an optional argument.&&
countByValueAndWindow(windowLength,&&slideInterval, [numTasks])应用到一个(K,V)对组成的DStream上,返回一个由(K,V)对组成的新的DStream。每个key的值都是它们在滑动窗口中出现的频率。&& && && &&
DStreams上的输出操作输出操作允许DStream的操作推到如数据库、文件系统等外部系统中。因为输出操作实际上是允许外部系统消费转换后的数据,它们触发的实际操作是DStream转换。目前,定义了下面几种输出操作:&&Output&&Operation&&Meaningprint()在DStream的每个批数据中打印前10条元素,这个操作在开发和调试中都非常有用。在Python API中调用pprint()。saveAsObjectFiles(prefix,&&[suffix])保存DStream的内容为一个序列化的文件SequenceFile。每一个批间隔的文件的文件名基于prefix和suffix生成。&prefix-TIME_IN_MS[.suffix]&,在Python API中不可用。saveAsTextFiles(prefix,&&[suffix])保存DStream的内容为一个文本文件。每一个批间隔的文件的文件名基于prefix和suffix生成。&prefix-TIME_IN_MS[.suffix]&saveAsHadoopFiles(prefix,&&[suffix])保存DStream的内容为一个hadoop文件。每一个批间隔的文件的文件名基于prefix和suffix生成。&prefix-TIME_IN_MS[.suffix]&,在Python API中不可用。foreachRDD(func)在从流中生成的每个RDD上应用函数func的最通用的输出操作。这个函数应该推送每个RDD的数据到外部系统,例如保存RDD到文件或者通过网络写到数据库中。需要注意的是,func函数在驱动程序中执行,并且通常都有RDD action在里面推动RDD流的计算。
利用foreachRDD的设计模式dstream.foreachRDD是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是非常重要的。下面几点介绍了如何避免一般错误。
经常写数据到外部系统需要建一个连接对象(例如到远程服务器的TCP连接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在Spark驱动中创建一个连接对象,但是在Spark worker中 尝试调用这个连接对象保存记录到RDD中,如下:
&&dstream.foreachRDD(rdd =& {
& && &val connection = createNewConnection()&&// executed at the driver
& && &rdd.foreach(record =& {
& && && & connection.send(record) // executed at the worker
& && &})
&&})复制代码
这是不正确的,因为这需要先序列化连接对象,然后将它从driver发送到worker中。这样的连接对象在机器之间不能传送。它可能表现为序列化错误(连接对象不可序列化)或者初始化错误(连接对象应该 在worker中初始化)等等。正确的解决办法是在worker中创建连接对象。
然而,这会造成另外一个常见的错误-为每一个记录创建了一个连接对象。例如:
&&dstream.foreachRDD(rdd =& {
& && &rdd.foreach(record =& {
& && && & val connection = createNewConnection()
& && && & connection.send(record)
& && && & connection.close()
& && &})
&&})复制代码
通常,创建一个连接对象有资源和时间的开支。因此,为每个记录创建和销毁连接对象会导致非常高的开支,明显的减少系统的整体吞吐量。一个更好的解决办法是利用rdd.foreachPartition方法。 为RDD的partition创建一个连接对象,用这个两件对象发送partition中的所有记录。 dstream.foreachRDD(rdd =& {
& && &rdd.foreachPartition(partitionOfRecords =& {
& && && & val connection = createNewConnection()
& && && & partitionOfRecords.foreach(record =& connection.send(record))
& && && & connection.close()
& && &})
&&})复制代码
这就将连接对象的创建开销分摊到了partition的所有记录上了。
最后,可以通过在多个RDD或者批数据间重用连接对象做更进一步的优化。开发者可以保有一个静态的连接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。
&&dstream.foreachRDD(rdd =& {
& && &rdd.foreachPartition(partitionOfRecords =& {
& && && & // ConnectionPool is a static, lazily initialized pool of connections
& && && & val connection = ConnectionPool.getConnection()
& && && & partitionOfRecords.foreach(record =& connection.send(record))
& && && & ConnectionPool.returnConnection(connection)&&// return to the pool for future reuse
& && &})
&&})复制代码
需要注意的是,池中的连接对象应该根据需要延迟创建,并且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。其它需要注意的地方:
输出操作通过懒执行的方式操作DStreams,正如RDD action通过懒执行的方式操作RDD。具体地看,RDD actions和DStreams输出操作接收数据的处理。因此,如果你的应用程序没有任何输出操作或者 用于输出操作dstream.foreachRDD(),但是没有任何RDD action操作在dstream.foreachRDD()里面,那么什么也不会执行。系统仅仅会接收输入,然后丢弃它们。默认情况下,DStreams输出操作是分时执行的,它们按照应用程序的定义顺序按序执行。
相关内容:
Spark中文手册1-编程指南
Spark中文手册2:Spark之一个快速的例子
Spark中文手册3:Spark之基本概念
Spark中文手册5:Spark之基本概念(3)
Spark中文手册6:Spark-sql由入门到精通
Spark中文手册7:Spark-sql由入门到精通【续】
Spark中文手册8:spark GraphX编程指南(1)
Spark中文手册9:spark GraphX编程指南(2)
Spark中文手册10:spark部署:提交应用程序及独立部署模式
Spark中文手册11:Spark 配置指南
本帖被以下淘专辑推荐:
& |主题: 11, 订阅: 6
欢迎加入about云群 、 ,云计算爱好者群,关注
主题帖子积分
中级会员, 积分 890, 距离下一级还需 110 积分
中级会员, 积分 890, 距离下一级还需 110 积分
敢死队撒的发生飞洒
主题帖子积分
高级会员, 积分 1647, 距离下一级还需 3353 积分
高级会员, 积分 1647, 距离下一级还需 3353 积分
不错~& && && && &
主题帖子积分
高级会员, 积分 1219, 距离下一级还需 3781 积分
高级会员, 积分 1219, 距离下一级还需 3781 积分
主题帖子积分
高级会员, 积分 1219, 距离下一级还需 3781 积分
高级会员, 积分 1219, 距离下一级还需 3781 积分
主题帖子积分
高级会员, 积分 1219, 距离下一级还需 3781 积分
高级会员, 积分 1219, 距离下一级还需 3781 积分
主题帖子积分
高级会员, 积分 1219, 距离下一级还需 3781 积分
高级会员, 积分 1219, 距离下一级还需 3781 积分
主题帖子积分
高级会员, 积分 1219, 距离下一级还需 3781 积分
高级会员, 积分 1219, 距离下一级还需 3781 积分
主题帖子积分
中级会员, 积分 553, 距离下一级还需 447 积分
中级会员, 积分 553, 距离下一级还需 447 积分
主题帖子积分
中级会员, 积分 429, 距离下一级还需 571 积分
中级会员, 积分 429, 距离下一级还需 571 积分
积极上进,爱好学习
经常参与各类话题的讨论,发帖内容较有主见
长期对论坛的繁荣而不断努力,或多次提出建设性意见
为论坛做出突出贡献的会员
站长推荐 /4
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
等待验证会员请验证邮箱
新手获取积分方法
技术类问答,解决学习openstack,hadoop生态系统中遇到的问题
Powered by【Spark十二】Spark任务调度和作业执行流程初步 - bit1129的博客 - ITeye博客
博客分类:
任务调度以及作业执行流程是Spark的核心,本文不进行源码级别的探究,只是概述Spark的核心组件、它们的作用以及它们如何协作以完成计算作业。
Spark核心组件
SparkContext
DAGScheduler
TaskScheduler
BlockManager
BlockTracker
ShuffleTracker
Spark集群架构概览
在上面这幅图片中,用户将任务提交给Driver,Driver将任务分发到所有的Worker节点(Driver最好跟Work节点在同一个局域网内,以使得任务的分发和结果回送更快)。Worker节点根据Driver提交过来的任务,算出位于本地的那部分数据,然后对它进行计算(这就是数据本地性的概念)。具体的做法是,Workder首先将数据加载到内存(如果内存中没有的话),形成RDD(所以,RDD存在于内存中),然后对RDD进行接下来的计算。
在这个架构中,Driver和Worker构成了Master/Slave架构,Driver负责分发任务,以及等待任务结果
另外一副架构图,在这个图中,Master和Driver是分开的,实际上是否会在一起呢?
spark计算速度快的原因
1.基于内存计算
2.general computation graph --即DAG,Worker对DAG进行优化,然后提交给TaskScheduler去执行。这里的问题是DAG何时构造,由谁构造,DAG的数据结构如何,DAG包含哪些信息,这个暂时放这里。目前要了解的重点是,DAG是提交给TaskScheduler等待调度执行。
上图中,自己编写程序提交到Master上,而Master是由四大部分组成(RDD Graph,Scheduler,Block Tracker以及Shuffle Tracker),启动RDD Graph就是DAG,它会提交给Task Scheduler任务调度器等待调度执行,具体执行时,Task Scheduler会把任务提交到Worker节点上。Block Tracker用于记录计算数据在Worker节点上的块信息。Shuffle Blocker用于记录RDD在计算过程中遇到Shuffle过程时会进行物化,那么Shuffle Tracker用于记录这些物化的RDD的存放信息
上面浅绿色的四个圆柱形框构成一个RDD,每个圆柱框都是一个Partition,每个Partition分配一个任务来执行。浅绿色圆柱框内的绿色矩形框表示实施RDD操作后的数据集,比如对于Task1,先对Partion执行map操作,再执行filter操作得到两个绿色矩形框。
因为map操作或者filter是对RDD进行调用的,所以,RDD中的Partition都会执行相同的动作序列,每个操作结束时,每个Partition都会产生一个数据集,这些数据集对应一个RDD,如MappedRDD,FilteredRDD。这样,就形成了RDD Graph,如上图中的八个绿色框,上面四个框形成一个RDD,下面四个框形成一个RDD。
作业与任务调度
DAGScheduler
1.三个输入元素
1.target RDD是什么RDD是初始RDD还是包含了所有的RDD,比如rdd.map().filter()操作,target RDD是什么
2.针对partition的function指的是什么,比如rdd.map().filter()操作,是map和filter函数都包括吗?
具体的,DAG Scheduler完成下面三个任务:
1.为每个Job(一个Action类型的操作如collect,count)分割Stage,同时决定最佳路径。DAGScheduler会记录哪个RDD或者Stage会被物化,从而寻找一个最佳调度方案。
2.将TaskSet提交给Task Tracker
3.重新提交输出lost的Stage
2. DAGScheduler优化
1.stage的操作是pipleline的
比如,stage内有5个操作,Spark的做法是1+1+1+1+1=5,而对于Hadoop而言,它的做法是1+1=2, 2+1=3,3+1=4,4+1=5,即每计算一步就先存入HDFS,然后后面的操作再从HDFS上都出来,因此IO消耗非常大。
2. 基于Partition选择最小化的join算法,减少Shuffle操作
在Hadoop中,Shuffle相当于Barrier(Join等待合并结果),Reduce操作需要等待Map操作完全执行完
3. 重用RDD Cache过的数据
因为DAGScheduler知道哪些RDD和Stage已经物化过,所以DAGScheduler在执行路径上,会尽可能的使用已经缓存过的数据
从上图中可以看到,AB位于同一个Stage,CDE位于同一个Stage。AB和CDE的结果做Join是产生了一个新的Stage
如下两个阶段一定会产生Stage
1.从数据源加载数据形成RDD时,一定会有Stage的产生
2.进行Shuffle,即有宽依赖的时候一定有Stage的产生,所以上面的DE应该产生一个Stage
Job执行流程
浏览: 347908 次
来自: 北京
关于第一个reduceByKey对应的cache,shuffl ...
看了你的文章,updateStateByKey 这个方式的使用 ...
棒极啦,解决了我的问题。
你好,这个代码生成主要在,那个地方使用。
看楼主这么厉害的样子,请问楼主如何知道类库的版本呢?比如g++ ...spark中如何将JavaDStream&String&转化为JavaRDD&String&? - 知乎4被浏览1344分享邀请回答1添加评论分享收藏感谢收起javaDstream.foreachRDD(new VoidFunction&JavaRDD&String&&() {
public void call(JavaRDD&String& o) throws Exception {
System.out.println("处理逻辑");
0添加评论分享收藏感谢收起}

我要回帖

更多关于 spark 两个rdd join 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信