启动入口
CliFrontend.main -> cli.parseParameters -> ACTION_RUN run(params); -> executeProgram -> invokeInteractiveModeForExecution
-> callMainMethod(){
mainMethod = entryClass.getMethod("main", String[].class);
mainMethod.invoke(null, (Object) args);
}
--> SocketWindowWordCount.main(){
final String hostname;
final int port;
try {
final ParameterTool params = ParameterTool.fromArgs(args);
hostname = params.has("hostname") ? params.get("hostname") : "localhost";
port = params.getInt("port");
} catch(Exception e) {
return;
}
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.socketTextStream(hostname, port, "\n");
DataStream<WordWithCount> windowCounts = text
.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) {
for(String word : value.split("\\s")) {
out.collect(new WordWithCount(word, 1L));
}
}
})
.keyBy(value -> value.word)
.timeWindow(Time.seconds(5))
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount a, WordWithCount b) {
return new WordWithCount(a.word, a.count + b.count);
}
});
windowCounts.print().setParallelism(1);
env.execute("Socket Window WordCount");
}
--> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment 是 Flink 应用程序的执行入口,提供了一些重要的操作机制:
1、提供了 readTextFile(), socketTextStream(), createInput(), addSource() 等方法去对接数据源
2、提供了 setParallelism() 设置程序的并行度
3、StreamExecutionEnvironment 管理了 ExecutionConfig 对象,该对象负责Job执行的一些行为配置管理。
还管理了 Configuration 管理一些其他的配置
4、StreamExecutionEnvironment 管理了一个 List<Transformation<?>> transformations
成员变量,该成员变量,主要用于保存 Job 的各种算子转化得到的 Transformation,把这些Transformation 按照逻辑拼接起来,就能得到 StreamGragh(Transformation ->StreamOperator -> StreamNode)
5、StreamExecutionEnvironment 提供了 execute() 方法主要用于提交 Job 执行。该方法接收的参数就是:StreamGraph
--> env.socketTextStream -> addSource(){
TypeInformation<OUT> resolvedTypeInfo = getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
return new DataStreamSource<>(this, resolvedTypeInfo, sourceOperator, isParallel, sourceName);
}
--> text.flatMap(讲算子生成 Transformation 加入到 Env 中的 transformations 集合中){
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes();
return flatMap(flatMapper, outType);
--> flatMap(){
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
---> doTransform(){
transformation.getOutputType();
OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(this.transformation, operatorName, operatorFactory, outTypeInfo,environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"}) SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
}
-> env.execute(提交执行)
StreamGraph
env.execute() -> StreamGraph sg = getStreamGraph(jobName); -> getStreamGraphGenerator().setJobName(jobName).generate(){
for(Transformation<?> transformation : transformations) {
transform(transformation);
}
--> transformOneInputTransform(){
streamGraph
.addOperator(transform.getId(), slotSharingGroup, transform.getCoLocationGroupKey(), transform.getOperatorFactory(), transform.getInputType(),transform.getOutputType(), transform.getName());
}
-->addOperator -> addNode(){
StreamNode vertex = new StreamNode(vertexID, slotSharingGroup, coLocationGroup, operatorFactory, operatorName, new ArrayList<OutputSelector<?>>(),vertexClass);
streamNodes.put(vertexID, vertex);
}
for(Integer inputId : inputIds) {
streamGraph.addEdge(inputId, transform.getId(), 0);
}
-> addEdge -> addEdgeInternal(){
StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode, typeNumber, outputNames, partitioner, outputTag, shuffleMode);
getStreamNode(edge.getSourceId()).addOutEdge(edge);
getStreamNode(edge.getTargetId()).addInEdge(edge);
}
}
总结:
1、生成上游顶点和下游顶点 StreamNode upstreamNode | StreamNode downstreamNode
2、根据上下游顶点生成 StreamEdge StreamEdge edge = new StreamEdge(upstreamNode, downstreamNode...)
3、将成的StreamEdge 加入上游StreamNode 的 出边 getStreamNode(edge.getSourceId()).addOutEdge(edge);
为啥不直接用 upstreamNode.addOutEdge(edge);
4、将成的StreamEdge 加入下游StreamNode 的 入边 getStreamNode(edge.getTargetId()).addInEdge(edge);
JobGraph
execute(sg); -> executeAsync -> AbstractSessionClusterExecutor.execute(){
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
clusterClient.submitJob(jobGraph)
}
-> PipelineExecutorUtils.getJobGraph(pipeline, configuration) -> FlinkPipelineTranslationUtil.getJobGraph
-> pipelineTranslator.translateToJobGraph -> streamGraph.getJobGraph -> StreamingJobGraphGenerator.createJobGraph
-> new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(){
setChaining(hashes, legacyHashes);
setPhysicalEdges();
setSlotSharingAndCoLocation();
}
--> setChaining(){
for(Integer sourceNodeId : streamGraph.getSourceIDs()) {
createChain(sourceNodeId, 0, new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, streamGraph));
}
}
---> createChain(){
for(StreamEdge outEdge : currentNode.getOutEdges()) {
if(isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
}
}
for(StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(createChain(chainable.getTargetId(), chainIndex + 1, chainInfo));
}
for(StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
createChain(nonChainable.getTargetId(), 0, chainInfo.newChain(nonChainable.getTargetId()));
}
StreamConfig config = currentNodeId.equals(startNodeId) ?
createJobVertex(startNodeId, chainInfo) : new StreamConfig(new Configuration());
for(StreamEdge edge : transitiveOutEdges) {
connect(startNodeId, edge);
}
}
isChainable(){
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
return downStreamVertex.getInEdges().size() == 1
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph){
StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory();
StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory();
if(downStreamOperator == null || upStreamOperator == null) {
return false;
}
if(upStreamOperator.getChainingStrategy() == ChainingStrategy.NEVER ||
downStreamOperator.getChainingStrategy() != ChainingStrategy.ALWAYS) {
return false;
}
}
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}
createJobVertex(){
StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
JobVertexID jobVertexId = new JobVertexID(hash);
if(chainedInputOutputFormats.containsKey(streamNodeId)) {
jobVertex = new InputOutputFormatVertex(chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);
chainedInputOutputFormats.get(streamNodeId).write(new TaskConfig(jobVertex.getConfiguration()));
} else {
jobVertex = new JobVertex(chainedNames.get(streamNodeId), jobVertexId, operatorIDPairs);
}
jobGraph.addVertex(jobVertex);
}
connect(startNodeId, edge){
JobEdge jobEdge;
if(isPointwisePartitioner(partitioner)) {
jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE, resultPartitionType);
} else {
jobEdge = downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType);
}
}
----> connectNewDataSetAsInput(){
IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
JobEdge edge = new JobEdge(dataSet, this, distPattern);
this.inputs.add(edge);
dataSet.addConsumer(edge);
return edge;
}
==================================================================================================================================
-> clusterClient.submitJob(jobGraph){
CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {
try {
final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");
try(ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {
objectOut.writeObject(jobGraph);
CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {
List<String> jarFileNames = new ArrayList<>(8);
List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);
Collection<FileUpload> filesToUpload = new ArrayList<>(8);
filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
for(Path jar : jobGraph.getUserJars()) {
jarFileNames.add(jar.getName());
filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));
}
requestAndFileUploads -> sendRetriableRequest(JobSubmitHeaders.getInstance(), EmptyMessageParameters.getInstance(), requestAndFileUploads.f0,
requestAndFileUploads.f1, isConnectionProblemOrServiceUnavailable()));
Files.delete(jobGraphFile);
}
--> sendRetriableRequest -> restClient.sendRequest(){
final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
httpRequest.writeTo(channel);
}
ExecutionGraph
接上文
httpRequest.writeTo(channel);
发送请求 到 WebMonitorEndpoint 的 Netty 服务端 最终 JobSubmitHandler 来执行处理
具体参考 2.1 启动 webMonitorEndpoint 源码分析
JobSubmitHandler.handleRequest() -> DispatcherGateway.submitJob -> internalSubmitJob -> persistAndRunJob
-> runJob(){
final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
FunctionUtils.uncheckedFunction(this::startJobManagerRunner)
}
createJobManagerRunner -> createJobManagerRunner -> new JobManagerRunnerImpl(负责启动 JobMaster)
-> jobMasterFactory.createJobMasterService -> new JobMaster
-> this.schedulerNG = createScheduler(jobManagerJobMetricGroup); -> createInstance -> new DefaultScheduler
-> super -> SchedulerBase(){
this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
-> ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker);
}
--> createExecutionGraph() -> ExecutionGraphBuilder.buildGraph(){
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
executionGraph.attachJobGraph(sortedTopology);
} -> attachJobGraph(){
for(JobVertex jobVertex : topologiallySorted) {
if(jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
this.isStoppable = false;
}
ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, maxPriorAttemptsHistoryLength, rpcTimeout, globalModVersion, createTimestamp);
ejv.connectToPredecessors(this.intermediateResults);
ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
if(previousTask != null) {
throw new JobException(
String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", jobVertex.getID(), ejv, previousTask));
}
for(IntermediateResult res : ejv.getProducedDataSets()) {
IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
if(previousDataSet != null) {
throw new JobException(
String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", res.getId(), res, previousDataSet));
}
}
this.verticesInCreationOrder.add(ejv);
this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);
}
}
--> ejv.connectToPredecessors(this.intermediateResults){
for(int num = 0; num < inputs.size(); num++) {
JobEdge edge = inputs.get(num);
IntermediateResult ires = intermediateDataSets.get(edge.getSourceId());
this.inputs.add(ires);
for(int i = 0; i < parallelism; i++) {
ExecutionVertex ev = taskVertices[i];
ev.connectSource(num, ires, edge, consumerIndex);
}
}
}
----> connectSource(){
edges = connectAllToAll(sourcePartitions, inputNumber){
for(int i = 0; i < sourcePartitions.length; i++) {
IntermediateResultPartition irp = sourcePartitions[i];
edges[i] = new ExecutionEdge(irp, this, inputNumber);
}
return edges;
}
}
startJobManagerRunner -> jobManagerRunner.start(); -> leaderElectionService.start(this); -> grantLeadership
-> verifyJobSchedulingStatusAndStartJobManager -> startJobMaster -> jobMasterService.start
-> startJobExecution(){
startJobMasterServices();
resetAndStartScheduler();
}
JobMaster 向 ResourceManager 和 TaskManager 注册和维持心跳
startJobMasterServices() -> startHeartbeatServices()
private void startHeartbeatServices() {
taskManagerHeartbeatManager = heartbeatServices
.createHeartbeatManagerSender(resourceId, new TaskManagerHeartbeatListener(), getMainThreadExecutor(), log);
resourceManagerHeartbeatManager = heartbeatServices
.createHeartbeatManager(resourceId, new ResourceManagerHeartbeatListener(), getMainThreadExecutor(), log);
}
reconnectToResourceManager -> tryConnectToResourceManager -> connectToResourceManager
-> resourceManagerConnection.start(){
createNewRegistration() -> generateRegistration
newRegistration.startRegistration() -> register -> invokeRegistration -> registerJobManager
-> registerJobMasterInternal(){
JobManagerRegistration jobManagerRegistration = new JobManagerRegistration(jobId, jobManagerResourceId, jobMasterGateway);
jobManagerRegistrations.put(jobId, jobManagerRegistration);
jmResourceIdRegistrations.put(jobManagerResourceId, jobManagerRegistration);
jobManagerHeartbeatManager.monitorTarget(){
public void requestHeartbeat(ResourceID resourceID, Void payload) {
jobMasterGateway.heartbeatFromResourceManager(resourceID);
}
}
}
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
JobMaster 开始申请 Slot,并且部署 Task
resetAndStartScheduler -> startScheduling
-> startAllOperatorCoordinators
-> startSchedulingInternal(){
schedulingStrategy.startScheduling(); -> allocateSlotsAndDeploy(){
final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments = allocateSlots(executionVertexDeploymentOptions);
waitForAllSlotsAndDeploy(deploymentHandles);
}
}
Slot 管理(申请和释放)源码解析
allocateSlots
Task 部署和提交
waitForAllSlotsAndDeploy
|