从上文 Flink1.15源码解析---- DispatcherResourceManagerComponent 我们知道WebMonitorEndpoint的创建及启动
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#create
// 构建了一个线程池用于执行 WebMonitorEndpointEndpoint 所接收到的client发送过来的请求
final ScheduledExecutorService executor =
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
// 初始化 MetricFetcher, 默认刷新间隔是10s
final long updateInterval =
configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher =
updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);
// 创建 三大组件之 WebMonitorEndpoint
webMonitorEndpoint =
restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
metricFetcher,
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
fatalErrorHandler);
// 启动 三大组件之 WebMonitorEndpoint
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
本文我们将详细的梳理 WebMonitorEndpoint 的构建与启动
WebMonitorEndpoint 由 restEndpointFactory 构建, restEndpointFactory 的初始化 由 DispatcherResourceManagerComponentFactory 根据启动方式不同

接下来我们以 StandaloneSessionClusterEntrypoint 为例 看 restEndpointFactory 的初始化
1、StandaloneSessionClusterEntrypoint 创建 DefaultDispatcherResourceManagerComponentFactory
org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint#createDispatcherResourceManagerComponentFactory
@Override
protected DefaultDispatcherResourceManagerComponentFactory
createDispatcherResourceManagerComponentFactory(Configuration configuration) {
return DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory(
StandaloneResourceManagerFactory.getInstance());
}
2、createSessionComponentFactory 包含三大组件工厂的创建
org.apache.flink.runtime.entrypoint.component.DefaultDispatcherResourceManagerComponentFactory#createSessionComponentFactory
public static DefaultDispatcherResourceManagerComponentFactory createSessionComponentFactory(
ResourceManagerFactory<?> resourceManagerFactory) {
return new DefaultDispatcherResourceManagerComponentFactory(
DefaultDispatcherRunnerFactory.createSessionRunner(
SessionDispatcherFactory.INSTANCE),
resourceManagerFactory,
SessionRestEndpointFactory.INSTANCE);
}
restEndpointFactory 是 SessionRestEndpointFactory.INSTANCE
RestEndpointFactory 创建 DispatcherRestEndpoint
/** {@link RestEndpointFactory} which creates a {@link DispatcherRestEndpoint}. */
public enum SessionRestEndpointFactory implements RestEndpointFactory<DispatcherGateway> {
INSTANCE;
@Override
public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
Configuration configuration,
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler)
throws Exception {
final RestHandlerConfiguration restHandlerConfiguration =
RestHandlerConfiguration.fromConfiguration(configuration);
// 创建 DispatcherRestEndpoint
return new DispatcherRestEndpoint(
dispatcherGatewayRetriever,
configuration,
restHandlerConfiguration,
resourceManagerGatewayRetriever,
transientBlobService,
executor,
metricFetcher,
leaderElectionService,
RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
fatalErrorHandler);
}
}
创建的 DispatcherRestEndpoint 是 Dispatcher 的 REST endpoint
/** REST endpoint for the {@link Dispatcher} component. */
public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway> {
//......
}

实际调用 org.apache.flink.runtime.rest.RestServerEndpoint#start
// 1、首先创建Router,来解析Client的请求并寻找对应的Handler
final Router router = new Router();
// 2、 注册了一堆Handler
// 2.1、初始化 handlers
final CompletableFuture<String> restAddressFuture = new CompletableFuture<>();
handlers = initializeHandlers(restAddressFuture);
// 2.2、将这些Handler进行排序,这里的排序是为了确认URL和Handler一对一的关系
/* sort the handlers such that they are ordered the following:
* /jobs
* /jobs/overview
* /jobs/:jobid
* /jobs/:jobid/config
* /:*
*/
Collections.sort(handlers, RestHandlerUrlComparator.INSTANCE);
// 2.3、 排序好后通过checkAllEndpointsAndHandlersAreUnique方法来确认唯一性
checkAllEndpointsAndHandlersAreUnique(handlers);
// 2.4、 注册 handlers
handlers.forEach(handler -> registerHandler(router, handler, log));
// 3.1、 ChannelInitializer 初始化
ChannelInitializer<SocketChannel> initializer =
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws ConfigurationException {
RouterHandler handler = new RouterHandler(router, responseHeaders);
// SSL should be the first handler in the pipeline
if (isHttpsEnabled()) {
ch.pipeline()
.addLast(
"ssl",
new RedirectingSslHandler(
restAddress,
restAddressFuture,
sslHandlerFactory));
}
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new FileUploadHandler(uploadDir))
.addLast(
new FlinkHttpObjectAggregator(
maxContentLength, responseHeaders));
for (InboundChannelHandlerFactory factory :
inboundChannelHandlerFactories) {
Optional<ChannelHandler> channelHandler =
factory.createHandler(configuration, responseHeaders);
if (channelHandler.isPresent()) {
ch.pipeline().addLast(channelHandler.get());
}
}
ch.pipeline()
.addLast(new ChunkedWriteHandler())
.addLast(handler.getName(), handler)
.addLast(new PipelineErrorHandler(log, responseHeaders));
}
};
NioEventLoopGroup bossGroup =
new NioEventLoopGroup(
1, new ExecutorThreadFactory("flink-rest-server-netty-boss"));
NioEventLoopGroup workerGroup =
new NioEventLoopGroup(
0, new ExecutorThreadFactory("flink-rest-server-netty-worker"));
bootstrap = new ServerBootstrap();
bootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(initializer);
// 3.3、 Binding rest endpoint
// 3.3.1、获取可用端口范围
Iterator<Integer> portsIterator;
try {
portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
} catch (IllegalConfigurationException e) {
throw e;
} catch (Exception e) {
throw new IllegalArgumentException(
"Invalid port range definition: " + restBindPortRange);
}
// 3.3.2、处理端口冲突 将逐一尝试端口是否可用
int chosenPort = 0;
while (portsIterator.hasNext()) {
try {
chosenPort = portsIterator.next();
final ChannelFuture channel;
// 绑定address,port 获取 channel
if (restBindAddress == null) {
channel = bootstrap.bind(chosenPort);
} else {
channel = bootstrap.bind(restBindAddress, chosenPort);
}
serverChannel = channel.syncUninterruptibly().channel();
break;
} catch (final Exception e) {
// syncUninterruptibly() throws checked exceptions via Unsafe
// continue if the exception is due to the port being in use, fail early
// otherwise
if (!(e instanceof java.net.BindException)) {
throw e;
}
}
}
if (serverChannel == null) {
throw new BindException(
"Could not start rest endpoint on any port in port range "
+ restBindPortRange);
}
log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort);
final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.localAddress();
final String advertisedAddress;
if (bindAddress.getAddress().isAnyLocalAddress()) {
advertisedAddress = this.restAddress;
} else {
advertisedAddress = bindAddress.getAddress().getHostAddress();
}
port = bindAddress.getPort();
log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").toString();
restAddressFuture.complete(restBaseUrl);
state = State.RUNNING;
/**
* Hook to start sub class specific services.
*
* @throws Exception if an error occurred
*/
protected abstract void startInternal() throws Exception;
我们看下子类 startInternal 的 具体实现
org.apache.flink.runtime.webmonitor.WebMonitorEndpoint#startInternal
@Override
public void startInternal() throws Exception {
// 1、 节点选举
leaderElectionService.start(this);
startExecutionGraphCacheCleanupTask();
if (hasWebUI) {
log.info("Web frontend listening at {}.", getRestBaseUrl());
}
}
HighAvailabilityServices 初始化, 根据 high-availability 的类型创建不同的 HighAvailabilityServices
leaderElectionService 初始化是在 WebMonitorEndpoint 创建时构建的。
highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
@Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService() {
synchronized (lock) {
checkNotShutdown();
return new StandaloneLeaderElectionService();
}
}
节点选举, 直接将 contender 设置为领导者, 此处的 contender 就是 WebMonitorEndpoint
@Override
public void start(LeaderContender newContender) throws Exception {
if (contender != null) {
// Service was already started
throw new IllegalArgumentException(
"Leader election service cannot be started multiple times.");
}
contender = Preconditions.checkNotNull(newContender);
// directly grant leadership to the given contender
contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);
}
org.apache.flink.runtime.highavailability.AbstractHaServices#getClusterRestEndpointLeaderElectionService
@Override
public LeaderElectionService getClusterRestEndpointLeaderElectionService() {
// 由子类实现 创建 选举leader服务
return createLeaderElectionService(getLeaderPathForRestServer());
}
子类实现
//org.apache.flink.runtime.highavailability.zookeeper.ZooKeeperHaServices#createLeaderElectionService
@Override
protected LeaderElectionService createLeaderElectionService(String leaderPath) {
return ZooKeeperUtils.createLeaderElectionService(getCuratorFramework(), leaderPath);
}
// 创建 DefaultLeaderElectionService
// org.apache.flink.runtime.util.ZooKeeperUtils#createLeaderElectionService
public static DefaultLeaderElectionService createLeaderElectionService(
final CuratorFramework client, final String path) {
return new DefaultLeaderElectionService(createLeaderElectionDriverFactory(client, path));
}
DefaultLeaderElectionService 启动节点选举, 此处传入的 contender 就是 WebMonitorEndpoint
Flink的选举使用的是Curator框架,节点的选举针对每一个参选对象,会创建一个选举驱动leaderElectionDriver,在完成选举之后,会回调两个方法,如果选举成功会回调isLeader方法,如果竞选失败则回调notLeader方法。
// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#start
@Override
public final void start(LeaderContender contender) throws Exception {
checkNotNull(contender, "Contender must not be null.");
Preconditions.checkState(leaderContender == null, "Contender was already set.");
synchronized (lock) {
running = true;
/*
在WebMonitorEndpoint中调用时,此contender为DispatcherRestEndPoint
在ResourceManager中调用时,contender为ResourceManager
在DispatcherRunner中调用时,contender为DispatcherRunner
*/
leaderContender = contender;
// 针对每一个参选对象,会创建一个选举驱动leaderElectionDriver
leaderElectionDriver =
leaderElectionDriverFactory.createLeaderElectionDriver(
this,
new LeaderElectionFatalErrorHandler(),
leaderContender.getDescription());
LOG.info("Starting DefaultLeaderElectionService with {}.", leaderElectionDriver);
}
}
Flink的选举使用的是Curator框架,节点的选举针对每一个参选对象,会创建一个选举驱动leaderElectionDriver,在完成选举之后,会回调两个方法,如果选举成功会回调isLeader方法,如果竞选失败则回调notLeader方法。

ZooKeeperLeaderElectionDriverFactory 创建 ZooKeeperLeaderElectionDriver, LeaderElectionDriver负责执行领导选举和存储
领导信息。
// org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriverFactory#createLeaderElectionDriver
@Override
public ZooKeeperLeaderElectionDriver createLeaderElectionDriver(
LeaderElectionEventHandler leaderEventHandler, // DefaultLeaderElectionService对象
FatalErrorHandler fatalErrorHandler, // new LeaderElectionFatalErrorHandler()
String leaderContenderDescription)
throws Exception {
return new ZooKeeperLeaderElectionDriver(
client, path, leaderEventHandler, fatalErrorHandler, leaderContenderDescription);
}
public ZooKeeperLeaderElectionDriver(
CuratorFramework client,
String path,
LeaderElectionEventHandler leaderElectionEventHandler, // 传入的是 DefaultLeaderElectionService
FatalErrorHandler fatalErrorHandler,
String leaderContenderDescription)
throws Exception {
checkNotNull(path);
this.client = checkNotNull(client);
this.connectionInformationPath = ZooKeeperUtils.generateConnectionInformationPath(path);
this.leaderElectionEventHandler = checkNotNull(leaderElectionEventHandler);
this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
this.leaderContenderDescription = checkNotNull(leaderContenderDescription);
leaderLatchPath = ZooKeeperUtils.generateLeaderLatchPath(path);
leaderLatch = new LeaderLatch(client, leaderLatchPath);
this.cache =
ZooKeeperUtils.createTreeCache(
client,
connectionInformationPath,
this::retrieveLeaderInformationFromZooKeeper);
running = true;
// 开始选举 zookeeper 的 leaderLatch 提供选举功能
leaderLatch.addListener(this);
leaderLatch.start();
/*
TODO 选举开始后,不就会接收到响应:
1.如果竞选成功,则回调该类的isLeader方法
2.如果竞选失败,则回调该类的notLeader方法
每一个竞选者对应一个竞选Driver
*/
cache.start();
client.getConnectionStateListenable().addListener(listener);
}
在这里,通过start方法开始进行选举,正如我们上面所说,在选举完成和会调用回调方法,我们去看该类的isLeader方法
@Override
public void isLeader() {
leaderElectionEventHandler.onGrantLeadership(UUID.randomUUID());
}
// org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService#onGrantLeadership
@Override
@GuardedBy("lock")
public void onGrantLeadership(UUID newLeaderSessionId) {
synchronized (lock) {
if (running) {
issuedLeaderSessionID = newLeaderSessionId;
clearConfirmedLeaderInformation();
if (LOG.isDebugEnabled()) {
LOG.debug(
"Grant leadership to contender {} with session ID {}.",
leaderContender.getDescription(),
issuedLeaderSessionID);
}
/*
有4中竞选者类型,LeaderContender有4中情况
1.Dispatcher = DefaultDispatcherRunner
2.JobMaster = JobManagerRunnerImpl
3.ResourceManager = ResourceManager
4.WebMonitorEndpoint = WebMonitorEndpoint
*/
leaderContender.grantLeadership(issuedLeaderSessionID);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Ignoring the grant leadership notification since the {} has "
+ "already been closed.",
leaderElectionDriver);
}
}
}
}
我们再进入 leaderContender.grantLeadership 方法,因为当前是 WebMonitorEndpoint 的选举,所以我们进入WebMonitorEndpoint 的实现
@Override
public void grantLeadership(final UUID leaderSessionID) {
log.info(
"{} was granted leadership with leaderSessionID={}",
getRestBaseUrl(),
leaderSessionID);
leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
}
没什么好说的,我们再进入leaderElectionService.confirmLeadership方法,选择DefaultLeaderElectionService实现
@Override
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Confirm leader session ID {} for leader {}.", leaderSessionID, leaderAddress);
}
checkNotNull(leaderSessionID);
synchronized (lock) {
if (hasLeadership(leaderSessionID)) {
if (running) {
// 确认Leader信息,并将节点信息写入zk
confirmLeaderInformation(leaderSessionID, leaderAddress);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Ignoring the leader session Id {} confirmation, since the "
+ "LeaderElectionService has already been stopped.",
leaderSessionID);
}
}
} else {
// Received an old confirmation call
if (!leaderSessionID.equals(this.issuedLeaderSessionID)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Receive an old confirmation call of leader session ID {}, "
+ "current issued session ID is {}",
leaderSessionID,
issuedLeaderSessionID);
}
} else {
LOG.warn(
"The leader session ID {} was confirmed even though the "
+ "corresponding JobManager was not elected as the leader.",
leaderSessionID);
}
}
}
}
进入confirmLeaderInformation方法里
@GuardedBy("lock")
private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
confirmedLeaderSessionID = leaderSessionID;
confirmedLeaderAddress = leaderAddress;
leaderElectionDriver.writeLeaderInformation(
LeaderInformation.known(confirmedLeaderSessionID, confirmedLeaderAddress));
}
可以看到,WebMonitorEndpoint在选举Leader成功后,并没有做什么,只是将自己的信息写入zookeeper。
在信息写入完毕后,WebMonitorEndpoint就算是启动完成了。
WebMonitorEndpoint的启动流程并不复杂,总结一下就是做了以下这些工作: