TaskManager是运行时的工作节点,提供了Job运行时所需的Slot计算资源。JobManager提交的Task实例,都会运行在TaskManager上。
执行bin/start-cluster.sh脚本启动集群时,会同步运行TaskManager的启动脚本,脚本会调用执行TaskManagerRunner。TaskManagerRunner专门用于创建TaskManager。
/**
* 启动TaskManager,其实就是对TaskExecutor内部服务进行初始化。
* 初始化完毕后,TaskExecutor作为TaskManager就算是启动完毕了
*/
public static void runTaskManager(Configuration configuration, ResourceID resourceId) throws Exception {
// 初始化TaskManagerRunner实例(内部构造方法会同步创建TaskExecutor)
// 最后一个参数是:TaskExecutorToServiceAdapter(TaskExecutorService接口的实现子类)
final TaskManagerRunner taskManagerRunner = new TaskManagerRunner(configuration, resourceId, TaskManagerRunner::createTaskExecutorService);
// 这里调用的是TaskExecutorToServiceAdapter#start()方法
// 最终调用的就是TaskExecutor#start()方法,由于TaskExecutor继承了RpcEndPoint,所以这会启动TaskExecutor对应的RpcServer。
// 启动RpcServer会同步调用RpcEndPoint实现类的onStart方法,也就是TaskExecutor#onStart()方法
taskManagerRunner.start();
}
在new TaskManagerRunner的过程中,会创建TaskExecutor。TaskExecutor是TaskManager的底层实现类,因此,TaskManager的能力都是由TaskExecutor提供的。
/**
* new出来TaskExecutor,并将其交给TaskExecutorService的成员变量“合法持有”
*/
public static TaskExecutorService createTaskExecutorService(
Configuration configuration,
ResourceID resourceID,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
BlobCacheService blobCacheService,
boolean localCommunicationOnly,
FatalErrorHandler fatalErrorHandler) throws Exception {
// 创建TaskExecutor实例(直接new TaskExecutor)
final TaskExecutor taskExecutor = startTaskManager(
configuration,
resourceID,
rpcService,
highAvailabilityServices,
heartbeatServices,
metricRegistry,
blobCacheService,
localCommunicationOnly,
fatalErrorHandler);
// 将创建好的TaskExecutor实例,包装到TaskExecutorToServiceAdapter中,让它的成员变量持有
// TaskExecutorToServiceAdapter是TaskExecutorService的实现子类,通过这种包装、持有,TaskManagerRunner才能顺利的启动TaskExecutor
return TaskExecutorToServiceAdapter.createFor(taskExecutor);
}
创建好的TaskExecutor实例,交给TaskExecutorToServiceAdapter的成员变量持有,而TaskExecutorToServiceAdapter又被包装到了TaskManagerRunner中。通过这种包装、持有操作,让TaskManagerRunner可以“合法持有”TaskExecutor实例。
得到TaskManagerRunner后,就会调用它的start()方法。 通过上面分析过的“合法持有”关系,调用TaskManagerRunner#start()就是最终调用的TaskExecutor#start()方法。由于TaskExecutor继承了RpcEndPoint,所以这会启动TaskExecutor对应的RpcServer。启动RpcServer会同步调用RpcEndPoint实现类的onStart方法,也就是TaskExecutor#onStart()方法。
/**
* 启动TaskManager,实质上就是执行TaskExecutor#start()。
* TaskExecutor继承了RpcEndpoint,所以启动TaskExecutor就是启动TaskExecutor对应的RpcServer服务。
* 启动RpcServer服务会同步调用RpcEndpoint#onStart()方法,也就是调用TaskExecutor#onStart()
*/
@Override
public void onStart() throws Exception {
try {
// 启动TaskExecutor,本质上就是对TaskExecutor内部服务进行初始化
startTaskExecutorServices();
} catch (Exception e) {
final TaskManagerException exception = new TaskManagerException(String.format("Could not start the TaskExecutor %s", getAddress()), e);
onFatalError(exception);
throw exception;
}
startRegistrationTimeout();
}
/**
* TaskExecutor内部服务都初始化完毕后,就意味着TaskExecutor(作为TaskManager)启动完成了。
*/
private void startTaskExecutorServices() throws Exception {
try {
// 通过LeaderRetrievalService服务,创建TaskExecutor与ResourceManager之间的RPC连接,此时会将TaskManager的资源信息汇报给ResourceManager
// 这个监听器在TaskExecutor启动时会初始化调用一次,以后只有当ResourceManager的Leader地址发生变更时才会被调用
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
// 启动TaskSlotTable服务,它是用来管理TaskManager内的Slot计算资源
taskSlotTable.start(new SlotActionsImpl(), getMainThreadExecutor());
// 启动JobLeaderService服务,它是TaskExecutor用来和JobManager进行RPC通信时,获取JobManager的Leader节点的
jobLeaderService.start(getAddress(), getRpcService(), haServices, new JobLeaderListenerImpl());
// FileCache是用来存储Task执行过程中,从PermanentBlobService拉取来的文件,并将其放到/tmp_/路径下。
// 如果Task处于“非注册”状态的时间超过5s,就将这个临时文件clear掉
fileCache = new FileCache(taskManagerConfiguration.getTmpDirectories(), blobCacheService.getPermanentBlobService());
} catch (Exception e) {
handleStartTaskExecutorServicesException(e);
}
}
TaskExecutor内部服务都初始化完毕后,就意味着TaskExecutor(作为TaskManager)启动完成了。此时ResourceManager会接收到来自TaskManager的注册信息,并对TaskManager提供的资源统一管理。
总结:
在启动TaskExecutor(初始化TaskExecutor的内部服务)时,first step就是建立TaskExecutor和ResourceManager之间的RPC连接
// 创建TaskExecutor与ResourceManager之间的RPC连接,此时会将TaskManager的资源信息汇报给ResourceManager
// 这个监听器在TaskExecutor启动时会初始化调用一次,以后只有当ResourceManager的Leader地址发生变更时才会被调用
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
LeaderRetrievalService服务通过ResourceManagerLeaderListener,可以获取ResourceManager的Leader地址。其中,该能力是由LeaderRetrievalListener接口提供的。这个监听器在TaskExecutor启动时会初始化调用一次,以后只有当ResourceManager的Leader地址发生变更时才会被调用
/**
* The listener for leader changes of the resource manager.
* ResourceManager的Leader地址发生变更时,所用的监听器
* LeaderRetrievalListener接口提供了监听方法
*/
private final class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
// 当ResourceManager的Leader地址发生变更时,会异步执行下面的方法块
runAsync(
// 根据新的Leader地址,重新和ResourceManager建立RPC连接
() -> notifyOfNewResourceManagerLeader(
leaderAddress,
ResourceManagerId.fromUuidOrNull(leaderSessionID)));
}
/**
* 异常处理
*/
@Override
public void handleError(Exception exception) {
onFatalError(exception);
}
}
有了ResourceManager变更后的Leader地址,就可以创建一个新的“可以连接到ResourceManager的地址”。有了“新ResourceManager的Leader地址”,就能重新和ResourceManager建立RPC连接了
/**
* 根据ResourceManager变更后的Leader地址,重新和ResourceManager建立RPC连接
*/
private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) {
// 根据新的ResourceManager的Leader地址和id,创建新的ResourceManager的连接地址
resourceManagerAddress = createResourceManagerAddress(newLeaderAddress, newResourceManagerId);
// 重新建立和ResourceManager之间的RPC连接
reconnectToResourceManager(new FlinkException(String.format("ResourceManager leader changed to new address %s", resourceManagerAddress)));
}
和ResourceManager重新建立连接,分为3步:
/**
* 重新和ResourceManager的Leader建立RPC连接
*/
private void reconnectToResourceManager(Exception cause) {
// 关闭之前的ResourceManager的Leader的“旧的”RPC连接
// 如果是首次启动TaskExecutor,不会执行“关闭”操作
closeResourceManagerConnection(cause);
// 注册超时等待时间,防止TaskExecutor和ResourceManager之间出现RPC连接超时
startRegistrationTimeout();
// 尝试再次连接ResourceManager
tryConnectToResourceManager();
}
最重要的就是TaskExecutor重新和ResourceManager建立连接,核心行为就是创建TaskExecutor和ResourceManager之间的物理连接,然后start。
/**
* TaskExecutor重新和ResourceManager建立RPC连接
*/
private void tryConnectToResourceManager() {
if (resourceManagerAddress != null) {
// 提供的Leader地址不为null,就开始建立RPC连接
connectToResourceManager();
}
}
/**
* TaskExecutor重新和ResourceManager建立RPC连接
*/
private void connectToResourceManager() {
assert(resourceManagerAddress != null);
assert(establishedResourceManagerConnection == null);
assert(resourceManagerConnection == null);
log.info("Connecting to ResourceManager {}.", resourceManagerAddress);
// 创建TaskExecutorRegistration,用来存放TaskExecutor的基本信息
final TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(
getAddress(),
getResourceID(),
taskManagerLocation.dataPort(),
hardwareDescription,
taskManagerConfiguration.getDefaultSlotResourceProfile(),
taskManagerConfiguration.getTotalResourceProfile()
);
// TaskExecutorToResourceManagerConnection是TaskExecutor和ResourceManager之间的物理连接
resourceManagerConnection =
new TaskExecutorToResourceManagerConnection(
log,
getRpcService(),
taskManagerConfiguration.getRetryingRegistrationConfiguration(),
resourceManagerAddress.getAddress(),
resourceManagerAddress.getResourceManagerId(),
getMainThreadExecutor(),
new ResourceManagerRegistrationListener(),
taskExecutorRegistration);
// 启动TaskExecutor和ResourceManager之间的物理连接
resourceManagerConnection.start();
}
至此,TaskExecutor和ResourceManager之间的RPC连接就算是创建完毕了。此时的ResourceManager就可以通过ResourceManagerGateway接收TaskExecutor的注册信息。一旦注册成功的话,TaskExecutor就会创建TaskManager和ResourceManager之间的心跳连接。TaskExecutor就会通过心跳连接,将SlotReport发送给ResourceManager。
上面已经创建好了TaskExecutor和ResourceManager之间的物理连接,并调用TaskExecutorToResourceManagerConnection#start()方法(实际是父类RegisteredRpcConnection的start()方法)。
既然TaskExecutor会通过TaskExecutorToResourceManagerConnection#start()方法向ResourceManager注册,那就得有专门的方法来接收ResourceManager的反馈信息。
/**
* TaskExecutor向ResourceManager注册后,会收到ResourceManager的反馈
*/
@Override
protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess success) {
log.info("Successful registration at resource manager {} under registration id {}.",
getTargetAddress(), success.getRegistrationId());
// 向TaskExecutor发送“TaskExecutor已经向ResourceManager注册成功”的消息
registrationListener.onRegistrationSuccess(this, success);
}
@Override
protected void onRegistrationFailure(Throwable failure) {
log.info("Failed to register at resource manager {}.", getTargetAddress(), failure);
registrationListener.onRegistrationFailure(failure);
}
RegistrationConnectionListener接口专门用来接收“TaskExecutor向ResourceManager注册”后的反馈信息,唯一实现是TaskExecutor的内部类ResourceManagerRegistrationListener。
/**
* 当TaskExecutor向ResourceManager注册后,会收到来自ResourceManager的反馈信息。
* 注册success or failure会通过这个监听器回调,通知给TaskExecutor
*/
private final class ResourceManagerRegistrationListener implements RegistrationConnectionListener<TaskExecutorToResourceManagerConnection, TaskExecutorRegistrationSuccess> {
@Override
public void onRegistrationSuccess(TaskExecutorToResourceManagerConnection connection, TaskExecutorRegistrationSuccess success) {
// 获取“注册”相关的配置信息
final ResourceID resourceManagerId = success.getResourceManagerId();
final InstanceID taskExecutorRegistrationId = success.getRegistrationId();
final ClusterInformation clusterInformation = success.getClusterInformation();
// 从物理连接中,取出ResourceManagerGateway
final ResourceManagerGateway resourceManagerGateway = connection.getTargetGateway();
// 异步执行方法块:构建TaskExecutor和ResourceManager之间的网络连接
runAsync(
() -> {
// filter out outdated connections
//noinspection ObjectEquality
if (resourceManagerConnection == connection) {
try {
establishResourceManagerConnection(
resourceManagerGateway,
resourceManagerId,
taskExecutorRegistrationId,
clusterInformation);
} catch (Throwable t) {
log.error("Establishing Resource Manager connection in Task Executor failed", t);
}
}
});
}
@Override
public void onRegistrationFailure(Throwable failure) {
onFatalError(failure);
}
}
一旦TaskExecutor向ResourceManager注册成功后,“成功消息”会通过监听器通知TaskExecutor。TaskExecutor收到“success消息”后,会调用以下方法,向ResourceManager汇报SlotReport信息
/**
* 一旦TaskExecutor向ResourceManager注册成功后,“成功消息”会通过监听器通知TaskExecutor。
* TaskExecutor收到后,会调用该方法,向ResourceManager汇报SlotReport信息
*/
private void establishResourceManagerConnection(
ResourceManagerGateway resourceManagerGateway,
ResourceID resourceManagerResourceId,
InstanceID taskExecutorRegistrationId,
ClusterInformation clusterInformation) {
// 向ResourceManager汇报SlotReport信息
// SlotReport包含当前TaskExecutor具备的Slot资源报告
final CompletableFuture<Acknowledge> slotReportResponseFuture = resourceManagerGateway.sendSlotReport(
getResourceID(),
taskExecutorRegistrationId,
taskSlotTable.createSlotReport(getResourceID()),
taskManagerConfiguration.getTimeout());
// 执行SlotReport上报操作,如果出现异常,就重新和ResourceManager建立连接
slotReportResponseFuture.whenCompleteAsync(
(acknowledge, throwable) -> {
if (throwable != null) {
// 上报SlotReport出现异常,重新和ResourceManager建立连接
reconnectToResourceManager(new TaskManagerException("Failed to send initial slot report to ResourceManager.", throwable));
}
}, getMainThreadExecutor());
// monitor the resource manager as heartbeat target
// 通过HeartbeatManager,建立TaskExecutor和ResourceManager之间的心跳连接
// TaskExecutor会周期性的向ResourceManager进行心跳上报
resourceManagerHeartbeatManager.monitorTarget(resourceManagerResourceId, new HeartbeatTarget<TaskExecutorHeartbeatPayload>() {
@Override
public void receiveHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
}
@Override
public void requestHeartbeat(ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {
// the TaskManager won't send heartbeat requests to the ResourceManager
}
});
// set the propagated blob server address
final InetSocketAddress blobServerAddress = new InetSocketAddress(
clusterInformation.getBlobServerHostname(),
clusterInformation.getBlobServerPort());
blobCacheService.setBlobServerAddress(blobServerAddress);
// EstablishedResourceManagerConnection被用来保存TaskExecutor和ResourceManager之间的连接信息,
// 这些会保存在TaskExecutor内
establishedResourceManagerConnection = new EstablishedResourceManagerConnection(
resourceManagerGateway,
resourceManagerResourceId,
taskExecutorRegistrationId);
// 在TaskExecutor尝试和ResourceManager建立连接之前,曾经注册了一个超时等待时间(防止TaskExecutor和ResourceManager之间连接超时)
// 此时停止注册超时监听,因为TaskExecutor和ResourceManager之间的连接已经建立完成,SlotReport也已经汇报给ResourceManager了
stopRegistrationTimeout();
}
TaskExecutor会通过ResourceManagerGateway,向ResourceManager汇报SlotReport信息。如果汇报过程出现异常,就重新建立和ResourceManager之间的网络连接。最后停止注册超时监听,在TaskExecutor尝试和ResourceManager建立连接之前,曾经注册了一个超时等待时间(防止TaskExecutor和ResourceManager之间连接超时)。此时停止注册超时监听,因为TaskExecutor和ResourceManager之间的连接已经建立完成,SlotReport也已经汇报给ResourceManager了。
最终,TaskExecutor也启动了,和ResourceManager之间的连接也建立好了,TaskExecutor内的SlotReport也都汇报给ResourceManager了。这意味着,ResourceManager可以对TaskManager进行统一管理了,作业提交到集群后,会从已注册的TaskManager上分配Slot计算资源,并将Task运行在指定的TaskManager上。