From df92cc7181b8a80980169c37484803a50777f46d Mon Sep 17 00:00:00 2001 From: Lakshay Kalbhor Date: Wed, 16 Oct 2024 15:00:05 +0530 Subject: [PATCH] fix: mysql results table commands and add tests --- config.test_mysql.toml | 43 ++++++++++++++++++ config.test.toml => config.test_pg.toml | 4 +- internal/resultbackends/sqldb/sqldb.go | 59 ++++++++++++++----------- sql/mysql/test.mysql.sql | 26 +++++++++++ sql/{ => pg}/test.postgres.sql | 0 5 files changed, 105 insertions(+), 27 deletions(-) create mode 100644 config.test_mysql.toml rename config.test.toml => config.test_pg.toml (98%) create mode 100644 sql/mysql/test.mysql.sql rename sql/{ => pg}/test.postgres.sql (100%) diff --git a/config.test_mysql.toml b/config.test_mysql.toml new file mode 100644 index 0000000..5d4aaa2 --- /dev/null +++ b/config.test_mysql.toml @@ -0,0 +1,43 @@ +[app] +log_level = "DEBUG" +default_job_ttl = "60s" + +[job_queue.broker] +type = "redis" +addresses = ["localhost:6379"] +password = "" +db = 1 +max_active = 50 +max_idle = 20 +dial_timeout = "1s" +read_timeout = "1s" +write_timeout = "1s" + +[job_queue.state] +type = "redis" +addresses = ["localhost:6379"] +password = "" +db = 1 +max_active = 50 +max_idle = 20 +dial_timeout = "1s" +read_timeout = "1s" +write_timeout = "1s" +expiry = "30s" +meta_expiry = "3600s" + +# Results database configuration (MySQL) +[results.my_results] +type = "mysql" +dsn = "root:rootpassword@tcp(127.0.0.1:3306)/mydb" +max_idle = 10 +max_active = 100 +connect_timeout = "10s" +results_table = "results_%s" + +[db.my_db] +type = "mysql" +dsn = "root:rootpassword@tcp(127.0.0.1:3306)/mydb" +max_idle = 10 +max_active = 100 +connect_timeout = "10s" diff --git a/config.test.toml b/config.test_pg.toml similarity index 98% rename from config.test.toml rename to config.test_pg.toml index e2f2363..5ad2009 100644 --- a/config.test.toml +++ b/config.test_pg.toml @@ -6,7 +6,7 @@ default_job_ttl = "60s" type = "redis" addresses = ["localhost:6379"] password = "" -db = 1 +db = 2 max_active = 50 max_idle = 20 dial_timeout = "1s" @@ -17,7 +17,7 @@ write_timeout = "1s" type = "redis" addresses = ["localhost:6379"] password = "" -db = 1 +db = 2 max_active = 50 max_idle = 20 dial_timeout = "1s" diff --git a/internal/resultbackends/sqldb/sqldb.go b/internal/resultbackends/sqldb/sqldb.go index 554c4bb..3cb7266 100644 --- a/internal/resultbackends/sqldb/sqldb.go +++ b/internal/resultbackends/sqldb/sqldb.go @@ -187,7 +187,7 @@ func (w *SQLDBResultSet) WriteCols(cols []string) error { return err } - return err + return nil } // WriteRow writes an individual row from a result set to the backend. @@ -234,7 +234,7 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins ) for i := range cols { - colNameHolder[i] = fmt.Sprintf(`"%s"`, cols[i]) + colNameHolder[i] = s.quoteIdentifier(cols[i]) // This will be filled by the driver. if s.opt.DBType == dbTypePostgres { @@ -247,37 +247,35 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins var ( fields = make([]string, len(cols)) - typ = "" - unlogged = "" + typ string + unlogged string ) for i := 0; i < len(cols); i++ { typ = colTypes[i].DatabaseTypeName() - switch colTypes[i].DatabaseTypeName() { - case "INT2", "INT4", "INT8", // Postgres - "TINYINT", "SMALLINT", "INT", "MEDIUMINT", "BIGINT": // MySQL + switch typ { + case "INT2", "INT4", "INT8", "TINYINT", "SMALLINT", "INT", "MEDIUMINT", "BIGINT": typ = "BIGINT" - case "FLOAT4", "FLOAT8", // Postgres - "DECIMAL", "FLOAT", "DOUBLE", "NUMERIC": // MySQL + case "FLOAT4", "FLOAT8", "DECIMAL", "FLOAT", "DOUBLE", "NUMERIC": typ = "DECIMAL" - case "TIMESTAMP", // Postgres, MySQL - "DATETIME": // MySQL + case "TIMESTAMP", "DATETIME": typ = "TIMESTAMP" - case "DATE": // Postgres, MySQL + case "DATE": typ = "DATE" - case "BOOLEAN": // Postgres, MySQL + case "BOOLEAN": typ = "BOOLEAN" - case "JSON", "JSONB": // Postgres + case "JSON", "JSONB": + if s.opt.DBType == dbTypePostgres { + typ = "JSONB" + } else { + typ = "JSON" + } + case "_INT4", "_INT8", "_TEXT": if s.opt.DBType != dbTypePostgres { typ = "TEXT" } - // _INT4, _INT8, _TEXT represent array types in Postgres - case "_INT4": // Postgres - typ = "_INT4" - case "_INT8": // Postgres - typ = "_INT8" - case "_TEXT": // Postgres - typ = "_TEXT" + case "VARCHAR": + typ = "VARCHAR(255)" default: typ = "TEXT" } @@ -286,7 +284,7 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins typ += " NOT NULL" } - fields[i] = fmt.Sprintf(`"%s" %s`, cols[i], typ) + fields[i] = fmt.Sprintf("%s %s", s.quoteIdentifier(cols[i]), typ) } // If the DB is Postgres, optionally create an "unlogged" table that disables @@ -297,9 +295,20 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins } return insertSchema{ - dropTable: `DROP TABLE IF EXISTS "%s";`, - createTable: fmt.Sprintf(`CREATE %s TABLE IF NOT EXISTS "%%s" (%s);`, unlogged, strings.Join(fields, ",")), - insertRow: fmt.Sprintf(`INSERT INTO "%%s" (%s) VALUES (%s)`, strings.Join(colNameHolder, ","), + dropTable: fmt.Sprintf("DROP TABLE IF EXISTS %s;", s.quoteIdentifier("%s")), + createTable: fmt.Sprintf("CREATE %s TABLE IF NOT EXISTS %s (%s);", unlogged, s.quoteIdentifier("%s"), strings.Join(fields, ",")), + insertRow: fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", + s.quoteIdentifier("%s"), + strings.Join(colNameHolder, ","), strings.Join(colValHolder, ",")), } } + +// quoteIdentifier quotes an identifier (table or column name) based on the database type +func (s *SqlDB) quoteIdentifier(name string) string { + if s.opt.DBType == dbTypePostgres { + return fmt.Sprintf(`"%s"`, name) + } + // MySQL uses backticks + return fmt.Sprintf("`%s`", name) +} diff --git a/sql/mysql/test.mysql.sql b/sql/mysql/test.mysql.sql new file mode 100644 index 0000000..4a0c513 --- /dev/null +++ b/sql/mysql/test.mysql.sql @@ -0,0 +1,26 @@ +-- test.sql +-- concurrency parameter is associated with the queue. +-- Once a queue is set to a particular concurrency, it cannot be changed. +-- In the below example both `get_profit_summary` and `get_profit_entries` use +-- a common queue with concurrency = 5. It is okay to pass concurrency +-- again in `get_profit_entries` as long as it is the same as the one defined initially (5) + +-- name: get_profit_summary +-- db: my_db +-- concurrency: 5 +-- queue: test +SELECT SUM(amount) AS total, entry_date FROM entries WHERE user_id = ? GROUP BY entry_date; + +-- name: get_profit_entries +-- db: my_db +-- queue: test +SELECT * FROM entries WHERE user_id = ?; + +-- name: get_profit_entries_by_date +-- queue: test +SELECT * FROM entries WHERE user_id = ? AND timestamp > ? and timestamp < ?; + +-- name: slow_query +-- db: my_db +-- queue: test +SELECT SLEEP(?); \ No newline at end of file diff --git a/sql/test.postgres.sql b/sql/pg/test.postgres.sql similarity index 100% rename from sql/test.postgres.sql rename to sql/pg/test.postgres.sql