注册流程:
1. 主节点持有所有注册到本节点的 slave 节点的信息;主节点启动时,会同时启动一个定时器不停调用 salve 节点的 GetStatusServlet 接口获取 slave 节点的最新状态。
2. slave 节点启动时调用 master 节点的 RegisterSlaveServlet 接口;往 master 持有的slave节点的集合中添加上自己;
3. master节点上的RegisterSlaveServlet接口接收slave的信息,往 master 持有的slave节点的集合中添加此slave;
保持心跳:
主节点启动时,会同时启动一个定时器不停调用 salve 节点的 GetStatusServlet 接口获取 slave 节点的最新状态。
任务运行时搭配,RemoteStep:
RegisterTransServlet:往本地trans集合里添加一条记录;
PrepareExecutionTransServlet:trans.prepareExecution;Prepares previously loaded transformation for execution.
Method is used for preparing previously uploaded transformation for execution by its name.
StartExecutionTransServlet :trans.startThreads();Starts transformation. If the transformation is not ready, an error is returned.
ExecuteTransServlet:trans.prepareExecution + trans.startThreads;Executes transformation from the specified repository.
param:rep+user+pass+trans
RunTransServlet:trans.execute( null )->prepareExecution( arguments );+startThreads(); Execute transformation from enterprise repository.
param:trans
StartTransServlet: trans.execute( null )->prepareExecution( arguments );+startThreads(); Executes transformation previously uploaded to Carte server.
trans 类解析:
TransSplitter#splitOriginalTransformation
public static void executeClustered( final TransSplitter transSplitter,
final TransExecutionConfiguration executionConfiguration )
public void prepareExecution( String[] arguments ) throws KettleException中调用 StepInitThread,StepInitThread中调用stepmata的init方法初始化step,
StepMeta 中持有,List remoteInputSteps;和List remoteOutputSteps;BaseStep中的init方法中会初始化这些RemoteStep;
再次阶段会把所有的output socket打开;也就是所有的SocketServer打开;
step 跑起来之后,不停调用 processRow 方法,step 在 processRow 方法中获取 remote input 的数据,在尝试获取数据之前,调用BaseStep@openRemoteInputStepSocketsOnce方法,Opens socket connections to the remote input steps of this step.
在此方法中socket.connect( new InetSocketAddress( realHostname, portNumber ), 5000 );