Skip to content

Ability to parse json with optional node fields when applying explode #1360

@ai-ignatyev

Description

@ai-ignatyev

Description

Let's assume we have a json with optional nodes and we want to apply explode:

import datachain as dc

def get_json(x: int) -> dict:
    node = {'leaf_field': x} if x % 2 == 0 else None
    return {'node_field': node}

chain = dc.read_records(
    [ {'json': get_json(i)} for i in range(10) ],
    schema={'json': dict}
)
chain = chain.explode(
    'json',
    column='json',
    schema_sample_size=10
)

chain.show(10)

In this case we get the following error:

Processed: 0 rows [00:00, ? rows/s]============== Error in user code: 'Mapper' ==============
Traceback (most recent call last):
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/lib/udf.py", line 158, in process
    return self._func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/lib/dc/datachain.py", line 498, in json_to_model
    return model.model_validate(json_dict)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/pydantic/main.py", line 705, in model_validate
    return cls.__pydantic_validator__.validate_python(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.ValidationError: 1 validation error for JsonExplodedModel
node_field
  Input should be a valid dictionary or instance of ArrowDataModel_node_field [type=model_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.11/v/model_type
==========================================================
Traceback (most recent call last): 
  File "/home/aignatyev/main.py", line 17, in <module>
    chain.show(10)
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/lib/dc/datachain.py", line 1901, in show
    df = dc.to_pandas(flatten, include_hidden=include_hidden)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/lib/dc/datachain.py", line 1878, in to_pandas
    results = self.results(include_hidden=include_hidden)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/lib/dc/datachain.py", line 1372, in results
    return list(self._leaf_values(include_hidden=include_hidden))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/lib/dc/datachain.py", line 1323, in _leaf_values
    with self._query.ordered_select(*db_signals).as_iterable() as rows:
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.12/contextlib.py", line 137, in __enter__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/query/dataset.py", line 1352, in as_iterable
    query = self.apply_steps().select()
            ^^^^^^^^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/query/dataset.py", line 1298, in apply_steps
    result = step.apply(
             ^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/query/dataset.py", line 637, in apply
    self.populate_udf_table(udf_table, query)
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/query/dataset.py", line 531, in populate_udf_table
    process_udf_outputs(
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/query/dataset.py", line 351, in process_udf_outputs
    warehouse.insert_rows(udf_table, _insert_rows(), batch_size=batch_size)
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/data_storage/sqlite.py", line 722, in insert_rows
    for row_chunk in batched(rows, batch_size):
                     ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/utils.py", line 255, in batched
    yield from (tuple(batch) for batch in _dynamic_batched_core(iterable, batch_size))
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/utils.py", line 255, in <genexpr>
    yield from (tuple(batch) for batch in _dynamic_batched_core(iterable, batch_size))
                                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/utils.py", line 237, in _dynamic_batched_core
    for item in iterable:
                ^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/query/dataset.py", line 342, in _insert_rows
    for udf_output in udf_results:
                      ^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/lib/udf.py", line 83, in run
    yield from self.inner.run(
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/lib/udf.py", line 401, in run
    result_objs = self.process_safe(udf_args)
                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/aignatyev/.venv/lib/python3.12/site-packages/datachain/lib/udf.py", line 298, in process_safe
    raise DataChainError(
datachain.lib.utils.DataChainError: Error in user code in class 'Mapper': 1 validation error for JsonExplodedModel
node_field
  Input should be a valid dictionary or instance of ArrowDataModel_node_field [type=model_type, input_value=None, input_type=NoneType]
    For further information visit https://errors.pydantic.dev/2.11/v/model_type

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions