想研读下spark的spark 源码分析,怎么搭阅读和调试的环境

想研读下spark的源码,怎么搭阅读和调试的环境-中国学网-中国IT综合门户网站
> 想研读下spark的源码,怎么搭阅读和调试的环境
想研读下spark的源码,怎么搭阅读和调试的环境
转载 编辑:李强
为了帮助网友解决“想研读下spark的源码,怎么搭阅读和调”相关的问题,中国学网通过互联网对“想研读下spark的源码,怎么搭阅读和调”相关的解决方案进行了整理,用户详细问题包括:RT,我想知道:想研读下spark的源码,怎么搭阅读和调试的环境,具体解决方案如下:解决方案1:所以怎么看源码都无所谓。另外,还需要做一些其他的插件配置(python应该说这个和是不是Spark项目没什么关系,我直接依赖了编译好的包就不会报错了。你也可以使用Eclipse看。idea我不熟;我使用的是后者;sbt&#47,导入即可,向Databricks的开发者看齐,在spark目录下执行&quot。推荐使用前者,你熟悉就好。但是子项目之间的依赖会有点问题;,把Spark项目当maven工程导入,我也看有的Committer用vim看spark代码的,会自动生成, sbt等)和环境设置.idea项目;sbt gen-idea&quot。建议你使用intellij idea,会报错,而且这和是不是Spark项目也没什么关系,纯读源码的话也勉强可以跟踪和调试,Eclipse有scala IDE。通过对数据库的索引,我们还为您准备了:看你想研究什么功能模块,从最关键的类一步步跟踪下去,看它是如何实现功能的。===========================================WORD文档背景色就是在格式菜单下点击背景就可以更换了。===========================================
本文欢迎转载,转载请注明:转载自中国学网: []
用户还关注
可能有帮助阅读spark源代码是学习scala语言非常不错的方法,spark源代码阅读体验也非常良好,同时还可以了解云计算框架的分布式调度和资源管理,一举多得!本文针对spark源代码v1.0.0版本进行分析
下面看一个wordcount的spark程序
package spark.example
import org.apache.spark._
import SparkContext._
object WordCount {
def main(args: Array[String]) {
val sc = new SparkContext(args(0), "WordCount",
System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
val inputFile = sc.textFile("README.md")
val countResult = inputFile.flatMap(line =& line.split(" "))
.map(word =& (word, 1))
.reduceByKey(_ + _)
countResult foreach println
12345678910111213141516
package spark.example import org.apache.spark._import SparkContext._ object WordCount {&&def main(args: Array[String]) {&&&&val sc = new SparkContext(args(0), "WordCount",&&&&&&System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))&&&&val inputFile = sc.textFile("README.md")&&&&val countResult = inputFile.flatMap(line =& line.split(" "))&&&&&&&&&&&&&&&&&&&&&&.map(word =& (word, 1))&&&&&&&&&&&&&&&&&&&&&&.reduceByKey(_ + _)&&&&countResult foreach println&&}}
从上面代码可以看出首先实例化一个SparkContext实例sc,然后执行SparkContext的方法textFile载入一个文本文件(该方法的功能类似于open一个文件);然后对该载入的文件执行flatMap、map、reduceByKey方法,完成mapreduce计算,最后执行foreach方法将计算结果打印出来。
wordcuont程序详解-SparkContext实例化
下面先看看SparkContext实例化spark做了些什么工作
val sc = new SparkContext(args(0), "WordCount",
System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
val sc = new SparkContext(args(0), "WordCount",&&System.getenv("SPARK_HOME"),SparkContext.jarOfClass(this.getClass))
进入到SparkContext,该实例化程序化调用SparkContext的如下方法,完整源代码见:
private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =
this(master, appName, sparkHome, jars, Map(), Map())
&&private[spark] def this(master: String, appName: String, sparkHome: String, jars: Seq[String]) =&&&&this(master, appName, sparkHome, jars, Map(), Map())
而该调用最终会调用SparkContext的构造函数实例化SparkContext
class SparkContext(config: SparkConf) extends Logging {
private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()
def this() = this(new SparkConf())
6263646566676869
class SparkContext(config: SparkConf) extends Logging {&&&private[spark] var preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()&&&&def this() = this(new SparkConf())......}
看看SparkContext实例化主要干了什么事情,首先读取配置文件,建立处理event消息事件的LiveListenerBus,设置spark程序运行的环境变量env,
private[spark] val conf = config.clone()
conf.validateSettings()
* Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
* changed at runtime.
def getConf: SparkConf = conf.clone()
if (!conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
if (!conf.contains("spark.app.name")) {
throw new SparkException("An application name must be set in your configuration")
if (conf.getBoolean("spark.logConf", false)) {
logInfo("Spark configuration:\n" + conf.toDebugString)
// Set Spark driver host and port system properties
conf.setIfMissing("spark.driver.host", Utils.localHostName())
conf.setIfMissing("spark.driver.port", "0")
val jars: Seq[String] =
conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
val files: Seq[String] =
conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")
// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = "spark-" + randomUUID.toString()
conf.set("spark.tachyonStore.folderName", tachyonFolderName)
val isLocal = (master == "local" || master.startsWith("local["))
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
// An asynchronous listener bus for Spark events
private[spark] val listenerBus = new LiveListenerBus
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.create(
"&driver&",
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
isDriver = true,
isLocal = isLocal,
listenerBus = listenerBus)
SparkEnv.set(env)
156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
&&private[spark] val conf = config.clone()&&conf.validateSettings()&&&/**&& * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be&& * changed at runtime.&& */&&def getConf: SparkConf = conf.clone()&&&if (!conf.contains("spark.master")) {&&&&throw new SparkException("A master URL must be set in your configuration")&&}&&if (!conf.contains("spark.app.name")) {&&&&throw new SparkException("An application name must be set in your configuration")&&}&&&if (conf.getBoolean("spark.logConf", false)) {&&&&logInfo("Spark configuration:\n" + conf.toDebugString)&&}&&&// Set Spark driver host and port system properties&&conf.setIfMissing("spark.driver.host", Utils.localHostName())&&conf.setIfMissing("spark.driver.port", "0")&&&val jars: Seq[String] =&&&&conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten&&&val files: Seq[String] =&&&&conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten&&&val master = conf.get("spark.master")&&val appName = conf.get("spark.app.name")&&&// Generate the random name for a temp folder in Tachyon&&// Add a timestamp as the suffix here to make it more safe&&val tachyonFolderName = "spark-" + randomUUID.toString()&&conf.set("spark.tachyonStore.folderName", tachyonFolderName)&&&val isLocal = (master == "local" || master.startsWith("local["))&&&if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")&&&// An asynchronous listener bus for Spark events&&private[spark] val listenerBus = new LiveListenerBus&&&// Create the Spark execution environment (cache, map output tracker, etc)&&private[spark] val env = SparkEnv.create(&&&&conf,&&&&"&driver&",&&&&conf.get("spark.driver.host"),&&&&conf.get("spark.driver.port").toInt,&&&&isDriver = true,&&&&isLocal = isLocal,&&&&listenerBus = listenerBus)&&SparkEnv.set(env)
启动LiveListenerBus
// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (conf.getBoolean("spark.eventLog.enabled", false)) {
val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
} else None
// At this point, all relevant SparkListeners have been registered, so begin releasing events
listenerBus.start()
248249250251252253254255256257258259
&&// Optionally log Spark events&&private[spark] val eventLogger: Option[EventLoggingListener] = {&&&&if (conf.getBoolean("spark.eventLog.enabled", false)) {&&&&&&val logger = new EventLoggingListener(appName, conf, hadoopConfiguration)&&&&&&logger.start()&&&&&&listenerBus.addListener(logger)&&&&&&Some(logger)&&&&} else None&&}&&&// At this point, all relevant SparkListeners have been registered, so begin releasing events&&listenerBus.start()
然后实例化taskScheduler DAGScheduler,为后续任务调度做好准备
// Create and start the scheduler
private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)
@volatile private[spark] var dagScheduler: DAGScheduler = _
dagScheduler = new DAGScheduler(this)
case e: Exception =& throw
new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
taskScheduler.start()
306307308309310311312313314315316317318
&&// Create and start the scheduler&&private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)&&@volatile private[spark] var dagScheduler: DAGScheduler = _&&try {&&&&dagScheduler = new DAGScheduler(this)&&} catch {&&&&case e: Exception =& throw&&&&&&new SparkException("DAGScheduler cannot be initialized due to %s".format(e.getMessage))&&}&&&// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's&&// constructor&&taskScheduler.start()
最终会返回一个SparkContext的实例sc,下图是sc的数据结构信息
后续继续分析剩余代码
Post navigation关于spark代码的阅读 可以看我的这个回答也可以直接看我的日志
我也答一下这个题目吧,我自己看过HDFS以及HDFS Raid的源码,其他的偶尔也看一下。个人感觉大致以下一些步骤吧:1. 看官方网站的描述,知道项目的定位、功能、常见用例。2. 搜集文档,与项目相关的论文、整体架构文档、某些重要feature的文档等,如果你愿意像 所说的那样email项目主要contributors要一些文档的话,也可以。另外,知名的hadoop和spark的技术博客一定不要错过,比如Yahoo!、Cloudera、Hortonworks、Databricks等的官方博客。对于不明白的feature,可以考虑先去StackOverflow上找找相关信息,给自己普及一下基础知识。3. 下载项目源码,看一下源码的layout,了解相关功能在哪个目录下。使用有效地IDE开始分析。4. 如果已有别人的源码分析,搞过来看看。比如HDFS、MR的分析已经有书存在了。5. 搭建一个单机的系统,run起来,看看日志,打开debug模式跟踪一下。6. 自己整理分析笔记之类的,输出有助于更好的思考。ps:建议1,2先行。3,4,5可能需要交叉进行。-------------------------------------------------------------以上公司名字可能拼写错误,大概能知道是那个就行。欢迎大家补充,我也很希望学习到高效的项目分析、源码分析的方法。
不要直接读hadoop的源代码,他文档缺失的厉害,很累。正确的其实是去读mapr的源代码,文档全面,mapr就是hadoop的一个fork,更好理解的
数据挖掘,hadoop、spark性能分析,android app业余开发者,转载请注明来自:&&&&&&&&&&&&&&&&&&&&&&&
研究Spark源码也有一段时间了,一直都是直接看代码,没有调试。虽然带着思路去看源代码已经能够帮助我们去了解Spark了;但是很多细节从字面上是看不出来的,如果我能够通过运行时调试验证我的想法,或者能够查看某个类中变量和结构在运行时是什么岂不是更好?好,我们今天就来实现这个想法。
动手之前,我已经在网上找了关于spark调试的方法,要么就是local模式的,要么就是写的很模糊。spark local模式和其他分布式模式有很大不同,虽然可以在local模式下进行debug,但有很多东西只有在分布式模式下才有用,本文主要是介绍在Spark Standalone模式下如何调试Driver、Master、Worker和Executor(yarn模式比较复杂,还需要结合yarn的debug模式才能搞定,但研究standalone已经可以搞清楚spark的大部分原理了)。
一、主要思想
1、spark-class: 像Master、Worker、Driver都是通过spark-class脚本进行启动各自的jvm(Driver其实是间接由spark-class启动,提交程序时以spark-class启动SparkSubmit,然后SparkSubmit以反射的形式调用Driver的main方法从而实现driver的运行)。&
spark-class中设置了各jvm的参数,所以可以在这些参数中加入debug的相关参数。&
2、Executor即CoarseGrainedExecutorBackend的参数在spark-class脚本中修改没用,因为其不是通过spark-class启动的,其是被ExecutorRunner启动,在buildCommandSeq-&buildJavaOpts对相应参数进行设置,比如固定MaxPermSize=128m等。可以在SparkConf中设置spark.executor.extraJavaOptions参数。
二、前提要求
本文假定你已经掌握或完成了以下内容:
1、已经完成了一个Spark Standalone的集群(大小不重要,能用就行,不需要hdfs的支持),并且能够顺利启动和运行
2、IntelliJ IDEA、Scala插件、Java JDK、Scala SDK都已经安装和配置完成
3、拥有java开发基础&
三、新建测试项目
启动IntelliJ IDEA,选择New Project,然后选择Scala,点击下一步
输入项目名称和参数继续下一步:
在src目录下新建一个scala类对象:RemoteDebug,我们将用这个类来做测试。
&输入以下代码(相信你看出来了,就是官网的计算π的例子):
object RemoteDebug {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(&Spark Pi&).setMaster(&spark://Master:7077&)
.setJars(List(&F:\\Spark\\SparkRemoteDebug\\out\\artifacts\\SparkRemoteDebug_jar\\SparkRemoteDebug.jar&))
val spark = new SparkContext(conf)
val slices = if (args.length & 0) args(0).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =&
val x = random * 2 - 1
val y = random * 2 - 1
if (x * x + y * y & 1) 1 else 0
}.reduce(_ + _)
println(&Pi is roughly & + 4.0 * count / n)
spark.stop()
注意以下两点:
1、”setMaster(&spark://Master:7077&)&”不能忘,因为我们是要在Standalone集群上运行的,“Master“就是master所在的主机名,如果你没有在本机配置“Master”指向集群中的master机器IP的话,请直接使用IP,如:spark://192.168.1.103:7070。
2、”setJars(List(&F:\\Spark\\SparkRemoteDebug\\out\\artifacts\\SparkRemoteDebug_jar\\SparkRemoteDebug.jar&))“,告诉Spark 集群我们要提交的作业的代码在哪里,也就是我们包含我们程序的Jar包的路径,记住路径中千万别包含中文,不然会出错(血的教训)。
首先给项目添加Spark的依赖jar以及源码zip,选择项目,按下F4,就会弹出下面的配置窗体:
接下来配置我们的程序打包:
好,让我们来看看我们的测试结果。启动Spark集群成功后,你应该可以看到SparkUI界面(以下截图是我的环境):
为我们的程序添加一个启动项:
好,点击旁边的绿色小三角启动按钮,启动我们的程序,查看运行结果:
至此,我们的程序已经能够正常在集群上运行,并返回结果了,下一步我们就来看看怎么调试Driver、Master和Worker以及Executor。&
三、调试Spark Standalone
1、修改Master配置。
首先,我们停止我们的spark Cluster,因为我们需要修改一一些参数,打开Master所在机器的spark-class文件进行编辑,记得先备份哦
找到并修改为以下内容:
找到以下内容:
# Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
'org.apache.spark.deploy.worker.Worker')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
'org.apache.spark.deploy.history.HistoryServer')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
# Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,address=8002,server=y,suspend=n&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
'org.apache.spark.deploy.worker.Worker')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,address=8003,server=y,suspend=n&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
'org.apache.spark.deploy.history.HistoryServer')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
2、修改Worker配置
同理(文件位置参考Master机器),找到以下内容并修改:
找到以下内容:
# Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
'org.apache.spark.deploy.worker.Worker')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
'org.apache.spark.deploy.history.HistoryServer')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
PARK_DAEMON_MEMORY.
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
'org.apache.spark.deploy.worker.Worker')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS -Xdebug -Xrunjdwp:transport=dt_socket,address=8009,server=y,suspend=n&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
'org.apache.spark.deploy.history.HistoryServer')
OUR_JAVA_OPTS=&$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS&
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
3、重新启动Spark Cluster
查看Master和Worker的日志
好,可以看到,我们的Master和Worker都已经启动成功,并且按照我们的配置监听各自的端口,下面我们就通过程序来调试它们。&
4、开启调试Master和Worker
回到我们的idea中,添加两个Remote启动项
重要的时刻来了,我们先启动调试Master,并加上属于Master代码的断点:
&可以看到,idea已经连接到了我们Cluster中的Master机器的8002端口,而这正是我们在集群中配置的端口。同理启动Slave1(Worker)
为了能够调试Executor,我们得修改一下我们前面写的代码,修改后的代码如下:
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(&Spark Pi&).setMaster(&spark://Master:7077&)
.setJars(List(&F:\\Spark\\SparkRemoteDebug\\out\\artifacts\\SparkRemoteDebug_jar\\SparkRemoteDebug.jar&))
.set(&spark.executor.extraJavaOptions&, &-Xdebug -Xrunjdwp:transport=dt_socket,address=8005,server=y,suspend=n&)
println(&sleep begin.&)
Thread.sleep(10000) //等待10s,让有足够时间启动driver的remote debug
println(&sleep end.&)
val spark = new SparkContext(conf)
val slices = if (args.length & 0) args(0).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =&
val x = random * 2 - 1
val y = random * 2 - 1
if (x * x + y * y & 1) 1 else 0
}.reduce(_ + _)
println(&Pi is roughly & + 4.0 * count / n)
spark.stop()
最后,我们来测试一下我们的成果:
Executor(CoarseGrainedExecutorBackend)是运行在Worker上的另一个JVM进程,貌似我这次实验并没有进入断点,等哪天找到方法,再补上。
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:39333次
排名:千里之外
原创:19篇
转载:97篇
(1)(3)(5)(17)(8)(4)(5)(4)(16)(15)(6)(4)(1)(5)(1)(8)(12)(1)}

我要回帖

更多关于 openfire spark 源码 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信