Skip to content

Commit 5ab2413

Browse files
Merge pull request #8 from ilikeorangutans/check_project
Check project
2 parents 6eb8368 + 1b9b814 commit 5ab2413

19 files changed

+290
-98
lines changed

azkaban/client.go

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -86,13 +86,13 @@ func (c *Client) FetchExecutionJobLog(executionID int64, jobID string, offset in
8686
return log, err
8787
}
8888

89-
func (c *Client) FlowExecutions(project, flow string) (Executions, error) {
89+
func (c *Client) FlowExecutions(project, flow string, paginator Paginator) (Executions, error) {
9090
params := make(map[string]string)
9191
params["ajax"] = "fetchFlowExecutions"
9292
params["project"] = project
9393
params["flow"] = flow
94-
params["start"] = "0"
95-
params["length"] = "20"
94+
params["start"] = strconv.Itoa(paginator.Offset)
95+
params["length"] = strconv.Itoa(paginator.Length)
9696

9797
executions := ExecutionsList{}
9898
if err := c.requestAndDecode("GET", "manager", params, &executions); err != nil {
@@ -257,15 +257,6 @@ func findExecutions(n *htmlx.Node) ([]FlowExecution, error) {
257257
return executions, nil
258258
}
259259

260-
func getAttribute(n *htmlx.Node, name string) string {
261-
for _, a := range n.Attr {
262-
if a.Key == name {
263-
return a.Val
264-
}
265-
}
266-
return ""
267-
}
268-
269260
func findElementsOfType(n *htmlx.Node, t string) []*htmlx.Node {
270261
var result []*htmlx.Node
271262
if n.Type == htmlx.ElementNode && n.Data == t {
@@ -281,7 +272,7 @@ func findElementsOfType(n *htmlx.Node, t string) []*htmlx.Node {
281272

282273
func findElementWithID(n *htmlx.Node, id string) *htmlx.Node {
283274
if hasAttribute(n, "id", id) {
284-
return n;
275+
return n
285276
}
286277
for c := n.FirstChild; c != nil; c = c.NextSibling {
287278
if result := findElementWithID(c, id); result != nil {

azkaban/executions.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ type ajaxExecRepo struct {
1515
}
1616

1717
func (r *ajaxExecRepo) ListExecutions(proj Project, f Flow, p Paginator) (Executions, error) {
18-
return r.azkaban.FlowExecutions(proj.Name, f.FlowID)
18+
return r.azkaban.FlowExecutions(proj.Name, f.FlowID, p)
1919
}

azkaban/flow_predicates.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package azkaban
2+
3+
import (
4+
"log"
5+
"regexp"
6+
"strings"
7+
)
8+
9+
type FlowPredicate func(Flow) bool
10+
11+
func MatchesAnyFlow() FlowPredicate {
12+
return func(_ Flow) bool { return true }
13+
}
14+
func MatchesFlowName(input string) FlowPredicate {
15+
if len(input) > 0 {
16+
regexString := input
17+
if strings.HasPrefix(regexString, "/") && strings.HasSuffix(regexString, "/") {
18+
regex, err := regexp.Compile(regexString[1 : len(input)-1])
19+
if err != nil {
20+
log.Fatal(err)
21+
}
22+
return func(f Flow) bool { return regex.MatchString(f.FlowID) }
23+
} else {
24+
return func(f Flow) bool { return strings.HasPrefix(f.FlowID, input) }
25+
}
26+
} else {
27+
return func(f Flow) bool { return true }
28+
}
29+
30+
}
31+
32+
// MatchesAll returns a FlowPredicate that returns true if a flow matches all the provided predicates.
33+
func MatchesAll(predicates ...FlowPredicate) FlowPredicate {
34+
return func(f Flow) bool {
35+
for _, p := range predicates {
36+
if !p(f) {
37+
return false
38+
}
39+
}
40+
return true
41+
}
42+
}

azkaban/flows.go

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
package azkaban
22

3+
import "sort"
4+
5+
type Flows []Flow
6+
7+
func (f Flows) Len() int { return len(f) }
8+
func (f Flows) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
9+
func (f Flows) Less(i, j int) bool { return f[i].FlowID < f[j].FlowID }
10+
311
type FlowRepository interface {
412
// Flow returns a flow instance for the given name and project
513
Flow(Project, string) (Flow, Project, error)
6-
ListFlows(Project) ([]Flow, error)
14+
ListFlows(Project, FlowPredicate) (Flows, error)
715
}
816

917
func NewFlowRepository(client *Client) FlowRepository {
@@ -16,8 +24,22 @@ type ajaxFlowRepoImpl struct {
1624
azkaban *Client
1725
}
1826

19-
func (r *ajaxFlowRepoImpl) ListFlows(proj Project) ([]Flow, error) {
20-
return r.azkaban.ListFlows(proj.Name)
27+
func (r *ajaxFlowRepoImpl) ListFlows(proj Project, predicate FlowPredicate) (Flows, error) {
28+
flows, err := r.azkaban.ListFlows(proj.Name)
29+
if err != nil {
30+
return nil, err
31+
}
32+
33+
var result Flows
34+
for _, flow := range flows {
35+
if predicate(flow) {
36+
result = append(result, flow)
37+
}
38+
}
39+
40+
sort.Sort(result)
41+
42+
return result, nil
2143
}
2244

2345
func (r *ajaxFlowRepoImpl) Flow(proj Project, flowID string) (Flow, Project, error) {

azkaban/health.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,15 @@ func (h Health) Colored() string {
1818
}
1919
}
2020

21+
func (h Health) IsHealthy() bool {
22+
switch h {
23+
case Healthy:
24+
return true
25+
default:
26+
return false
27+
}
28+
}
29+
2130
const (
2231
Healthy Health = "healthy"
2332
Concerning = "concerning"

azkaban/paginator.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,11 @@ type Paginator struct {
55
Offset int
66
}
77

8+
func NMostRecent(n int) Paginator {
9+
return Paginator{
10+
Length: n,
11+
Offset: 0,
12+
}
13+
}
14+
815
var TenMostRecent = Paginator{Length: 10, Offset: 0}

azkaban/projects.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
package azkaban
22

3+
import (
4+
"errors"
5+
"fmt"
6+
)
7+
38
type ProjectRepository interface {
49
ListProjects() ([]Project, error)
10+
ByName(name string) (Project, error)
511
}
612

713
func NewProjectRepository(client *Client) ProjectRepository {
@@ -14,6 +20,20 @@ type projectRepoImpl struct {
1420
client *Client
1521
}
1622

23+
func (r *projectRepoImpl) ByName(name string) (Project, error) {
24+
projects, err := r.ListProjects()
25+
if err != nil {
26+
return Project{}, err
27+
}
28+
29+
for _, p := range projects {
30+
if p.Name == name {
31+
return p, nil
32+
}
33+
}
34+
return Project{}, errors.New(fmt.Sprintf("no project with name %s", name))
35+
}
36+
1737
func (r *projectRepoImpl) ListProjects() ([]Project, error) {
1838
return r.client.ListProjects()
1939
}

azkaban/status.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import (
99
type Status string
1010

1111
func (s Status) Colored() string {
12-
return s.ColorFunc()((fmt.Sprintf("%-9s", s)))
12+
return s.ColorFunc()(fmt.Sprintf("%-9s", s))
1313
}
1414

1515
func (s Status) ColorFunc() func(string, ...interface{}) string {

cli/check_cmd.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package cli
2+
3+
import (
4+
"github.com/spf13/cobra"
5+
)
6+
7+
func NewCheckCmd(context Context) *cobra.Command {
8+
checkCmd := &cobra.Command{
9+
Use: "check",
10+
Aliases: []string{"c"},
11+
Short: "checks things",
12+
}
13+
14+
checkCmd.AddCommand(newCheckFlowCmd(context))
15+
checkCmd.AddCommand(newCheckProjectCmd(context))
16+
17+
return checkCmd
18+
}

cli/check_flow_cmd.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,8 @@ import (
1212
"strings"
1313
)
1414

15-
func NewCheckFlowCmd(context Context) *cobra.Command {
16-
checkCmd := &cobra.Command{
17-
Use: "check",
18-
Aliases: []string{"c"},
19-
Short: "checks things",
20-
}
21-
22-
checkFlowCmd := &cobra.Command{
15+
func newCheckFlowCmd(context Context) *cobra.Command {
16+
return &cobra.Command{
2317
Use: "flow",
2418
Aliases: []string{"f"},
2519
Short: "check a given flow",
@@ -110,9 +104,6 @@ func NewCheckFlowCmd(context Context) *cobra.Command {
110104
}
111105
},
112106
}
113-
checkCmd.AddCommand(checkFlowCmd)
114-
115-
return checkCmd
116107
}
117108

118109
type FlowStatusChecker struct {
@@ -140,7 +131,7 @@ func (h FlowStatusChecker) printSchedule() error {
140131
func (h FlowStatusChecker) printFlowStatus() (status FlowStatus, err error) {
141132
fmt.Printf("Checking status of %s %s...\n", h.project.Name, h.flow.FlowID)
142133
client := h.client
143-
executions, err := client.FlowExecutions(h.project.Name, h.flow.FlowID)
134+
executions, err := client.FlowExecutions(h.project.Name, h.flow.FlowID, azkaban.TenMostRecent)
144135
if err != nil {
145136
return status, err
146137
}

0 commit comments

Comments
 (0)