Skip to content

Commit

Permalink
Merge pull request #29265 from taosdata/fix/TD-33275.2
Browse files Browse the repository at this point in the history
fix: add more test cases
  • Loading branch information
guanshengliang authored Dec 23, 2024
2 parents eafbc5e + 72dfc06 commit 5934324
Show file tree
Hide file tree
Showing 9 changed files with 233 additions and 14 deletions.
6 changes: 3 additions & 3 deletions source/libs/catalog/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@ target_link_libraries(
PRIVATE os util transport qcom nodes
)

#if(${BUILD_TEST})
# ADD_SUBDIRECTORY(test)
#endif(${BUILD_TEST})
if(${BUILD_TEST} AND NOT ${TD_WINDOWS})
ADD_SUBDIRECTORY(test)
endif()
3 changes: 3 additions & 0 deletions source/libs/executor/src/hashjoin.c
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ int32_t hInnerJoinDo(struct SOperatorInfo* pOperator) {
return code;
}

#ifdef HASH_JOIN_FULL

int32_t hLeftJoinHandleSeqRowRemains(struct SOperatorInfo* pOperator, SHJoinOperatorInfo* pJoin, bool* loopCont) {
bool allFetched = false;
SHJoinCtx* pCtx = &pJoin->ctx;
Expand Down Expand Up @@ -346,4 +348,5 @@ int32_t hLeftJoinDo(struct SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS;
}

#endif

2 changes: 1 addition & 1 deletion source/libs/executor/src/hashjoinoperator.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ int32_t hJoinSetImplFp(SHJoinOperatorInfo* pJoin) {
case JOIN_TYPE_RIGHT: {
switch (pJoin->subType) {
case JOIN_STYPE_OUTER:
pJoin->joinFp = hLeftJoinDo;
//pJoin->joinFp = hLeftJoinDo; TOOPEN
break;
default:
break;
Expand Down
2 changes: 2 additions & 0 deletions source/libs/qcom/src/queryUtil.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ static void processTaskQueue(SQueueInfo *pInfo, SSchedMsg *pSchedMsg) {
}

int32_t initTaskQueue() {
memset(&taskQueue, 0, sizeof(taskQueue));

taskQueue.wrokrerPool.name = "taskWorkPool";
taskQueue.wrokrerPool.min = tsNumOfTaskQueueThreads;
taskQueue.wrokrerPool.max = tsNumOfTaskQueueThreads;
Expand Down
10 changes: 6 additions & 4 deletions source/libs/scheduler/src/schRemote.c
Original file line number Diff line number Diff line change
Expand Up @@ -531,8 +531,8 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
qDebug("QID:0x%" PRIx64 ",SID:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 " task notify rsp received, code:0x%x",
pParam->queryId, pParam->seriousId, pParam->clientId, pParam->taskId, code);
if (pMsg) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
}
return TSDB_CODE_SUCCESS;
}
Expand All @@ -545,8 +545,8 @@ int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
qDebug("handle %p is broken", pMsg->handle);

if (head->isHbParam) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);

SSchHbCallbackParam *hbParam = (SSchHbCallbackParam *)param;
SSchTrans trans = {.pTrans = hbParam->pTrans, .pHandle = NULL, .pHandleId = 0};
Expand Down Expand Up @@ -1293,6 +1293,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
}
break;
}
/*
case TDMT_SCH_QUERY_HEARTBEAT: {
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
Expand Down Expand Up @@ -1320,6 +1321,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
persistHandle = true;
break;
}
*/
case TDMT_SCH_TASK_NOTIFY: {
ETaskNotifyType* pType = param;
STaskNotifyReq qMsg;
Expand Down
11 changes: 8 additions & 3 deletions source/libs/scheduler/src/schTask.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode)
}

pTask->failedExecId = pTask->execId;
pTask->failedSeriousId = pTask->seriousId;

int8_t jobStatus = 0;
if (schJobNeedToStop(pJob, &jobStatus)) {
Expand Down Expand Up @@ -438,7 +437,7 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
pTask->waitRetry = true;

if (pTask->delayTimer) {
taosTmrStop(pTask->delayTimer);
UNUSED(taosTmrStop(pTask->delayTimer));
}

schDropTaskOnExecNode(pJob, pTask);
Expand All @@ -452,6 +451,8 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
}

#if 0

int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
int32_t code = 0;

Expand Down Expand Up @@ -593,6 +594,7 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i

SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
}
#endif

int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
Expand Down Expand Up @@ -759,7 +761,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
(void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);

if (pTask->delayTimer) {
taosTmrStop(pTask->delayTimer);
UNUSED(taosTmrStop(pTask->delayTimer));
}

(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
Expand Down Expand Up @@ -869,6 +871,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
return TSDB_CODE_SUCCESS;
}

#if 0
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
Expand Down Expand Up @@ -900,6 +903,7 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe

return code;
}
#endif

int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs);
Expand Down Expand Up @@ -1376,6 +1380,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {

for (int32_t i = 0; i < level->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(level->subTasks, i);
pTask->failedSeriousId = pJob->seriousId - 1;
pTask->seriousId = pJob->seriousId;

SCH_TASK_DLOG("task seriousId set to 0x%" PRIx64, pTask->seriousId);
Expand Down
Loading

0 comments on commit 5934324

Please sign in to comment.