Skip to content

Incorrect handling of second-to-last chunk using (streaming) bulk helpers #3206

@ludovicofrizziero

Description

@ludovicofrizziero

Elasticsearch allows bulk uploading of data, handled by helpers.streaming_bulk and helpers.async_streaming_bulk in this package. They allow to set two parameters to control chunking: chunk_size and/or max_chunk_bytes.

However, the last fully formed chunk, given those parameters, is mishandled due to a logical bug inside helpers._ActionChunker.feed.

The bug causes the current last full chunk to get emitted only when a new action is generated by the stream, and a new partial chunk begins, rather then it being emitted first and only then starting to listen for new actions.

Hence, the bug is 'silent', given that all data eventually do end up ingested into Elasticsearch, but if and only if the source action stream ends, a situation which I belive is the most normal when using these helpers.

Unfortunately I have a never ending stream of very small and infrequent batches of actions, hence I uncovered this bug noticing some data where missing inside the database.

The bug is very simple to solve, all it takes, within _ActionChunker.feed, is to move the check on the chunk boundaries down, after the chuck is updated with the latest incoming action from the stream.

The code as is right now (package version 9.2.0):

class _ActionChunker:
    ...

    def feed(
        self,
        action: _TYPE_BULK_ACTION_HEADER_WITH_META,
        data: _TYPE_BULK_ACTION_BODY,
    ) -> Optional[
        Tuple[
            List[
                Union[
                    Tuple[_TYPE_BULK_ACTION_HEADER],
                    Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY],
                ]
            ],
            List[bytes],
        ]
    ]:
        ret = None
        action_bytes = b""
        data_bytes: Optional[bytes] = None
        cur_size = 0
        if not isinstance(action, BulkMeta):
            action_bytes = to_bytes(self.serializer.dumps(action), "utf-8")
            # +1 to account for the trailing new line character
            cur_size = len(action_bytes) + 1

            if data is not None:
                data_bytes = to_bytes(self.serializer.dumps(data), "utf-8")
                cur_size += len(data_bytes) + 1
            else:
                data_bytes = None

        # full chunk, send it and start a new one
        if self.bulk_actions and (
            self.size + cur_size > self.max_chunk_bytes
            or self.action_count == self.chunk_size
            or (action == BulkMeta.flush and self.bulk_actions)
        ):
            ret = (self.bulk_data, self.bulk_actions)
            self.bulk_actions = []
            self.bulk_data = []
            self.size = 0
            self.action_count = 0

        if not isinstance(action, BulkMeta):
            self.bulk_actions.append(action_bytes)
            if data_bytes is not None:
                self.bulk_actions.append(data_bytes)
                self.bulk_data.append((action, data))
            else:
                self.bulk_data.append((action,))

            self.size += cur_size
            self.action_count += 1
        return ret

and the code with the proposed change

class _ActionChunker:
    ...

    def feed(
        self,
        action: _TYPE_BULK_ACTION_HEADER_WITH_META,
        data: _TYPE_BULK_ACTION_BODY,
    ) -> Optional[
        Tuple[
            List[
                Union[
                    Tuple[_TYPE_BULK_ACTION_HEADER],
                    Tuple[_TYPE_BULK_ACTION_HEADER, _TYPE_BULK_ACTION_BODY],
                ]
            ],
            List[bytes],
        ]
    ]:
        ret = None
        action_bytes = b""
        data_bytes: Optional[bytes] = None
        cur_size = 0
        if not isinstance(action, BulkMeta):
            action_bytes = to_bytes(self.serializer.dumps(action), "utf-8")
            # +1 to account for the trailing new line character
            cur_size = len(action_bytes) + 1

            if data is not None:
                data_bytes = to_bytes(self.serializer.dumps(data), "utf-8")
                cur_size += len(data_bytes) + 1
            else:
                data_bytes = None
       
            # ------ moved up ------
            self.bulk_actions.append(action_bytes)
            if data_bytes is not None:
                self.bulk_actions.append(data_bytes)
                self.bulk_data.append((action, data))
            else:
                self.bulk_data.append((action,))

            self.size += cur_size
            self.action_count += 1
            # ----------------------

        # full chunk, send it and start a new one
        if self.bulk_actions and (
            self.size > self.max_chunk_bytes   # ----------- changed
            or self.action_count >= self.chunk_size   # ---------- changed (more reliable check)
            or (action == BulkMeta.flush and self.bulk_actions)
        ):
            ret = (self.bulk_data, self.bulk_actions)
            self.bulk_actions = []
            self.bulk_data = []
            self.size = 0
            self.action_count = 0

        return ret

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions