Skip to content

GCS resumeable upload chunker bug #3108

@jroper

Description

@jroper

Just browsing the source code for the GCS resumeable upload implementation, I noticed a bug. This code is supposed to split the stream into byte string chunks of the same size, with the remaining chunks being in the last element of the stream. Here is the code:

  private def chunker(chunkSize: Int) = Flow[ByteString].map(Some(_)).concat(Source.single(None)).statefulMapConcat {
    () =>
      val chunkBuilder = ByteString.newBuilder
      bytes =>
        bytes.fold(Some(chunkBuilder.result()).filter(_.nonEmpty).toList) { bytes =>
          chunkBuilder ++= bytes
          if (chunkBuilder.length < chunkSize) {
            Nil
          } else if (chunkBuilder.length == chunkSize) {
            val chunk = chunkBuilder.result()
            chunkBuilder.clear()
            chunk :: Nil
          } else { // chunkBuilder.length > chunkSize
            val result = chunkBuilder.result()
            chunkBuilder.clear()
            val (chunk, init) = result.splitAt(chunkSize)
            chunkBuilder ++= init
            chunk :: Nil
          }
        }
  }

The issue is in the last few lines, if the chunkBuilder buffer has multiple chunks worth of bytes in it, they should be all emited, but this code is only emitting the first chunk. The result is that the additional chunks won't be emitted until the next ByteString is received, and possible consequences of this include unbounded buffer growth (if the size of the ByteString's coming in to the chunker are consistently larger than the chunk size) and emitting a final chunk that is larger (potentially much larger) than the chunk size.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions