Skip to content

Commit

Permalink
Read Script Pod logs from K8s API Server (#867)
Browse files Browse the repository at this point in the history
  • Loading branch information
APErebus authored Apr 4, 2024
1 parent 4516360 commit 6805b21
Show file tree
Hide file tree
Showing 17 changed files with 813 additions and 833 deletions.
98 changes: 44 additions & 54 deletions docker/kubernetes-tentacle/bootstrapRunner/bootstrapRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,15 @@ import (
"fmt"
"os"
"os/exec"
"sync"
"time"
)

type SafeCounter struct {
Mutex sync.Mutex
Value int
}

// The bootstrapRunner applet is designed to execute a script in a specific folder
// and format the script's output from stdout and stderr into the following format:
// <line number>|<RFC3339Nano date time>|<"stdout" or "stderr">|<original string>
Expand All @@ -18,6 +24,9 @@ import (
//
// Note: all arguments given after the <script> argument are passed directly to the script as arguments.
func main() {

lineCounter := SafeCounter{Value: 1, Mutex: sync.Mutex{}}

workspacePath := os.Args[1]
args := os.Args[2:]
cmd := exec.Command("bash", args[0:]...)
Expand All @@ -31,46 +40,23 @@ func main() {
doneStd := make(chan bool)
doneErr := make(chan bool)

//stdout log file
so, err := os.Create(workspacePath + "/stdout.log")
if err != nil {
panic(err)
}
// close fo on exit and check for its returned error
defer func() {
if err := so.Close(); err != nil {
panic(err)
}
}()
// make a write buffer
stdoutLogFile := bufio.NewWriter(so)

//stderr log file
se, err := os.Create(workspacePath + "/stderr.log")
if err != nil {
panic(err)
}
// close fo on exit and check for its returned error
defer func() {
if err := se.Close(); err != nil {
panic(err)
}
}()
// make a write buffer
stderrLogFile := bufio.NewWriter(se)

go reader(stdOutScanner, stdoutLogFile, &doneStd)
go reader(stdErrScanner, stderrLogFile, &doneErr)

err = cmd.Start()
if err != nil {
panic(err)
}
go reader(stdOutScanner, "stdout", &doneStd, &lineCounter)
go reader(stdErrScanner, "stderr", &doneErr, &lineCounter)

Write("stdout", "##octopus[stdout-verbose]", &lineCounter)
Write("stdout", "Kubernetes Script Pod started", &lineCounter)
Write("stdout", "##octopus[stdout-default]", &lineCounter)

err := cmd.Start()

// Wait for output buffering first
<-doneStd
<-doneErr

if err != nil {
panic(err)
}

err = cmd.Wait()

var exitErr *exec.ExitError
Expand All @@ -79,28 +65,32 @@ func main() {
fmt.Fprintln(os.Stderr, "bootstrapRunner.go: Failed to execute bootstrap script", err)
}

// Perform a final flush of the file buffers, just in case they didn't get flushed before
if err := stdoutLogFile.Flush(); err != nil {
fmt.Fprintln(os.Stderr, "bootstrapRunner.go: Failed to perform final flush of stdoutLogFile", err)
}
if err := stderrLogFile.Flush(); err != nil {
fmt.Fprintln(os.Stderr, "bootstrapRunner.go: Failed to perform final flush of stderrLogFile", err)
}

os.Exit(cmd.ProcessState.ExitCode())
exitCode := cmd.ProcessState.ExitCode()

Write("stdout", "##octopus[stdout-verbose]", &lineCounter)
Write("stdout", "Kubernetes Script Pod completed", &lineCounter)
Write("stdout", "##octopus[stdout-default]", &lineCounter)

//TODO: Add this back to speed things up
//Write("stdout", fmt.Sprintf("EOS-075CD4F0-8C76-491D-BA76-0879D35E9CFE<<>>%d", exitCode))

os.Exit(exitCode)
}

func reader(scanner *bufio.Scanner, writer *bufio.Writer, done *chan bool) {
func reader(scanner *bufio.Scanner, stream string, done *chan bool, counter *SafeCounter) {
for scanner.Scan() {
message := fmt.Sprintf("%s|%s\n", time.Now().UTC().Format(time.RFC3339Nano), scanner.Text())
fmt.Print(message)
if _, err := writer.WriteString(message); err != nil {
panic(err)
}

if err := writer.Flush(); err != nil {
panic(err)
}
Write(stream, scanner.Text(), counter)
}
*done <- true
}

func Write(stream string, text string, counter *SafeCounter) {
//Use a mutex to prevent race conditions updating the line number
//https://go.dev/tour/concurrency/9
counter.Mutex.Lock()

fmt.Printf("%d|%s|%s|%s\n", counter.Value, time.Now().UTC().Format(time.RFC3339Nano), stream, text)
counter.Value++

counter.Mutex.Unlock()
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task ExistingPodIsUpdatedWhenCompleted()
}
};

podService.ListAllPodsAsync(Arg.Any<CancellationToken>())
podService.ListAllPods(Arg.Any<CancellationToken>())
.Returns(new V1PodList
{
Items = new List<V1Pod>
Expand Down Expand Up @@ -137,7 +137,7 @@ public async Task ExistingPodIsUpdatedWhenFailed()
}
};

podService.ListAllPodsAsync(Arg.Any<CancellationToken>())
podService.ListAllPods(Arg.Any<CancellationToken>())
.Returns(new V1PodList
{
Items = new List<V1Pod>
Expand Down Expand Up @@ -199,7 +199,7 @@ public async Task StopTrackingPodWhenDeleted()
}
};

podService.ListAllPodsAsync(Arg.Any<CancellationToken>())
podService.ListAllPods(Arg.Any<CancellationToken>())
.Returns(new V1PodList
{
Items = new List<V1Pod>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using System;
using System.Linq;
using FluentAssertions;
using NUnit.Framework;
using Octopus.Tentacle.Contracts;
using Octopus.Tentacle.Kubernetes;

namespace Octopus.Tentacle.Tests.Kubernetes
{
[TestFixture]
public class PodLogLineParserFixture
{
[TestCase("a|b|c", Reason = "Doesn't have 4 parts")]
public void NotCorrectlyPipeDelimited(string line)
{
var result = PodLogLineParser.ParseLine(line);
result.Error.Should().Contain("delimited");
}

[TestCase("a|b|c|d", Reason = "Not a line number")]
public void FirstPartIsNotALineNumber(string line)
{
var result = PodLogLineParser.ParseLine(line);
result.Error.Should().Contain("line number");
}

[TestCase("1|b|c|d", Reason = "Not a date")]
public void SecondPartIsNotALineDate(string line)
{
var result = PodLogLineParser.ParseLine(line);
result.Error.Should().Contain("DateTimeOffset");
}

[TestCase("1|2024-04-03T06:03:10.501025551Z|c|d", Reason = "Not a valid source")]
public void ThirdPartIsNotAValidSource(string line)
{
var result = PodLogLineParser.ParseLine(line);
result.Error.Should().Contain("source");
}

[Test]
public void SimpleMessage()
{
var logLine = PodLogLineParser.ParseLine("123|2024-04-03T06:03:10.501025551Z|stdout|This is the message").LogLine;
logLine.Should().NotBeNull();

logLine.LineNumber.Should().Be(123);
logLine.Source.Should().Be(ProcessOutputSource.StdOut);
logLine.Message.Should().Be("This is the message");
logLine.Occurred.Should().BeCloseTo(new DateTimeOffset(2024, 4, 3, 6, 3, 10, 501, TimeSpan.Zero), TimeSpan.FromMilliseconds(1));
}

[Test]
public void ServiceMessage()
{
var logLine = PodLogLineParser.ParseLine("123|2024-04-03T06:03:10.501025551Z|stdout|##octopus[stdout-verbose]").LogLine;
logLine.Should().NotBeNull();

logLine.LineNumber.Should().Be(123);
logLine.Source.Should().Be(ProcessOutputSource.StdOut);
logLine.Message.Should().Be("##octopus[stdout-verbose]");
logLine.Occurred.Should().BeCloseTo(new DateTimeOffset(2024, 4, 3, 6, 3, 10, 501, TimeSpan.Zero), TimeSpan.FromMilliseconds(1));
}

[Test]
public void ErrorMessage()
{
var logLine = PodLogLineParser.ParseLine("123|2024-04-03T06:03:10.501025551Z|stderr|Error!").LogLine;
logLine.Should().NotBeNull();

logLine.LineNumber.Should().Be(123);
logLine.Source.Should().Be(ProcessOutputSource.StdErr);
logLine.Message.Should().Be("Error!");
logLine.Occurred.Should().BeCloseTo(new DateTimeOffset(2024, 4, 3, 6, 3, 10, 501, TimeSpan.Zero), TimeSpan.FromMilliseconds(1));
}

[Test]
public void MessageHasPipeInIt()
{
var logLine = PodLogLineParser.ParseLine("123|2024-04-03T06:03:10.501025551Z|stdout|This is the me|ss|age").LogLine;
logLine.Should().NotBeNull();

logLine.LineNumber.Should().Be(123);
logLine.Source.Should().Be(ProcessOutputSource.StdOut);
logLine.Message.Should().Be("This is the me|ss|age");
logLine.Occurred.Should().BeCloseTo(new DateTimeOffset(2024, 4, 3, 6, 3, 10, 501, TimeSpan.Zero), TimeSpan.FromMilliseconds(1));
}
}
}
111 changes: 111 additions & 0 deletions source/Octopus.Tentacle.Tests/Kubernetes/PodLogReaderFixture.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using FluentAssertions;
using NUnit.Framework;
using Octopus.Tentacle.Contracts;
using Octopus.Tentacle.Kubernetes;

namespace Octopus.Tentacle.Tests.Kubernetes
{
[TestFixture]
public class PodLogReaderFixture
{
[TestCase(0, Reason = "Initial position")]
[TestCase(4, Reason = "Subsequent position")]
[TestCase(12387126, Reason = "Large position")]
public async Task NoLines_SameSequenceNumber(long lastLogSequence)
{
string[] podLines = Array.Empty<string>();

var reader = SetupReader(podLines);
var result = await PodLogReader.ReadPodLogs(lastLogSequence, reader);
result.NextSequenceNumber.Should().Be(lastLogSequence);
result.Lines.Should().BeEmpty();
}

[Test]
public async Task FirstLine_SequenceNumberIncreasesByOne()
{
string[] podLines = {
"1|2024-04-03T06:03:10.517865655Z|stdout|Kubernetes Script Pod completed",
};

var reader = SetupReader(podLines);
var result = await PodLogReader.ReadPodLogs(0, reader);
result.NextSequenceNumber.Should().Be(1);
result.Lines.Should().BeEquivalentTo(new[]
{
new ProcessOutput(ProcessOutputSource.StdOut, "Kubernetes Script Pod completed", DateTimeOffset.Parse("2024-04-03T06:03:10.517865655Z"))
});
}

[Test]
public async Task ThreeSubsequentLines_SequenceNumberIncreasesByThree()
{
string[] podLines = {
"5|2024-04-03T06:03:10.517857755Z|stdout|##octopus[stdout-verbose]",
"6|2024-04-03T06:03:10.517865655Z|stderr|Kubernetes Script Pod completed",
"7|2024-04-03T06:03:10.517867355Z|stdout|##octopus[stdout-default]"
};

var reader = SetupReader(podLines);
var result = await PodLogReader.ReadPodLogs(4, reader);
result.NextSequenceNumber.Should().Be(7);
result.Lines.Should().BeEquivalentTo(new[]
{
new ProcessOutput(ProcessOutputSource.StdOut, "##octopus[stdout-verbose]", DateTimeOffset.Parse("2024-04-03T06:03:10.517857755Z")),
new ProcessOutput(ProcessOutputSource.StdErr, "Kubernetes Script Pod completed", DateTimeOffset.Parse("2024-04-03T06:03:10.517865655Z")),
new ProcessOutput(ProcessOutputSource.StdOut, "##octopus[stdout-default]", DateTimeOffset.Parse("2024-04-03T06:03:10.517867355Z")),
});
}

[Test]
public async Task StreamContainsPreviousLines_Deduplicates()
{
string[] podLines = {
"5|2024-04-03T06:03:10.517857755Z|stdout|##octopus[stdout-verbose]",
"6|2024-04-03T06:03:10.517865655Z|stderr|Kubernetes Script Pod completed",
"7|2024-04-03T06:03:10.517867355Z|stdout|##octopus[stdout-default]"
};

var allTaskLogs = new List<ProcessOutput>();
var reader = SetupReader(podLines.Take(1).ToArray());
var result = await PodLogReader.ReadPodLogs(4, reader);
result.NextSequenceNumber.Should().Be(5);
allTaskLogs.AddRange(result.Lines);

reader = SetupReader(podLines.ToArray());
result = await PodLogReader.ReadPodLogs(5, reader);
result.NextSequenceNumber.Should().Be(7);
allTaskLogs.AddRange(result.Lines);

allTaskLogs.Should().BeEquivalentTo(new[]
{
new ProcessOutput(ProcessOutputSource.StdOut, "##octopus[stdout-verbose]", DateTimeOffset.Parse("2024-04-03T06:03:10.517857755Z")),
new ProcessOutput(ProcessOutputSource.StdErr, "Kubernetes Script Pod completed", DateTimeOffset.Parse("2024-04-03T06:03:10.517865655Z")),
new ProcessOutput(ProcessOutputSource.StdOut, "##octopus[stdout-default]", DateTimeOffset.Parse("2024-04-03T06:03:10.517867355Z")),
});
}

[Test]
public async Task MissingLine_Throws()
{
string[] podLines = {
"100|2024-04-03T06:03:10.517865655Z|stdout|Kubernetes Script Pod completed",
};

var reader = SetupReader(podLines);
Func<Task> action = async () => await PodLogReader.ReadPodLogs(50, reader);
await action.Should().ThrowAsync<MissingPodLogException>();
}

static StreamReader SetupReader(params string[] lines)
{
return new StreamReader(new MemoryStream(Encoding.Default.GetBytes(string.Join("\n", lines))));
}
}
}
Loading

0 comments on commit 6805b21

Please sign in to comment.