IT数码 购物 网址 头条 软件 日历 阅读 图书馆
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
图片批量下载器
↓批量下载图片,美女图库↓
图片自动播放器
↓图片自动播放器↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁
 
   -> 大数据 -> flink standalone 客户端提交源码分析 -> 正文阅读

[大数据]flink standalone 客户端提交源码分析

启动入口

CliFrontend.main ->  cli.parseParameters  -> ACTION_RUN run(params); -> executeProgram -> invokeInteractiveModeForExecution
 -> callMainMethod(){
   mainMethod = entryClass.getMethod("main", String[].class);
   mainMethod.invoke(null, (Object) args);
 }
 --> SocketWindowWordCount.main(){
 		/*************************************************
		 * TODO 
		 *  注释: 解析 host 和 port
		 */
		// the host and the port to connect to
		final String hostname;
		final int port;
		try {
			final ParameterTool params = ParameterTool.fromArgs(args);
			hostname = params.has("hostname") ? params.get("hostname") : "localhost";
			port = params.getInt("port");
		} catch(Exception e) {
			return;
		}

		/*************************************************
		 * TODO 
		 *  注释: 获取 StreamExecutionEnvironment
		 *  它呢,还是 跟 Spark 中的 SparkContext 还是有区别的!
		 */
		// get the execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*************************************************
		 * TODO 
		 *  注释: 加载数据源得到数据抽象:DataStream
		 *  其实最终,只是创建了一个 DataStreamSource 对象,然后把 SourceFunction(StreamOperator)和 StreamExecutionEnvironment
		 *  设置到了 DataStreamSource 中, DataStreamSource 是 DataStream 的子类
		 *  -
		 *  DataStream 的主要分类:
		 *  	DataStreamSource	流数据源
		 *  	DataStreamSink		流数据目的地
		 *  	KeyedStream			按key分组的数据流
		 *  	DataStream			普通数据流
		 *  -
		 *  关于函数理解:
		 *  	Function			传参
		 *  	Operator			Graph 中抽象概念
		 *  	Transformation		一种针对流的逻辑操作
		 *	 最终: Function ---> Operator ---> Transformation
		 */
		// get input data by connecting to the socket
		DataStream<String> text = env.socketTextStream(hostname, port, "\n");

		// parse the data, group it, window it, and aggregate the counts
		DataStream<WordWithCount> windowCounts = text

			// TODO 注释: 讲算子生成 Transformation 加入到 Env 中的 transformations 集合中
			.flatMap(new FlatMapFunction<String, WordWithCount>() {
				@Override
				public void flatMap(String value, Collector<WordWithCount> out) {
					for(String word : value.split("\\s")) {
						out.collect(new WordWithCount(word, 1L));
					}
				}
			})

			// TODO 注释: 依然创建一个 DataStream(KeyedStream)
			.keyBy(value -> value.word)
			.timeWindow(Time.seconds(5))

			// TODO 注释:
			.reduce(new ReduceFunction<WordWithCount>() {
				@Override
				public WordWithCount reduce(WordWithCount a, WordWithCount b) {
					return new WordWithCount(a.word, a.count + b.count);
				}
			});

		// print the results with a single thread, rather than in parallel
		windowCounts.print().setParallelism(1);

		/*************************************************
		 * TODO 
		 *  注释: 提交执行
		 */
		env.execute("Socket Window WordCount");
 }
--> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironmentFlink 应用程序的执行入口,提供了一些重要的操作机制:
1、提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源
2、提供了 setParallelism() 设置程序的并行度
3StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责Job执行的一些行为配置管理。
   还管理了 Configuration 管理一些其他的配置
4StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations
成员变量,该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些Transformation 按照逻辑拼接起来,就能得到 StreamGraghTransformation ->StreamOperator -> StreamNode5StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的参数就是:StreamGraph

--> env.socketTextStream -> addSource(){
		/*************************************************
		 * TODO 
		 *  注释: 获取输出数据类型
		 */
		TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
		// TODO 注释: 判断是否是并行
		boolean isParallel = function instanceof ParallelSourceFunction;
		clean(function);
		/*************************************************
		 * TODO 
		 *  注释: 构建 SourceOperator
		 *  它是 SourceFunction 的子类,也是  StreamOperator 的子类
		 */
		final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
		/*************************************************
		 * TODO 
		 *  注释: 返回 DataStreamSource
		 *  关于这个东西的抽象有四种:
		 *  1、DataStream
		 *  2、KeyedDataStream
		 *  3、DataStreamSource
		 *  4、DataStreamSink
		 */
		return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);
}
--> text.flatMap(讲算子生成 Transformation 加入到 Env 中的 transformations 集合中){
		/*************************************************
		 * TODO 
		 *  注释: 通过反射拿到 算子的类型
		 */
		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes();

		/*************************************************
		 * TODO 
		 *  注释: 算子执行的真正操作逻辑是: 将算子构建成 Transformation 加入 Env 中的 transformation 中的
		 *  transformations 集合中。将来执行 StreamGraph 生成的时候,会将 Transformation 变成 Operator
		 *  -
		 *  flatMap 到最后,还是构建一个 DataStream (SingleOutputStreamOperator)对象返回,然后将 Transformation 加入到
		 *  transformations 集合中,等待将来提交的之后,构建成 StreamGraph
		 */
		return flatMap(flatMapper, outType);
		
		--> flatMap(){
				/*************************************************
				 * TODO 
				 *  注释: flink把每一个算子transform成一个对流的转换
				 *  并且注册到执行环境中,用于生成StreamGraph
				 *  -
				 *  第一步:用户代码里定义的UDF会被当作其基类对待,然后交给 StreamFlatMap 这个 operator 做进一步包装。
				 *  事实上,每一个Transformation都对应了一个StreamOperator。
				 *  -
				 *  flink流式计算的核心概念,就是将数据从输入流一个个传递给Operator进行链式处理,最后交给输出流的过程
				 *  -
				 *  StreamFlatMap 是一个 Function 也是一个 StreamOperator
				 *  -
				 *  StreamFlatMap = StreamOperator
				 *  flatMapper = Function
				 *  -最终调用 transform 方法来把 StreamFlatMap 这种StreamOperator 转换成 Transformation
				 *  最终加入到 StreamExectiionEnvironment 的 List<Transformation<?>> transformations
				 */
				return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
		}
		---> doTransform(){
				// read the output type of the input Transform to coax out errors about MissingTypeInfo
				transformation.getOutputType();
		
				/*************************************************
				 * TODO 
				 *  注释: 构建: OneInputTransformation
				 *  由于 flatMap 这个操作只接受一个输入,所以再被进一步包装为 OneInputTransformation
				 */
				OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation, operatorName, operatorFactory, outTypeInfo,environment.getParallelism());
		
				/*************************************************
				 * TODO 
				 *  注释: 构建: SingleOutputStreamOperator
				 */
				@SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
		
				/*************************************************
				 * TODO 重点
				 *  注释: 把 Operator 注册到执行环境中,用于生成 StreamGraph
				 *  最后,将该 transformation 注册到执行环境中,当执行 generate 方法时,生成 StreamGraph 图结构。
				 */
				getExecutionEnvironment().addOperator(resultTransform);
		
				/*************************************************
				 * TODO 
				 *  注释:
				 *  SingleOutputStreamOperator 也是 DataStream 的子类,也就是返回了一个新的 DataStream
				 * 	然后调用新的 DataStream 的某一个算子,又生成新的 StreamTransformation,
				 * 	继续加入到 StreamExecutionEnvironment 的 transformations
				 */
				return returnStream;
		}
}
-> env.execute(提交执行)

StreamGraph

env.execute() -> StreamGraph sg = getStreamGraph(jobName); ->  getStreamGraphGenerator().setJobName(jobName).generate(){
		/*************************************************
		 * TODO 
		 *  注释: 执行各种算子的 transformation: 由 算子 生成 Transformation 来构建 StreamGraph
		 *  当时在执行各种算子的时候,就已经把算子转换成对应的 Transformation 放入 transformations 集合中了
		 *  自底向上(先遍历 input transformations) 对转换树的每个 transformation 进行转换
		 */
		for(Transformation<?> transformation : transformations) {

			// TODO 注释: 从 Env 对象中,把 Transformation 拿出来,然后转换成 StreamNode
			// TODO 注释: Function --> Operator --> Transformation --> StreamNode
			transform(transformation);
		}
		--> transformOneInputTransform(){
		// 1. 生成 streamNode
		/*************************************************
		 * TODO 
		 *  注释: 添加一个 Operator(实际上 StreamGraph 端会添加一个 StreamNode)
		 */
		streamGraph
			.addOperator(transform.getId(), slotSharingGroup, transform.getCoLocationGroupKey(), transform.getOperatorFactory(), transform.getInputType(),transform.getOutputType(), transform.getName());
		}
		-->addOperator -> addNode(){
				/*************************************************
				 * TODO 
				 *  注释: 生成一个 StreamNode
				 */
				StreamNode vertex = new StreamNode(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, operatorName, new ArrayList<OutputSelector<?>>(),vertexClass);
		
				/*************************************************
				 * TODO 
				 *  注释: 添加一个 StreamNode
				 *
				 *   这个 vertexID 就是 Transformation id  transform.getId()
				 *   Transformation id 在创建 Transformation 时生成
				 *   即一个Transformation 对应一个 StreamNode
				 */
				streamNodes.put(vertexID, vertex);
		}
		// 2. 生成 StreamEdge
		/*************************************************
		 * TODO 
		 *  注释: 定义该 SreamNode 的 入边
		 */
		for(Integer inputId : inputIds) {

			/*************************************************
			 * TODO 
			 *  注释: 设置当前 StreamNode 和 上游所有 StreamNode 之间的 StreamEdge
			 *            inputId           上游
			 *            transform.getId() 下游
			 */
			streamGraph.addEdge(inputId, transform.getId(), 0);
		}
		-> addEdge -> addEdgeInternal(){
					/*************************************************
			 * TODO 
			 *  注释: 构建 StreamNode 之间的 边(StreamEdge) 对象
			 */
			StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);

			// TODO 注释: 给 上游 StreamNode 设置 出边
			getStreamNode(edge.getSourceId()).addOutEdge(edge);

			// TODO 注释: 给 下游 StreamNode 设置 入边
			getStreamNode(edge.getTargetId()).addInEdge(edge);
		}
}

 总结:
1、生成上游顶点和下游顶点  StreamNode upstreamNode  | StreamNode downstreamNode
2、根据上下游顶点生成 StreamEdge  StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode...)
3、将成的StreamEdge 加入上游StreamNode 的 出边   getStreamNode(edge.getSourceId()).addOutEdge(edge);
	  为啥不直接用  upstreamNode.addOutEdge(edge);
4、将成的StreamEdge 加入下游StreamNode 的 入边   getStreamNode(edge.getTargetId()).addInEdge(edge);

JobGraph

execute(sg); -> executeAsync -> AbstractSessionClusterExecutor.execute(){
        // 1. 将streamgraph优化得到jobgraph
		final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
		// 2. 调用RestClient中的netty 客户端进行提交 到 服务端执行
		// 通过 channel 把请求数据,发给 WebMonitorEndpoint 中的 JobSubmitHandler 来执行处理
		clusterClient.submitJob(jobGraph)
}
// 1. 将streamgraph优化得到jobgraph
-> PipelineExecutorUtils.getJobGraph(pipeline, configuration) -> FlinkPipelineTranslationUtil.getJobGraph
  -> pipelineTranslator.translateToJobGraph -> streamGraph.getJobGraph -> StreamingJobGraphGenerator.createJobGraph
   -> new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(){
   		/*************************************************
		 * TODO 
		 *  注释: 重点: 设置 Chaining, 将可以 Chain 到一起的 StreamNode Chain 在一起,
		 *  这里会生成相应的 JobVertex 、JobEdge 、 IntermediateDataSet 对象
		 *  把能 chain 在一起的 Operator 都合并了,变成了 OperatorChain
		 *  -
		 *  大致逻辑:
		 *  这里的逻辑大致可以理解为,挨个遍历节点:
		 *  1、如果该节点是一个 chain 的头节点,就生成一个 JobVertex,
		 *  2、如果不是头节点,就要把自身配置并入头节点,然后把头节点和自己的出边相连;
		 *  对于不能chain的节点,当作只有头节点处理即可
		 *  -
		 *  作用:
		 *  能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
		 */
		setChaining(hashes, legacyHashes);

		// TODO 注释: 设置 PhysicalEdges
		// TODO 注释: 将每个 JobVertex 的入边集合也序列化到该 JobVertex 的 StreamConfig 中
		// TODO 注释: 出边集合,已经在 上面的代码中,已经搞定了。
		setPhysicalEdges();

		// TODO 注释: 设置 SlotSharingAndCoLocation
		setSlotSharingAndCoLocation();
   }
   --> setChaining(){
	   /*************************************************
		 * TODO 
		 *  注释:
		 *  1、一个 StreamNode 也可以认为是 做了 chain 动作  StreamNode -> JobVertex
		 *  2、两个 StreamNode 做了 chain 动作  StreamNode + StreamNode -> JobVertex
		 */
		// TODO 注释: 处理每个 StreamNode
		for(Integer sourceNodeId : streamGraph.getSourceIDs()) {

			/*************************************************
			 * TODO 
			 *  注释: 把能 chain 在一起的 StreamNode 都 chain 在一起
			 */
			createChain(sourceNodeId, 0, new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph));
		}
   }
   ---> createChain(){
		   /*************************************************
			 * TODO  ->
			 *  注释: 判断是否可以 chain 在一起!
			 *  当前这个地方做的事情,只是当前这个 StreamNode 和它的直接下游 StreamNode
			 */
			for(StreamEdge outEdge : currentNode.getOutEdges()) {

				/*************************************************
				 * TODO  重点 1  isChainable
				 *  注释: 判断一个 StreamGraph 中的一个 StreamEdge 链接的上下游 Operator(StreamNode) 是否可以 chain 在一起
				 *
				 */
				if(isChainable(outEdge, streamGraph)) {
					// TODO 注释: 加入可 chain 集合
					chainableOutputs.add(outEdge);
				} else {
					// TODO 注释: 加入不可 chain 集合
					nonChainableOutputs.add(outEdge);
				}
			}
			// TODO 注释: 把可以 chain 在一起的 StreamEdge 两边的 Operator chain 在一个形成一个 OperatorChain
			for(StreamEdge chainable : chainableOutputs) {
				// TODO 注释: 递归 chain
				// TODO 注释: 如果可以 chain 在一起的话,这里的 chainIndex 会加 1
				transitiveOutEdges.addAll(createChain(chainable.getTargetId(), chainIndex + 1, chainInfo));
			}
			// 不能chain在一起的
			for(StreamEdge nonChainable : nonChainableOutputs) {
				transitiveOutEdges.add(nonChainable);
				// TODO 注释: 不能 chain 一起的话,这里的 chainIndex 是从 0 开始算的,后面也肯定会走到 createJobVertex 的逻辑
				createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId()));
			}
			
			/*************************************************
			 * TODO -> 重点 2 createJobVertex
			 *  注释: 把chain在一起的多个 Operator 创建成一个 JobVertex
			 *  如果当前节点是 chain 的起始节点, 则直接创建 JobVertex 并返回 StreamConfig, 否则先创建一个空的 StreamConfig
			 *  createJobVertex 函数就是根据 StreamNode 创建对应的 JobVertex, 并返回了空的 StreamConfig
			 *
			 * --总结:
			 * 	 StreamGraph -> JobGrahph
			 * 	 判断哪些StreamEge可以执行优化(chain),将 多个StreanNode 并成一个 JobVertex
			 *
			 * 	 StreamNode_A -> (StreamNode_B -> StreamNode_C)
			 * 	     B,C chain在一起,  startNodeId = B   // Integer startNodeId = chainInfo.getStartNodeId();
			 * 	        当 currentNodeId = B   则 B 创建 JobVertex
			 *          当 currentNodeId = C   则 C 不创建 JobVertex
			 */
			StreamConfig config = currentNodeId.equals(startNodeId) ?
				// TODO ->
				createJobVertex(startNodeId, chainInfo) : new StreamConfig(new Configuration());
			
			// TODO 注释: chain 在一起的多条边 connect 在一起
			for(StreamEdge edge : transitiveOutEdges) {
				/**
				 *  重点 3  根据 StreamNode和 StreamEdge 生成  JobEge 和 IntermediateDataSet 用来将JobVertex和JobEdge相连
				 * TODO ->
				 */
				connect(startNodeId, edge);
			}
	}
	//  重点 1  isChainable
	isChainable(){
	    // TODO 注释: 获取上游 SourceVertex
		StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
		// TODO 注释: 获取下游 TargetVertex
		StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
		
		/*************************************************
		 * TODO 
		 *  注释: 判断是否能 chain 在一起
		 */
   // TODO 条件1. 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入) A -> B  B A 一一对应 如果shuffle类,那么B的入度就 >= 2
		return downStreamVertex.getInEdges().size() == 1

			// TODO 注释: 条件2. 上下游算子实例处于同一个SlotSharingGroup中
			&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)

			// TODO -> 注释: 这里面有 3 个条件    条件 345
			&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph){
			    // TODO 注释: 获取 上游 Operator
	        	StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
				// TODO 注释: 获取 下游 Operator
				StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
				// TODO 注释:条件3、前后算子不为空  如果上下游有一个为空,则不能进行 chain    
				if(downStreamOperator == null || upStreamOperator == null) {
					return false;
				}
		
				/*************************************************
				 *  条件4、上游算子的链接策略是 always 或者 head
				 *  条件5、下游算子的链接策略必须是 always
				 */
				if(upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER ||
					downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) {
					return false;
				}
			}

			// TODO 注释:条件6 两个算子间的物理分区逻辑是ForwardPartitioner
			//  (无shuffle,当前节点的计算数据,只会发给自己 one to one 如上游50个task 计算完直接发送给下游50个task)
			&& (edge.getPartitioner() instanceof ForwardPartitioner)

			// TODO 注释:条件7 两个算子间的shuffle方式不等于批处理模式
			&& edge.getShuffleMode() != ShuffleMode.BATCH

			// TODO 注释:条件8 上下游算子实例的并行度相同
			&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()

			// TODO 注释:条件9 启动了 chain
			&& streamGraph.isChainingEnabled();
	}
	// 重点 2 createJobVertex
	createJobVertex(){
	    // TODO 注释: 获取 startStreamNode
		StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
		// TODO 注释: 生成一个 JobVertexID
		JobVertexID jobVertexId = new JobVertexID(hash);
		// JobVertex 初始化
        if(chainedInputOutputFormats.containsKey(streamNodeId)) {
			jobVertex = new InputOutputFormatVertex(chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);
			chainedInputOutputFormats.get(streamNodeId).write(new TaskConfig(jobVertex.getConfiguration()));
		} else {
			// TODO 注释: 创建一个 JobVertex
			jobVertex = new JobVertex(chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);
		}
		// 将生成好的 JobVertex 加入到: JobGraph
		jobGraph.addVertex(jobVertex);
	}
	// 重点 3  根据 StreamNode和 StreamEdge 生成  JobEge 和 IntermediateDataSet 用来将JobVertex和JobEdge相连
	connect(startNodeId, edge){
	    //生成JobEdge
	    JobEdge jobEdge;
		if(isPointwisePartitioner(partitioner)) {
			jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, resultPartitionType);
		} else {
			// TODO -> 创建 IntermediateDataSet
			jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
		}
	}
	----> connectNewDataSetAsInput(){
		// TODO -> input是JobVertex  即 JobVertex 创建 IntermediateDataSet
		IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
		// TODO 创建 JobEdge
		JobEdge edge = new JobEdge(dataSet, this, distPattern);
		this.inputs.add(edge);
		// TODO  IntermediateDataSet -> JobEdge
		dataSet.addConsumer(edge);
		return edge;
		// 至此形成流图     JobVertex -> IntermediateDataSet -> JobEdge
	}
==================================================================================================================================
// 2. 调用RestClient中的netty 客户端进行提交 到 服务端执行
-> clusterClient.submitJob(jobGraph){
		/*************************************************
		 * TODO 
		 *  注释: 先持久化: 把 JobGragh 持久化到磁盘文件形成 jobGraphFile
		 *  1、持久化 JobGragh 的前缀:flink-jobgraph
		 *  2、持久化 JobGragh 的后缀:.bin
		 *  当我们把 JobGraph 持久化了之后,变成了一个文件: jobGraphFile
		 *  然后其实,在提交 JobGraph 到 Flink 集群运行的时候,其实提交的就是: 这个文件!
		 *  将来,最终是有 FLink 集群的 WebMonitor(JobSubmitHandler) 去接收请求来执行处理
		 *  JobSubmitHandler 在执行处理的第一件事情: 把传送过来的这个文件反序列化得到 JobGraph 这个对象
		 */
		CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
			try {
				final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
				try(ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
					objectOut.writeObject(jobGraph);

		/*************************************************
		 * TODO 
		 *  注释: 等待持久化完成之后,然后加入带上传文件系列
		 *  补充: thenApply 接收一个函数作为参数,使用该函数处理上一个 CompletableFuture 调用的结果,并返回一个具有处理结果的 Future 对象。
		 */
		CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
			List<String> jarFileNames = new ArrayList<>(8);
			List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
			Collection<FileUpload> filesToUpload = new ArrayList<>(8);

			// TODO 注释: 加入待上传的文件系列
			filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));

			for(Path jar : jobGraph.getUserJars()) {
				jarFileNames.add(jar.getName());
				// 上传
				filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
			}
		
		// ->  TODO -> 注释:sendRetriableRequest() 提交   真正提交
			requestAndFileUploads -> sendRetriableRequest(JobSubmitHeaders.getInstance(), EmptyMessageParameters.getInstance(), requestAndFileUploads.f0,
				requestAndFileUploads.f1, isConnectionProblemOrServiceUnavailable()));
		
		// TODO 注释: 等 sendRetriableRequest 提交完成之后,删除生成的 jobGraghFile
				Files.delete(jobGraphFile);
}
--> sendRetriableRequest -> restClient.sendRequest(){
		/*************************************************
		 * TODO
		 *  注释: 通过 Netty 客户端发送请求给 Netty 服务端
		 */
		final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
		/*************************************************
		 * TODO ->
		 *  注释: 发送请求 到 WebMonitorEndpoint 的 Netty 服务端
		 *  最终由: JobSubmitHandler 来执行处理
		 */
		httpRequest.writeTo(channel);
}

ExecutionGraph

接上文  
httpRequest.writeTo(channel); 
发送请求 到 WebMonitorEndpointNetty 服务端 最终 JobSubmitHandler 来执行处理
具体参考 2.1 启动 webMonitorEndpoint 源码分析
 JobSubmitHandler.handleRequest() -> DispatcherGateway.submitJob -> internalSubmitJob -> persistAndRunJob
  -> runJob(){
  		/*************************************************
		 * TODO ->
		 *  注释: 创建 JobManagerRunner
		 *  在这里面会做一件重要的事情:
		 *  1、创建 JobMaster 实例
		 *  2、在创建 JobMaster 的时候,同时会把 JobGraph 编程 ExecutionGraph
		 *  -
		 *  严格来说,是启动 JobMaster, 那么这个地方的名字,就应该最好叫做: createJobMasterRunner
		 *  Flink 集群的一两个主从架构:
		 *  1、资源管理:  ResourceManager + TaskExecutor
		 *  2、任务运行:  JobMaster + StreamTask
		 */
		final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
			
			/*************************************************
			 * TODO  ->
			 *  注释: 提交任务 == start JobManagerRunner
			 */
		 FunctionUtils.uncheckedFunction(this::startJobManagerRunner)

  }
  // 重点1:createJobManagerRunner 创建 JobMaster , 将JobGraph 转换成 ExecutionGraph
  createJobManagerRunner -> createJobManagerRunner -> new JobManagerRunnerImpl(负责启动 JobMaster) 
   -> jobMasterFactory.createJobMasterService -> new JobMaster
    -> this.schedulerNG = createScheduler(jobManagerJobMetricGroup); -> createInstance -> new DefaultScheduler
      -> super -> SchedulerBase(){
        /*************************************************
		 * TODO  核心代码
		 *  注释: 获取 ExecutionGraph(成员变量)
		 *  读取 JobGraph 转换成 ExecutionGraph
		 */
		this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
		-> ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);
      }
   --> createExecutionGraph() -> ExecutionGraphBuilder.buildGraph(){
   		/*************************************************
		 * TODO 
		 *  注释: 设置 ExecutionGraph 的一些基本属性
		 *  1、JsonPlanGenerator.generatePlan(jobGraph) 根据 JobGraph 生成一个 JsonPlan
		 *  2、executionGraph.setJsonPlan(JsonPlan) 把 JsonPlan 设置到 ExecutionGraph
		 *
		 *  -> generatePlan
		 */
	     executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
	     /*************************************************
		 * TODO 
		 *  注释: 核心
		 *  ExecutionGraph 事实上只是改动了 JobGraph 的每个节点,而没有对整个拓扑结构进行变动,
		 *  所以代码里只是挨个遍历 jobVertex 并进行处理
		 */
		executionGraph.attachJobGraph(sortedTopology);
   } -> attachJobGraph(){
   		/*************************************************
		 * TODO 
		 *  注释: 遍历所有的 JobVertex
		 */
		for(JobVertex jobVertex : topologiallySorted) {

			if(jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
				this.isStoppable = false;
			}

			/*************************************************
			 * TODO 
			 *  注释: 一个 JobVertex 对应的创建一个 ExecutionJobVertex
			 */
			// create the execution job vertex and attach it to the graph
			ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, maxPriorAttemptsHistoryLength, rpcTimeout, globalModVersion, createTimestamp);

			/*************************************************
			 * TODO 
			 *  注释: 处理 JobEdge 和 IntermediateResult 和 ExecutionJobVertex中的 ExecutionVertex
			 *  对每个 JobEdge,获取对应的 IntermediateResult,并记录到本节点的输入上
			 *  最后,把每个 ExecutorVertex 和对应的 IntermediateResult 关联起来
			 */
			ejv.connectToPredecessors(this.intermediateResults);

			/*************************************************
			 * TODO 
			 *  注释: 将生成好的 ExecutionJobVertex 加入到 ExecutionGraph 中
			 */
			ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);

			if(previousTask != null) {
				throw new JobException(
					String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), ejv, previousTask));
			}

			/*************************************************
			 * TODO 
			 *  注释: 将当前 JobVertex 的输入 IntermediateResult 加入到 intermediateResults map 中
			 */
			for(IntermediateResult res : ejv.getProducedDataSets()) {
				IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
				if(previousDataSet != null) {
					throw new JobException(
						String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", res.getId(), res, previousDataSet));
				}
			}

			/*************************************************
			 * TODO 
			 *  注释: 使用创建顺序保存的 ExecutionJobVertex
			 */
			this.verticesInCreationOrder.add(ejv);

			// TODO 注释: 总并行度
			this.numVerticesTotal += ejv.getParallelism();

			// TODO 注释: ExecutionJobVertex 加入 newExecJobVertices List 中
			newExecJobVertices.add(ejv);
		}
   } 
   --> ejv.connectToPredecessors(this.intermediateResults){
   		/*************************************************
		 * TODO 
		 *  注释: 遍历每个 JobEdge
		 */
		for(int num = 0; num < inputs.size(); num++) {
			// TODO 注释: 遍历到一个 JobEdge
			JobEdge edge = inputs.get(num);
			// TODO 注释: 获取到 JobEdge 链接的 IntermediateResult
			IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
			// TODO 注释: 将当前 IntermediateResult 作为 ExecutionJobVertex 的输入
			// TODO 注释: 加入 inputs 集合,作为 ExecutionJobVertex 的输入
			this.inputs.add(ires);
			/*************************************************
			 * TODO 
			 *  注释: 根据并行度来设置 ExecutionVertex
			 */
			for(int i = 0; i < parallelism; i++) {
				ExecutionVertex ev = taskVertices[i];
				ev.connectSource(num, ires, edge, consumerIndex);
			}
		}
   }
   ----> connectSource(){
       edges = connectAllToAll(sourcePartitions, inputNumber){
       	 for(int i = 0; i < sourcePartitions.length; i++) {
			IntermediateResultPartition irp = sourcePartitions[i];
			/**
			 * this 这里就是ExecutionVertex
			 * 得到流图 IntermediateResultPartition -> ExecutionEdge -> ExecutionVertex
			 */
			edges[i] = new ExecutionEdge(irp, this, inputNumber);
		}
		/**
		 * 并行化后
		 *     IntermediateResult(2个partition)      ExecutionEdge      ExecutionJobVertex
		 *                                           ExecutionEdge -------- ExecutionVertex
		 *     IntermediateResultPartition
		 *                                           ExecutionEdge -------- ExecutionVertex
		 */

		return edges;
       }
   }
	/*************************************************
	 * TODO
	 *  注释: 一个 ExecutionVertex 就对应到 到时候真正执行的 StreamTask 一个
	 *  正常来说,一个StrewamTask 也需要申请得到一个 Slot
	 *
	 *  --总结:
	 *    JobGraph -> ExecutionGraph
	 *    将jobVertex并行化 -> 变成多个ExectionVertex
	 */

  // 重点2:startJobManagerRunner  启动 JobMaster
  startJobManagerRunner -> jobManagerRunner.start(); -> leaderElectionService.start(this); -> grantLeadership
   -> verifyJobSchedulingStatusAndStartJobManager -> startJobMaster -> jobMasterService.start 
     -> startJobExecution(){
     	/*************************************************
		 * TODO 
		 *  注释: 初始化一些必要的服务组件
		 *  JobMaster 的注册和心跳
		 */
		startJobMasterServices();
		/*************************************************
		 * TODO 
		 *  注释: 开始调度执行
		 *  JobMaster 调度 StreamTask 去运行
		 *
		 *  TODO 1.slot管理(申请和释放)
		 *       2.task部署和提交
		 */
		resetAndStartScheduler();
     }

JobMaster 向 ResourceManager 和 TaskManager 注册和维持心跳

startJobMasterServices()  -> startHeartbeatServices()
	/*************************************************
	 * TODO  第一步 启动心跳服务
	 *  注释:  TaskManager 上已经启动好了一个 JobMaster
	 *  1、JobMaster 需要向: ResourceManager 心跳汇报
	 *  2、JobMaster 需要向: TaskManager 维持心态
	 */
	private void startHeartbeatServices() {
		/*************************************************
		 * TODO 
		 *  注释: 创建心跳服务: taskManagerHeartbeatManager
		 */
		taskManagerHeartbeatManager = heartbeatServices
			.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log);
		/*************************************************
		 * TODO 
		 *  注释: 创建心跳服务: resourceManagerHeartbeatManager
		 */
		resourceManagerHeartbeatManager = heartbeatServices
			.createHeartbeatManager(resourceId, new ResourceManagerHeartbeatListener(), getMainThreadExecutor(), log);
	}
 		/*************************************************
		 * TODO  第二步
		 *  注释: JobMaster 连接 ResourceManager
		 *  主要的目的是为了: 向 ResourceManager 注册该 JobMaster
		 *  申请 Slot
		 */
       reconnectToResourceManager  -> tryConnectToResourceManager -> connectToResourceManager 
         -> resourceManagerConnection.start(){
           // 创建注册
           createNewRegistration() -> generateRegistration
           // 开始注册
           newRegistration.startRegistration() -> register -> invokeRegistration -> registerJobManager
             -> registerJobMasterInternal(){
             /*************************************************
			 * TODO 
			 *  注释: 执行注册
			 */
			JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, jobMasterGateway);
			jobManagerRegistrations.put(jobId, jobManagerRegistration);
			jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);

			// 维持心跳
			jobManagerHeartbeatManager.monitorTarget(){
			public void requestHeartbeat(ResourceID resourceID, Void payload) {
				/*************************************************
				 * TODO 
				 *  注释: 维持 JobMaster 和 ResourceManager 之间的心跳
				 */
				jobMasterGateway.heartbeatFromResourceManager(resourceID);
			}
		  }
        }

		/*************************************************
		 * TODO  第三步 监听 ResourceManager
		 *  注释: 工作准备就绪,请尝试与资源管理器建立连接
		 *  注册 start() 方法的参数:
		 *  1、ResourceManagerLeaderListener 是 LeaderRetrievalListener 的子类
		 *  2、NodeCacheListener 是 curator 提供的监听器,当指定的 zookeeper znode 节点数据发生改变,则会接收到通知
		 *     回调 nodeChanged() 方法
		 *  3、在 nodeChanged() 会调用对应的 LeaderRetrievalListener 的 notifyLeaderAddress() 方法
		 *  4、resourceManagerLeaderRetriever 的实现类是: LeaderRetrievalService的实现类:ZooKeeperLeaderRetrievalService
		 *  5、resourceManagerLeaderRetriever 进行监听,当发生变更的时候,就会回调:ResourceManagerLeaderListener 的 notifyLeaderAddress 方法
		 */
		resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());

JobMaster 开始申请 Slot,并且部署 Task

resetAndStartScheduler -> startScheduling 
  -> startAllOperatorCoordinators
  -> startSchedulingInternal(){
  		/*************************************************
		 * TODO 
		 *  注释: 调度
		 *  流式作业默认调度方式: schedulingStrategy = EagerSchedulingStrategy
		 *  1、EagerSchedulingStrategy(主要用于流式作业,所有顶点(ExecutionVertex)同时开始调度)
		 *  2、LazyFromSourcesSchedulingStrategy(主要用于批作业,从 Source 开始开始调度,其他顶点延迟调度)调度
		 */
		schedulingStrategy.startScheduling();  -> allocateSlotsAndDeploy(){
			/*************************************************
			 * TODO  -> 重点
			 *  注释: 申请Slot 核心入口
			 *  参数: 待调度执行的 ExecutionVertex 集合
			 *
			 *  1、slot会明确的表示出来,该slot 是属于哪个taskExecutor中的第几个slot
			 *  2、ExecutionVertex 代表是ExecutionGraph中的一个执行顶点
			 *  ExecutionGraph中的任何一个执行顶点都和申请到的所有slots中的某一个slot产生了映射关系
			 *  ExecutionVertex ===> slot ===> TaskExecutor ===> Index
			 *
			 */
			final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
		/*************************************************
		 * TODO 
		 *  注释: 部署运行
		 *  1、申请到了 slot
		 *  2、构件好了 Handler
		 *  3、执行部署
		 */
		waitForAllSlotsAndDeploy(deploymentHandles);
	}
 } 

Slot 管理(申请和释放)源码解析

allocateSlots

Task 部署和提交

waitForAllSlotsAndDeploy
  大数据 最新文章
实现Kafka至少消费一次
亚马逊云科技:还在苦于ETL?Zero ETL的时代
初探MapReduce
【SpringBoot框架篇】32.基于注解+redis实现
Elasticsearch:如何减少 Elasticsearch 集
Go redis操作
Redis面试题
专题五 Redis高并发场景
基于GBase8s和Calcite的多数据源查询
Redis——底层数据结构原理
上一篇文章      下一篇文章      查看所有文章
加:2021-10-17 12:03:34  更:2021-10-17 12:05:42 
 
开发: C++知识库 Java知识库 JavaScript Python PHP知识库 人工智能 区块链 大数据 移动开发 嵌入式 开发工具 数据结构与算法 开发测试 游戏开发 网络协议 系统运维
教程: HTML教程 CSS教程 JavaScript教程 Go语言教程 JQuery教程 VUE教程 VUE3教程 Bootstrap教程 SQL数据库教程 C语言教程 C++教程 Java教程 Python教程 Python3教程 C#教程
数码: 电脑 笔记本 显卡 显示器 固态硬盘 硬盘 耳机 手机 iphone vivo oppo 小米 华为 单反 装机 图拉丁

360图书馆 购物 三丰科技 阅读网 日历 万年历 2025年1日历 -2025/1/18 6:07:06-

图片自动播放器
↓图片自动播放器↓
TxT小说阅读器
↓语音阅读,小说下载,古典文学↓
一键清除垃圾
↓轻轻一点,清除系统垃圾↓
图片批量下载器
↓批量下载图片,美女图库↓
  网站联系: qq:121756557 email:121756557@qq.com  IT数码