-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #56 from radekg/dockertest
Add YugabyteDB Dockertest integration and utilities. #14
- Loading branch information
Showing
10 changed files
with
1,372 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
# YugabyteDB Test Kit | ||
|
||
YugabyteDB for embedding in go tests. Depends on Docker. | ||
|
||
## Usage | ||
|
||
Individual packages contain tests showing exact usage patterns for your own tests. | ||
|
||
## Example | ||
|
||
Run three masters with three TServers inside of the test and query YSQL on one of the TServers: | ||
|
||
```go | ||
package myprogram | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/hashicorp/go-hclog" | ||
"github.com/radekg/yugabyte-db-go-client/testutils/common" | ||
"github.com/radekg/yugabyte-db-go-client/testutils/master" | ||
"github.com/radekg/yugabyte-db-go-client/testutils/tserver" | ||
"github.com/radekg/yugabyte-db-go-client/client/implementation" | ||
"github.com/radekg/yugabyte-db-go-client/configs" | ||
|
||
// Postgres library: | ||
_ "github.com/lib/pq" | ||
) | ||
|
||
func TestClusterIntegration(t *testing.T) { | ||
|
||
masterTestCtx := master.SetupMasters(t, &common.TestMasterConfiguration{ | ||
ReplicationFactor: 3, | ||
MasterPrefix: "mytest", | ||
}) | ||
defer masterTestCtx.Cleanup() | ||
|
||
client, err := implementation.MasterLeaderConnectedClient(&configs.CliConfig{ | ||
MasterHostPort: masterTestCtx.MasterExternalAddresses(), | ||
OpTimeout: time.Duration(time.Second * 5), | ||
}, hclog.Default()) | ||
|
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
defer client.Close() | ||
|
||
common.Eventually(t, 15, func() error { | ||
listMastersPb, err := client.ListMasters() | ||
if err != nil { | ||
return err | ||
} | ||
t.Log(" ==> Received master list", listMastersPb) | ||
return nil | ||
}) | ||
|
||
// start a TServer: | ||
tserver1Ctx := tserver.SetupTServer(t, masterTestCtx, &common.TestTServerConfiguration{ | ||
TServerID: "my-tserver-1", | ||
}) | ||
defer tserver1Ctx.Cleanup() | ||
|
||
// start a TServer: | ||
tserver2Ctx := tserver.SetupTServer(t, masterTestCtx, &common.TestTServerConfiguration{ | ||
TServerID: "my-tserver-2", | ||
}) | ||
defer tserver2Ctx.Cleanup() | ||
|
||
// start a TServer: | ||
tserver3Ctx := tserver.SetupTServer(t, masterTestCtx, &common.TestTServerConfiguration{ | ||
TServerID: "my-tserver-3", | ||
}) | ||
defer tserver3Ctx.Cleanup() | ||
|
||
common.Eventually(t, 15, func() error { | ||
listTServersPb, err := client.ListTabletServers(&configs.OpListTabletServersConfig{}) | ||
if err != nil { | ||
return err | ||
} | ||
t.Log(" ==> Received TServer list", listTServersPb) | ||
return nil | ||
}) | ||
|
||
// try YSQL connection: | ||
t.Logf("connecting to YSQL at 127.0.0.1:%s", tserver1Ctx.TServerExternalYSQLPort()) | ||
db, sqlOpenErr := sql.Open("postgres", fmt.Sprintf("host=127.0.0.1 port=%s user=%s password=%s dbname=%s sslmode=disable", | ||
tserver1Ctx.TServerExternalYSQLPort(), "yugabyte", "yugabyte", "yugabyte")) | ||
if sqlOpenErr != nil { | ||
t.Fatal("failed connecting to YSQL, reason:", sqlOpenErr) | ||
} | ||
defer db.Close() | ||
t.Log("connected to YSQL") | ||
|
||
common.Eventually(t, 15, func() error { | ||
rows, sqlQueryErr := db.Query("select table_name from information_schema.tables") | ||
if sqlQueryErr != nil { | ||
return sqlQueryErr | ||
} | ||
nRows := 0 | ||
for { | ||
if !rows.Next() { | ||
break | ||
} | ||
nRows = nRows + 1 | ||
} | ||
t.Log("selected", nRows, " rows from YSQL") | ||
return nil | ||
}) | ||
|
||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,195 @@ | ||
package common | ||
|
||
import ( | ||
"errors" | ||
"fmt" | ||
"net" | ||
"os" | ||
"strings" | ||
"sync" | ||
"testing" | ||
"time" | ||
|
||
"github.com/ory/dockertest/v3" | ||
dc "github.com/ory/dockertest/v3/docker" | ||
) | ||
|
||
// GetEnvOrDefault returns the value of an environment variable | ||
// or fallback value, if environment variable is undefined or empty. | ||
func GetEnvOrDefault(key, fallback string) string { | ||
value := os.Getenv(key) | ||
if len(value) == 0 { | ||
return fallback | ||
} | ||
return value | ||
} | ||
|
||
// -- Random port supplier. | ||
|
||
// RandomPortSupplier wraps the functionality for random port handling in tests. | ||
type RandomPortSupplier interface { | ||
Cleanup() | ||
Discover() error | ||
DiscoveredHost() (string, bool) | ||
DiscoveredPort() (string, bool) | ||
} | ||
|
||
type listenerPortSupplier struct { | ||
closed bool | ||
discovered bool | ||
discoveredHost string | ||
discoveredPort string | ||
listener net.Listener | ||
lock *sync.Mutex | ||
} | ||
|
||
// NewRandomPortSupplier creates an initialized instance of a random port supplier. | ||
func NewRandomPortSupplier() (RandomPortSupplier, error) { | ||
listener, err := net.Listen("tcp", "127.0.0.1:0") | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &listenerPortSupplier{ | ||
lock: &sync.Mutex{}, | ||
listener: listener, | ||
}, nil | ||
} | ||
|
||
func (l *listenerPortSupplier) Cleanup() { | ||
l.lock.Lock() | ||
defer l.lock.Unlock() | ||
if !l.closed { | ||
l.listener.Close() | ||
l.closed = true | ||
} | ||
} | ||
|
||
func (l *listenerPortSupplier) Discover() error { | ||
l.lock.Lock() | ||
defer l.lock.Unlock() | ||
if l.closed { | ||
return errors.New("was-closed") | ||
} | ||
host, port, err := net.SplitHostPort(l.listener.Addr().String()) | ||
if err != nil { | ||
return err | ||
} | ||
l.discoveredHost = host | ||
l.discoveredPort = port | ||
l.discovered = true | ||
return nil | ||
} | ||
|
||
func (l *listenerPortSupplier) DiscoveredHost() (string, bool) { | ||
l.lock.Lock() | ||
defer l.lock.Unlock() | ||
return l.discoveredHost, l.discovered | ||
} | ||
|
||
func (l *listenerPortSupplier) DiscoveredPort() (string, bool) { | ||
l.lock.Lock() | ||
defer l.lock.Unlock() | ||
return l.discoveredPort, l.discovered | ||
} | ||
|
||
// WaitForContainerExit0 waits for the container to exist with code 0. | ||
func WaitForContainerExit0(t *testing.T, pool *dockertest.Pool, containerID string) error { | ||
finalState := "not started" | ||
finalStatus := "" | ||
|
||
benchMigrateStart := time.Now() | ||
chanSuccess := make(chan struct{}, 1) | ||
chanError := make(chan error, 1) | ||
|
||
go func() { | ||
poolRetryErr := pool.Retry(func() error { | ||
containers, _ := pool.Client.ListContainers(dc.ListContainersOptions{All: true}) | ||
for _, container := range containers { | ||
if container.ID == containerID { | ||
time.Sleep(time.Millisecond * 50) | ||
if container.State == "running" { | ||
return errors.New("still running") | ||
} | ||
if container.State == "restarting" { | ||
t.Logf("container %s is restarting with status '%s'...", containerID, container.Status) | ||
time.Sleep(time.Second) | ||
continue | ||
} | ||
finalState = container.State | ||
finalStatus = container.Status | ||
return nil | ||
} | ||
} | ||
return errors.New("no container") | ||
}) | ||
if poolRetryErr == nil { | ||
close(chanSuccess) | ||
return | ||
} | ||
chanError <- poolRetryErr | ||
}() | ||
|
||
select { | ||
case <-chanSuccess: | ||
t.Logf("container %s finished successfully after: %s", containerID, time.Now().Sub(benchMigrateStart).String()) | ||
case receivedError := <-chanError: | ||
return receivedError | ||
case <-time.After(time.Second * 10): | ||
return fmt.Errorf("container %s complete within timeout", containerID) | ||
} | ||
|
||
if finalState != "exited" { | ||
return fmt.Errorf("expected container %s to be in state exited but received: '%s'", containerID, finalState) | ||
} | ||
// it was exited, ... | ||
if !strings.HasPrefix(strings.ToLower(finalStatus), "exited (0)") { | ||
return fmt.Errorf("expected container %s to exit with status 0, received full exit message: '%s'", containerID, finalStatus) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// CompareStringSlices cpmpares if two string slices have the same length and same values. | ||
func CompareStringSlices(t *testing.T, this, that []string) { | ||
if len(this) != len(that) { | ||
t.Fatalf("expected did not match received: '%v' vs '%v'", this, that) | ||
} | ||
for idx, exp := range this { | ||
if exp != that[idx] { | ||
t.Fatalf("expected did not match received at index %d: '%v' vs '%v'", idx, exp, that[idx]) | ||
} | ||
} | ||
} | ||
|
||
// StringSlicesEqual compares two slices without failing the test. | ||
func StringSlicesEqual(t *testing.T, this, that []string) bool { | ||
if len(this) != len(that) { | ||
return false | ||
} | ||
for idx, exp := range this { | ||
if exp != that[idx] { | ||
return false | ||
} | ||
} | ||
return true | ||
} | ||
|
||
// -- | ||
|
||
// Eventually retries the request maximum number of tries and stops on success, or error. | ||
func Eventually(t *testing.T, maxTimes int, f func() error, messageLabels ...interface{}) { | ||
nTries := 0 | ||
for { | ||
if nTries > maxTimes { | ||
t.Fatal(append([]interface{}{fmt.Sprintf("Failed running block %d times", maxTimes)}, messageLabels...)...) | ||
} | ||
if err := f(); err != nil { | ||
nTries = nTries + 1 | ||
t.Log(append(messageLabels, err.Error())...) | ||
<-time.After(time.Second) | ||
continue | ||
} | ||
|
||
break | ||
} | ||
} |
Oops, something went wrong.