Skip to content

Commit

Permalink
fix: mysql results table commands and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Lakshay Kalbhor committed Oct 16, 2024
1 parent e7df3a1 commit df92cc7
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 27 deletions.
43 changes: 43 additions & 0 deletions config.test_mysql.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
[app]
log_level = "DEBUG"
default_job_ttl = "60s"

[job_queue.broker]
type = "redis"
addresses = ["localhost:6379"]
password = ""
db = 1
max_active = 50
max_idle = 20
dial_timeout = "1s"
read_timeout = "1s"
write_timeout = "1s"

[job_queue.state]
type = "redis"
addresses = ["localhost:6379"]
password = ""
db = 1
max_active = 50
max_idle = 20
dial_timeout = "1s"
read_timeout = "1s"
write_timeout = "1s"
expiry = "30s"
meta_expiry = "3600s"

# Results database configuration (MySQL)
[results.my_results]
type = "mysql"
dsn = "root:rootpassword@tcp(127.0.0.1:3306)/mydb"
max_idle = 10
max_active = 100
connect_timeout = "10s"
results_table = "results_%s"

[db.my_db]
type = "mysql"
dsn = "root:rootpassword@tcp(127.0.0.1:3306)/mydb"
max_idle = 10
max_active = 100
connect_timeout = "10s"
4 changes: 2 additions & 2 deletions config.test.toml → config.test_pg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ default_job_ttl = "60s"
type = "redis"
addresses = ["localhost:6379"]
password = ""
db = 1
db = 2
max_active = 50
max_idle = 20
dial_timeout = "1s"
Expand All @@ -17,7 +17,7 @@ write_timeout = "1s"
type = "redis"
addresses = ["localhost:6379"]
password = ""
db = 1
db = 2
max_active = 50
max_idle = 20
dial_timeout = "1s"
Expand Down
59 changes: 34 additions & 25 deletions internal/resultbackends/sqldb/sqldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (w *SQLDBResultSet) WriteCols(cols []string) error {
return err
}

return err
return nil
}

// WriteRow writes an individual row from a result set to the backend.
Expand Down Expand Up @@ -234,7 +234,7 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
)

for i := range cols {
colNameHolder[i] = fmt.Sprintf(`"%s"`, cols[i])
colNameHolder[i] = s.quoteIdentifier(cols[i])

// This will be filled by the driver.
if s.opt.DBType == dbTypePostgres {
Expand All @@ -247,37 +247,35 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins

var (
fields = make([]string, len(cols))
typ = ""
unlogged = ""
typ string
unlogged string
)

for i := 0; i < len(cols); i++ {
typ = colTypes[i].DatabaseTypeName()
switch colTypes[i].DatabaseTypeName() {
case "INT2", "INT4", "INT8", // Postgres
"TINYINT", "SMALLINT", "INT", "MEDIUMINT", "BIGINT": // MySQL
switch typ {
case "INT2", "INT4", "INT8", "TINYINT", "SMALLINT", "INT", "MEDIUMINT", "BIGINT":
typ = "BIGINT"
case "FLOAT4", "FLOAT8", // Postgres
"DECIMAL", "FLOAT", "DOUBLE", "NUMERIC": // MySQL
case "FLOAT4", "FLOAT8", "DECIMAL", "FLOAT", "DOUBLE", "NUMERIC":
typ = "DECIMAL"
case "TIMESTAMP", // Postgres, MySQL
"DATETIME": // MySQL
case "TIMESTAMP", "DATETIME":
typ = "TIMESTAMP"
case "DATE": // Postgres, MySQL
case "DATE":
typ = "DATE"
case "BOOLEAN": // Postgres, MySQL
case "BOOLEAN":
typ = "BOOLEAN"
case "JSON", "JSONB": // Postgres
case "JSON", "JSONB":
if s.opt.DBType == dbTypePostgres {
typ = "JSONB"
} else {
typ = "JSON"
}
case "_INT4", "_INT8", "_TEXT":
if s.opt.DBType != dbTypePostgres {
typ = "TEXT"
}
// _INT4, _INT8, _TEXT represent array types in Postgres
case "_INT4": // Postgres
typ = "_INT4"
case "_INT8": // Postgres
typ = "_INT8"
case "_TEXT": // Postgres
typ = "_TEXT"
case "VARCHAR":
typ = "VARCHAR(255)"
default:
typ = "TEXT"
}
Expand All @@ -286,7 +284,7 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
typ += " NOT NULL"
}

fields[i] = fmt.Sprintf(`"%s" %s`, cols[i], typ)
fields[i] = fmt.Sprintf("%s %s", s.quoteIdentifier(cols[i]), typ)
}

// If the DB is Postgres, optionally create an "unlogged" table that disables
Expand All @@ -297,9 +295,20 @@ func (s *SqlDB) createTableSchema(cols []string, colTypes []*sql.ColumnType) ins
}

return insertSchema{
dropTable: `DROP TABLE IF EXISTS "%s";`,
createTable: fmt.Sprintf(`CREATE %s TABLE IF NOT EXISTS "%%s" (%s);`, unlogged, strings.Join(fields, ",")),
insertRow: fmt.Sprintf(`INSERT INTO "%%s" (%s) VALUES (%s)`, strings.Join(colNameHolder, ","),
dropTable: fmt.Sprintf("DROP TABLE IF EXISTS %s;", s.quoteIdentifier("%s")),
createTable: fmt.Sprintf("CREATE %s TABLE IF NOT EXISTS %s (%s);", unlogged, s.quoteIdentifier("%s"), strings.Join(fields, ",")),
insertRow: fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)",
s.quoteIdentifier("%s"),
strings.Join(colNameHolder, ","),
strings.Join(colValHolder, ",")),
}
}

// quoteIdentifier quotes an identifier (table or column name) based on the database type
func (s *SqlDB) quoteIdentifier(name string) string {
if s.opt.DBType == dbTypePostgres {
return fmt.Sprintf(`"%s"`, name)
}
// MySQL uses backticks
return fmt.Sprintf("`%s`", name)
}
26 changes: 26 additions & 0 deletions sql/mysql/test.mysql.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- test.sql
-- concurrency parameter is associated with the queue.
-- Once a queue is set to a particular concurrency, it cannot be changed.
-- In the below example both `get_profit_summary` and `get_profit_entries` use
-- a common queue with concurrency = 5. It is okay to pass concurrency
-- again in `get_profit_entries` as long as it is the same as the one defined initially (5)

-- name: get_profit_summary
-- db: my_db
-- concurrency: 5
-- queue: test
SELECT SUM(amount) AS total, entry_date FROM entries WHERE user_id = ? GROUP BY entry_date;

-- name: get_profit_entries
-- db: my_db
-- queue: test
SELECT * FROM entries WHERE user_id = ?;

-- name: get_profit_entries_by_date
-- queue: test
SELECT * FROM entries WHERE user_id = ? AND timestamp > ? and timestamp < ?;

-- name: slow_query
-- db: my_db
-- queue: test
SELECT SLEEP(?);
File renamed without changes.

0 comments on commit df92cc7

Please sign in to comment.