| 首先,一个基本概念就是Spark应用程序从开始提交到task执行分了很多层。 
    应用调度器。主要是资源管理器,比如standalone,yarn等负责Spark整个应用的调度和集群资源的管理。job调度器。spark  的算子分为主要两大类,transform和action,其中每一个action都会产生一个job。这个job需要在executor提供的资源池里调度执行,当然并不少直接调度执行job。stage划分及调度。job具体会划分为若干stage,这个就有一个基本的概念就是宽依赖和窄依赖,宽依赖就会划分stage。stage也需要调度执行,从后往前划分,从前往后调度执行。task切割及调度。stage往下继续细化就是会根据不太的并行度划分出task集合,这个就是在executor上调度执行的基本单元,目前的调度默认是一个task一个cpu。Spark Streaming 的job生成是周期性的。当前job的执行时间超过生成周期就会产生job  累加。累加一定数目的job后有可能会导致应用程序失败。这个主要原因是由于FIFO的调度模式和Spark Streaming的默认单线程的job执行机制 3.Spark Streaming job生成 这个源码主要入口是StreamingContext#JobScheduler#JobGenerator对象,内部有个RecurringTimer,主要负责按照批处理时间周期产生GenrateJobs事件,当然在存在windows的情况下,该周期有可能不会生成job,要取决于滑动间隔,有兴趣自己去揭秘,浪尖星球里分享的视频教程里讲到了。具体代码块如下 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator") 
 我们直接看其实现代码块: eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {       override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)        override protected def onError(e: Throwable): Unit = {         jobScheduler.reportError("Error in job generator", e)       }     }     eventLoop.start() 
 event处理函数是processEvent方法 /** Processes all events */   private def processEvent(event: JobGeneratorEvent) {     logDebug("Got event " + event)     event match {       case GenerateJobs(time) => generateJobs(time)       case ClearMetadata(time) => clearMetadata(time)       case DoCheckpoint(time, clearCheckpointDataLater) =>         doCheckpoint(time, clearCheckpointDataLater)       case ClearCheckpointData(time) => clearCheckpointData(time)     }   } 
 在接受到GenerateJob事件的时候,会执行generateJobs代码,就是在该代码内部产生和调度job的。 /** Generate jobs and perform checkpointing for the given `time`.  */   private def generateJobs(time: Time) {     // Checkpoint all RDDs marked for checkpointing to ensure their lineages are     // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).     ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")     Try {       jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch       graph.generateJobs(time) // generate jobs using allocated block     } match {       case Success(jobs) =>         val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)         jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))       case Failure(e) =>         jobScheduler.reportError("Error generating jobs for time " + time, e)         PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)     }     eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))   } 
 (编辑:源码网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |