Skip to content

Commit 3aa75b1

Browse files
jackfrancisAaron Schlesinger
authored andcommitted
feat(*): implement cluster age filtering endpoint
This commit includes the data layer, handler & server, documentation and all tests. It has been squashed from 38 commits
1 parent bbdf19d commit 3aa75b1

18 files changed

+884
-109
lines changed

data/cluster.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package data
2+
3+
import (
4+
"database/sql"
5+
6+
"github.com/deis/workflow-manager/types"
7+
)
8+
9+
// Cluster is an interface for managing a persistent cluster record
10+
type Cluster interface {
11+
Get(*sql.DB, string) (types.Cluster, error)
12+
Set(*sql.DB, string, types.Cluster) (types.Cluster, error)
13+
Checkin(*sql.DB, string, types.Cluster) (sql.Result, error)
14+
FilterByAge(*sql.DB, *ClusterAgeFilter) ([]types.Cluster, error)
15+
}

data/cluster_age_filter.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package data
2+
3+
import (
4+
"fmt"
5+
"strings"
6+
"time"
7+
8+
"github.com/deis/workflow-manager-api/rest"
9+
)
10+
11+
type keyAndTime struct {
12+
key string
13+
time time.Time
14+
}
15+
16+
func (k keyAndTime) String() string {
17+
return fmt.Sprintf("%s (%s)", k.key, k.time)
18+
}
19+
20+
// ErrImpossibleFilter is the error returned when a called tries to create a new ClusterAgeFilter
21+
// with parameters that would create a filter that is guaranteed to produce no results. One such
22+
// "impossible" query is a "created before" time is after a "created " time that is after (i.e.
23+
// an impossible filter). One
24+
// example of an impossible
25+
type ErrImpossibleFilter struct {
26+
vals []keyAndTime
27+
reason string
28+
}
29+
30+
// Error is the error interface implementation
31+
func (e ErrImpossibleFilter) Error() string {
32+
strs := make([]string, len(e.vals))
33+
for i := 0; i < len(e.vals); i++ {
34+
strs[i] = e.vals[i].String()
35+
}
36+
return fmt.Sprintf("impossible filter for keys/times (%s): %s", strings.Join(strs, ", "), e.reason)
37+
}
38+
39+
// ClusterAgeFilter is the struct used to filter on cluster ages. It represents the conjunction
40+
// of all of its fields. For example:
41+
//
42+
// created_time<=CheckedInBefore
43+
// AND
44+
// created_time>=CheckedInAfter
45+
// AND
46+
// checked_in_time<=CheckedInBefore
47+
// AND
48+
// checked_in_time>=CheckedInAfter
49+
type ClusterAgeFilter struct {
50+
CheckedInBefore time.Time
51+
CheckedInAfter time.Time
52+
CreatedBefore time.Time
53+
CreatedAfter time.Time
54+
}
55+
56+
// NewClusterAgeFilter returns a new ClusterAgeFilter if the given times can result in a valid
57+
// query that would return clusters. If not, returns nil and an ErrImpossibleFilter error
58+
func NewClusterAgeFilter(
59+
checkedInBefore,
60+
checkedInAfter,
61+
createdBefore,
62+
createdAfter time.Time,
63+
) (*ClusterAgeFilter, error) {
64+
candidate := ClusterAgeFilter{
65+
CheckedInBefore: checkedInBefore,
66+
CheckedInAfter: checkedInAfter,
67+
CreatedBefore: createdBefore,
68+
CreatedAfter: createdAfter,
69+
}
70+
if err := candidate.checkValid(); err != nil {
71+
return nil, err
72+
}
73+
return &candidate, nil
74+
}
75+
76+
func (c ClusterAgeFilter) checkValid() error {
77+
if c.CreatedBefore.After(c.CheckedInBefore) {
78+
// you can't have clusters that were checked in before they were created
79+
return ErrImpossibleFilter{
80+
vals: []keyAndTime{
81+
keyAndTime{key: rest.CreatedBeforeQueryStringKey, time: c.CreatedBefore},
82+
keyAndTime{key: rest.CheckedInBeforeQueryStringKey, time: c.CheckedInBefore},
83+
},
84+
reason: fmt.Sprintf(
85+
"%s needs to be greater than or equal to %s",
86+
rest.CheckedInBeforeQueryStringKey,
87+
rest.CreatedBeforeQueryStringKey,
88+
),
89+
}
90+
} else if c.CheckedInAfter.After(c.CheckedInBefore) || c.CheckedInAfter.Equal(c.CheckedInBefore) {
91+
// you can't have clusters that were checked in before time T-1
92+
// and at the same time checked in after time T+1
93+
return ErrImpossibleFilter{
94+
vals: []keyAndTime{
95+
keyAndTime{key: rest.CheckedInBeforeQueryStringKey, time: c.CheckedInBefore},
96+
keyAndTime{key: rest.CheckedInAfterQueryStringKey, time: c.CheckedInAfter},
97+
},
98+
reason: fmt.Sprintf(
99+
"%s needs to be greater than %s",
100+
rest.CheckedInBeforeQueryStringKey,
101+
rest.CheckedInAfterQueryStringKey,
102+
),
103+
}
104+
} else if c.CreatedAfter.After(c.CreatedBefore) || c.CreatedAfter.Equal(c.CreatedBefore) {
105+
// you can't have clusters that were created after time T+1
106+
// and at the same time created before time T-1
107+
return ErrImpossibleFilter{
108+
vals: []keyAndTime{
109+
keyAndTime{key: rest.CreatedAfterQueryStringKey, time: c.CreatedAfter},
110+
keyAndTime{key: rest.CreatedBeforeQueryStringKey, time: c.CreatedBefore},
111+
},
112+
reason: fmt.Sprintf(
113+
"%s needs to be greater than %s",
114+
rest.CreatedBeforeQueryStringKey,
115+
rest.CreatedAfterQueryStringKey,
116+
),
117+
}
118+
} else if c.CheckedInBefore.Before(c.CreatedAfter) || c.CheckedInBefore.Equal(c.CreatedAfter) {
119+
// you can't have clusters that were checked in before time T-1
120+
// and at the same time created after time T+1
121+
return ErrImpossibleFilter{
122+
vals: []keyAndTime{
123+
keyAndTime{key: rest.CheckedInBeforeQueryStringKey, time: c.CheckedInBefore},
124+
keyAndTime{key: rest.CreatedAfterQueryStringKey, time: c.CreatedAfter},
125+
},
126+
reason: fmt.Sprintf(
127+
"%s needs to be after %s",
128+
rest.CheckedInBeforeQueryStringKey,
129+
rest.CreatedAfterQueryStringKey,
130+
),
131+
}
132+
}
133+
return nil
134+
}
135+
136+
func (c ClusterAgeFilter) checkedInBeforeTimestamp() string {
137+
return c.CheckedInBefore.Format(StdTimestampFmt)
138+
}
139+
140+
func (c ClusterAgeFilter) checkedInAfterTimestamp() string {
141+
return c.CheckedInAfter.Format(StdTimestampFmt)
142+
}
143+
144+
func (c ClusterAgeFilter) createdBeforeTimestamp() string {
145+
return c.CreatedBefore.Format(StdTimestampFmt)
146+
}
147+
148+
func (c ClusterAgeFilter) createdAfterTimestamp() string {
149+
return c.CreatedAfter.Format(StdTimestampFmt)
150+
}

data/cluster_age_filter_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package data
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestNewClusterAgeFilter(t *testing.T) {
9+
type testCase struct {
10+
chB time.Time
11+
chA time.Time
12+
crB time.Time
13+
crA time.Time
14+
err bool
15+
}
16+
17+
testCases := []testCase{
18+
// checked in time test cases
19+
testCase{chB: timeNow(), chA: timeNow(), crB: timeFuture(), crA: timeNow(), err: true},
20+
testCase{chB: timeNow(), chA: timeFuture(), crB: timeFuture(), crA: timeNow(), err: true},
21+
testCase{chB: timePast(), chA: timeNow(), crB: timeFuture(), crA: timeNow(), err: true},
22+
testCase{chB: timeFuture(), chA: timeNow(), crB: timeFuture(), crA: timeNow(), err: false},
23+
// create time test cases
24+
testCase{chB: timeFuture(), chA: timeNow(), crB: timeNow(), crA: timeNow(), err: true},
25+
testCase{chB: timeFuture(), chA: timeNow(), crB: timeNow(), crA: timeFuture(), err: true},
26+
testCase{chB: timeFuture(), chA: timeNow(), crB: timePast(), crA: timeNow(), err: true},
27+
testCase{chB: timeFuture(), chA: timeNow(), crB: timeNow(), crA: timePast(), err: false},
28+
}
29+
30+
for i, testCase := range testCases {
31+
filter, err := NewClusterAgeFilter(testCase.chB, testCase.chA, testCase.crB, testCase.crA)
32+
if testCase.err && err == nil {
33+
t.Errorf("expected error on iteration %d but got none", i)
34+
continue
35+
} else if !testCase.err && err != nil {
36+
t.Errorf("expected no error on iteration %d but got %s", i, err)
37+
continue
38+
}
39+
if filter == nil && err == nil {
40+
t.Errorf("got no error but resulting filter was nil on iteration %d", i)
41+
continue
42+
}
43+
if filter != nil && err != nil {
44+
t.Errorf("got an error but resulting filter was not nil on iteration %d", i)
45+
continue
46+
}
47+
}
48+
}

data/cluster_from_db.go

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package data
2+
3+
import (
4+
"database/sql"
5+
"encoding/json"
6+
"fmt"
7+
"log"
8+
9+
"github.com/deis/workflow-manager/components"
10+
"github.com/deis/workflow-manager/types"
11+
)
12+
13+
// ClusterFromDB fulfills the Cluster interface
14+
type ClusterFromDB struct{}
15+
16+
// Get method for ClusterFromDB, the actual database/sql.DB implementation
17+
func (c ClusterFromDB) Get(db *sql.DB, id string) (types.Cluster, error) {
18+
row := getDBRecord(db, clustersTableName, []string{clustersTableIDKey}, []string{id})
19+
rowResult := ClustersTable{}
20+
if err := row.Scan(&rowResult.clusterID, &rowResult.data); err != nil {
21+
return types.Cluster{}, err
22+
}
23+
cluster, err := components.ParseJSONCluster(rowResult.data)
24+
if err != nil {
25+
log.Println("error parsing cluster")
26+
return types.Cluster{}, err
27+
}
28+
return cluster, nil
29+
}
30+
31+
// Set method for ClusterFromDB, the actual database/sql.DB implementation
32+
func (c ClusterFromDB) Set(db *sql.DB, id string, cluster types.Cluster) (types.Cluster, error) {
33+
var ret types.Cluster // return variable
34+
js, err := json.Marshal(cluster)
35+
if err != nil {
36+
fmt.Println("error marshaling data")
37+
}
38+
row := getDBRecord(db, clustersTableName, []string{clustersTableIDKey}, []string{id})
39+
var result sql.Result
40+
// Register the "latest checkin" with the primary cluster record
41+
rowResult := ClustersTable{}
42+
if err := row.Scan(&rowResult.clusterID, &rowResult.data); err != nil {
43+
result, err = newClusterDBRecord(db, id, js)
44+
if err != nil {
45+
log.Println(err)
46+
}
47+
} else {
48+
result, err = updateClusterDBRecord(db, id, js)
49+
if err != nil {
50+
log.Println(err)
51+
}
52+
}
53+
affected, err := result.RowsAffected()
54+
if err != nil {
55+
log.Println("failed to get affected row count")
56+
}
57+
if affected == 0 {
58+
log.Println("no records updated")
59+
} else if affected == 1 {
60+
ret, err = c.Get(db, id)
61+
if err != nil {
62+
return types.Cluster{}, err
63+
}
64+
} else if affected > 1 {
65+
log.Println("updated more than one record with same ID value!")
66+
}
67+
return ret, nil
68+
}
69+
70+
// Checkin method for ClusterFromDB, the actual database/sql.DB implementation
71+
func (c ClusterFromDB) Checkin(db *sql.DB, id string, cluster types.Cluster) (sql.Result, error) {
72+
js, err := json.Marshal(cluster)
73+
if err != nil {
74+
fmt.Println("error marshaling data")
75+
}
76+
result, err := newClusterCheckinsDBRecord(db, id, now(), js)
77+
if err != nil {
78+
log.Println("cluster checkin db record not created", err)
79+
return nil, err
80+
}
81+
return result, nil
82+
}
83+
84+
// FilterByAge returns a slice of clusters whose various time fields match the requirements
85+
// in the given filter. Note that the filter's requirements are a conjunction, not a disjunction
86+
func (c ClusterFromDB) FilterByAge(db *sql.DB, filter *ClusterAgeFilter) ([]types.Cluster, error) {
87+
query := fmt.Sprintf(`SELECT DISTINCT clusters.*
88+
FROM clusters, clusters_checkins
89+
WHERE clusters_checkins.cluster_id = clusters.cluster_id
90+
GROUP BY clusters_checkins.cluster_id
91+
HAVING MIN(clusters_checkins.created_at) > '%s'
92+
AND MIN(clusters_checkins.created_at) < '%s'
93+
AND MIN(clusters_checkins.created_at) > '%s'
94+
AND MAX(clusters_checkins.created_at) < '%s'`,
95+
filter.createdAfterTimestamp(),
96+
filter.createdBeforeTimestamp(),
97+
filter.checkedInAfterTimestamp(),
98+
filter.checkedInBeforeTimestamp(),
99+
)
100+
101+
rows, err := db.Query(query)
102+
if err != nil {
103+
return nil, err
104+
}
105+
106+
clusters := []types.Cluster{}
107+
for rows.Next() {
108+
rowResult := ClustersTable{}
109+
if err := rows.Scan(&rowResult.clusterID, &rowResult.data); err != nil {
110+
return nil, err
111+
}
112+
cluster, err := components.ParseJSONCluster(rowResult.data)
113+
if err != nil {
114+
return nil, err
115+
}
116+
clusters = append(clusters, cluster)
117+
}
118+
return clusters, nil
119+
}

0 commit comments

Comments
 (0)