Skip to content

Commit 8c1f046

Browse files
Using cobra and viper now
1 parent 9b54bf3 commit 8c1f046

File tree

16 files changed

+786
-600
lines changed

16 files changed

+786
-600
lines changed

action_handler.go

Lines changed: 0 additions & 38 deletions
This file was deleted.

azkaban/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ func (c *Client) Running() ([]FlowExecution, error) {
203203

204204
// Azkaban serves the login page simply with a HTTP 200 so the only way to check if we're looking at the login page
205205
// is by looking for the login element.
206-
if findElementWithID(doc,"username") != nil && findElementWithID(doc, "password") != nil {
206+
if findElementWithID(doc, "username") != nil && findElementWithID(doc, "password") != nil {
207207
return nil, errors.New("credentials expired, reauthenticate")
208208
}
209209

Lines changed: 98 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,148 +1,118 @@
1-
package main
1+
package cli
22

33
import (
44
"bufio"
55
"bytes"
6-
"errors"
76
"fmt"
87
"github.com/dustin/go-humanize"
98
"github.com/ilikeorangutans/harbormaster/azkaban"
10-
"github.com/urfave/cli"
9+
"github.com/spf13/cobra"
1110
"log"
1211
"os"
1312
"strings"
1413
)
1514

16-
func SetupCheckFlowAction() cli.Command {
17-
handler := &CheckFlowHandler{}
18-
return cli.Command{
19-
Name: "check",
15+
func NewCheckFlowCmd(context Context) *cobra.Command {
16+
checkCmd := &cobra.Command{
17+
Use: "check",
2018
Aliases: []string{"c"},
21-
Usage: "check thinks on Azkaban",
22-
Subcommands: []cli.Command{
23-
{
24-
Name: "flow",
25-
Aliases: []string{"f"},
26-
Usage: "check a flow",
27-
ArgsUsage: "flowID",
28-
Flags: []cli.Flag{
29-
cli.IntFlag{
30-
Name: "n",
31-
Usage: "number of executions to include in the details",
32-
Value: 5,
33-
},
34-
},
35-
Action: handler.CheckFlow,
36-
},
37-
},
19+
Short: "checks things",
3820
}
39-
}
40-
41-
type CheckFlowHandler struct {
42-
ActionWithContext
43-
}
4421

45-
func (h *CheckFlowHandler) CheckFlow(c *cli.Context) error {
46-
if !c.Args().Present() {
47-
return errors.New("expected flowID")
48-
}
49-
50-
flowRepo := h.Context().Flows()
51-
flow, proj, err := flowRepo.Flow(azkaban.Project{Name: c.GlobalString("project")}, c.Args().First())
52-
if err != nil {
53-
return err
54-
}
55-
56-
statusChecker := FlowStatusChecker{
57-
client: h.Client(),
58-
context: h.Context(),
59-
histogramCount: uint(c.Int("n")),
60-
project: proj,
61-
flow: flow,
62-
}
22+
checkFlowCmd := &cobra.Command{
23+
Use: "flow",
24+
Aliases: []string{"f"},
25+
Short: "check a given flow",
26+
Args: cobra.ExactArgs(1),
27+
Run: func(cmd *cobra.Command, args []string) {
28+
flowRepo := context.Context().Flows()
29+
flow, proj, err := flowRepo.Flow(azkaban.Project{Name: context.Project()}, args[0])
30+
if err != nil {
31+
log.Fatal(err)
32+
}
33+
statusChecker := FlowStatusChecker{
34+
client: context.Client(),
35+
context: context.Context(),
36+
histogramCount: 6,
37+
project: proj,
38+
flow: flow,
39+
}
6340

64-
status, err := statusChecker.printFlowStatus()
65-
if err != nil {
66-
return err
67-
}
41+
status, err := statusChecker.printFlowStatus()
42+
if err != nil {
43+
log.Fatal(err)
44+
}
6845

69-
if status.Health == azkaban.Critical {
70-
fmt.Printf("Execution failed in %q, log messages of interest:\n", status.FailedJob.ID)
71-
client := h.client
72-
buffer := bytes.NewBuffer([]byte{})
73-
_, err := client.FetchLogsUntilEnd(status.LastExecution.ID, status.FailedJob.ID, 0, buffer)
74-
if err != nil {
75-
return err
76-
}
46+
if status.Health == azkaban.Critical {
47+
fmt.Printf("Execution failed in %q, log messages of interest:\n", status.FailedJob.ID)
48+
client := context.Client()
49+
buffer := bytes.NewBuffer([]byte{})
50+
_, err := client.FetchLogsUntilEnd(status.LastExecution.ID, status.FailedJob.ID, 0, buffer)
51+
if err != nil {
52+
log.Fatal(err)
53+
}
7754

78-
scanner := bufio.NewScanner(strings.NewReader(buffer.String()))
55+
scanner := bufio.NewScanner(strings.NewReader(buffer.String()))
7956

80-
patterns := []string{
81-
"err",
82-
"exception",
83-
"failed",
84-
"failure",
85-
}
86-
fmt.Println(strings.Repeat("-", 80))
87-
for scanner.Scan() {
88-
line := scanner.Text()
89-
lower := strings.ToLower(line)
90-
ofInterest := false
91-
for _, p := range patterns {
92-
if strings.Contains(lower, p) {
93-
ofInterest = true
94-
break
57+
patterns := []string{
58+
"err",
59+
"exception",
60+
"failed",
61+
"failure",
9562
}
96-
}
97-
if ofInterest {
98-
fmt.Println(line)
99-
}
100-
}
101-
fmt.Println(strings.Repeat("-", 80))
102-
fmt.Println()
103-
104-
scanner = bufio.NewScanner(os.Stdin)
105-
run := true
106-
107-
printActionTerm()
108-
for run {
109-
printActionTerm()
110-
run = run && scanner.Scan()
111-
input := strings.ToLower(strings.TrimSpace(scanner.Text()))
112-
113-
if input == "restart" {
114-
client.RestartFlowNow(proj.Name, flow.FlowID)
115-
} else if input == "logs" {
116-
// TODO this might be slow:
117-
// fmt.Println(l)
118-
_, err = client.FetchLogsUntilEnd(status.LastExecution.ID, status.FailedJob.ID, 0, os.Stdout)
119-
if err != nil {
120-
return err
63+
fmt.Println(strings.Repeat("-", 80))
64+
for scanner.Scan() {
65+
line := scanner.Text()
66+
lower := strings.ToLower(line)
67+
ofInterest := false
68+
for _, p := range patterns {
69+
if strings.Contains(lower, p) {
70+
ofInterest = true
71+
break
72+
}
73+
}
74+
if ofInterest {
75+
fmt.Println(line)
76+
}
12177
}
122-
} else if input == "status" {
123-
status, err = statusChecker.printFlowStatus()
124-
if err != nil {
125-
return err
78+
fmt.Println(strings.Repeat("-", 80))
79+
fmt.Println()
80+
81+
scanner = bufio.NewScanner(os.Stdin)
82+
run := true
83+
84+
printActionTerm()
85+
for run {
86+
printActionTerm()
87+
run = run && scanner.Scan()
88+
input := strings.ToLower(strings.TrimSpace(scanner.Text()))
89+
90+
if input == "restart" {
91+
client.RestartFlowNow(proj.Name, flow.FlowID)
92+
} else if input == "logs" {
93+
// TODO this might be slow:
94+
// fmt.Println(l)
95+
_, err = client.FetchLogsUntilEnd(status.LastExecution.ID, status.FailedJob.ID, 0, os.Stdout)
96+
if err != nil {
97+
log.Fatal(err)
98+
}
99+
} else if input == "status" {
100+
status, err = statusChecker.printFlowStatus()
101+
if err != nil {
102+
log.Fatal(err)
103+
}
104+
} else if input == "unschedule" {
105+
fmt.Println("not implemented yet")
106+
} else {
107+
run = false
108+
}
126109
}
127-
} else if input == "unschedule" {
128-
fmt.Println("not implemented yet")
129-
} else {
130-
run = false
131110
}
132-
}
133-
111+
},
134112
}
113+
checkCmd.AddCommand(checkFlowCmd)
135114

136-
return nil
137-
138-
}
139-
140-
type FlowStatus struct {
141-
Project azkaban.Project
142-
Flow azkaban.Flow
143-
Health azkaban.Health
144-
LastExecution azkaban.Execution
145-
FailedJob azkaban.JobStatus
115+
return checkCmd
146116
}
147117

148118
type FlowStatusChecker struct {
@@ -227,3 +197,11 @@ func printActionTerm() {
227197
fmt.Println("Actions: do nothing|status|restart|unschedule|logs")
228198
fmt.Print("> ")
229199
}
200+
201+
type FlowStatus struct {
202+
Project azkaban.Project
203+
Flow azkaban.Flow
204+
Health azkaban.Health
205+
LastExecution azkaban.Execution
206+
FailedJob azkaban.JobStatus
207+
}

0 commit comments

Comments
 (0)