Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions data/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package data

import (
"database/sql"

"github.com/deis/workflow-manager/types"
)

// Cluster is an interface for managing a persistent cluster record
type Cluster interface {
Get(*sql.DB, string) (types.Cluster, error)
Set(*sql.DB, string, types.Cluster) (types.Cluster, error)
Checkin(*sql.DB, string, types.Cluster) (sql.Result, error)
FilterByAge(*sql.DB, *ClusterAgeFilter) ([]types.Cluster, error)
}
150 changes: 150 additions & 0 deletions data/cluster_age_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package data

import (
"fmt"
"strings"
"time"

"github.com/deis/workflow-manager-api/rest"
)

type keyAndTime struct {
key string
time time.Time
}

func (k keyAndTime) String() string {
return fmt.Sprintf("%s (%s)", k.key, k.time)
}

// ErrImpossibleFilter is the error returned when a caller tries to create a new ClusterAgeFilter
// with parameters that would create a filter that is guaranteed to produce no results. One such
// "impossible" query is a "created before" time is after a "created " time that is after (i.e.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar could be improved for this sentence... reads a bit awkward as is...

// an impossible filter). See the documentation on ClusterAgeFilter for examples of impossible
// filters
type ErrImpossibleFilter struct {
vals []keyAndTime
reason string
}

// Error is the error interface implementation
func (e ErrImpossibleFilter) Error() string {
strs := make([]string, len(e.vals))
for i := 0; i < len(e.vals); i++ {
strs[i] = e.vals[i].String()
}
return fmt.Sprintf("impossible filter for keys/times (%s): %s", strings.Join(strs, ", "), e.reason)
}

// ClusterAgeFilter is the struct used to filter on cluster ages. It represents the conjunction
// of all of its fields. For example:
//
// created_time<=CreatedBefore
// AND
// created_time>=CreatedAfter
// AND
// checked_in_time<=CheckedInBefore
// AND
// checked_in_time>=CheckedInAfter
type ClusterAgeFilter struct {
CheckedInBefore time.Time
CheckedInAfter time.Time
CreatedBefore time.Time
CreatedAfter time.Time
}

// NewClusterAgeFilter returns a new ClusterAgeFilter if the given times can result in a valid
// query that would return clusters. If not, returns nil and an ErrImpossibleFilter error
func NewClusterAgeFilter(
checkedInBefore,
checkedInAfter,
createdBefore,
createdAfter time.Time,
) (*ClusterAgeFilter, error) {
candidate := ClusterAgeFilter{
CheckedInBefore: checkedInBefore,
CheckedInAfter: checkedInAfter,
CreatedBefore: createdBefore,
CreatedAfter: createdAfter,
}
if err := candidate.checkValid(); err != nil {
return nil, err
}
return &candidate, nil
}

func (c ClusterAgeFilter) checkValid() error {
if c.CreatedBefore.After(c.CheckedInBefore) {
// you can't have clusters that were checked in before they were created
return ErrImpossibleFilter{
vals: []keyAndTime{
keyAndTime{key: rest.CreatedBeforeQueryStringKey, time: c.CreatedBefore},
keyAndTime{key: rest.CheckedInBeforeQueryStringKey, time: c.CheckedInBefore},
},
reason: fmt.Sprintf(
"%s needs to be greater than or equal to %s",
rest.CheckedInBeforeQueryStringKey,
rest.CreatedBeforeQueryStringKey,
),
}
} else if c.CheckedInAfter.After(c.CheckedInBefore) || c.CheckedInAfter.Equal(c.CheckedInBefore) {
// you can't have clusters that were checked in before time T-1
// and at the same time checked in after time T+1
return ErrImpossibleFilter{
vals: []keyAndTime{
keyAndTime{key: rest.CheckedInBeforeQueryStringKey, time: c.CheckedInBefore},
keyAndTime{key: rest.CheckedInAfterQueryStringKey, time: c.CheckedInAfter},
},
reason: fmt.Sprintf(
"%s needs to be greater than %s",
rest.CheckedInBeforeQueryStringKey,
rest.CheckedInAfterQueryStringKey,
),
}
} else if c.CreatedAfter.After(c.CreatedBefore) || c.CreatedAfter.Equal(c.CreatedBefore) {
// you can't have clusters that were created after time T+1
// and at the same time created before time T-1
return ErrImpossibleFilter{
vals: []keyAndTime{
keyAndTime{key: rest.CreatedAfterQueryStringKey, time: c.CreatedAfter},
keyAndTime{key: rest.CreatedBeforeQueryStringKey, time: c.CreatedBefore},
},
reason: fmt.Sprintf(
"%s needs to be greater than %s",
rest.CreatedBeforeQueryStringKey,
rest.CreatedAfterQueryStringKey,
),
}
} else if c.CheckedInBefore.Before(c.CreatedAfter) || c.CheckedInBefore.Equal(c.CreatedAfter) {
// you can't have clusters that were checked in before time T-1
// and at the same time created after time T+1
return ErrImpossibleFilter{
vals: []keyAndTime{
keyAndTime{key: rest.CheckedInBeforeQueryStringKey, time: c.CheckedInBefore},
keyAndTime{key: rest.CreatedAfterQueryStringKey, time: c.CreatedAfter},
},
reason: fmt.Sprintf(
"%s needs to be after %s",
rest.CheckedInBeforeQueryStringKey,
rest.CreatedAfterQueryStringKey,
),
}
}
return nil
}

func (c ClusterAgeFilter) checkedInBeforeTimestamp() string {
return c.CheckedInBefore.Format(StdTimestampFmt)
}

func (c ClusterAgeFilter) checkedInAfterTimestamp() string {
return c.CheckedInAfter.Format(StdTimestampFmt)
}

func (c ClusterAgeFilter) createdBeforeTimestamp() string {
return c.CreatedBefore.Format(StdTimestampFmt)
}

func (c ClusterAgeFilter) createdAfterTimestamp() string {
return c.CreatedAfter.Format(StdTimestampFmt)
}
48 changes: 48 additions & 0 deletions data/cluster_age_filter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package data

import (
"testing"
"time"
)

func TestNewClusterAgeFilter(t *testing.T) {
type testCase struct {
chB time.Time
chA time.Time
crB time.Time
crA time.Time
err bool
}

testCases := []testCase{
// checked in time test cases
testCase{chB: timeNow(), chA: timeNow(), crB: timeFuture(), crA: timeNow(), err: true},
testCase{chB: timeNow(), chA: timeFuture(), crB: timeFuture(), crA: timeNow(), err: true},
testCase{chB: timePast(), chA: timeNow(), crB: timeFuture(), crA: timeNow(), err: true},
testCase{chB: timeFuture(), chA: timeNow(), crB: timeFuture(), crA: timeNow(), err: false},
// create time test cases
testCase{chB: timeFuture(), chA: timeNow(), crB: timeNow(), crA: timeNow(), err: true},
testCase{chB: timeFuture(), chA: timeNow(), crB: timeNow(), crA: timeFuture(), err: true},
testCase{chB: timeFuture(), chA: timeNow(), crB: timePast(), crA: timeNow(), err: true},
testCase{chB: timeFuture(), chA: timeNow(), crB: timeNow(), crA: timePast(), err: false},
}

for i, testCase := range testCases {
filter, err := NewClusterAgeFilter(testCase.chB, testCase.chA, testCase.crB, testCase.crA)
if testCase.err && err == nil {
t.Errorf("expected error on iteration %d but got none", i)
continue
} else if !testCase.err && err != nil {
t.Errorf("expected no error on iteration %d but got %s", i, err)
continue
}
if filter == nil && err == nil {
t.Errorf("got no error but resulting filter was nil on iteration %d", i)
continue
}
if filter != nil && err != nil {
t.Errorf("got an error but resulting filter was not nil on iteration %d", i)
continue
}
}
}
119 changes: 119 additions & 0 deletions data/cluster_from_db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package data

import (
"database/sql"
"encoding/json"
"fmt"
"log"

"github.com/deis/workflow-manager/components"
"github.com/deis/workflow-manager/types"
)

// ClusterFromDB fulfills the Cluster interface
type ClusterFromDB struct{}

// Get method for ClusterFromDB, the actual database/sql.DB implementation
func (c ClusterFromDB) Get(db *sql.DB, id string) (types.Cluster, error) {
row := getDBRecord(db, clustersTableName, []string{clustersTableIDKey}, []string{id})
rowResult := ClustersTable{}
if err := row.Scan(&rowResult.clusterID, &rowResult.data); err != nil {
return types.Cluster{}, err
}
cluster, err := components.ParseJSONCluster(rowResult.data)
if err != nil {
log.Println("error parsing cluster")
return types.Cluster{}, err
}
return cluster, nil
}

// Set method for ClusterFromDB, the actual database/sql.DB implementation
func (c ClusterFromDB) Set(db *sql.DB, id string, cluster types.Cluster) (types.Cluster, error) {
var ret types.Cluster // return variable
js, err := json.Marshal(cluster)
if err != nil {
fmt.Println("error marshaling data")
}
row := getDBRecord(db, clustersTableName, []string{clustersTableIDKey}, []string{id})
var result sql.Result
// Register the "latest checkin" with the primary cluster record
rowResult := ClustersTable{}
if err := row.Scan(&rowResult.clusterID, &rowResult.data); err != nil {
result, err = newClusterDBRecord(db, id, js)
if err != nil {
log.Println(err)
}
} else {
result, err = updateClusterDBRecord(db, id, js)
if err != nil {
log.Println(err)
}
}
affected, err := result.RowsAffected()
if err != nil {
log.Println("failed to get affected row count")
}
if affected == 0 {
log.Println("no records updated")
} else if affected == 1 {
ret, err = c.Get(db, id)
if err != nil {
return types.Cluster{}, err
}
} else if affected > 1 {
log.Println("updated more than one record with same ID value!")
}
return ret, nil
}

// Checkin method for ClusterFromDB, the actual database/sql.DB implementation
func (c ClusterFromDB) Checkin(db *sql.DB, id string, cluster types.Cluster) (sql.Result, error) {
js, err := json.Marshal(cluster)
if err != nil {
fmt.Println("error marshaling data")
}
result, err := newClusterCheckinsDBRecord(db, id, now(), js)
if err != nil {
log.Println("cluster checkin db record not created", err)
return nil, err
}
return result, nil
}

// FilterByAge returns a slice of clusters whose various time fields match the requirements
// in the given filter. Note that the filter's requirements are a conjunction, not a disjunction
func (c ClusterFromDB) FilterByAge(db *sql.DB, filter *ClusterAgeFilter) ([]types.Cluster, error) {
query := fmt.Sprintf(`SELECT DISTINCT clusters.*
FROM clusters, clusters_checkins
WHERE clusters_checkins.cluster_id = clusters.cluster_id
GROUP BY clusters_checkins.cluster_id
HAVING MIN(clusters_checkins.created_at) > '%s'
AND MIN(clusters_checkins.created_at) < '%s'
AND MIN(clusters_checkins.created_at) > '%s'
AND MAX(clusters_checkins.created_at) < '%s'`,
filter.createdAfterTimestamp(),
filter.createdBeforeTimestamp(),
filter.checkedInAfterTimestamp(),
filter.checkedInBeforeTimestamp(),
)

rows, err := db.Query(query)
if err != nil {
return nil, err
}

clusters := []types.Cluster{}
for rows.Next() {
rowResult := ClustersTable{}
if err := rows.Scan(&rowResult.clusterID, &rowResult.data); err != nil {
return nil, err
}
cluster, err := components.ParseJSONCluster(rowResult.data)
if err != nil {
return nil, err
}
clusters = append(clusters, cluster)
}
return clusters, nil
}
Loading