Skip to content

Latest commit

 

History

History
147 lines (123 loc) · 19.4 KB

conn.md

File metadata and controls

147 lines (123 loc) · 19.4 KB

Flink数据通信

从数据源开始分析数据通信的整个过程,SourceFunction接口中的内部接口SourceContext的collect()方法用于发射数据,其实现类 NonTimestampContext的collect()方法直接调用了output对象的collect方法,它是Output<StreamRecord>类型,它的实际 类型是CountingOutput类型,这是一个包装类型,是对Output的包装,并在此基础上增加了收集元素数量的numRecordsOut的Counter 类型的监控变量,collect()方法中调用了numRecordsOut.inc()方法来对元素数量进行自增,从而实现了对收集元素数量的监控。NonTimestampContext 的CountingOutput封装的output的真正类型是RecordWriterOutput类型,其collect()方法会直接过滤掉输出到其它旁路input的数 据,而对于输出到非旁路input的数据则直接使用pushToRecordWriter()方法进行序列化代理,并将数据传递给recordWriter。

RecordWriter会对数据进行序列化,然后写到缓存中,它是一个接口,有两个定义为final的实现类:

  • BroadcastRecordWriter:主要用于广播模式,维护了多个下游channel,在发送时会将数据发往下游所有的channel中;
  • ChannelSelectorRecordWriter:它是一个通用的面向记录的运行时结果输出器,通过channelSelector对象来判断需要将数据发往 下游哪个channel中,keyby算子中就用到了这种RecordWriter。

以ChannelSelectorRecordWriter类为例进行分析,在其emit()方法中,其调用了父类RecordWriter的方法。传入了两个参数,分别是 需要发送的消息和调用channelSelector.selectChannel()方法根据消息和下游channel的对应关系得到的目标channel的编号。

在父类RecordWriter的emit()方法中首先使用序列化器将消息序列化,然后调用copyFromSerializerToTargetChannel()方法从序列化 器中复制数据到目标channel,深入的来分析下这个方法:首先调用serializer的reset()方法将其内部的data buffer position重置为0, 这样就可以将序列化后的结果拷贝到多个目标buffer中去。通过getBufferBuilder()获取目标channel的bufferBuilder,bufferBuilder 维护了MemorySegment内存片段,这是Flink进行内存管理的基本单位,Flink中的内存管理依赖于它并由此实现堆内堆外内存的管理。在RecordWriter 中有一个bufferBuilder的数组,其长度与下游channel的数目相同,它以channel ID为下标存储着与channel对应的bufferBuilder,如果 该channel对应的bufferBuilder还不存在就会先调用requestNewBufferBuilder申请新的bufferBuilder,否则就会直接使用当前已经存在 的。在获得BufferBuilder后会复制serializer中的数据写入到bufferBuilder中,并循环判断写入的buffer是否已满,如果已满则将BufferBuilder 标记为已满,并判断数据是否写入完毕。如果已经完毕就表明写入已经完成,将当前BufferBuilder对目标target channel标记为空,并跳出循环。 如果未写入完毕,则申请新的buffer进行写入,因此一条数据可能会被写入多个buffer中去,在申请到新的buffer后需要再复制serializer中的数 据到新的bufferBuilder中,这样即可保证只要数据没有写入完毕,那么在循环判断时写入的buffer都是已满。在数据已经写入buffer中后,判断 flushAlways是否为true,若是则对目标channel的数据进行一次flush。

以ChannelSelectorRecordWriter类为例来分析getBufferBuilder()的实现,在获取目标channel的bufferBuilder时,判断bufferBuilders 数组对应targetChannel下标的BufferBuilder是否为空,如果不为空就直接返回,否则就调用requestNewBufferBuilder()申请新的buffer。 在ChannelSelectorRecordWriter类的构造方法中,只是创建了bufferBuilders数组但并没有赋值,只有在第一次getBufferBuilder()时才会 创建,因此它是懒加载的。再来看下requestNewBufferBuilder()方法的实现,首先进行必要的验证,只有targetChannel对应的buffer为空或数据 已经写入完毕才能进行下面的逻辑。通过调用RecordWriter类的requestNewBufferBuilder()方法申请或是获取目标分区的bufferBuilder,然后 创建BufferConsumer用于读取BufferBuilder写入的数据,并将bufferConsumer添加到对应下标的ResultSubpartition中,最后返回该BufferBuilder。

ResultSubpartition是一个抽象类,它有两个具体的实现类,分别是PipelinedSubpartition和BoundedBlockingSubpartition。PipelinedSubpartition 类用于流场景下的数据消费,其内部维护着该Subpartition的所有buffer。消费者可以通过调用createReadView()方法创建一个PipelinedSubpartitionView 来消费数据,在创建时除了需要传递父Subpartition也就是调用的对象实例外,还需要传递一个BufferAvailabilityListener对象,用于当buffer中 有数据时的回调,因此只要其中有数据就能及时通知下游进行消费,这也是其适用于流式数据处理场景的原因之一。

BoundedBlockingSubpartition类适用于批处理场景下的数据消费,写入的数据存放在属性BoundedData中,采用的是先写入后消费的模式,支持一次 写入,多次消费。BoundedData是个接口,它有三个实现类,分别对应着不同的数据存放方式,包括保存在文件系统的FileChannelBoundedData、保存 在内存的MemoryMappedBoundedData和同时保存在文件系统及内存的FileChannelMemoryMappedBoundedData。

上游写入数据的流程基本已经分析完毕了,接下来就要说到下游消费写入的数据了。

前述说过数据的消费是通过ResultSubPartition调用createReadView()方法,在实现类之一的PipelinedSubpartition类中,其首先用buffers 作为同步对象进行同步确保该同步块同一时间只有一个线程能够访问以保证线程安全,确保不会重复创建read view。然后判断subpartition还没有被释 放,并且该subpartition的read view为空以确保数据还没有被消费。如果判断通过,则创建read view,并判断是否有数据需要消费,如果是则通知 下游消费者进行消费。ResultPartitionManager负责维护当前已经创建和消费的分区,在其createSubpartitionView()方法中调用了createReadView() 方法,这个方法非常简单,ResultPartition类在其setup()方法中会调用partitionManager类的registerResultPartition()方法将分区注册, 其实就是将待注册的分区id和待注册的分区存放到registeredPartitions这个map中,然后在创建view时就能通过分区id从registeredPartitions 中获取到对应的ResultPartition,并创建一个subpartition view。

继续来往上追溯,发现LocalInputChannel和CreditBasedSequenceNumberingViewReader这两个类调用了createSubpartitionView()方法。 LocalInputChannel负责从本地请求一个subPartition view,而CreditBasedSequenceNumberingViewReader则是一个用于支持基于credit反压的 网络场景下的subpartition view的简单封装。查看其requestSubpartitionView()方法发现其非常简单,就是单纯的创建了一个subpartitionView。 继续往上追踪,来到了PartitionRequestServerHandler类的channelRead0方法(emmmm...看到这个是不是有些熟悉?是的,在基于credit的背压机制 中也遇到了它)。它在上游发送端执行,根据接收到消息的类型,作出相应的响应,如果接收到的消息类型是PartitionRequest,就会创建一个CreditBasedSequenceNumberingViewReader 类的实例,并调用其requestSubpartitionView()方法创建对应的subPartitionView,并将reader加入到outboundQueue中。

outboundQueue是一个PartitionRequestQueue类的对象,就是由它来负责处理partition request。每次partition request都会在PartitionRequestServerHandler 中创建一个NetworkSequenceViewReader对象,然后给每个reader分配SubPartitionView(调用requestSubpartitionView)。最后调用notifyReaderCreated 把reader加入到PartitionRequestQueue的allReaders中。PartitionRequestQueue监听下游channel是否可以写入,当下游channel可写时会调用 channelWritabilityChanged()方法,它会直接调用writeAndFlushNextMessageIfPossible()方法,该方法的逻辑是:如果有致命异常或是channel 不可写,就直接返回。否则就从allReaders中取出一个reader,如果取出的reader为空就直接返回,否则就从reader中获取buffer。如果buffer为空,并且 buffer没有被释放,则跳过本次循环重新开始新的循环,否则获取异常信息并将异常信息组装并抛出。否则,如果channel没有被从可用的读取队列中移除,则重新 将其添加到队列中,如果它仍然可用,则将消息包装为BufferResponse类型,等待写入和清空数据完成后再继续处理下一个buffer。

那么PartitionRequest是在哪里被发送的呢?这就要说到NettyPartitionRequestClient类的requestSubpartition()方法了。它会先判断TCP channel不 处于关闭状态,然后将inputChannel加入到clientHandler中。clientHandler是一个map类型的变量,是CreditBasedPartitionRequestClientHandler 类的实例,内部维护着input channel ID与channel的对应关系,在读取消息时需要从其中根据channel ID获取到channel对象本身。然后创建PartitionRequest 对象,并创建发送PartitionRequest请求成功之后的回调函数,其中是发送遇到错误时的一些异常处理功能。然后判断是否需要延迟发送,如果不需要延迟发送,则 直接通过tcpChannel将PartitionRequest请求发送出去,并添加回调函数。否则如果需要延迟发送,则调用eventLoop的schedule()方法进行延迟发送。那么是谁 调用了NettyPartitionRequestClient类的requestSubpartition()方法呢?继续向上追踪就来到了RemoteInputChannel的requestSubPartition()方法, 它的逻辑是如果partitionRequestClient为空,则会预先通过connectionManager创建一个client,然后调用其requestSubpartition()方法发送partitionRequest。 SingleInputGate类的requestPartitions()方法调用了上述方法,它首先会判断该partition是否已经被请求过,由于只能请求一次,所以它会先循环所有的inputChannels, 并请求对应的subPartition,并在第一次调用该方法后就会将requestedPartitionsFlag设置为true,以防重复调用。SingleInputGate继承于InputGate, 而InputGate的作用就是从intermediate result中读取数据到task中,operatorChain之间会使用intermediate result来作为中间结果的缓存,它在执行 时会使用ResultPartition来存放数据,其会根据数据分区条件分为一个或多个ResultSubpartition,每一个ResultSubpartition都对应下游的一个InputGate, InputGate会读取ResultSubpartition的内容。

那么数据究竟是怎么样被读取的呢?让我们再来分析下StreamTask的processInput()方法。它会调用inputProcessor的processInput()方法,该方法会返回 一个状态,如果状态表示的是还有输入数据且recordWriter依然可用,则返回,如果表示的是输入结束,则会将mailboxLoopRunning设置为false,并停止运行。 否则表示的是recordWriter不可用,则会在inputGate的recordWriter或inputProcessor恢复可用之后异步调用default action执行恢复操作。在这其中 最重要的是inputProcessor的processInput()方法,inputProcessor有三个实现类,分别是:StreamOneInputProcessor、StreamTwoInputProcessor 和StreamTwoInputSelectableProcessor。我们以StreamOneInputProcessor的processInput()方法为例来进行分析,它调用了input.emitNext()方法 并返回一个状态,如果返回的是输入结束则会调用operatorChain的endHeadOperatorInput()方法。

input也有两个具体的实现类,分别是StreamTaskSourceInput和StreamTaskNetworkInput,如果该StreamTask是数据源,则实现类为StreamTaskSourceInput, 否则就是StreamTaskNetworkInput,表示需要通过网络读取数据。我们分析下网络情况下的emitNext()方法,由于经过了网络,所以它需要从buffer的memorySegment 中反序列化出数据,然后判断buffer是否已经被消费了,如果为true表示buffer可以被回收。如果已经读取到了完整的记录,则处理从buffer中反序列化后得到的数据, 并返回还有更多数据待处理。如果当前记录的反序列化器中没有数据,或是从反序列化器中读取的数据还不完整,则还需要从CheckpointInputGate中读取数据,如果从 其中读取到数据,表明缓存中存在数据,需要调用processBufferOrEvent()方法将缓存中的memorySegment传递给currentRecordDeserializer进行反序列化得 到数据。否则,判断CheckpointInputGate是否已经结束,是则直接返回数据输入结束,如果为否就返回没有可用数据。

processBufferOrEvent()方法会首先判断缓存中获取的是buffer还是event,如果是buffer就获取buffer对应的channel ID,如果指定过channel ID则根据 channel ID获取对应的反序列化器,如果获取到的反序列化器不为null,就设置反序列化器要读取的buffer是inputGate得到的buffer。那么,如果是event呢? 它获取得到的event,并判断如果event不是EndOfPartitionEvent,就会立即清除channel对应的反序列化器,并将recordDeserializers对应channel ID 的引用置为空。

那么buffer是如何从inputGate中获取到的呢?来看一下checkpointedInputGate.pollNext()方法。CheckpointedInputGate负责检查数据流中的checkpoint barrier,调用对应的barrierHandler来决定是否触发checkpoint操作,它包装了InputGateWithMetrics,InputGateWithMetrics又包装了SingleInputGate。 InputGateWithMetrics负责监控接收到的数据,并统计输入数据的总字节数,作为metrics进行上报。而最终是调用了SingleInputGate.pollNext()方法,它用非阻塞 的方式调用getNextBufferOrEvent()方法,其中还有一个getNext()方法,是采用阻塞的方式调用getNextBufferOrEvent()方法。getNextBufferOrEvent()方法的 逻辑是,如果接收到所有分区的终止事件则返回空,如果input gate已经被关闭,则抛出CancelTaskException异常。否则调用waitAndGetNextData()方法读取数据,传 入一个blocking参数来决定是否阻塞,如果读取到数据则将数据封装为bufferOrEvent并返回。在waitAndGetNextData()方法中,它先根据传入的blocking参数决定采用 阻塞或非阻塞的方式获取channel,如果获取不到channel则返回空。否则,获取input channel的缓存中的数据,如果能获取到数据且还有更多的数据,则将inputChannel 加入到inputChannelsWithData队列中,并采用一个BitSet记录下哪些inputChannel已经加入到了该队列中。如果该队列为空,则设置其为不可用状态。最后,返回包装后 的结果。

每一个InputGate都包含一个或多个InputChannel。其中InputChannel又分为2种:LocalInputChannel负责从本地的ResultSubPartition读取数据,而 RemoteInputChannel负责从远程其他节点的ResultSubpartition读取数据。先来分析一下LocalInputChannel的getNextBuffer()方法,它先获取requestSubPartition() 方法得到的subpartitionView,如果subpartitionView为空,则需要再次检查subpartitionView。如果此时另一个线程正在调用requestSubpartition() 方法,那么checkAndWaitForSubpartitionView()方法会被阻塞直到requestSubpartition执行完毕。然后获取缓存的数据,如果数据为空,则判断subpartitionView 是否为空,是则抛出异常,否则返回空。更新从buffer中读取的字节数,并自增已读取的缓存数,最终将读取的数据封装返回,这就是从本地读取数据的方法。再来分析 下RemoteInputChannel的getNextBuffer()方法,它与本地获取数据不同的是它需要从receivedBuffers队列中获取buffer,而不是直接从subpartitionView 获取,其余的与本地获取逻辑类似。

receivedBuffers队列中的缓存数据是在onBuffer()方法中被加入,该方法的逻辑是如果releaseAllResources()方法还未被调用,资源还没有被释放,则对 sequenceNumber进行检查,判断添加buffer之前队列是否为空,随后添加缓存数据到队列中,由于缓存中已放入数据,所以标记缓存不需要回收。自增sequenceNumber, 如果添加buffer之前队列为空,则需要通知对应的inputGate已有数据不再为空。如果需要提前分配buffer,则还需要锁定bufferQueue,并根据后续所需的buffer 数和初始预留的buffer数计算出需要的buffer数目。如果可用的buffer数比需要的buffer数少,且状态不是等待请求浮动buffer的状态,则表示需要为bufferQueue 增加浮动的buffer。此时将申请一个buffer,并将其加入浮动buffer队列中,否则如果申请不到buffer,就表明已经没有足够的buffer,需要注册一个监听器,并 将状态标记为等待请求浮动buffer的状态。最后,如果本次操作请求的buffer数量大于0,且如果unannouncedCredit在增加numRequestedBuffers之前的值为0 (unannouncedCredit为未告知上游生产者的credit,用于数据反压),表明需要通知上游这里有可用credit,可以接收数据。

上面在申请buffer失败时注册了一个监听器,那么当监听器监听到buffer创建成功时调用什么方法呢?这就是notifyBufferAvailable()方法,它会首先确保正处于 等待浮动buffer的状态,并确保资源还未被释放,且可用buffer数目比所需的buffer数目少才会进行下面的流程。在后续的流程中,它会首先将浮动buffer添加到 bufferQueue中,如果此时可用buffer数与所需buffer数一致表示不再需要申请新的buffer,否则返回状态表示仍需要新的buffer。如果unannouncedCredit在 增加1个可用buffer之前为0,就需要通知上游,下游已经可以接收数据。

那么问题来了,onBuffer()方法又是何时被调用的呢?答案是在CreditBasedPartitionRequestClientHandler类的decodeBufferOrEvent()方法中调用。 这个方法负责处理接收到的数据,这里的数据的类型可能为buffer,也可能是event。具体的逻辑是,如果是buffer,且如果从netty接收到的字节数为0,则调用 RemoteInputChannel的onEmptyBuffer()方法并返回。否则,请求一个空的buffer,如果请求成功,则将从netty中读取到的数据填充到buffer中,并根据 是否压缩的配置控制是否进行压缩,然后调用inputChannel.onBuffer()方法进行处理。否则如果请求空buffer不成功,且inputChannel已经被释放,则取消 请求方法。否则,抛出状态异常。如果是event呢?则会创建一个memSeg,其中的数据是event的内容,再将其包装进networkBuffer对象,通过onBuffer()方法 交给RemoteInputChannel进行处理。最后,释放netty使用到的buffer。

那么,decodeBufferOrEvent()方法又是在何处被调用的呢?答案是在decodeMsg()方法中。在该方法中,它会先判断msg是否是NettyMessage.BufferResponse 类,如果是则获取接收此buffer的input channel,如果channel为空则释放buffer,取消request,并返回。否则,调用decodeBufferOrEvent()方法。该方法 中还有一些异常代码的处理。这个方法是在netty框架的channelRead()方法中调用,也就是说它从netty中读取数据,进行反序列化,申请buffer,并将数据存放到buffer 中去,最终被task读取到,这就是整个数据的读取流程,至此已经分析完毕了。