Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix stmt/stmt2 should not insert disordered or duplicate data #29146

Open
wants to merge 5 commits into
base: 3.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions include/common/tdataformat.h
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ typedef struct {
TAOS_MULTI_BIND *bind;
} SBindInfo;
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray);
SArray *rowArray, bool *pOrdered, bool *pDupTs);

// stmt2 binding
int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int32_t buffMaxLen, initGeosFn igeos,
Expand All @@ -392,7 +392,7 @@ typedef struct {
} SBindInfo2;

int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray);
SArray *rowArray, bool *pOrdered, bool *pDupTs);

#endif

Expand Down
56 changes: 40 additions & 16 deletions source/client/src/clientMain.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ static timezone_t setConnnectionTz(const char* val){

time_t tx1 = taosGetTimestampSec();
char output[TD_TIMEZONE_LEN] = {0};
taosFormatTimezoneStr(tx1, val, tz, output);
code = taosHashPut(pTimezoneNameMap, &tz, sizeof(timezone_t), output, strlen(output) + 1);
code = taosFormatTimezoneStr(tx1, val, tz, output);
if (code == 0){
code = taosHashPut(pTimezoneNameMap, &tz, sizeof(timezone_t), output, strlen(output) + 1);
}
if (code != 0){
tscError("failed to put timezone %s to map", val);
}
Expand All @@ -122,23 +124,23 @@ static timezone_t setConnnectionTz(const char* val){

static int32_t setConnectionOption(TAOS *taos, TSDB_OPTION_CONNECTION option, const char* val){
if (taos == NULL) {
return TSDB_CODE_INVALID_PARA;
return terrno = TSDB_CODE_INVALID_PARA;
}

#ifdef WINDOWS
if (option == TSDB_OPTION_CONNECTION_TIMEZONE){
return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
return terrno = TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
}
#endif

if (option < TSDB_OPTION_CONNECTION_CLEAR || option >= TSDB_MAX_OPTIONS_CONNECTION){
return TSDB_CODE_INVALID_PARA;
return terrno = TSDB_CODE_INVALID_PARA;
}

int32_t code = taos_init();
// initialize global config
if (code != 0) {
return code;
return terrno = code;
}

STscObj *pObj = acquireTscObj(*(int64_t *)taos);
Expand Down Expand Up @@ -208,7 +210,7 @@ static int32_t setConnectionOption(TAOS *taos, TSDB_OPTION_CONNECTION option, co

END:
releaseTscObj(*(int64_t *)taos);
return code;
return terrno = code;
}

int taos_options_connection(TAOS *taos, TSDB_OPTION_CONNECTION option, const void *arg, ...){
Expand Down Expand Up @@ -2166,19 +2168,38 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col
pStmt->semWaited = true;
}

int32_t code = 0;
SSHashObj *hashTbnames = tSimpleHashInit(100, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR));
if (NULL == hashTbnames) {
tscError("stmt2 bind failed: %s", tstrerror(terrno));
return terrno;
}

int32_t code = TSDB_CODE_SUCCESS;
for (int i = 0; i < bindv->count; ++i) {
if (bindv->tbnames && bindv->tbnames[i]) {
if (pStmt->sql.stbInterlaceMode) {
if (tSimpleHashGet(hashTbnames, bindv->tbnames[i], strlen(bindv->tbnames[i])) != NULL) {
code = terrno = TSDB_CODE_PAR_TBNAME_DUPLICATED;
tscError("stmt2 bind failed: %s %s", tstrerror(terrno), bindv->tbnames[i]);
goto out;
}

code = tSimpleHashPut(hashTbnames, bindv->tbnames[i], strlen(bindv->tbnames[i]), NULL, 0);
if (code) {
goto out;
}
}

code = stmtSetTbName2(stmt, bindv->tbnames[i]);
if (code) {
return code;
goto out;
}
}

if (bindv->tags && bindv->tags[i]) {
code = stmtSetTbTags2(stmt, bindv->tags[i]);
if (code) {
return code;
goto out;
}
}

Expand All @@ -2187,26 +2208,29 @@ int taos_stmt2_bind_param(TAOS_STMT2 *stmt, TAOS_STMT2_BINDV *bindv, int32_t col

if (bind->num <= 0 || bind->num > INT16_MAX) {
tscError("invalid bind num %d", bind->num);
terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
return terrno;
code = terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
goto out;
}

int32_t insert = 0;
(void)stmtIsInsert2(stmt, &insert);
if (0 == insert && bind->num > 1) {
tscError("only one row data allowed for query");
terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
return terrno;
code = terrno = TSDB_CODE_TSC_STMT_BIND_NUMBER_ERROR;
goto out;
}

code = stmtBindBatch2(stmt, bind, col_idx);
if (TSDB_CODE_SUCCESS != code) {
return code;
goto out;
}
}
}

return TSDB_CODE_SUCCESS;
out:
tSimpleHashCleanup(hashTbnames);

return code;
}

int taos_stmt2_exec(TAOS_STMT2 *stmt, int *affected_rows) {
Expand Down
42 changes: 40 additions & 2 deletions source/common/src/tdataformat.c
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,11 @@ static int32_t tBindInfoCompare(const void *p1, const void *p2, const void *para
* `infoSorted` is whether the bind information is sorted by column id
* `pTSchema` is the schema of the table
* `rowArray` is the array to store the rows
* `pOrdered` is the pointer to store ordered
* `pDupTs` is the pointer to store duplicateTs
*/
int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray) {
SArray *rowArray, bool *pOrdered, bool *pDupTs) {
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
return TSDB_CODE_INVALID_PARA;
}
Expand All @@ -469,6 +471,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
return terrno;
}

SRowKey rowKey, lastRowKey;
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
taosArrayClear(colValArray);

Expand Down Expand Up @@ -507,6 +510,22 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
code = terrno;
goto _exit;
}

if (pOrdered && pDupTs) {
tRowGetKey(row, &rowKey);
if (iRow == 0) {
*pOrdered = true;
*pDupTs = false;
} else {
// no more compare if we already get disordered or duplicate rows
if (*pOrdered && !*pDupTs) {
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
*pOrdered = (code >= 0);
*pDupTs = (code == 0);
}
}
lastRowKey = rowKey;
}
}

_exit:
Expand Down Expand Up @@ -3235,9 +3254,11 @@ int32_t tColDataAddValueByBind2(SColData *pColData, TAOS_STMT2_BIND *pBind, int3
* `infoSorted` is whether the bind information is sorted by column id
* `pTSchema` is the schema of the table
* `rowArray` is the array to store the rows
* `pOrdered` is the pointer to store ordered
* `pDupTs` is the pointer to store duplicateTs
*/
int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorted, const STSchema *pTSchema,
SArray *rowArray) {
SArray *rowArray, bool *pOrdered, bool *pDupTs) {
if (infos == NULL || numOfInfos <= 0 || numOfInfos > pTSchema->numOfCols || pTSchema == NULL || rowArray == NULL) {
return TSDB_CODE_INVALID_PARA;
}
Expand Down Expand Up @@ -3266,6 +3287,7 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
}
}

SRowKey rowKey, lastRowKey;
for (int32_t iRow = 0; iRow < numOfRows; iRow++) {
taosArrayClear(colValArray);

Expand Down Expand Up @@ -3317,6 +3339,22 @@ int32_t tRowBuildFromBind2(SBindInfo2 *infos, int32_t numOfInfos, bool infoSorte
code = terrno;
goto _exit;
}

if (pOrdered && pDupTs) {
tRowGetKey(row, &rowKey);
if (iRow == 0) {
*pOrdered = true;
*pDupTs = false;
} else {
// no more compare if we already get disordered or duplicate rows
if (*pOrdered && !*pDupTs) {
int32_t code = tRowKeyCompare(&rowKey, &lastRowKey);
*pOrdered = (code >= 0);
*pDupTs = (code == 0);
}
}
lastRowKey = rowKey;
}
}

_exit:
Expand Down
4 changes: 2 additions & 2 deletions source/libs/parser/src/parInsertStmt.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ int32_t qBindStmtStbColsValue(void* pBlock, SArray* pCols, TAOS_MULTI_BIND* bind
// }
}

code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
code = tRowBuildFromBind(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered, &pDataBlock->duplicateTs);

qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);

Expand Down Expand Up @@ -745,7 +745,7 @@ int32_t qBindStmtStbColsValue2(void* pBlock, SArray* pCols, TAOS_STMT2_BIND* bin
pBindInfos[c].bytes = pColSchema->bytes;
}

code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols);
code = tRowBuildFromBind2(pBindInfos, boundInfo->numOfBound, colInOrder, *pTSchema, pCols, &pDataBlock->ordered, &pDataBlock->duplicateTs);

qDebug("stmt all %d columns bind %d rows data", boundInfo->numOfBound, rowNum);

Expand Down
4 changes: 4 additions & 0 deletions tests/script/api/makefile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ exe:
# gcc $(CFLAGS) ./stmt2-get-fields.c -o $(ROOT)stmt2-get-fields $(LFLAGS)
# gcc $(CFLAGS) ./stmt2-nohole.c -o $(ROOT)stmt2-nohole $(LFLAGS)
gcc $(CFLAGS) ./stmt-crash.c -o $(ROOT)stmt-crash $(LFLAGS)
gcc $(CFLAGS) ./stmt-insert-dupkeys.c -o $(ROOT)stmt-insert-dupkeys $(LFLAGS)
gcc $(CFLAGS) ./stmt2-insert-dupkeys.c -o $(ROOT)stmt2-insert-dupkeys $(LFLAGS)

clean:
rm $(ROOT)batchprepare
Expand All @@ -47,3 +49,5 @@ clean:
rm $(ROOT)stmt2-get-fields
rm $(ROOT)stmt2-nohole
rm $(ROOT)stmt-crash
rm $(ROOT)stmt-insert-dupkeys
rm $(ROOT)stmt2-insert-dupkeys
Loading