• Flink1.14 SourceReader概念入门讲解与源码解析 (三)


    目录

    SourceReader 概念

    SourceReader 源码方法

    void start();

    InputStatus pollNext(ReaderOutput output) throws Exception;

    List snapshotState(long checkpointId);

    CompletableFuture isAvailable();

    void addSplits(List splits);

    参考


    SourceReader 概念

    SourceReader是一个运行在Task Manager上的组件,主要是负责读取 SplitEnumerator 分配的source split。

    SourceReader 提供了一个拉动式(pull-based)处理接口。Flink任务会在循环中不断调用 pollNext(ReaderOutput) 轮询来自 SourceReader 的记录。 pollNext(ReaderOutput) 方法的返回值指示 SourceReader 的状态。

    • MORE_AVAILABLE - SourceReader 有可用的记录。
    • NOTHING_AVAILABLE - SourceReader 现在没有可用的记录,但是将来可能会有记录可用。
    • END_OF_INPUT - SourceReader 已经处理完所有记录,到达数据的尾部。这意味着 SourceReader 可以终止任务了。

    pollNext(ReaderOutput) 会使用 ReaderOutput 作为参数,为了提高性能且在必要情况下, SourceReader 可以在一次 pollNext() 调用中返回多条记录。例如:有时外部系统的工作系统的工作粒度为块。而一个块可以包含多个记录,但是 source 只能在块的边界处设置 Checkpoint。在这种情况下, SourceReader 可以一次将一个块中的所有记录通过 ReaderOutput 发送至下游。

    然而,除非有必要,SourceReader 的实现应该避免在一次 pollNext(ReaderOutput) 的调用中发送多个记录。这是因为对 SourceReader 轮询的任务线程工作在一个事件循环(event-loop)中,且不能阻塞。

    在创建 SourceReader 时,相应的 SourceReaderContext 会提供给 Source,而 Source 则会将对应的上下文传递给 SourceReader 实例。 SourceReader 可以通过 SourceReaderContext 将 SourceEvent 传递给相应的 SplitEnumerator 。 Source 的一个典型设计模式是让 SourceReader 发送它们的本地信息给 SplitEnumerator,后者则会全局性地做出决定。

    SourceReader API 是一个底层(low-level)API,允许用户自行处理分片,并使用自己的线程模型来获取和移交记录。为了帮助实现 SourceReader,Flink 提供了 SourceReaderBase 类,可以显著减少编写 SourceReader 所需要的工作量。

    强烈建议连接器开发人员充分利用 SourceReaderBase 而不是从头开始编写 SourceReader

    这里简单说一下,如何通过 Source 创建 DataStream ,有两种方法(感觉上没啥区别):

    • env.fromSource
    • env.addSource
    1. // fromSource 这个返回的是source
    2. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    3. Source mySource = new MySource(....);
    4. DataStream stream = env.fromSource(
    5. mySource,
    6. WatermarkStrategy.noWatermarks(),// 无水标
    7. "MySourceName");
    8. ..
    9. // addSource 这个返回的是Source function
    10. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    11. DataStream<..> stream = env.addSource(new MySource(...));

    SourceReader 源码方法

    void start();

    判断是否有splits了,如果当前没有已经分配的splits了就发送请求获取。

    1. /** Start the reader. */
    2. void start();
    3. // FileSourceReader的实现
    4. @Override
    5. public void start() {
    6. // we request a split only if we did not get splits during the checkpoint restore
    7. if (getNumberOfCurrentlyAssignedSplits() == 0) {
    8. context.sendSplitRequest(); // 发送split的读取请求给SplitEnumerator,在handleSplitRequest方法中被调用
    9. }
    10. }

    InputStatus pollNext(ReaderOutput output) throws Exception;

    主要负责拉取下一个可读取的记录到SourceOutput,确保这个方法是非阻塞的,并且最好一次调用只输出一条数据。

    1. /**
    2. * Poll the next available record into the {@link SourceOutput}.
    3. *
    4. *

      The implementation must make sure this method is non-blocking.

    5. *
    6. *

      Although the implementation can emit multiple records into the given SourceOutput, it is

    7. * recommended not doing so. Instead, emit one record into the SourceOutput and return a {@link
    8. * InputStatus#MORE_AVAILABLE} to let the caller thread know there are more records available.
    9. *
    10. * @return The InputStatus of the SourceReader after the method invocation.
    11. */
    12. InputStatus pollNext(ReaderOutput output) throws Exception;
    13. // FileSourceReader读取数据的pollNext方法位于父类SourceReaderBase中
    14. @Override
    15. public InputStatus pollNext(ReaderOutput output) throws Exception {
    16. // make sure we have a fetch we are working on, or move to the next
    17. // 获取当前从fetcher中读取到的一批split
    18. // RecordsWithSplitIds代表了从fetcher拉取到SourceReader的数据
    19. // RecordsWithSplitIds可以包含多个split,但是对于FileRecords而言,只代表一个split
    20. RecordsWithSplitIds recordsWithSplitId = this.currentFetch;
    21. if (recordsWithSplitId == null) {
    22. // 如果没有,获取下一批split
    23. recordsWithSplitId = getNextFetch(output);
    24. if (recordsWithSplitId == null) {
    25. // 如果还没有获取到,需要检查后续是否还会有数据到来。
    26. return trace(finishedOrAvailableLater());
    27. }
    28. }
    29. // we need to loop here, because we may have to go across splits
    30. while (true) {
    31. // Process one record.
    32. // 从split中获取下一条记录
    33. final E record = recordsWithSplitId.nextRecordFromSplit();
    34. if (record != null) {
    35. // emit the record.
    36. // 如果获取到数据
    37. // 记录数量计数器加1
    38. numRecordsInCounter.inc(1);
    39. // 发送数据到Output
    40. // currentSplitOutput为当前split对应的下游output
    41. // currentSplitContext.state为reader的读取状态
    42. recordEmitter.emitRecord(record, currentSplitOutput, currentSplitContext.state);
    43. LOG.trace("Emitted record: {}", record);
    44. // We always emit MORE_AVAILABLE here, even though we do not strictly know whether
    45. // more is available. If nothing more is available, the next invocation will find
    46. // this out and return the correct status.
    47. // That means we emit the occasional 'false positive' for availability, but this
    48. // saves us doing checks for every record. Ultimately, this is cheaper.
    49. // 总是发送MORE_AVAILABLE
    50. // 如果真的没有可用数据,下次调用会返回正确的状态
    51. return trace(InputStatus.MORE_AVAILABLE);
    52. } else if (!moveToNextSplit(recordsWithSplitId, output)) {
    53. // 如果本次fetch的split已经全部被读取(本批没有更多的split),读取下一批数据
    54. // The fetch is done and we just discovered that and have not emitted anything, yet.
    55. // We need to move to the next fetch. As a shortcut, we call pollNext() here again,
    56. // rather than emitting nothing and waiting for the caller to call us again.
    57. return pollNext(output);
    58. }
    59. // else fall through the loop
    60. }
    61. }

    getNextFetch方法获取下一批 split 。

    1. @Nullable
    2. private RecordsWithSplitIds getNextFetch(final ReaderOutput output) {
    3. // 检查fetcher是否有错误
    4. splitFetcherManager.checkErrors();
    5. LOG.trace("Getting next source data batch from queue");
    6. // elementsQueue中缓存了fetcher线程获取的split
    7. // 从这个队列中拿出一批split
    8. final RecordsWithSplitIds recordsWithSplitId = elementsQueue.poll();
    9. // 如果队列中没有数据,并且接下来这批split已被读取完毕,返回null
    10. if (recordsWithSplitId == null || !moveToNextSplit(recordsWithSplitId, output)) {
    11. // No element available, set to available later if needed.
    12. return null;
    13. }
    14. // 更新当前的fetch
    15. currentFetch = recordsWithSplitId;
    16. return recordsWithSplitId;
    17. }

    finishedOrAvailableLater 方法检查后续是否还有数据,返回对应的状态。

    1. private InputStatus finishedOrAvailableLater() {
    2. // 检查所有的fetcher是否都已关闭
    3. final boolean allFetchersHaveShutdown = splitFetcherManager.maybeShutdownFinishedFetchers();
    4. // 如果reader不会再接收更多的split,或者所有的fetcher都已关闭
    5. // 返回NOTHING_AVAILABLE,将来可能会有记录可用。
    6. if (!(noMoreSplitsAssignment && allFetchersHaveShutdown)) {
    7. return InputStatus.NOTHING_AVAILABLE;
    8. }
    9. if (elementsQueue.isEmpty()) {
    10. // 如果缓存队列中没有数据,返回END_OF_INPUT
    11. // We may reach here because of exceptional split fetcher, check it.
    12. splitFetcherManager.checkErrors();
    13. return InputStatus.END_OF_INPUT;
    14. } else {
    15. // We can reach this case if we just processed all data from the queue and finished a
    16. // split,
    17. // and concurrently the fetcher finished another split, whose data is then in the queue.
    18. // 其他情况返回MORE_AVAILABLE
    19. return InputStatus.MORE_AVAILABLE;
    20. }
    21. }

    moveToNextSplit 方法前往读取下一个split。

    1. private boolean moveToNextSplit(
    2. RecordsWithSplitIds recordsWithSplitIds, ReaderOutput output) {
    3. // 获取下一个split的ID
    4. final String nextSplitId = recordsWithSplitIds.nextSplit();
    5. if (nextSplitId == null) {
    6. // 如果没获取到,则当前获取过程结束
    7. LOG.trace("Current fetch is finished.");
    8. finishCurrentFetch(recordsWithSplitIds, output);
    9. return false;
    10. }
    11. // 获取当前split上下文
    12. // Map> splitStates它保存了split ID和split的状态
    13. currentSplitContext = splitStates.get(nextSplitId);
    14. checkState(currentSplitContext != null, "Have records for a split that was not registered");
    15. // 获取当前split对应的output
    16. // SourceOperator在从SourceCoordinator获取到分片后会为每个分片创建一个OUtput,currentSplitOutput是当前分片的输出
    17. currentSplitOutput = currentSplitContext.getOrCreateSplitOutput(output);
    18. LOG.trace("Emitting records from fetch for split {}", nextSplitId);
    19. return true;
    20. }

    List snapshotState(long checkpointId);

    主要是负责创建 source 的 checkpoint 。

    1. /**
    2. * Checkpoint on the state of the source.
    3. *
    4. * @return the state of the source.
    5. */
    6. List snapshotState(long checkpointId);
    7. public List snapshotState(long checkpointId) {
    8. List splits = new ArrayList();
    9. this.splitStates.forEach((id, context) -> {
    10. splits.add(this.toSplitType(id, context.state));
    11. });
    12. return splits;
    13. }

    CompletableFuture isAvailable();

    1. /**
    2. * Returns a future that signals that data is available from the reader.
    3. *
    4. *

      Once the future completes, the runtime will keep calling the {@link

    5. * #pollNext(ReaderOutput)} method until that methods returns a status other than {@link
    6. * InputStatus#MORE_AVAILABLE}. After that the, the runtime will again call this method to
    7. * obtain the next future. Once that completes, it will again call {@link
    8. * #pollNext(ReaderOutput)} and so on.
    9. *
    10. *

      The contract is the following: If the reader has data available, then all futures

    11. * previously returned by this method must eventually complete. Otherwise the source might stall
    12. * indefinitely.
    13. *
    14. *

      It is not a problem to have occasional "false positives", meaning to complete a future

    15. * even if no data is available. However, one should not use an "always complete" future in
    16. * cases no data is available, because that will result in busy waiting loops calling {@code
    17. * pollNext(...)} even though no data is available.
    18. *
    19. * @return a future that will be completed once there is a record available to poll.
    20. */
    21. // 创建一个future,表明reader中是否有数据可被读取
    22. // 一旦这个future进入completed状态,Flink一直调用pollNext(ReaderOutput)方法直到这个方法返回除InputStatus#MORE_AVAILABLE之外的内容
    23. // 在这之后,会再次调isAvailable方法获取下一个future。如果它completed,再次调用pollNext(ReaderOutput)。以此类推
    24. public CompletableFuture isAvailable() {
    25. return this.currentFetch != null ? FutureCompletingBlockingQueue.AVAILABLE : this.elementsQueue.getAvailabilityFuture();
    26. }

    void addSplits(List splits);

    1. /**
    2. * Adds a list of splits for this reader to read. This method is called when the enumerator
    3. * assigns a split via {@link SplitEnumeratorContext#assignSplit(SourceSplit, int)} or {@link
    4. * SplitEnumeratorContext#assignSplits(SplitsAssignment)}.
    5. *
    6. * @param splits The splits assigned by the split enumerator.
    7. */
    8. // 添加一系列splits,以供reader读取。这个方法在SplitEnumeratorContext#assignSplit(SourceSplit, int)或者SplitEnumeratorContext#assignSplits(SplitsAssignment)中调用
    9. void addSplits(List splits);

    其中,SourceReaderBase类的实现,fetcher的作用是从拉取split缓存到SourceReader中。

    1. @Override
    2. public void addSplits(List splits) {
    3. LOG.info("Adding split(s) to reader: {}", splits);
    4. // Initialize the state for each split.
    5. splits.forEach(
    6. s ->
    7. splitStates.put(
    8. s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
    9. // Hand over the splits to the split fetcher to start fetch.
    10. splitFetcherManager.addSplits(splits);
    11. }

    addSplits 方法将fetch任务交给 SplitFetcherManager 处理,它的 addSplits 方法如下:

    1. @Override
    2. public void addSplits(List splitsToAdd) {
    3. // 获取正在运行的fetcher
    4. SplitFetcher fetcher = getRunningFetcher();
    5. if (fetcher == null) {
    6. // 如果没有,创建出一个fetcher
    7. fetcher = createSplitFetcher();
    8. // Add the splits to the fetchers.
    9. // 将这个创建出的fetcher加入到running fetcher集合中
    10. fetcher.addSplits(splitsToAdd);
    11. // 启动这个fetcher
    12. startFetcher(fetcher);
    13. } else {
    14. // 如果获取到了正在运行的fetcher,调用它的addSplits方法
    15. fetcher.addSplits(splitsToAdd);
    16. }
    17. }

    最后我们查看SplitFetcheraddSplits方法:

    1. public void addSplits(List splitsToAdd) {
    2. // 将任务包装成AddSplitTask,通过splitReader兼容不同格式数据的读取方式
    3. // 将封装好的任务加入到队列中
    4. enqueueTask(new AddSplitsTask<>(splitReader, splitsToAdd, assignedSplits));
    5. // 唤醒fetcher任务,使用SplitReader读取数据
    6. // Split读取数据并缓存到elementQueue的逻辑位于FetcherTask,不再具体分析
    7. wakeUp(true);
    8. }

    参考

    数据源 | Apache Flink

    Flink 源码之新 Source 架构 - 简书

    Flink新Source架构(下) - 知乎

  • 相关阅读:
    一起Talk Android吧(第五百五十三回:解析Retrofit返回的数据)
    Docker中安装Redis
    IDEA2022.1创建maven项目,规避idea2022新建maven项目卡死,无反应问题
    多租户架构
    2022年双十一百亿补贴,2022年聚划算双11玩法解读
    ABP - 依赖注入(1)
    成为黄金代理,必须考虑到这一点
    Linux open suse15==安装pcre zlib openssl nginx 关防火墙 安装httpd
    接入国家能源平台MQTT应用案例
    语音识别GMM-HMM
  • 原文地址:https://blog.csdn.net/Stray_Lambs/article/details/133794233