-
Notifications
You must be signed in to change notification settings - Fork 291
Support of partitioning/checkpointing/event-logging #748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Summary of Changes
Hello @cyruszhang, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request significantly upgrades Data-Juicer's capabilities for large-scale data processing by implementing a comprehensive system for partitioning, checkpointing, and event logging. These changes are designed to enhance the platform's fault tolerance, improve scalability, provide in-depth observability, and offer greater flexibility in managing data processing workflows, especially within the Ray execution environment.
Highlights
- Enhanced Fault Tolerance and Resumption: This PR introduces a robust partitioning and checkpointing system for Data-Juicer, enabling jobs to automatically resume from failures. It prevents the need to re-run entire pipelines by saving intermediate states, significantly improving reliability for large dataset processing.
- Comprehensive Observability with Event Logging: A new, universal event logging system has been integrated, providing real-time monitoring, detailed progress tracking, and comprehensive audit trails. This system logs various events, including job, partition, operation, and checkpoint activities, enhancing debugging and operational visibility across all executor types.
- Optimized Data Handling and Storage: The system now supports configurable intermediate data storage formats (Parquet, Arrow, JSONL) with compression, allowing users to balance performance, storage efficiency, and human readability. It also includes an intelligent auto-configuration system for optimal partition sizing based on data modality and processing complexity.
- Flexible Job Management and Configuration: New configuration options and command-line arguments are added for fine-grained control over partitioning, checkpointing strategies (e.g.,
every_op,every_partition,every_n_ops), and file retention policies. It also introduces job-specific directories and automatic generation of resumption commands for easier job management. - New Partitioned Ray Executor: A
ray_partitionedexecutor type is introduced, leveraging Ray's distributed capabilities with the new partitioning, checkpointing, and event logging features to provide a highly scalable and fault-tolerant processing solution for large datasets.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments or fill out our survey to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a major new feature: a partitioned Ray executor with checkpointing and event logging capabilities. This is a significant step towards making Data-Juicer more robust, scalable, and observable for large-scale data processing. The changes are extensive, including new core components, configuration options, and comprehensive documentation.
The overall design is solid, with a clear separation of concerns for partitioning, checkpointing, and event logging. The use of a mixin for event logging is a good pattern for reusability across different executors. The automatic partition size optimization is a great usability improvement.
However, the review identified several critical and high-severity issues that need to be addressed:
- There is significant code duplication and conflicting implementations for event logging between
PartitionedRayExecutorandEventLoggingMixin. - A new file
partitioned_executor_base.pyappears to be dead code and should be removed. - There are a couple of correctness bugs in the event logging and partition size optimizer that could lead to runtime errors or incorrect behavior.
I've provided detailed comments and suggestions for each issue. Addressing these will greatly improve the maintainability, correctness, and clarity of the new implementation. Once these issues are resolved, this will be a fantastic addition to Data-Juicer.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
这次的 Pull Request 引入了一套完整的分区、检查点和事件日志系统,极大地增强了 Data-Juicer 的容错性、可扩展性和可观测性。这些改动非常广泛且结构清晰,包括了新的执行器、配置管理和作业工具。特别是 ray_partitioned 执行器的引入,以及可配置的检查点策略和详细的事件日志,为企业级数据处理奠定了坚实的基础。新增的作业快照分析和监控等管理工具也极大地改善了用户体验。
不过,我发现了一些问题,尤其是在目录解析逻辑中存在一个关键问题,可能会导致用户无法按预期使用灵活的存储路径。此外,用于作业恢复的命令行参数解析器不够健壮,并且有一个测试用例存在错误。解决这些问题将使这个出色的贡献更加完善。
| def _parse_cli_to_config(cli_args: list) -> dict: | ||
| """Parse CLI arguments into config dictionary format.""" | ||
| config = {} | ||
|
|
||
| i = 0 | ||
| while i < len(cli_args): | ||
| arg = cli_args[i] | ||
|
|
||
| if arg.startswith("--"): | ||
| key = arg[2:] # Remove '--' | ||
|
|
||
| # Check if next arg is a value (not another flag) | ||
| if i + 1 < len(cli_args) and not cli_args[i + 1].startswith("--"): | ||
| value = cli_args[i + 1] | ||
|
|
||
| # Try to parse as different types | ||
| if value.lower() in ["true", "false"]: | ||
| config[key] = value.lower() == "true" | ||
| elif value.isdigit(): | ||
| config[key] = int(value) | ||
| elif value.replace(".", "").isdigit(): | ||
| config[key] = float(value) | ||
| else: | ||
| config[key] = value | ||
|
|
||
| i += 2 # Skip both key and value | ||
| else: | ||
| # Boolean flag (no value) | ||
| config[key] = True | ||
| i += 1 | ||
| else: | ||
| i += 1 | ||
|
|
||
| return config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
这个 PR 引入了一套完整的分区、检查点和事件日志系统,极大地增强了 Data-Juicer 处理大规模数据集时的容错性、可扩展性和可观测性。这是一个非常重要的功能,设计和实现都相当全面。主要亮点包括:
- 分区执行器:新的
PartitionedRayExecutor支持将数据集拆分为多个分区并行处理,并能处理需要全局信息的算子(如去重)。 - 灵活的检查点:支持多种检查点策略(
every_op,every_n_ops,manual),并实现了分区级别的检查点,能够在任务失败后从最近的检查点恢复,大大节省了重新计算的成本。 - 全面的事件日志和作业管理:引入了类似 Spark 的事件日志系统,记录作业执行过程中的详细事件。基于此,提供了作业快照、监控和停止等实用工具,并实现了可靠的作业恢复机制。
- 自动化资源优化:新增了资源感知的分区优化器,可以根据数据模态和系统资源自动配置最佳的分区大小和工作节点数,简化了用户配置。
代码结构清晰,通过 Mixin(EventLoggingMixin, DAGExecutionMixin)实现了功能的解耦和复用。同时,为新功能添加了详细的文档、演示和单元测试,这对于项目的可维护性至关重要。
在审查过程中,我发现了一些需要关注的问题,主要集中在配置处理和核心执行逻辑中,具体请见下面的评论。总体而言,这是一个高质量、功能强大的 PR,将显著提升 Data-Juicer 的企业级服务能力。
| if self.pipeline_dag: | ||
| self._pre_execute_operations_with_dag_monitoring(group_ops, partition_id=partition_id) | ||
| else: | ||
| # Fallback to manual logging without DAG | ||
| for op_idx, op in enumerate(group_ops): | ||
| self._log_event( | ||
| event_type=EventType.OP_START, | ||
| message=f"Starting operation: {op._name}", | ||
| operation_name=op._name, | ||
| operation_idx=start_idx + op_idx, | ||
| partition_id=partition_id, | ||
| ) | ||
|
|
||
| # Execute operations | ||
| current_dataset = current_dataset.process(group_ops) | ||
|
|
||
| # Post-execute DAG monitoring (log operation completion events) | ||
| if self.pipeline_dag: | ||
| self._post_execute_operations_with_dag_monitoring(group_ops, partition_id=partition_id) | ||
| else: | ||
| # Fallback to manual logging without DAG | ||
| for op_idx, op in enumerate(group_ops): | ||
| self._log_event( | ||
| event_type=EventType.OP_COMPLETE, | ||
| message=f"Completed operation: {op._name}", | ||
| operation_name=op._name, | ||
| operation_idx=start_idx + op_idx, | ||
| partition_id=partition_id, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在 _process_with_checkpointing 方法中,current_dataset.process(group_ops) 调用被错误地放置在了 if self.pipeline_dag: 的 else 分支内部。这意味着当启用 DAG 监控时(即 self.pipeline_dag 为真),分区中的操作(group_ops)将永远不会被执行。代码会记录操作开始的日志,但随后会跳过实际的数据处理步骤。这是一个严重错误,破坏了执行器的核心处理逻辑。应将 dataset.process 调用移出 if/else 块。
# Pre-execute DAG monitoring (log operation start events)
if self.pipeline_dag:
self._pre_execute_operations_with_dag_monitoring(group_ops, partition_id=partition_id)
else:
# Fallback to manual logging without DAG
for op_idx, op in enumerate(group_ops):
self._log_event(
event_type=EventType.OP_START,
message=f"Starting operation: {op._name}",
operation_name=op._name,
operation_idx=start_idx + op_idx,
partition_id=partition_id,
)
# Execute operations
current_dataset = current_dataset.process(group_ops)
# Post-execute DAG monitoring (log operation completion events)
if self.pipeline_dag:
self._post_execute_operations_with_dag_monitoring(group_ops, partition_id=partition_id)
else:
# Fallback to manual logging without DAG
for op_idx, op in enumerate(group_ops):
self._log_event(
event_type=EventType.OP_COMPLETE,
message=f"Completed operation: {op._name}",
operation_name=op._name,
operation_idx=start_idx + op_idx,
partition_id=partition_id,
)Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
HYLcool
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please resolve pre-commit issues as well.
| if partition_id != 0: | ||
| # Partitioned executor - pass partition_id | ||
| self._log_operation_with_dag_context(op_name, op_idx, "op_start", partition_id=partition_id) | ||
| else: | ||
| # Non-partitioned executor | ||
| self._log_operation_with_dag_context(op_name, op_idx, "op_start") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value of partition_id is 0 in the method _log_operation_with_dag_context, so maybe this if-else is not necessary?
| if partition_id != 0: | ||
| # Partitioned executor - pass partition_id | ||
| self._log_operation_with_dag_context( | ||
| op_name, | ||
| op_idx, | ||
| "op_complete", | ||
| partition_id=partition_id, | ||
| duration=0.0, | ||
| input_rows=0, | ||
| output_rows=0, | ||
| ) | ||
| else: | ||
| # Non-partitioned executor | ||
| self._log_operation_with_dag_context( | ||
| op_name, op_idx, "op_complete", duration=0.0, input_rows=0, output_rows=0 | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the previous comment: the default value of partition_id is 0 in the method _log_operation_with_dag_context.
| operation_name=op._name, | ||
| operation_idx=start_idx + op_idx, | ||
| partition_id=partition_id, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This if-else might be not correct. Please double check.
Maybe lines 619-634 should have their indentation reduced.
design doc (internal): https://aliyuque.antfin.com/ah7ri9/zdesop/qw3tm08a5wcqx446
概述
Data-Juicer 分区、检查点和事件日志系统为处理大型数据集提供了全面的解决方案,具备容错性、可扩展性和完整的可观测性。
设计初衷
Ray会有一些容错能力(actor的persistence机制,以及task级别的重试逻辑);Ray-DLC也会提供更好的异常容错和自愈;但是还是存在一些系统性的问题:
● 整体执行问题:ray将整个数据集作为一个整体单元来处理;如果一小部分失败了,整个OP stage乃至pipeine就失败
● 进度恢复空白:整个流程作为个整体来操作的,一个部分错了就会需要全部重跑
● 没有用户可配置的细粒度的容错方式,缺少灵活性
● 数据持久化和映射:这个目前是空缺的;actor可以提供入口,但是目前DJ框架没有支持
● 可观测性不够:ray只有集群状态,对于dj任务的状态还是缺少了更好的观测
所以我们希望能够通过一整套分区、检查点、事件日志的逻辑,把这些问题都解决
主要功能
● 容错性: 使用检查点自动从故障中恢复
● 可扩展性: 基于分区的处理,适用于任何规模的数据集
● 可观测性: 全面的事件日志记录和实时监控
● 性能: 优化的存储格式和并行处理
● 灵活性: 可配置的分区和检查点策略
DJ会提供分区、检查点逻辑,可以给用户提供显式的容错机制介入,灵活的重启方式,小分片的血缘支持,也有更加完整和细粒度的的任务可观测性,可以给DJ提供企业级的服务打好基础。这个跟Ray或者Ray-DLC关注的容错和计算可扩展性并不冲突