Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: XStream 返回流同时支持异步迭代 #319

Merged
merged 22 commits into from
Dec 23, 2024

Conversation

ppbl
Copy link
Contributor

@ppbl ppbl commented Dec 6, 2024

why?
目前 XStream 返回一个 AsyncGenerator,支持 for await of 异步迭代,
但是实际开发中遇到响应很长中途中断是很常见的需求,此时通过 XStream 无法取消。

如果返回一个 stream 则可以自定义实现取消逻辑,同时通过给 stream 增加.asyncIterator 属性仍然可以支持异步迭代
(想过 polyfill, 一是编码的时候遇到类型问题不想断言,然后觉得毕竟影响比较大,现在这种方式只影响使用 XStream 的 stream)

Summary by CodeRabbit

  • 新特性
    • 引入了新的 XReadableStream 类型,支持异步迭代功能。
    • 更新了 XStream 函数的签名,支持使用 for await...of 语法进行流的异步迭代。
    • 新增了聊天界面组件,支持流式消息传递和中断功能。
  • 测试
    • 增加了新的测试用例,验证 XStream 函数返回 ReadableStream 实例的功能。
  • 文档
    • stream-cancel 功能添加了简体中文和英文的文档说明。
    • useXChat 文档中新增了中断输出的示例代码。

Copy link
Contributor

coderabbitai bot commented Dec 6, 2024

Important

Review skipped

Review was skipped due to path filters

⛔ Files ignored due to path filters (1)
  • components/useXChat/__tests__/__snapshots__/demo.test.ts.snap is excluded by !**/*.snap

CodeRabbit blocks several paths by default. You can override this behavior by explicitly including those paths in the path filters. For example, including **/dist/** will override the default block on the dist directory, by removing the pattern from both the lists.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

📝 Walkthrough
📝 Walkthrough
📝 Walkthrough

Walkthrough

此拉取请求对 components/x-stream/index.ts 文件进行了修改,主要集中在 XStream 函数及其相关类型上。引入了新类型 XReadableStream<R>,该类型扩展了 ReadableStream<R>,以支持通过 AsyncGenerator<R> 进行异步迭代。更新了 XStream 函数的签名,去除了 async 关键字,并直接在流上实现了异步迭代器,允许消费者使用 for await...of 语法。内部逻辑进行了调整,以支持异步迭代器。对 components/x-stream/__tests__/index.test.tsx 文件中的测试进行了扩展,增加了对 XStream 返回类型的验证。

Changes

文件路径 修改摘要
components/x-stream/index.ts 添加类型 XReadableStream<R>,更新 XStream 函数签名,去除 async 关键字,实现异步迭代器。
components/x-stream/tests/index.test.tsx 新增测试用例以验证 XStream 返回 ReadableStream 实例,扩展了现有测试覆盖范围。
components/useXChat/demo/stream-cancel.md 添加简体中文和英语的文档部分,描述中断流式输出的功能。
components/useXChat/demo/stream-cancel.tsx 新增 App 组件,管理聊天界面,使用模拟可读流流式消息。
components/useXChat/index.en-US.md useXChat 文档中新增示例代码,展示中断输出的功能。
components/useXChat/index.zh-CN.md useXChat 文档中新增示例代码,展示“打断输出”的功能。

Possibly related PRs

Suggested reviewers

  • zombieJ

Poem

在流中跳跃,兔子欢腾,
新的迭代,乐趣无穷。
异步的舞步,轻盈飞扬,
代码如歌,欢快流淌。
让我们一起,探索无边,
兔子与流,心连心间。 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

github-actions bot commented Dec 6, 2024

Preview is ready

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 5321c19 and 862eace.

📒 Files selected for processing (1)
  • components/x-stream/index.ts (2 hunks)
🔇 Additional comments (3)
components/x-stream/index.ts (3)

140-142: 添加了 XReadableStream 接口以支持异步迭代

接口 XReadableStream 正确地扩展了 ReadableStream,并添加了 [Symbol.asyncIterator]() 方法,使流可以直接进行异步迭代。


189-189: 返回值正确地断言为 XReadableStream

函数返回流,并将其断言为 XReadableStream<Output>,符合预期。


148-148: XStream 函数签名更改,请确保兼容性

XStream 函数由 async function* 修改为普通函数,返回类型变为 XReadableStream<Output>。请确认所有调用 XStream 的地方都已适配新的返回类型,以避免潜在的问题。

请运行以下脚本,检查代码库中对 XStream 的调用,确保调用方已适配新的返回类型:

✅ Verification successful

Let me generate additional verification scripts to check the implementation details of the XStream function.


XStream 函数签名变更已正确适配

根据代码分析,XStream 函数虽然从 async function* 修改为普通函数,但它通过在返回的 stream 对象上显式实现 Symbol.asyncIterator,保持了与之前相同的异步迭代器接口。所有调用处都使用 for await...of 语法进行迭代,完全兼容新的实现方式。

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# 描述:查找所有对 XStream 的调用,并显示其上下文

rg -A 5 "XStream\("

Length of output: 5018


Script:

#!/bin/bash
# Check the implementation of XStream function and its return type
ast-grep --pattern 'function XStream<$_>($_) {
  $$$
}'

# Check if there are any type definitions related to XStream
rg "type.*XStream" -A 2

Length of output: 3644

Comment on lines 173 to 187
/** support async iterator */
(stream as XReadableStream<Output>)[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
return;
}
yield value;
}
} finally {
reader.cancel();
}
};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

在 finally 块中应使用 reader.releaseLock() 而非 reader.cancel()

在异步迭代器的 finally 块中,调用 reader.cancel() 会取消流的读取,并可能导致未读取的数据被丢弃。如果只是想释放读取器的锁,应使用 reader.releaseLock()

请应用以下修改来修正此问题:

  } finally {
-   reader.cancel();
+   reader.releaseLock();
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/** support async iterator */
(stream as XReadableStream<Output>)[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
return;
}
yield value;
}
} finally {
reader.cancel();
}
};
/** support async iterator */
(stream as XReadableStream<Output>)[Symbol.asyncIterator] = async function* () {
const reader = this.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
return;
}
yield value;
}
} finally {
reader.releaseLock();
}
};

@YumoImer
Copy link
Collaborator

YumoImer commented Dec 6, 2024

感谢贡献!

Copy link

codecov bot commented Dec 6, 2024

Bundle Report

Changes will decrease total bundle size by 74 bytes (-0.05%) ⬇️. This is within the configured threshold ✅

Detailed changes
Bundle name Size Change
antdx-array-push 156.23kB 74 bytes (-0.05%) ⬇️

@YumoImer
Copy link
Collaborator

YumoImer commented Dec 6, 2024

这个 PR 的实现思路让我学习到了,优雅!

@YumoImer
Copy link
Collaborator

YumoImer commented Dec 9, 2024

@ppbl run test 挂啦

@ppbl
Copy link
Contributor Author

ppbl commented Dec 9, 2024

@ppbl run test 挂啦

异常捕获的问题,加了 finally 块,splitPart 里面的错误就捕获不到直接抛出来了

@ppbl
Copy link
Contributor Author

ppbl commented Dec 9, 2024

@YumoImer

现在的测试错误是
it('throws an error when key-value separator is not ":"', async () => { ...
这个测试,试了一下因为 try finally 的方式无法很好的把错误抛出去

目前改成 AsyncIterator 对象的形式可以解决

 (stream as XReadableStream<Output>)[Symbol.asyncIterator] = function () {
    const reader = this.getReader();
    return {
      async next() {
        const { done, value } = await reader.read();
        return {
          done,
          value: value!,
        };
      },
      async return() {
        await reader.cancel();
        return {
          done: true,
          value: null,
        };
      },
    };
  };

这样的话带来的问题是
it('supports custom transform streams', 这个测试失败,原因是 不能像在 generator 里面一样写 return 或者 while 里写 break
之前. if (!value) continue; 这行代码无法兼容,导致 custom-part1|custom-part2| 会返回['custom-part1', 'custom-part2', '']

不过我个人感觉这个也可以在外部自行处理

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (2)
components/x-stream/index.ts (2)

146-146: 函数签名修改合理,建议更新文档

移除 async 关键字的修改符合新的实现方式。建议在函数文档中补充说明返回的流对象同时支持标准流操作和异步迭代的特性。


171-192: 建议添加性能优化

考虑添加缓冲机制以提高性能,特别是在处理大量数据时。

  (stream as XReadableStream<Output>)[Symbol.asyncIterator] = function () {
    const reader = this.getReader();
+   const buffer: Output[] = [];
+   const bufferSize = 1024; // 可配置的缓冲区大小
    return {
      async next() {
        try {
+         if (buffer.length > 0) {
+           return {
+             done: false,
+             value: buffer.shift()!,
+           };
+         }
          const { done, value } = await reader.read();
+         if (value && Array.isArray(value)) {
+           buffer.push(...value.slice(1));
+           return {
+             done: false,
+             value: value[0],
+           };
+         }
          return {
            done,
            value: value ?? null,
          };
        } catch (error) {
          reader.releaseLock();
          throw error;
        }
      },
      // ... 其他方法保持不变
    };
  };
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 862eace and 75fda70.

📒 Files selected for processing (2)
  • components/x-stream/__tests__/index.test.tsx (1 hunks)
  • components/x-stream/index.ts (2 hunks)
🔇 Additional comments (3)
components/x-stream/__tests__/index.test.tsx (1)

66-68: 需要验证空值处理的必要性

这个改动表明新的流实现可能会产生空值。虽然过滤掉空值可以让测试通过,但这可能掩盖了底层实现的问题。

建议:

  1. 确认空值是否是预期行为
  2. 如果是预期的,请在代码注释中说明原因
  3. 如果不是预期的,建议在流的实现中处理,而不是在消费端过滤
✅ Verification successful

空值处理是合理的实现

根据代码分析,空值检查是合理的:

  1. 这个空值检查是在使用自定义 transformStream 的测试用例中,而不是默认的 SSE 处理流程
  2. 从实现来看,XStream 支持可选的自定义 transformStream,允许用户定义自己的数据转换逻辑
  3. 自定义转换流可能产生空值是完全合理的,这取决于具体的业务需求

建议:

  • 在测试用例中添加注释,说明空值检查的目的是为了支持自定义转换流的灵活性
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# 检查其他测试文件中是否有类似的空值处理模式
rg -l 'if.*value.*push' --type ts

Length of output: 35


Script:

#!/bin/bash
# 搜索流实现中的相关代码
ast-grep --pattern 'class $_ implements Stream {
  $$$
}'

# 查找测试文件中的相关代码模式
rg -A 5 -B 5 'result.push.*value' --type ts

# 查找可能的空值处理相关代码
rg -A 3 'null.*value|undefined.*value' --type ts

Length of output: 4089


Script:

#!/bin/bash
# 检查自定义转换流的实现
ast-grep --pattern 'const customTransform = $$$'

# 查找流实现中的相关代码
rg -A 5 'class.*Stream.*{' --type ts

# 检查转换流的实现
rg -A 10 'transform.*Stream' --type ts

Length of output: 8571

components/x-stream/index.ts (2)

140-141: 类型定义清晰且符合预期

XReadableStream 类型很好地结合了 ReadableStreamAsyncIterable 的特性,使流既支持标准的流操作,又支持异步迭代。


171-192: ⚠️ Potential issue

异步迭代器实现需要改进

当前实现存在以下问题:

  1. return() 方法中使用 cancel() 可能导致未读取的数据丢失
  2. 缺少错误处理机制
  3. 没有适当的资源清理

建议应用以下修改:

  (stream as XReadableStream<Output>)[Symbol.asyncIterator] = function () {
    const reader = this.getReader();
    return {
      async next() {
+       try {
          const { done, value } = await reader.read();
          return {
            done,
            value: value!,
          };
+       } catch (error) {
+         reader.releaseLock();
+         throw error;
+       }
      },
      async return() {
-       await reader.cancel();
+       reader.releaseLock();
        return {
          done: true,
          value: null,
        };
      },
+     async throw(error) {
+       reader.releaseLock();
+       throw error;
+     },
    };
  };

components/x-stream/index.ts Outdated Show resolved Hide resolved
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (3)
components/x-stream/index.ts (3)

140-141: 建议改进类型定义的类型安全性

建议避免使用 any 类型,可以使用更具体的类型或泛型约束来提高类型安全性。

-type XReadableStream<R = any> = ReadableStream<R> & AsyncIterable<R>;
+type XReadableStream<R = unknown> = ReadableStream<R> & AsyncIterable<R>;

Line range hint 146-170: 建议增强错误处理和错误信息

当前的错误处理可以更加完善:

  1. 错误信息应该包含实际接收到的类型信息
  2. 建议添加对 transformStream 的类型验证
  if (!(readableStream instanceof ReadableStream)) {
-   throw new Error('The options.readableStream must be an instance of ReadableStream.');
+   throw new Error(
+     `options.readableStream 必须是 ReadableStream 的实例,但收到了 ${Object.prototype.toString.call(readableStream)}`
+   );
  }

+ if (transformStream && !(transformStream instanceof TransformStream)) {
+   throw new Error(
+     `options.transformStream 必须是 TransformStream 的实例,但收到了 ${Object.prototype.toString.call(transformStream)}`
+   );
+ }

Line range hint 146-192: 建议考虑添加流的取消信号支持

目前的实现虽然支持了异步迭代,但在实际应用中可能需要支持更细粒度的取消控制。建议考虑添加对 AbortSignal 的支持。

建议在 XStreamOptions 接口中添加可选的 signal 参数:

export interface XStreamOptions<Output> {
  readableStream: ReadableStream<Uint8Array>;
  transformStream?: TransformStream<string, Output>;
  signal?: AbortSignal;
}

这样可以让上层应用更灵活地控制流的生命周期,特别是在需要提前取消长时间运行的流处理时。

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Reviewing files that changed from the base of the PR and between 75fda70 and 9c28380.

📒 Files selected for processing (1)
  • components/x-stream/index.ts (2 hunks)

Comment on lines 171 to 190
/** support async iterator */
(stream as XReadableStream<Output>)[Symbol.asyncIterator] = function () {
const reader = this.getReader();
return {
async next() {
const { done, value } = await reader.read();
return {
done,
value: value ?? null,
};
},
async return() {
await reader.cancel();
return {
done: true,
value: null,
};
},
};
};
Copy link
Contributor

@coderabbitai coderabbitai bot Dec 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

建议优化资源清理逻辑

当前的异步迭代器实现存在以下问题:

  1. 没有正确处理读取器的锁定释放
  2. 在错误发生时可能会导致资源泄漏

建议按照以下方式修改实现:

  (stream as XReadableStream<Output>)[Symbol.asyncIterator] = function () {
    const reader = this.getReader();
    return {
      async next() {
-       const { done, value } = await reader.read();
-       return {
-         done,
-         value: value ?? null,
-       };
+       try {
+         const { done, value } = await reader.read();
+         return {
+           done,
+           value: value ?? null,
+         };
+       } catch (error) {
+         reader.releaseLock();
+         throw error;
+       }
      },
      async return() {
-       await reader.cancel();
+       try {
+         await reader.cancel();
+       } finally {
+         reader.releaseLock();
+       }
        return {
          done: true,
          value: null,
        };
      },
    };
  };

这样的修改可以:

  1. 确保在发生错误时释放读取器锁
  2. 保证在迭代器结束时正确清理资源
  3. 防止资源泄漏
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/** support async iterator */
(stream as XReadableStream<Output>)[Symbol.asyncIterator] = function () {
const reader = this.getReader();
return {
async next() {
const { done, value } = await reader.read();
return {
done,
value: value ?? null,
};
},
async return() {
await reader.cancel();
return {
done: true,
value: null,
};
},
};
};
/** support async iterator */
(stream as XReadableStream<Output>)[Symbol.asyncIterator] = function () {
const reader = this.getReader();
return {
async next() {
try {
const { done, value } = await reader.read();
return {
done,
value: value ?? null,
};
} catch (error) {
reader.releaseLock();
throw error;
}
},
async return() {
try {
await reader.cancel();
} finally {
reader.releaseLock();
}
return {
done: true,
value: null,
};
},
};
};

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的核心问题是:需不需要替用户将流资源释放?

我认为在网络请求的场景,这样做没问题。

其他场景你有相关经验吗?或者,这块你如何考虑的?@ppbl

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YumoImer,您好。即使在非网络请求的场景下,确保正确释放读取器的锁定仍然是重要的。未释放的读取器可能会导致资源泄漏,特别是在长时间运行的应用程序中。通过在发生错误或迭代器结束时释放锁定,可以确保资源被及时清理,避免潜在的问题。

因此,我建议在实现中加入对读取器锁定的释放逻辑,以提高代码的健壮性和可靠性。

Copy link
Contributor Author

@ppbl ppbl Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的核心问题是:需不需要替用户将流资源释放?

我认为在网络请求的场景,这样做没问题。

其他场景你有相关经验吗?或者,这块你如何考虑的?@ppbl

你是指的 reader.cancel() 方法吗,我刚才去看了下 web-streams-polyfill 的实现,有一个 preventCancel 选项,我们也可以参考添加一个,如果用户传 true 则可以只调用 reader.releaseLock();

 const stream = XStream({
    readableStream: response.body,
    preventCancel: true
  })

这样的话,其实现在的实现完全是可以向前兼容的,可以等需要的时候随时支持类似 preventCancel 以及更多的选项~

你看看呢~ @YumoImer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我的观点是倾向于尽量向标准看齐,也可以暴露一个 values 方法可以显式的传参数,直接用 for..await..of 迭代的时候就用默认的参数

👍 这个点我也认同~

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯那现在是要同步支持 values 方法吗,需要的话我加一下试试

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

最终肯定还是你来决定咯@YumoImer

不是的~你的决定很重要!我觉得我们要抒发观点后,达成一致再执行~@ppbl

看起来对于「通过标准的 stream.values 的形式来支持这个默认 cancel 流的行为」我们可以再议「比如新起一个 PR」?

我不想默认处理这个逻辑的理由:

  1. 虽然是合适的时机调用了 cancel 方法,但感觉还是替用户做了选择。如果用户不希望这个流被取消,那我们就需要新增参数来支持。
  2. 已经给用户返回了 stream 对象,用户完全可以自己控制这个行为。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我明白你的意思了,那异步迭代就先保持目前的逻辑吧,取消的可以后续再说~

Copy link
Collaborator

@YumoImer YumoImer Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

可以的~取消这个行为还是辛苦你来如何 😄 另外写个 RFC 我们充分讨论下

[RFC] XStream support ... options

@YumoImer
Copy link
Collaborator

之前. if (!value) continue; 这行代码无法兼容,导致 custom-part1|custom-part2| 会返回['custom-part1', 'custom-part2', '']
不过我个人感觉这个也可以在外部自行处理

这里我最初设计的时候认为「空串不应该触发迭代」所以这里是个 break change

@YumoImer
Copy link
Collaborator

明天我再想想怎么处理好些。先睡了~

@ppbl
Copy link
Contributor Author

ppbl commented Dec 11, 2024

之前. if (!value) continue; 这行代码无法兼容,导致 custom-part1|custom-part2| 会返回['custom-part1', 'custom-part2', '']
不过我个人感觉这个也可以在外部自行处理

这里我最初设计的时候认为「空串不应该触发迭代」所以这里是个 break change

测试恢复了, reader.cancel() 前面加个 await 完美解决~

Copy link

codecov bot commented Dec 11, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 91.38%. Comparing base (876f7d4) to head (075ff2a).
Report is 3 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #319      +/-   ##
==========================================
- Coverage   91.48%   91.38%   -0.10%     
==========================================
  Files          66       66              
  Lines        1468     1451      -17     
  Branches      395      383      -12     
==========================================
- Hits         1343     1326      -17     
  Misses        125      125              

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@YumoImer YumoImer requested a review from a team December 16, 2024 07:40
components/x-stream/index.ts Outdated Show resolved Hide resolved
components/x-stream/index.ts Outdated Show resolved Hide resolved
components/x-stream/index.ts Outdated Show resolved Hide resolved
components/x-stream/index.ts Outdated Show resolved Hide resolved
components/x-stream/index.ts Outdated Show resolved Hide resolved
@YumoImer
Copy link
Collaborator

评论了一波,辛苦看下~感谢~ @ppbl

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
components/x-stream/index.ts (2)

140-141: 类型定义清晰,建议添加文档注释

类型定义正确地结合了 ReadableStreamAsyncGenerator,符合设计目标。

建议添加 JSDoc 文档注释来说明这个类型的用途:

+/**
+ * @description 支持异步迭代的可读流类型
+ * @template R - 流中的数据类型,默认为 SSEOutput
+ */
type XReadableStream<R = SSEOutput> = ReadableStream<R> & AsyncGenerator<R>;

156-169: 建议增强类型断言的安全性

当前的类型转换实现可以更加类型安全。

建议添加类型守卫函数:

function isXReadableStream<T>(stream: ReadableStream<T>): stream is XReadableStream<T> {
  return stream instanceof ReadableStream;
}

然后修改类型断言:

- ) as XReadableStream<Output>;
+ ) as ReturnType<typeof XStream<Output>>;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 98e44d8 and 7fabb27.

📒 Files selected for processing (1)
  • components/x-stream/index.ts (2 hunks)
🔇 Additional comments (2)
components/x-stream/index.ts (2)

146-146: 函数签名修改合理

移除 async 关键字并返回 XReadableStream 的修改符合新的类型系统设计,同时保持了向后兼容性。


171-184: 🛠️ Refactor suggestion

建议增强错误处理机制

当前的异步迭代器实现缺少错误处理机制。根据之前的讨论,虽然不自动处理资源清理,但仍建议添加基本的错误处理。

建议修改实现:

  stream[Symbol.asyncIterator] = async function* () {
    const reader = this.getReader();
    
+   try {
      while (true) {
        const { done, value } = await reader.read();
        
        if (done) break;
        if (!value) continue;
        
        yield value;
      }
+   } catch (error) {
+     // 让错误继续向上传播,用户可以在自己的 try-catch 中处理清理逻辑
+     throw error;
+   }
  };

Likely invalid or redundant comment.

@YumoImer YumoImer requested a review from a team December 17, 2024 12:44
@YumoImer
Copy link
Collaborator

我这边没问题了~ @ant-design/ant-design-x 大家帮看一下

Copy link
Collaborator

@YumoImer YumoImer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

感谢贡献!

@zombieJ
Copy link
Member

zombieJ commented Dec 18, 2024

加个用例?

@ppbl
Copy link
Contributor Author

ppbl commented Dec 18, 2024

加个用例?

测试用例还是 demo 用例?pr 的主要改动是把 stream 返回了出去,新增的用例就是标准 ReadableStream 用例~

至于直接的目的是为了支持取消流,这个用例可能是一个组合用例,后面看看怎么加一下?😊

@YumoImer
Copy link
Collaborator

YumoImer commented Dec 18, 2024

加个用例?

测试用例还是 demo 用例?pr 的主要改动是把 stream 返回了出去,新增的用例就是标准 ReadableStream 用例~

至于直接的目的是为了支持取消流,这个用例可能是一个组合用例,后面看看怎么加一下?😊

我想了下,加一个「支持取消流 demo 用例」& 加一个「返回 stream 的测试用例」。

对于开发者用户:方便通过文档了解 XStream 返回的 stream 如何使用。

对于 Contributors 们,未来对于 XStream 的返回变动也有用例保障。

然后这两个用例都是和本 PR 变化相关的,就在这个 PR 内完成吧。辛苦 @ppbl 看下~

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
components/x-stream/__tests__/index.test.tsx (1)

101-112: 测试用例实现正确且符合需求

测试用例验证了 XStream 返回 ReadableStream 实例的新功能,这与 PR 的目标保持一致。测试数据的选择也很合适,包含了事件和数据字段。

不过建议添加以下测试场景:

  1. 验证返回的流是否正确支持取消操作
  2. 验证在取消后是否正确清理了资源

建议添加如下测试用例:

it('should support cancellation', async () => {
  const stream = XStream({
    readableStream: new ReadableStream({
      async start(controller) {
        // 模拟长响应
        for (let i = 0; i < 1000; i++) {
          controller.enqueue(new TextEncoder().encode(`event: message\ndata: ${i}\n\n`));
          await new Promise(resolve => setTimeout(resolve, 10));
        }
        controller.close();
      },
    }),
  });

  const reader = stream.getReader();
  // 读取几个值后取消
  await reader.read();
  await reader.read();
  await reader.cancel();
  
  // 验证取消后的状态
  expect(reader.closed).resolves.toBeUndefined();
});
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7fabb27 and 56ed94f.

📒 Files selected for processing (1)
  • components/x-stream/__tests__/index.test.tsx (1 hunks)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (3)
components/useXChat/demo/stream-cancel.tsx (3)

58-58: 建议优化空对象参数模式

当前的空对象解构模式可能会引起困惑。建议明确声明预期的参数类型。

-request: async ({}, { onSuccess, onUpdate }) => {
+request: async ({ message }, { onSuccess, onUpdate }) => {
🧰 Tools
🪛 Biome (1.9.4)

[error] 58-58: Unexpected empty object pattern.

(lint/correctness/noEmptyPattern)


76-78: 建议增加类型安全性

解析 JSON 数据时缺少类型检查。建议添加类型定义和验证。

+interface StreamData {
+  id: string;
+  content: string;
+}
+
+function isValidStreamData(data: unknown): data is StreamData {
+  return (
+    typeof data === 'object' &&
+    data !== null &&
+    'id' in data &&
+    'content' in data &&
+    typeof (data as StreamData).content === 'string'
+  );
+}

-        const data = JSON.parse(value.data);
+        const data = JSON.parse(value.data);
+        if (!isValidStreamData(data)) {
+          throw new Error('Invalid stream data format');
+        }
         current += data.content || '';

32-49: 建议优化模拟数据生成逻辑

当前的模拟数据生成方式可以更加灵活。建议将延迟时间和内容块作为可配置参数。

-function mockReadableStream() {
+function mockReadableStream({
+  delay = 300,
+  chunks = contentChunks,
+}: {
+  delay?: number;
+  chunks?: string[];
+} = {}) {
   const sseChunks: string[] = [];

-  for (let i = 0; i < contentChunks.length; i++) {
+  for (let i = 0; i < chunks.length; i++) {
     const sseEventPart = 
-      `event: message\ndata: {"id":"${i}","content":"${contentChunks[i]}"}\n\n`;
+      `event: message\ndata: {"id":"${i}","content":"${chunks[i]}"}\n\n`;
     sseChunks.push(sseEventPart);
   }

   return new ReadableStream({
     async start(controller) {
       for (const chunk of sseChunks) {
-        await new Promise((resolve) => setTimeout(resolve, 300));
+        await new Promise((resolve) => setTimeout(resolve, delay));
         controller.enqueue(new TextEncoder().encode(chunk));
       }
       controller.close();
     },
   });
 }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 56ed94f and 3599871.

📒 Files selected for processing (4)
  • components/useXChat/demo/stream-cancel.md (1 hunks)
  • components/useXChat/demo/stream-cancel.tsx (1 hunks)
  • components/useXChat/index.en-US.md (1 hunks)
  • components/useXChat/index.zh-CN.md (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • components/useXChat/demo/stream-cancel.md
🧰 Additional context used
🪛 LanguageTool
components/useXChat/index.zh-CN.md

[uncategorized] ~24-~24: A punctuation mark might be missing here.
Context: ...c="./demo/stream-cancel.tsx">打断输出 多项建议<...

(AI_EN_LECTOR_MISSING_PUNCTUATION)

🪛 Biome (1.9.4)
components/useXChat/demo/stream-cancel.tsx

[error] 58-58: Unexpected empty object pattern.

(lint/correctness/noEmptyPattern)

🔇 Additional comments (2)
components/useXChat/index.zh-CN.md (1)

24-24: 文档更新正确且完整

新增的示例代码条目与现有格式保持一致,清晰地展示了流式输出中断的功能。

🧰 Tools
🪛 LanguageTool

[uncategorized] ~24-~24: A punctuation mark might be missing here.
Context: ...c="./demo/stream-cancel.tsx">打断输出
多项建议<...

(AI_EN_LECTOR_MISSING_PUNCTUATION)

components/useXChat/index.en-US.md (1)

23-23: 英文文档更新与中文文档保持一致

新增的示例代码条目正确展示了流式输出中断功能,保持了文档的双语一致性。

components/useXChat/demo/stream-cancel.tsx Show resolved Hide resolved
components/useXChat/demo/stream-cancel.tsx Show resolved Hide resolved
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
components/useXChat/demo/stream-cancel.tsx (1)

75-85: ⚠️ Potential issue

建议增加 try/catch 来完善错误处理

若在读取流时发生异常,目前的 while 循环会直接中断。可以在循环外增加 try/catch 以捕获并处理异常,提升健壮性并使用户可感知错误。

🧹 Nitpick comments (1)
components/useXChat/demo/stream-cancel.tsx (1)

64-64: 避免使用空对象模式来解构未使用的参数

static analysis 工具提示此处的空对象模式不符合规范。若首个参数确实无需使用,建议使用命名占位符或下划线代替,以提高可读性并避免误解:

- request: async ({}, { onSuccess, onUpdate }) => {
+ request: async (_unused, { onSuccess, onUpdate }) => {
🧰 Tools
🪛 Biome (1.9.4)

[error] 64-64: Unexpected empty object pattern.

(lint/correctness/noEmptyPattern)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3599871 and 6831da6.

📒 Files selected for processing (1)
  • components/useXChat/demo/stream-cancel.tsx (1 hunks)
🧰 Additional context used
🪛 Biome (1.9.4)
components/useXChat/demo/stream-cancel.tsx

[error] 64-64: Unexpected empty object pattern.

(lint/correctness/noEmptyPattern)

components/useXChat/demo/stream-cancel.tsx Outdated Show resolved Hide resolved
@ppbl
Copy link
Contributor Author

ppbl commented Dec 18, 2024

@YumoImer @zombieJ 辛苦看下~

@YumoImer
Copy link
Collaborator

@ppbl lint 挂了,辛苦看下~

@YumoImer
Copy link
Collaborator

@ppbl lint 挂了,辛苦看下~

不好意思打扰了,今天我想发布一个版本,应该就剩下一个 lint 的问题了,看看今天上个车如何😄?@ppbl

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
components/useXChat/demo/stream-cancel.tsx (2)

17-30: 建议增强模拟数据的类型定义和文档说明

为了提高代码的可维护性和可读性,建议:

  1. contentChunks 添加类型定义
  2. 添加注释说明数据的用途和格式
+// 模拟的消息内容片段,用于演示流式响应
+const contentChunks: readonly string[] = [
   'He',
   'llo',
   // ... 其他内容
];

113-113: 建议优化取消操作的加载状态处理

当前的取消操作没有考虑加载状态的处理,建议在取消操作时更新相关状态。

-        onCancel={() => abortRef.current()}
+        onCancel={() => {
+          abortRef.current();
+          // 可以考虑添加取消状态的处理
+          agent.setRequesting(false);
+        }}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 6831da6 and b84a579.

📒 Files selected for processing (1)
  • components/useXChat/demo/stream-cancel.tsx (1 hunks)
🧰 Additional context used
🪛 Biome (1.9.4)
components/useXChat/demo/stream-cancel.tsx

[error] 64-64: Unexpected empty object pattern.

(lint/correctness/noEmptyPattern)

🔇 Additional comments (1)
components/useXChat/demo/stream-cancel.tsx (1)

75-85: 建议增强错误处理机制

当前的流读取逻辑缺少错误处理,可能导致运行时异常。

-      while (reader) {
+      try {
+        while (reader) {
           const { value, done } = await reader.read();
           if (done) {
             onSuccess(current);
             break;
           }
           if (!value) continue;
           const data = JSON.parse(value.data);
           current += data.content || '';
           onUpdate(current);
+        }
+      } catch (error) {
+        console.error('Stream reading error:', error);
+        onSuccess(current);
+      }

components/useXChat/demo/stream-cancel.tsx Outdated Show resolved Hide resolved
components/useXChat/demo/stream-cancel.tsx Show resolved Hide resolved
@ppbl
Copy link
Contributor Author

ppbl commented Dec 20, 2024

@YumoImer 修复了~ 不好意思

@YumoImer
Copy link
Collaborator

@ppbl 需要更新下这个 demo 的测试用例快照~

● renders components/useXChat/demo/stream-cancel.tsx correctly

@YumoImer
Copy link
Collaborator

All modified and coverable lines are covered by tests ✅

先合并了, 覆盖率下降是因为整体代码行数提高。

@YumoImer YumoImer merged commit 02d7729 into ant-design:main Dec 23, 2024
10 of 11 checks passed
@ppbl ppbl deleted the feat/x-stream branch December 24, 2024 02:38
@coderabbitai coderabbitai bot mentioned this pull request Dec 25, 2024
@YumoImer YumoImer linked an issue Dec 25, 2024 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

如何实现打断输出功能?
4 participants