Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Frank/podlogs #837

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 30 additions & 54 deletions docker/kubernetes-tentacle/bootstrapRunner/bootstrapRunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"time"
)

var line = 1

// The bootstrapRunner applet is designed to execute a script in a specific folder
// and format the script's output from stdout and stderr into the following format:
// <line number>|<RFC3339Nano date time>|<"stdout" or "stderr">|<original string>
Expand All @@ -31,46 +33,23 @@ func main() {
doneStd := make(chan bool)
doneErr := make(chan bool)

//stdout log file
so, err := os.Create(workspacePath + "/stdout.log")
if err != nil {
panic(err)
}
// close fo on exit and check for its returned error
defer func() {
if err := so.Close(); err != nil {
panic(err)
}
}()
// make a write buffer
stdoutLogFile := bufio.NewWriter(so)

//stderr log file
se, err := os.Create(workspacePath + "/stderr.log")
if err != nil {
panic(err)
}
// close fo on exit and check for its returned error
defer func() {
if err := se.Close(); err != nil {
panic(err)
}
}()
// make a write buffer
stderrLogFile := bufio.NewWriter(se)

go reader(stdOutScanner, stdoutLogFile, &doneStd)
go reader(stdErrScanner, stderrLogFile, &doneErr)

err = cmd.Start()
if err != nil {
panic(err)
}
go reader(stdOutScanner, "stdout", &doneStd)
go reader(stdErrScanner, "stderr", &doneErr)

Write("stdout", "##octopus[stdout-verbose]")
Write("stdout", "Kubernetes Script Pod started")
Write("stdout", "##octopus[stdout-default]")

err := cmd.Start()

// Wait for output buffering first
<-doneStd
<-doneErr

if err != nil {
panic(err)
}

err = cmd.Wait()

var exitErr *exec.ExitError
Expand All @@ -79,28 +58,25 @@ func main() {
fmt.Fprintln(os.Stderr, "bootstrapRunner.go: Failed to execute bootstrap script", err)
}

// Perform a final flush of the file buffers, just in case they didn't get flushed before
if err := stdoutLogFile.Flush(); err != nil {
fmt.Fprintln(os.Stderr, "bootstrapRunner.go: Failed to perform final flush of stdoutLogFile", err)
}
if err := stderrLogFile.Flush(); err != nil {
fmt.Fprintln(os.Stderr, "bootstrapRunner.go: Failed to perform final flush of stderrLogFile", err)
}
os.Exit(cmd.ProcessState.ExitCode())
exitCode := cmd.ProcessState.ExitCode()

Write("stdout", "##octopus[stdout-verbose]")
Write("stdout", "Kubernetes Script Pod completed")
Write("stdout", "##octopus[stdout-default]")

Write("stdout", fmt.Sprintf("EOS-075CD4F0-8C76-491D-BA76-0879D35E9CFE<<>>%d", exitCode))

os.Exit(exitCode)
}

func reader(scanner *bufio.Scanner, writer *bufio.Writer, done *chan bool) {
func reader(scanner *bufio.Scanner, stream string, done *chan bool) {
for scanner.Scan() {
message := fmt.Sprintf("%s|%s\n", time.Now().UTC().Format(time.RFC3339Nano), scanner.Text())
fmt.Print(message)
if _, err := writer.WriteString(message); err != nil {
panic(err)
}

if err := writer.Flush(); err != nil {
panic(err)
}
Write(stream, scanner.Text())
}
*done <- true
}

func Write(stream string, text string) {
fmt.Printf("%d|%s|%s|%s\n", line, time.Now().UTC().Format(time.RFC3339Nano), stream, text)
line++
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
using System;
using System.IO;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using k8s.Autorest;

namespace Octopus.Tentacle.Kubernetes
{
public static class KubernetesClientExtensionMethods
{
public static async Task<Stream> GetNamespacedPodLogsAsync(this k8s.Kubernetes client, string podName, string namespaceParameter, string container, DateTimeOffset? sinceTime = null, CancellationToken cancellationToken = default)
{
var url = $"api/v1/namespaces/{namespaceParameter}/pods/{podName}/log?container={container}";

if (sinceTime is not null)
{
var sinceTimeStr = sinceTime.Value.ToString("O");
url += $"&sinceTime={Uri.EscapeDataString(sinceTimeStr)}";
}

url = string.Concat(client.BaseUri, url);

var httpRequest = new HttpRequestMessage(HttpMethod.Get, url);

if (client.Credentials is not null)
{
await client.Credentials.ProcessHttpRequestAsync(httpRequest, CancellationToken.None);
}

var httpResponse = await client.HttpClient.SendAsync(httpRequest, HttpCompletionOption.ResponseHeadersRead, cancellationToken);

if (httpResponse.IsSuccessStatusCode)
return await httpResponse.Content.ReadAsStreamAsync();

// an exception occurred, throw
var responseContent = httpResponse.Content != null
? await httpResponse.Content.ReadAsStringAsync().ConfigureAwait(false)
: string.Empty;

var ex = new HttpOperationException($"Operation returned an invalid status code '{httpResponse.StatusCode}', response body {responseContent}")
{
Request = new HttpRequestMessageWrapper(httpRequest, null),
Response = new HttpResponseMessageWrapper(httpResponse, responseContent)
};

httpRequest.Dispose();
httpResponse.Dispose();

throw ex;
}
}
}
24 changes: 24 additions & 0 deletions source/Octopus.Tentacle/Kubernetes/KubernetesLogService.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

namespace Octopus.Tentacle.Kubernetes
{
public interface IKubernetesLogService
{
Task<Stream> GetLogs(string podName, string namespaceParameter, string container, DateTimeOffset? sinceTime = null, CancellationToken cancellationToken = default);
}

public class KubernetesLogService : KubernetesService, IKubernetesLogService
{
public KubernetesLogService(IKubernetesClientConfigProvider configProvider) : base(configProvider)
{
}

public async Task<Stream> GetLogs(string podName, string namespaceParameter, string container, DateTimeOffset? sinceTime = null, CancellationToken cancellationToken = default)
{
return await Client.GetNamespacedPodLogsAsync(podName, namespaceParameter, container, sinceTime, cancellationToken);
}
}
}
1 change: 1 addition & 0 deletions source/Octopus.Tentacle/Kubernetes/KubernetesModule.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ protected override void Load(ContainerBuilder builder)
{
builder.RegisterType<KubernetesPodService>().As<IKubernetesPodService>().SingleInstance();
builder.RegisterType<KubernetesClusterService>().As<IKubernetesClusterService>().SingleInstance();
builder.RegisterType<KubernetesLogService>().As<IKubernetesLogService>().SingleInstance();
builder.RegisterType<KubernetesPodContainerResolver>().As<IKubernetesPodContainerResolver>().SingleInstance();
builder.RegisterType<KubernetesConfigMapService>().As<IKubernetesConfigMapService>().SingleInstance();
builder.RegisterType<KubernetesSecretService>().As<IKubernetesSecretService>().SingleInstance();
Expand Down
4 changes: 2 additions & 2 deletions source/Octopus.Tentacle/Kubernetes/KubernetesPodService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public KubernetesPodService(IKubernetesClientConfigProvider configProvider)
}

public async Task<V1Pod?> TryGetPod(ScriptTicket scriptTicket, CancellationToken cancellationToken) =>
await TryGetAsync(() => Client.ReadNamespacedPodAsync(scriptTicket.ToKubernetesScriptPobName(), KubernetesConfig.Namespace, cancellationToken: cancellationToken));
await TryGetAsync(() => Client.ReadNamespacedPodAsync(scriptTicket.ToKubernetesScriptPodName(), KubernetesConfig.Namespace, cancellationToken: cancellationToken));

public async Task<V1PodList> ListAllPodsAsync(CancellationToken cancellationToken)
{
Expand Down Expand Up @@ -84,6 +84,6 @@ public async Task Create(V1Pod pod, CancellationToken cancellationToken)
}

public async Task Delete(ScriptTicket scriptTicket, CancellationToken cancellationToken)
=> await Client.DeleteNamespacedPodAsync(scriptTicket.ToKubernetesScriptPobName(), KubernetesConfig.Namespace, cancellationToken: cancellationToken);
=> await Client.DeleteNamespacedPodAsync(scriptTicket.ToKubernetesScriptPodName(), KubernetesConfig.Namespace, cancellationToken: cancellationToken);
}
}
132 changes: 132 additions & 0 deletions source/Octopus.Tentacle/Kubernetes/KubernetesScriptLog.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using k8s.Autorest;
using Newtonsoft.Json;
using Octopus.Tentacle.Contracts;
using Octopus.Tentacle.Diagnostics;
using Octopus.Tentacle.Kubernetes.Scripts;
using Octopus.Tentacle.Scripts;
using Octopus.Tentacle.Util;

namespace Octopus.Tentacle.Kubernetes
{
public class KubernetesScriptLog : IScriptLog
{
readonly IKubernetesLogService kubernetesLogService;
readonly SensitiveValueMasker sensitiveValueMasker;
readonly ScriptTicket scriptTicket;
readonly object sync = new object();

readonly List<ProcessOutput> inMemoryTentacleLogs = new List<ProcessOutput>();

public KubernetesScriptLog(IKubernetesLogService kubernetesLogService, SensitiveValueMasker sensitiveValueMasker, ScriptTicket scriptTicket)
{
this.kubernetesLogService = kubernetesLogService;
this.sensitiveValueMasker = sensitiveValueMasker;
this.scriptTicket = scriptTicket;
}

public IScriptLogWriter CreateWriter()
{
return new KubernetesWriter(sync, sensitiveValueMasker, inMemoryTentacleLogs);
}

public List<ProcessOutput> GetOutput(long afterSequenceNumber, out long nextSequenceNumber)
{
var podName = scriptTicket.ToKubernetesScriptPodName();

Stream logStream;

var writer = CreateWriter();
try
{
//TODO: Only grab recent
logStream = kubernetesLogService.GetLogs(podName, KubernetesConfig.Namespace, podName).Result;
}
catch (AggregateException ex)
{
//writer.WriteOutput(ProcessOutputSource.Debug, "ABC123: " + ex.ToString());

if (ex.InnerExceptions.Single() is HttpOperationException httpOperationException &&
httpOperationException.Response.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.BadRequest)
{
nextSequenceNumber = afterSequenceNumber;
return new List<ProcessOutput>();

}

throw;
}

var results = new List<ProcessOutput>();
nextSequenceNumber = afterSequenceNumber;
lock (sync)
{
using (var reader = new StreamReader(logStream))
{
while (true)
{
var line = reader.ReadLineAsync().Result;
if (line.IsNullOrEmpty())
{
var output = results.Concat(inMemoryTentacleLogs).OrderBy(m => m.Occurred).ToList();

inMemoryTentacleLogs.Clear();
return output;
}

var parsedLine = PodLogParser.ParseLine(writer, line!);
if (parsedLine != null)
{
if (parsedLine.LineNumber > afterSequenceNumber)
results.Add(new ProcessOutput(parsedLine.Source, parsedLine.Message, parsedLine.Occurred));

nextSequenceNumber = parsedLine.LineNumber;
}
}
}
}
}

class KubernetesWriter : IScriptLogWriter
{
readonly object sync;
readonly SensitiveValueMasker sensitiveValueMasker;
readonly List<ProcessOutput> processOutputs;

public KubernetesWriter(object sync, SensitiveValueMasker sensitiveValueMasker, List<ProcessOutput> processOutputs)
{
this.sync = sync;
this.sensitiveValueMasker = sensitiveValueMasker;
this.processOutputs = processOutputs;
}

public void WriteOutput(ProcessOutputSource source, string message)
=> WriteOutput(source, message, DateTimeOffset.UtcNow);

public void WriteOutput(ProcessOutputSource source, string message, DateTimeOffset occurred)
{
lock (sync)
{
var output = new ProcessOutput(source, MaskSensitiveValues(message), occurred);
processOutputs.Add(output);
}
}

string MaskSensitiveValues(string rawMessage)
{
string? maskedMessage = null;
sensitiveValueMasker.SafeSanitize(rawMessage, s => maskedMessage = s);
return maskedMessage ?? rawMessage;
}

public void Dispose()
{
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace Octopus.Tentacle.Kubernetes
{
public static class KubernetesScriptPodNameExtensions
{
public static string ToKubernetesScriptPobName(this ScriptTicket scriptTicket) => $"octopus-script-{scriptTicket.TaskId}".ToLowerInvariant().Truncate(63);
public static string ToKubernetesScriptPodName(this ScriptTicket scriptTicket) => $"octopus-script-{scriptTicket.TaskId}".ToLowerInvariant().Truncate(63);
}
}
Loading