-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
基于wf现有架构,实现full-duplex 通信---channel #873
Comments
这么牛逼…… |
好家伙!我直呼好家伙! |
还是大佬牛逼,打造的wf生态,很好玩~~~ |
|
你好用户,你的代码真是惊艳到我们了,非常感谢参与我们生态开发!
|
关于乱序的问题,考虑到已经在wfchannel->channel_fanout_msg_in里面进行重组了,重组的逻辑类似tcp数据重组,我在每条msg进来的时候加了个seq。以此为排序依据。排序在处理 chanel_send_one 逻辑里面,is_channel = 2表明异步或者有其他现场正在ing写,这次不写下次在写。这里又个逻辑写的时候“必须”一个个msg写操作。 最后一个问题加了CONN_STATE_ESTABLISHED,主要的逻辑这操作session 的in和out上 |
其实新现实的WFChannel,只是留下Channel的名字,和之前CommChannel 的抽象已经没有任何逻辑联系了 |
嗯嗯,我也看了代码,和我们的CommChannel没有关系。CommChannel那个模块我也并不是特别喜欢,所以我现在试图通过协议叠加的方式来实现流式的传输。目前我们PackageWrapper的第一个应该可能用在MySQL binlog上。 |
考虑到channel_fanout_msg_in/channel_fanout_msg_out中有数据竞态问题,后续想用condwaittask进行优化了,组成任务流形式的排序。可以完全去掉mutex咯~,还请大佬,多拍拍砖 |
嗯嗯,wait这块,你也可以看看我们的WFConditional和WFResourcePool。WFResourcePool是对WFConditional的一种组织形式,但WFConditaionl也可以独立使用,通过WFTaskFactory产生,唤醒方调用它的signal操作就可以。 |
嗯嗯,其实WFCondition , WFCondtask的逻辑我搬的websocket分支里面,当时改完下了,发现WFConditional和WFResourcePool已经有,功能还有点重复了=-= |
其实这边的第三个问题是在Communicator::create_message 中利用CONN_STATE_ESTABLISHED 和session->message_in() 中的new_channel_msg_session 来解决所谓粘包问题。这个地方我有这样的思考:channel粘包数据,也是channel的next一个数据,wf架构只管message_in就可以,这个地方我使用new_channel_msg_session 来解决粘包,这样从架构上感觉更直观一些~ |
我们的PackageWrapper,就是一个message_in。它的append实现是: int PackageWrapper::append(const void *buf, size_t *size)
{
int ret = this->ProtocolWrapper::append(buf, size);
if (ret > 0)
{
this->msg = this->next(this->msg);
if (this->msg)
{
this->renew();
ret = 0;
}
}
return ret;
} Wrapper构造的时候传进来一个msg(比如一个Websocket frame),这一行: int ret = this->ProtocolWrapper::append(buf, size); 调用实际msg的append操作。当这个append返回>0时,一个msg操作接收完成,虚函数PackageWrapper::next()被调用。如果next返回NULL,则整个package接收完成。如果next返回下一个msg,则PackageWrapper::append()返回>0,继续收消息。 |
嗯,这个场景确实不太适合。wfChannel的设计场景,就是既要让channel在poller中append,同时也要让msg在工作线程中handle。 |
想了一下,关于shutdown的问题,之前逻辑是依据read = 0/timeout/ send error 来被动执行的。缺乏主动shutdown的场景,刚刚加了一下直接利用poller_del来shutdown。这样的话,代码改动比较少,也能复用wf架构close fd的逻辑。大佬,有空闲的话,拍拍🧱~~ |
CommChannel的shutdown主要是为了解决竞争问题,因为send的一瞬间,你不能确定连接会不会被关闭并释放。所以,只要连接建立成功,需要在上面增加一个ref,在shutdown时减去。这样无论如何不会出现send时连接对象不存在的问题。 |
嗯嗯,我在wfchannel中也引入的类似的做法。复用connEntry中ref,在channel也引入了一个ref(作为msg级别的) |
py测试工具留个简单记录 py_cli:
py_srv:
|
对于chanel send one操作,如果数据很大,需要异步写。你现在的逻辑正确吗?我原来的做法是dup一个fd。 |
嗯,现在想法还是保持poller一致,fd在异步同一时间只能读,或只能写。不过还是保持写优先 上面的设想,会在极端情况下会造成read延时,不过现阶段还不用考虑,后续的话在考虑是用dup,还是改poller同时支持读写来优化吧 |
好家伙!我直呼好家伙! 我是说楼主的头像。。。 |
🤣🤣🤣🤦 |
@gnblao 麻烦打开一下你项目的issue啊。 |
|
大佬,关于channel功能,基本开发完了。我有想法推到你们仓库,不然在我那边基本没人用了哈~抽空帮忙review下,看看是否合适~~ |
我可以review,也可以想办法帮你宣传。但是这个代码合进来不太现实啊,我们主分支尽量只保持必要的功能(我自己都拒绝了自己的一个channel修改)。作为一个增强功能的fork,也是很符合github的玩法的。 |
了解,主分支尽量简洁~~ |
实在是很不好意思啊,到时候我在主页上加个显眼的链接到你那边吧,有这个需求的用户肯定会使用的。 |
大佬创新设计一个架构不容易,我从中也白嫖了不少好东西,继续大佬学习,哈哈哈~ |
目的是为了更好的让wf支持,full-duplex 通信协议
websocket,quic,http2。。。。
想法:
#833
channel逻辑
https://github.com/gnblao/workflow/tree/channel
现有在channe的基础上实现两种协议
具体参考
https://github.com/gnblao/workflow/blob/channel/docs/about-channel.md
The text was updated successfully, but these errors were encountered: