Skip to content

[Feature] 建议改进 requestId 异步处理方式,使用 context.Context 实现更符合 Go 风格的异步模型 #9

Open
@Lensual

Description

@Lensual

当前问题

目前 SDK 通过 requestId 与 EventHandler 的方式来实现异步调用的响应处理。但在上层业务逻辑中存在一些痛点:

  1. 每次调用都需要手动生成和管理 requestId, requestId 与回调之间的映射需要手动管理,多线程下需要加锁,易出错,且不易调试。
  2. 目前实现中涉及用户自己从 Go 向 C 层传递 requestId 非托管内存,这在使用上是不安全的,可能存在访问非法内存的问题。(栈内存指针可以,go的堆内存指针不行,因为go的gc会在后台移动内存)
  3. 不符合 Go 的惯用写法,难以配合 context.Context 实现取消、超时等控制逻辑。

改进建议

建议由Go SDK来包装处理带 requestId 的方法。
用户调用方式改为如下:

ctx := context.Background()
result,err := rtmClient.Subscribe(context.Background(), “my-channel”, options).Await()
if(err!=nil){
  panic(err)
}
print(result.xxx)
  • 定义RtmAsyncContext,用于承载异步方法的上下文
  • 定义一个map,管理requestId和上下文的映射
  • 在调用rtm对应的方法时,创建上下文,并写入到map
  • 在EventHandler处,查找map并将event写入到上下文,通知context完成
  • 将带有requestId相关的方法,返回值改为RtmAsyncContext,用户可以选择等待或不等待上下文完成
  • 上下文中保留requestId,让用户可以兼容旧的EventHandle的方式来处理

参考实现

context.go

type RtmAsyncContext interface {
	context.Context
	RequestId() uint64
	ReturnCode() int
	Await() (any, error)            // 这个any可以改泛型,或者将带requestId的Event声明成接口
}

var _ RtmAsyncContext = (*rtmAsyncContext)(nil)

type rtmAsyncContext struct {
	context.Context
	parentCtx  context.Context
	requestId  uint64
	returnCode int
	cancel     context.CancelCauseFunc
	result     any
}

func newRtmAsyncContext(ctx context.Context, requestId uint64, returnCode int) *rtmAsyncContext {
	parentCtx := ctx
	ctx, cancel := context.WithCancelCause(parentCtx)

	if returnCode != 0 {
		err := errors.New(agRtm.GetErrorReason(returnCode))
		cancel(err)
	}

	return &rtmAsyncContext{
		Context:    ctx,
		parentCtx:  parentCtx,
		requestId:  requestId,
		returnCode: returnCode,
		cancel:     cancel,
		result:     nil,
	}
}

// RequestId implements [RtmAsyncContext].
func (ctx *rtmAsyncContext) RequestId() uint64 {
	return ctx.requestId
}

// ReturnCode implements [RtmAsyncContext].
func (ctx *rtmAsyncContext) ReturnCode() int {
	return ctx.returnCode
}

var rtmAsyncContextSuccess = errors.New("success")

// Await implements [RtmAsyncContext].
func (ctx *rtmAsyncContext) Await() (any, error) {
	select {
	case <-ctx.parentCtx.Done():
		return nil, ctx.parentCtx.Err()
	case <-ctx.Done():
		err := context.Cause(ctx)
		if !errors.Is(err, rtmAsyncContextSuccess) {
			return nil, err
		}
		return ctx.result, nil
	}
}

RtmEventHandlerBridge.go

//需要找个地方放
+ reqIdMap            map[uint64]*rtmAsyncContext 
+ reqIdMapLock     sync.Mutex

//export cgo_RtmEventHandlerBridge_onSubscribeResult
func cgo_RtmEventHandlerBridge_onSubscribeResult(_ *C.C_RtmEventHandlerBridge, userData unsafe.Pointer,
	requestId C.uint64_t, channelName *C.char, errorCode C.enum_C_RTM_ERROR_CODE) {

	if userData == nil {
		return
	}

+	reqIdMapLock.Lock()
+	rtmAsyncCtx, ok := reqIdMap[requestId]
+	delete(reqIdMap, requestId)
+	reqIdMapLock.Unlock()
+	if (ok) {
+		rtmAsyncCtx.event = OnSubscribeResultEvent{} //TODO 这个参数需要额外包装一下
+		rtmCallCtx.cancel(rtmAsyncContextSuccess)
+	}

	bridge := (*RtmEventHandlerBridge)(userData)
	bridge.handler.OnSubscribeResult(
		uint64(requestId),
		C.GoString(channelName),
		RTM_ERROR_CODE(errorCode),
	)
}

IAgoraRtmClient.go

// - func (this_ *IRtmClient) Subscribe(channelName string, option *SubscribeOptions, requestId *uint64) int {
func (this_ *IRtmClient) Subscribe(ctx context.Context, channelName string, option *SubscribeOptions) *RtmAsyncContext {
	cChannelName := C.CString(channelName)
	defer C.free(unsafe.Pointer(cChannelName))

	reqIdMapLock.Lock() //加锁,防止eventHandler过早被调用
	var requestId uint64
	ret := int(C.agora_rtm_client_subscribe(unsafe.Pointer(this_),
		cChannelName,
		(*C.struct_C_SubscribeOptions)(option),
		(*C.uint64_t)(unsafe.Pointer(&requestId)),
	))
	retCtx := newRtmAsyncContext(
		Context: ctx,
		requestId: requestId,
		returnCode: ret,
	)
	reqIdMap[requestId] = retCtx
	reqIdMapLock.Unlock()
	return retCtx
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions