Skip to content

Refactor AWS SDK integration and update dependencies #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
vendor
dist
saw
go.sum
179 changes: 80 additions & 99 deletions blade/blade.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package blade

import (
"context"
"encoding/json"
"fmt"
"os"
Expand All @@ -9,175 +10,155 @@ import (

"github.com/TylerBrock/colorjson"
"github.com/TylerBrock/saw/config"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/types"
"github.com/fatih/color"
)

// A Blade is a Saw execution instance
type Blade struct {
config *config.Configuration
aws *config.AWSConfiguration
output *config.OutputConfiguration
cwl *cloudwatchlogs.CloudWatchLogs
cwl *cloudwatchlogs.Client
}

// NewBlade creates a new Blade with CloudWatchLogs instance from provided config
func NewBlade(
config *config.Configuration,
awsConfig *config.AWSConfiguration,
outputConfig *config.OutputConfiguration,
) *Blade {
blade := Blade{}
awsCfg := aws.Config{}

if awsConfig.Endpoint != "" {
awsCfg.Endpoint = &awsConfig.Endpoint
}

if awsConfig.Region != "" {
awsCfg.Region = &awsConfig.Region
}

awsSessionOpts := session.Options{
Config: awsCfg,
AssumeRoleTokenProvider: stscreds.StdinTokenProvider,
SharedConfigState: session.SharedConfigEnable,
) (*Blade, error) {
cfg, err := awsConfig.LoadConfig()
if err != nil {
return nil, fmt.Errorf("failed to load AWS config: %w", err)
}

if awsConfig.Profile != "" {
awsSessionOpts.Profile = awsConfig.Profile
blade := &Blade{
cwl: cloudwatchlogs.NewFromConfig(cfg),
config: config,
output: outputConfig,
}

sess := session.Must(session.NewSessionWithOptions(awsSessionOpts))

blade.cwl = cloudwatchlogs.New(sess)
blade.config = config
blade.output = outputConfig

return &blade
return blade, nil
}

// GetLogGroups gets the log groups from AWS given the blade configuration
func (b *Blade) GetLogGroups() []*cloudwatchlogs.LogGroup {
func (b *Blade) GetLogGroups(ctx context.Context) []types.LogGroup {
input := b.config.DescribeLogGroupsInput()
groups := make([]*cloudwatchlogs.LogGroup, 0)
b.cwl.DescribeLogGroupsPages(input, func(
out *cloudwatchlogs.DescribeLogGroupsOutput,
lastPage bool,
) bool {
for _, group := range out.LogGroups {
groups = append(groups, group)
groups := make([]types.LogGroup, 0)

paginator := cloudwatchlogs.NewDescribeLogGroupsPaginator(b.cwl, input)
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
fmt.Println("Error", err)
os.Exit(2)
}
return !lastPage
})
groups = append(groups, output.LogGroups...)
}
return groups
}

// GetLogStreams gets the log streams from AWS given the blade configuration
func (b *Blade) GetLogStreams() []*cloudwatchlogs.LogStream {
func (b *Blade) GetLogStreams(ctx context.Context) []types.LogStream {
input := b.config.DescribeLogStreamsInput()
streams := make([]*cloudwatchlogs.LogStream, 0)
b.cwl.DescribeLogStreamsPages(input, func(
out *cloudwatchlogs.DescribeLogStreamsOutput,
lastPage bool,
) bool {
for _, stream := range out.LogStreams {
streams = append(streams, stream)
}
return !lastPage
})
streams := make([]types.LogStream, 0)

paginator := cloudwatchlogs.NewDescribeLogStreamsPaginator(b.cwl, input)
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
fmt.Println("Error", err)
os.Exit(2)
}
streams = append(streams, output.LogStreams...)
}
return streams
}

// GetEvents gets events from AWS given the blade configuration
func (b *Blade) GetEvents() {
func (b *Blade) GetEvents(ctx context.Context) {
formatter := b.output.Formatter()
input := b.config.FilterLogEventsInput()

handlePage := func(page *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
for _, event := range page.Events {
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(b.cwl, input)
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
fmt.Println("Error", err)
os.Exit(2)
}

for _, event := range output.Events {
if b.output.Pretty {
fmt.Println(formatEvent(formatter, event))
} else {
fmt.Println(*event.Message)
fmt.Println(aws.ToString(event.Message))
}
}
return !lastPage
}
err := b.cwl.FilterLogEventsPages(input, handlePage)
if err != nil {
fmt.Println("Error", err)
os.Exit(2)
}
}

// StreamEvents continuously prints log events to the console
func (b *Blade) StreamEvents() {
func (b *Blade) StreamEvents(ctx context.Context) {
var lastSeenTime *int64
var seenEventIDs map[string]bool
seenEventIDs := make(map[string]bool)
formatter := b.output.Formatter()
input := b.config.FilterLogEventsInput()

clearSeenEventIds := func() {
seenEventIDs = make(map[string]bool, 0)
seenEventIDs = make(map[string]bool)
}

addSeenEventIDs := func(id *string) {
seenEventIDs[*id] = true
}
for {
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(b.cwl, input)
for paginator.HasMorePages() {
output, err := paginator.NextPage(ctx)
if err != nil {
fmt.Println("Error", err)
os.Exit(2)
}

updateLastSeenTime := func(ts *int64) {
if lastSeenTime == nil || *ts > *lastSeenTime {
lastSeenTime = ts
clearSeenEventIds()
}
}
for _, event := range output.Events {
timestamp := aws.ToInt64(event.Timestamp)
if lastSeenTime == nil || timestamp > *lastSeenTime {
lastSeenTime = &timestamp
clearSeenEventIds()
}

handlePage := func(page *cloudwatchlogs.FilterLogEventsOutput, lastPage bool) bool {
for _, event := range page.Events {
updateLastSeenTime(event.Timestamp)
if _, seen := seenEventIDs[*event.EventId]; !seen {
var message string
if b.output.Raw {
message = *event.Message
} else {
message = formatEvent(formatter, event)
eventID := aws.ToString(event.EventId)
if !seenEventIDs[eventID] {
var message string
if b.output.Raw {
message = aws.ToString(event.Message)
} else {
message = formatEvent(formatter, event)
}
message = strings.TrimRight(message, "\n")
fmt.Println(message)
seenEventIDs[eventID] = true
}
message = strings.TrimRight(message, "\n")
fmt.Println(message)
addSeenEventIDs(event.EventId)
}
}
return !lastPage
}

for {
err := b.cwl.FilterLogEventsPages(input, handlePage)
if err != nil {
fmt.Println("Error", err)
os.Exit(2)
}
if lastSeenTime != nil {
input.SetStartTime(*lastSeenTime)
input.StartTime = lastSeenTime
}
time.Sleep(1 * time.Second)
}
}

// formatEvent returns a CloudWatch log event as a formatted string using the provided formatter
func formatEvent(formatter *colorjson.Formatter, event *cloudwatchlogs.FilteredLogEvent) string {
func formatEvent(formatter *colorjson.Formatter, event types.FilteredLogEvent) string {
red := color.New(color.FgRed).SprintFunc()
white := color.New(color.FgWhite).SprintFunc()

str := aws.StringValue(event.Message)
str := aws.ToString(event.Message)
bytes := []byte(str)
date := aws.MillisecondsTimeValue(event.Timestamp)
date := time.Unix(0, aws.ToInt64(event.Timestamp)*int64(time.Millisecond))
dateStr := date.Format(time.RFC3339)
streamStr := aws.StringValue(event.LogStreamName)
streamStr := aws.ToString(event.LogStreamName)
jl := map[string]interface{}{}

if err := json.Unmarshal(bytes, &jl); err != nil {
Expand Down
14 changes: 11 additions & 3 deletions cmd/get.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"errors"
"fmt"
"os"
Expand All @@ -25,17 +26,24 @@ var getCommand = &cobra.Command{
},
Run: func(cmd *cobra.Command, args []string) {
getConfig.Group = args[0]
b := blade.NewBlade(&getConfig, &awsConfig, &getOutputConfig)
ctx := context.Background()

b, err := blade.NewBlade(&getConfig, &awsConfig, &getOutputConfig)
if err != nil {
fmt.Printf("Failed to initialize saw blade: %v\n", err)
os.Exit(1)
}

if getConfig.Prefix != "" {
streams := b.GetLogStreams()
streams := b.GetLogStreams(ctx)
if len(streams) == 0 {
fmt.Printf("No streams found in %s with prefix %s\n", getConfig.Group, getConfig.Prefix)
fmt.Printf("To view available streams: `saw streams %s`\n", getConfig.Group)
os.Exit(3)
}
getConfig.Streams = streams
}
b.GetEvents()
b.GetEvents(ctx)
},
}

Expand Down
12 changes: 10 additions & 2 deletions cmd/groups.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package cmd

import (
"context"
"fmt"
"os"

"github.com/TylerBrock/saw/blade"
"github.com/TylerBrock/saw/config"
Expand All @@ -16,8 +18,14 @@ var groupsCommand = &cobra.Command{
Short: "List log groups",
Long: "",
Run: func(cmd *cobra.Command, args []string) {
b := blade.NewBlade(&groupsConfig, &awsConfig, nil)
logGroups := b.GetLogGroups()
ctx := context.Background()
b, err := blade.NewBlade(&groupsConfig, &awsConfig, nil)
if err != nil {
fmt.Println("Error creating blade:", err)
os.Exit(1)
}

logGroups := b.GetLogGroups(ctx)
for _, group := range logGroups {
fmt.Println(*group.LogGroupName)
}
Expand Down
11 changes: 9 additions & 2 deletions cmd/streams.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package cmd

import (
"context"
"errors"
"fmt"
"os"

"github.com/TylerBrock/saw/blade"
"github.com/TylerBrock/saw/config"
Expand All @@ -23,9 +25,14 @@ var streamsCommand = &cobra.Command{
},
Run: func(cmd *cobra.Command, args []string) {
streamsConfig.Group = args[0]
b := blade.NewBlade(&streamsConfig, &awsConfig, nil)
ctx := context.Background()
b, err := blade.NewBlade(&streamsConfig, &awsConfig, nil)
if err != nil {
fmt.Println("Error creating blade:", err)
os.Exit(1)
}

logStreams := b.GetLogStreams()
logStreams := b.GetLogStreams(ctx)
for _, stream := range logStreams {
fmt.Println(*stream.LogStreamName)
}
Expand Down
12 changes: 9 additions & 3 deletions cmd/watch.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"context"
"errors"
"fmt"
"os"
Expand All @@ -26,17 +27,22 @@ var watchCommand = &cobra.Command{
},
Run: func(cmd *cobra.Command, args []string) {
watchConfig.Group = args[0]
b := blade.NewBlade(&watchConfig, &awsConfig, &watchOutputConfig)
ctx := context.Background()
b, err := blade.NewBlade(&watchConfig, &awsConfig, &watchOutputConfig)
if err != nil {
fmt.Println("Error creating blade:", err)
os.Exit(1)
}
if watchConfig.Prefix != "" {
streams := b.GetLogStreams()
streams := b.GetLogStreams(ctx)
if len(streams) == 0 {
fmt.Printf("No streams found in %s with prefix %s\n", watchConfig.Group, watchConfig.Prefix)
fmt.Printf("To view available streams: `saw streams %s`\n", watchConfig.Group)
os.Exit(3)
}
watchConfig.Streams = streams
}
b.StreamEvents()
b.StreamEvents(ctx)
},
}

Expand Down
Loading