Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

机器学习场景的使用的最佳实践是什么(实用案例推荐) #935

Open
rockyzhengwu opened this issue Jun 8, 2022 · 18 comments
Labels
documentation Improvements or additions to documentation

Comments

@rockyzhengwu
Copy link

rockyzhengwu commented Jun 8, 2022

场景

服务端是一个 PyTorch C++ 实现的 CTR 预估的服务,预估之前需要去 KV(类似 redis) 里读取特征作为 CTR 预估的输入。一个客户端请求可能读一次,也可能读两次 KV 。

详细流程可能是这样: 拿到请求体 -> 解析 -> 读 kv -> 产生模型输入 -> CTR 模型 -> calibration 模型 -> 其他逻辑 -> 返回 。

现在的实现

看了下 demo 然后就写了大概这样的逻辑

static WFFacilities::WaitGroup wait_group(1);

void sig_handler(int signo) { wait_group.done(); }

int main(int argc, char *argv[]) {
  // ....   something init

  unsigned short port = 8083;
  signal(SIGINT, sig_handler);
  signal(SIGTERM, sig_handler);
  WFHttpServer server(process);
  if (server.start(port) == 0) {
    wait_group.wait();
    server.stop();
  } else {
    perror("Cannot start server");
    exit(1);
  }
  return 0;
}

void process(WFHttpTask *server_task) {
  protocol::HttpRequest *req = server_task->get_req();
  protocol::HttpResponse *resp = server_task->get_resp();
  long long seq = server_task->get_task_seq();
  
  resp->set_http_version("HTTP/1.1");
  resp->set_status_code("200");

  std::string body;
  auto uri = req->get_request_uri();
  if (std::strcmp(uri, "/cvr") == 0) {
    const void *body;
    size_t size;
    req->get_parsed_body(&body, &size);
    std::string req_body = static_cast<const char *>(body);
    std::string response_body = "";
    MODL_MANAGER->predict(std::move(req_body),  response_body);
    resp->append_output_body(response_body.data(), response_body.size());
  } else {
    resp->set_status_code("404");
  }
}

遇到的问题

由于在 MODL_MANAGER->predict(std::move(req_body), response_body); 里面实现了等待读取 KV 的逻辑和模型预估的部分。但读 KV 有可能会卡住(看了很久还不是很理解这个问题)。
想不占线程的读 KV,不知道怎么实现合适。一番讨论发现从我开始用 workflow 的时候就好像用错了,似乎在最开始就应该把逻辑分成不同的 task 。所以想是不是有什么最佳的实践。

补充一下, workflow 线上用了一年多了,在我要读 kv 之前都没任何问题, 50 ms 的预估时间,之前尝试过很多都不能很好的解决超时, workflow 可以..... 目前 p99 30ms cpu 打满基本不超时。

@Barenboim
Copy link
Contributor

Barenboim commented Jun 8, 2022

你好。这个里面好几个问题。我一个个回复。
首先,如果你的整个predict过程是一个纯计算,那么,最好的实现是吧predict包装成一个计算任务,并且push_back到server task所在的series。这么做比直接在process里计算要好一些,可以让计算运行在计算线程,而不是占用网络线程。示例:

void predict_fn(const void *body, size_t size, std::string *response_body)
{
    ....
}

void process(WFHttpTask *task)
{
    void *body;
    size_t size;

    task->get_req()->get_parsed_body(&body, &size);
    std::string *response_body = new std::string;
    task->user_data = response_body;
    task->set_callback([](WFHttpTask *task) { delete (std::string *)task->user_data; });

    WFGoTask *predict_task = WFTaskFactory::create_go_task("predict", predict_fn, body, size, response_body);
    predict_task->set_callback([task](WFGoTask *predict_task) {
        std::string *response_body = (std::string *)task->user_data;
        task->get_resp()->append_output_body_nocopy(response_body->c_str(), response_body->size());
    });
    series_of(task)->push_back(predict_task);
}

很多用户误认为process函数结束就是服务处理流程结束了。其实,服务处理流程是series而不是process函数,否者,我们无法实现全异步的server了。这个示例,我们产生一个计算任务(predict_task)并添加到series,在计算任务的callback里填写了resp。由于想要最佳实践,这里用了append_output_body_nocopy,并且在server task的callback里释放response_body。
这里就涉及了多个调用的时机。predict_fn调用显然不在process函数线程里,而是process函数结束之后,在某个计算线程里被调用。而predict_task的callback在predict_fn之后在同一个线程里被执行,我们在这里填写resp,并且使用nocopy接口,因为response_body在回复结束之前,都还没有被delete。
还有一个函数,就是server task的callback。server task的callback是回复结束之后被调用(这和http client task一致,都是http交互完成调用callback),在这里我们释放了response_body。
当然,如果不那么要求性能,这个实现可以简单一些。比如不用_nocopy接口。

@Barenboim
Copy link
Contributor

Barenboim commented Jun 8, 2022

第二个问题,你需要在计算中插入通讯。我不知道你的kv是什么协议的通讯,我们就假设是redis(自定义协议的话可以自己实现,参考相关文档)。如果"解析"部分计算量不大,直接在process里算就可以了。我们假设这个计算量确实不大,那么,现在就是先访问redis再predict计算,然后填写回复。可以这么实现:

void predict_fn(const void *body, size_t size, std::string *response_body)
{
    ....
}

void redis_callback(WFRedisTask *task)
{ 
    protocol::RedisResponse *resp = task->get_resp();
    // read body and size from resp.

    WFGoTask *predict_task = WFTaskFactory::create_go_task("predict", predict_fn, body, size, response_body);
    predict_task->set_callback([](WFGoTask *predict_task) {
        WFHttpTask *server_task = (WFHttpTask *)series_of(predict_task)->get_context(); // 取回server task
        std::string *response_body = (std::string *)server_task->user_data;
        server_task->get_resp()->append_output_body_nocopy(response_body->c_str(), response_body->size());
    });

    series_of(task)->push_back(predict_task); // 把计算任务放进series
}

void process(WFHttpTask *task)
{
    SeriesWork *series = series_of(task);

    std::string *response_body = new std::string;
    task->user_data = response_body;
    task->set_callback([](WFHttpTask *task) { delete (std::string *)task->user_data; });

    WFRedisTask *redis_task = WFTaskFactory::create_redis_task(url, 0, redis_callback);
    redis_task->get_req()->set_request(....);

    series->set_context(task);    // 把server_task指针放在series上下文
    series->push_back(redis_task);
}

这个就构成了一个网络任务+计算任务的处理流程。整个过程无需等待,也没有占有网络线程进行复杂的计算。如果在redis任务之前,还需要进行比较复杂的计算,同理增加一个计算任务即可。
但是,我们发现,如果处理流程过于复杂,你就不的不让每个任务callback里发起下一步操作。这会让代码很零碎。特别是,当A任务是功能X的最后一个任务,任务B是功能Y的第一个任务。因为X功能之后执行Y,就需要让A的callback去创建B,这是很不合理的。那么,对于这种复杂的需求,就可以使用我们的模块任务。

@Barenboim
Copy link
Contributor

当若干个任务完成一个特定的功能,就可以使用模块任务了。例如,我们认为redis和predict构成一个完整的功能,那么上例中最不合理的就是让predict_task的callback去填写response。用模块可以这样改造:

void predict_fn(const void *body, size_t size, std::string *response_body)
{
    ....
}

void redis_callback(WFRedisTask *task)
{ 
    protocol::RedisResponse *resp = task->get_resp();
    // read body and size from resp.

    std::string *response_body = new std::string;
    WFGoTask *predict_task = WFTaskFactory::create_go_task("predict", predict_fn, body, size, response_body);
    // 不再需要给predict_task设置callback了。耦合性降低。计算结果放在series context。
    series_of(task)->set_context(response_body);
    series_of(task)->push_back(predict_task); // 把计算任务放进series。这个series其实是module内的sub_series
}

void process(WFHttpTask *task)
{
    WFRedisTask *redis_task = WFTaskFactory::create_redis_task(url, 0, redis_callback);
    WFModuleTask *module = WFTaskFactory::create_module_task(redis_task, [task](WFModuleTask *mod) {
        std::string *response_body = mod->sub_series()->get_context();  // 这里是sub_series不是series_of
        task->user_data = response_body;
        task->set_callback([](WFHttpTask *task) { delete (std::string *)task->user_data; });
        task->get_resp()->append_output_body_nocopy(response_body->c_str(), response_body->size());
    });
    series_of(task)->push_back(module);
}

可以看一下module的文档:https://github.com/sogou/workflow/blob/master/docs/about-module.md

@Barenboim
Copy link
Contributor

总结

  • server处理流程是整个series,不是process函数。不要在process函数里等待或进行复杂计算。
  • 如果处理流程是一个纯计算,包装成计算任务push到series里,在任务callback里填写resp。
  • 如果是计算和通讯结合,那么,在每一个任务callback,创建并push下一个任务,最后填写resp。
  • 逻辑复杂的server,可以使用模块任务(WFModuleTask)进行模块级封装,降低任务之间的耦合。

@Barenboim Barenboim added the documentation Improvements or additions to documentation label Jun 8, 2022
@rockyzhengwu
Copy link
Author

@Barenboim 明白了,感谢这么优秀的项目,特别耐心的解答

@Barenboim Barenboim changed the title 机器学习场景的使用的最佳实践是什么 机器学习场景的使用的最佳实践是什么(实用案例推荐) Jun 9, 2022
@MaybeShewill-CV

This comment was marked as resolved.

@Barenboim
Copy link
Contributor

先回答小疑问。运行中的计算任务是你的一个函数,显然框架无法中断你执行中的函数。这个我们也许可以考虑搞一个进程任务(感觉有点像actor模式了),时间到了直接kill听起来很刺激,起码是个理论上可行的方案。
我们内部有workflow的低代码框架,就是可以通过一个配置自动生成workflow代码。这里可以给计算任务设置超时或中断。但这个肯定需要在函数里加入检查点,让函数可以中途自行返回。代码写成死循环是没有办法的。
actor模式可以想一下。

@MaybeShewill-CV
Copy link
Contributor

@Barenboim 感谢解答。下来去了解下actor哈:)

@Barenboim

This comment was marked as resolved.

@MaybeShewill-CV
Copy link
Contributor

@Barenboim 是的哈 用了太多timer和sleep来模拟,确实有点乱:) **series << xx << xx 学到了, 哈哈哈:)

@Barenboim
Copy link
Contributor

Barenboim commented Jun 10, 2022

@Barenboim 是的哈 用了太多timer和sleep来模拟,确实有点乱:) **series << xx << xx 学到了, 哈哈哈:)

不是**series哈,是**task。*task得到task的引用,task对象的'*'运算符,得到task所在series的引用,所以可以:
**task << task1 << task2

@MaybeShewill-CV
Copy link
Contributor

@Barenboim 嗯嗯 哈哈哈 键盘敲错了 ==!

@Barenboim
Copy link
Contributor

Barenboim commented Jun 13, 2022

@MaybeShewill-CV 你看一下这个新功能:#938

class WFTaskFactory
{
    /* Create 'Go' task with running time limit in seconds plus nanoseconds.
     * If time exceeded, state WFT_SYS_STATE_ERROR and error ETIMEDOUT will be got in callback. */
    template<class FUNC, class... ARGS>
    static WFGoTask *create_timedgo_task(time_t seconds, long nanoseconds,
                                         const std::string& queue_name,
                                         FUNC&& func, ARGS&&... args);
};

@MaybeShewill-CV
Copy link
Contributor

@Barenboim 好滴,感觉这样很方便了,可以傻瓜式编程了。明天抽空试试看:)

@MaybeShewill-CV
Copy link
Contributor

@Barenboim 抽空测试了一个demo,没有问题,用起来非常方便,赞。下来会再多测测,有问题再反馈哈:)

@Barenboim
Copy link
Contributor

@Barenboim 抽空测试了一个demo,没有问题,用起来非常方便,赞。下来会再多测测,有问题再反馈哈:)

主要是从你的demo上看出来,计算时间限制好像用户非常有需求,所以就想到了实现这个功能。然后发现go task上加这个功能非常的自然,因为任务的类型是确定的。在实现的过程中,也帮我们优化了一些核心代码。果然用户是我们发展的原动力。

@Barenboim
Copy link
Contributor

@MaybeShewill-CV 现在thread task也支持加超时了。

template<class INPUT, class OUTPUT>
class WFThreadTaskFactory
{
public:
    static T *create_thread_task(time_t seconds, long nanoseconds,
                                 const std::string& queue_name,
                                 std::function<void (INPUT *, OUTPUT *)> routine,
                                 std::function<void (T *)> callback);
};

@MaybeShewill-CV
Copy link
Contributor

@Barenboim 点赞! 回去看看之前几个thread_task的任务能不能更新下:)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation
Projects
None yet
Development

No branches or pull requests

3 participants