diff --git a/docker/kubernetes-tentacle/bootstrapRunner/bootstrapRunner.go b/docker/kubernetes-tentacle/bootstrapRunner/bootstrapRunner.go index 0f7ce1007..8186d6ce0 100644 --- a/docker/kubernetes-tentacle/bootstrapRunner/bootstrapRunner.go +++ b/docker/kubernetes-tentacle/bootstrapRunner/bootstrapRunner.go @@ -9,6 +9,8 @@ import ( "time" ) +var line = 1 + // The bootstrapRunner applet is designed to execute a script in a specific folder // and format the script's output from stdout and stderr into the following format: // ||<"stdout" or "stderr">| @@ -31,46 +33,23 @@ func main() { doneStd := make(chan bool) doneErr := make(chan bool) - //stdout log file - so, err := os.Create(workspacePath + "/stdout.log") - if err != nil { - panic(err) - } - // close fo on exit and check for its returned error - defer func() { - if err := so.Close(); err != nil { - panic(err) - } - }() - // make a write buffer - stdoutLogFile := bufio.NewWriter(so) - - //stderr log file - se, err := os.Create(workspacePath + "/stderr.log") - if err != nil { - panic(err) - } - // close fo on exit and check for its returned error - defer func() { - if err := se.Close(); err != nil { - panic(err) - } - }() - // make a write buffer - stderrLogFile := bufio.NewWriter(se) - - go reader(stdOutScanner, stdoutLogFile, &doneStd) - go reader(stdErrScanner, stderrLogFile, &doneErr) - - err = cmd.Start() - if err != nil { - panic(err) - } + go reader(stdOutScanner, "stdout", &doneStd) + go reader(stdErrScanner, "stderr", &doneErr) + + Write("stdout", "##octopus[stdout-verbose]") + Write("stdout", "Kubernetes Script Pod started") + Write("stdout", "##octopus[stdout-default]") + + err := cmd.Start() // Wait for output buffering first <-doneStd <-doneErr + if err != nil { + panic(err) + } + err = cmd.Wait() var exitErr *exec.ExitError @@ -79,28 +58,25 @@ func main() { fmt.Fprintln(os.Stderr, "bootstrapRunner.go: Failed to execute bootstrap script", err) } - // Perform a final flush of the file buffers, just in case they didn't get flushed before - if err := stdoutLogFile.Flush(); err != nil { - fmt.Fprintln(os.Stderr, "bootstrapRunner.go: Failed to perform final flush of stdoutLogFile", err) - } - if err := stderrLogFile.Flush(); err != nil { - fmt.Fprintln(os.Stderr, "bootstrapRunner.go: Failed to perform final flush of stderrLogFile", err) - } - - os.Exit(cmd.ProcessState.ExitCode()) + exitCode := cmd.ProcessState.ExitCode() + + Write("stdout", "##octopus[stdout-verbose]") + Write("stdout", "Kubernetes Script Pod completed") + Write("stdout", "##octopus[stdout-default]") + + Write("stdout", fmt.Sprintf("EOS-075CD4F0-8C76-491D-BA76-0879D35E9CFE<<>>%d", exitCode)) + + os.Exit(exitCode) } -func reader(scanner *bufio.Scanner, writer *bufio.Writer, done *chan bool) { +func reader(scanner *bufio.Scanner, stream string, done *chan bool) { for scanner.Scan() { - message := fmt.Sprintf("%s|%s\n", time.Now().UTC().Format(time.RFC3339Nano), scanner.Text()) - fmt.Print(message) - if _, err := writer.WriteString(message); err != nil { - panic(err) - } - - if err := writer.Flush(); err != nil { - panic(err) - } + Write(stream, scanner.Text()) } *done <- true } + +func Write(stream string, text string) { + fmt.Printf("%d|%s|%s|%s\n", line, time.Now().UTC().Format(time.RFC3339Nano), stream, text) + line++ +} diff --git a/source/Octopus.Tentacle/Kubernetes/KubernetesClientExtensionMethods.cs b/source/Octopus.Tentacle/Kubernetes/KubernetesClientExtensionMethods.cs new file mode 100644 index 000000000..00725c4fd --- /dev/null +++ b/source/Octopus.Tentacle/Kubernetes/KubernetesClientExtensionMethods.cs @@ -0,0 +1,53 @@ +using System; +using System.IO; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using k8s.Autorest; + +namespace Octopus.Tentacle.Kubernetes +{ + public static class KubernetesClientExtensionMethods + { + public static async Task GetNamespacedPodLogsAsync(this k8s.Kubernetes client, string podName, string namespaceParameter, string container, DateTimeOffset? sinceTime = null, CancellationToken cancellationToken = default) + { + var url = $"api/v1/namespaces/{namespaceParameter}/pods/{podName}/log?container={container}"; + + if (sinceTime is not null) + { + var sinceTimeStr = sinceTime.Value.ToString("O"); + url += $"&sinceTime={Uri.EscapeDataString(sinceTimeStr)}"; + } + + url = string.Concat(client.BaseUri, url); + + var httpRequest = new HttpRequestMessage(HttpMethod.Get, url); + + if (client.Credentials is not null) + { + await client.Credentials.ProcessHttpRequestAsync(httpRequest, CancellationToken.None); + } + + var httpResponse = await client.HttpClient.SendAsync(httpRequest, HttpCompletionOption.ResponseHeadersRead, cancellationToken); + + if (httpResponse.IsSuccessStatusCode) + return await httpResponse.Content.ReadAsStreamAsync(); + + // an exception occurred, throw + var responseContent = httpResponse.Content != null + ? await httpResponse.Content.ReadAsStringAsync().ConfigureAwait(false) + : string.Empty; + + var ex = new HttpOperationException($"Operation returned an invalid status code '{httpResponse.StatusCode}', response body {responseContent}") + { + Request = new HttpRequestMessageWrapper(httpRequest, null), + Response = new HttpResponseMessageWrapper(httpResponse, responseContent) + }; + + httpRequest.Dispose(); + httpResponse.Dispose(); + + throw ex; + } + } +} \ No newline at end of file diff --git a/source/Octopus.Tentacle/Kubernetes/KubernetesLogService.cs b/source/Octopus.Tentacle/Kubernetes/KubernetesLogService.cs new file mode 100644 index 000000000..3331687b9 --- /dev/null +++ b/source/Octopus.Tentacle/Kubernetes/KubernetesLogService.cs @@ -0,0 +1,24 @@ +using System; +using System.IO; +using System.Threading; +using System.Threading.Tasks; + +namespace Octopus.Tentacle.Kubernetes +{ + public interface IKubernetesLogService + { + Task GetLogs(string podName, string namespaceParameter, string container, DateTimeOffset? sinceTime = null, CancellationToken cancellationToken = default); + } + + public class KubernetesLogService : KubernetesService, IKubernetesLogService + { + public KubernetesLogService(IKubernetesClientConfigProvider configProvider) : base(configProvider) + { + } + + public async Task GetLogs(string podName, string namespaceParameter, string container, DateTimeOffset? sinceTime = null, CancellationToken cancellationToken = default) + { + return await Client.GetNamespacedPodLogsAsync(podName, namespaceParameter, container, sinceTime, cancellationToken); + } + } +} \ No newline at end of file diff --git a/source/Octopus.Tentacle/Kubernetes/KubernetesModule.cs b/source/Octopus.Tentacle/Kubernetes/KubernetesModule.cs index b74a95fdc..bf04027bd 100644 --- a/source/Octopus.Tentacle/Kubernetes/KubernetesModule.cs +++ b/source/Octopus.Tentacle/Kubernetes/KubernetesModule.cs @@ -9,6 +9,7 @@ protected override void Load(ContainerBuilder builder) { builder.RegisterType().As().SingleInstance(); builder.RegisterType().As().SingleInstance(); + builder.RegisterType().As().SingleInstance(); builder.RegisterType().As().SingleInstance(); builder.RegisterType().As().SingleInstance(); builder.RegisterType().As().SingleInstance(); diff --git a/source/Octopus.Tentacle/Kubernetes/KubernetesPodService.cs b/source/Octopus.Tentacle/Kubernetes/KubernetesPodService.cs index 6a3ec31f7..f1074e241 100644 --- a/source/Octopus.Tentacle/Kubernetes/KubernetesPodService.cs +++ b/source/Octopus.Tentacle/Kubernetes/KubernetesPodService.cs @@ -25,7 +25,7 @@ public KubernetesPodService(IKubernetesClientConfigProvider configProvider) } public async Task TryGetPod(ScriptTicket scriptTicket, CancellationToken cancellationToken) => - await TryGetAsync(() => Client.ReadNamespacedPodAsync(scriptTicket.ToKubernetesScriptPobName(), KubernetesConfig.Namespace, cancellationToken: cancellationToken)); + await TryGetAsync(() => Client.ReadNamespacedPodAsync(scriptTicket.ToKubernetesScriptPodName(), KubernetesConfig.Namespace, cancellationToken: cancellationToken)); public async Task ListAllPodsAsync(CancellationToken cancellationToken) { @@ -84,6 +84,6 @@ public async Task Create(V1Pod pod, CancellationToken cancellationToken) } public async Task Delete(ScriptTicket scriptTicket, CancellationToken cancellationToken) - => await Client.DeleteNamespacedPodAsync(scriptTicket.ToKubernetesScriptPobName(), KubernetesConfig.Namespace, cancellationToken: cancellationToken); + => await Client.DeleteNamespacedPodAsync(scriptTicket.ToKubernetesScriptPodName(), KubernetesConfig.Namespace, cancellationToken: cancellationToken); } } \ No newline at end of file diff --git a/source/Octopus.Tentacle/Kubernetes/KubernetesScriptLog.cs b/source/Octopus.Tentacle/Kubernetes/KubernetesScriptLog.cs new file mode 100644 index 000000000..960055159 --- /dev/null +++ b/source/Octopus.Tentacle/Kubernetes/KubernetesScriptLog.cs @@ -0,0 +1,132 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Net; +using System.Text; +using k8s.Autorest; +using Newtonsoft.Json; +using Octopus.Tentacle.Contracts; +using Octopus.Tentacle.Diagnostics; +using Octopus.Tentacle.Kubernetes.Scripts; +using Octopus.Tentacle.Scripts; +using Octopus.Tentacle.Util; + +namespace Octopus.Tentacle.Kubernetes +{ + public class KubernetesScriptLog : IScriptLog + { + readonly IKubernetesLogService kubernetesLogService; + readonly SensitiveValueMasker sensitiveValueMasker; + readonly ScriptTicket scriptTicket; + readonly object sync = new object(); + + readonly List inMemoryTentacleLogs = new List(); + + public KubernetesScriptLog(IKubernetesLogService kubernetesLogService, SensitiveValueMasker sensitiveValueMasker, ScriptTicket scriptTicket) + { + this.kubernetesLogService = kubernetesLogService; + this.sensitiveValueMasker = sensitiveValueMasker; + this.scriptTicket = scriptTicket; + } + + public IScriptLogWriter CreateWriter() + { + return new KubernetesWriter(sync, sensitiveValueMasker, inMemoryTentacleLogs); + } + + public List GetOutput(long afterSequenceNumber, out long nextSequenceNumber) + { + var podName = scriptTicket.ToKubernetesScriptPodName(); + + Stream logStream; + + var writer = CreateWriter(); + try + { + //TODO: Only grab recent + logStream = kubernetesLogService.GetLogs(podName, KubernetesConfig.Namespace, podName).Result; + } + catch (AggregateException ex) + { + //writer.WriteOutput(ProcessOutputSource.Debug, "ABC123: " + ex.ToString()); + + if (ex.InnerExceptions.Single() is HttpOperationException httpOperationException && + httpOperationException.Response.StatusCode is HttpStatusCode.NotFound or HttpStatusCode.BadRequest) + { + nextSequenceNumber = afterSequenceNumber; + return new List(); + + } + + throw; + } + + var results = new List(); + nextSequenceNumber = afterSequenceNumber; + lock (sync) + { + using (var reader = new StreamReader(logStream)) + { + while (true) + { + var line = reader.ReadLineAsync().Result; + if (line.IsNullOrEmpty()) + { + var output = results.Concat(inMemoryTentacleLogs).OrderBy(m => m.Occurred).ToList(); + + inMemoryTentacleLogs.Clear(); + return output; + } + + var parsedLine = PodLogParser.ParseLine(writer, line!); + if (parsedLine != null) + { + if (parsedLine.LineNumber > afterSequenceNumber) + results.Add(new ProcessOutput(parsedLine.Source, parsedLine.Message, parsedLine.Occurred)); + + nextSequenceNumber = parsedLine.LineNumber; + } + } + } + } + } + + class KubernetesWriter : IScriptLogWriter + { + readonly object sync; + readonly SensitiveValueMasker sensitiveValueMasker; + readonly List processOutputs; + + public KubernetesWriter(object sync, SensitiveValueMasker sensitiveValueMasker, List processOutputs) + { + this.sync = sync; + this.sensitiveValueMasker = sensitiveValueMasker; + this.processOutputs = processOutputs; + } + + public void WriteOutput(ProcessOutputSource source, string message) + => WriteOutput(source, message, DateTimeOffset.UtcNow); + + public void WriteOutput(ProcessOutputSource source, string message, DateTimeOffset occurred) + { + lock (sync) + { + var output = new ProcessOutput(source, MaskSensitiveValues(message), occurred); + processOutputs.Add(output); + } + } + + string MaskSensitiveValues(string rawMessage) + { + string? maskedMessage = null; + sensitiveValueMasker.SafeSanitize(rawMessage, s => maskedMessage = s); + return maskedMessage ?? rawMessage; + } + + public void Dispose() + { + } + } + } +} diff --git a/source/Octopus.Tentacle/Kubernetes/KubernetesScriptPodNameExtensions.cs b/source/Octopus.Tentacle/Kubernetes/KubernetesScriptPodNameExtensions.cs index 9383a4bdd..9dc15cb3e 100644 --- a/source/Octopus.Tentacle/Kubernetes/KubernetesScriptPodNameExtensions.cs +++ b/source/Octopus.Tentacle/Kubernetes/KubernetesScriptPodNameExtensions.cs @@ -5,6 +5,6 @@ namespace Octopus.Tentacle.Kubernetes { public static class KubernetesScriptPodNameExtensions { - public static string ToKubernetesScriptPobName(this ScriptTicket scriptTicket) => $"octopus-script-{scriptTicket.TaskId}".ToLowerInvariant().Truncate(63); + public static string ToKubernetesScriptPodName(this ScriptTicket scriptTicket) => $"octopus-script-{scriptTicket.TaskId}".ToLowerInvariant().Truncate(63); } } \ No newline at end of file diff --git a/source/Octopus.Tentacle/Kubernetes/Scripts/KubernetesPodOutputStreamWriter.cs b/source/Octopus.Tentacle/Kubernetes/Scripts/KubernetesPodOutputStreamWriter.cs index ba837ab4e..40d2d3d92 100644 --- a/source/Octopus.Tentacle/Kubernetes/Scripts/KubernetesPodOutputStreamWriter.cs +++ b/source/Octopus.Tentacle/Kubernetes/Scripts/KubernetesPodOutputStreamWriter.cs @@ -1,144 +1,130 @@ -using System; -using System.Collections.Generic; -using System.IO; -using System.Linq; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using Octopus.Tentacle.Contracts; -using Octopus.Tentacle.Scripts; -using Octopus.Tentacle.Util; - -namespace Octopus.Tentacle.Kubernetes.Scripts -{ - public class KubernetesPodOutputStreamWriter - { - readonly IScriptWorkspace workspace; - long lastStdOutOffset; - long lastStdErrOffset; - - public KubernetesPodOutputStreamWriter(IScriptWorkspace workspace) - { - this.workspace = workspace; - } - - public async Task StreamPodLogsToScriptLog(IScriptLogWriter writer, CancellationToken cancellationToken, bool isFinalRead = false) - { - try - { - //open the file streams for reading - using var stdOutStream = await SafelyOpenLogStreamReader("stdout.log", cancellationToken); - using var stdErrStream = await SafelyOpenLogStreamReader("stderr.log", cancellationToken); - - //if either of these is null, just return - if (stdOutStream is null || stdErrStream is null) - return; - - // This loop is exited when either the cancellation token is cancelled (which is when the pod is finished or the script is cancelled) - // or if this is the final read, at the end (so we read once and jump out) - while (true) - { - if (cancellationToken.IsCancellationRequested) - break; - - //Read both the stdout and stderr log files - var stdOutReadTask = ReadLogFileTail(writer, stdOutStream, ProcessOutputSource.StdOut, lastStdOutOffset); - var stdErrReadTask = ReadLogFileTail(writer, stdErrStream, ProcessOutputSource.StdErr, lastStdErrOffset); - - //wait for them to both complete - await Task.WhenAll(stdOutReadTask, stdErrReadTask); - - //store the final offsets - lastStdOutOffset = stdOutReadTask.Result.FinalOffset; - lastStdErrOffset = stdErrReadTask.Result.FinalOffset; - - //stitch the log lines together and order by occurred, then write to actual log - var orderedLogLines = stdOutReadTask.Result.Logs - .Concat(stdErrReadTask.Result.Logs) - .OrderBy(ll => ll.Occurred); - - //write all the read log lines to the output script log - foreach (var logLine in orderedLogLines) - { - writer.WriteOutput(logLine.Source, logLine.Message, logLine.Occurred); - } - - //wait for 250ms before reading the logs again (except on the final read) - if (!isFinalRead) - { - await Task.Delay(TimeSpan.FromMilliseconds(250), cancellationToken); - } - else - { - //if this is the last read we need to jump out (and not spin forever) - break; - } - } - } - catch (TaskCanceledException) - { - //ignore all task cancelled exceptions as they may be thrown by the pod finishing (and thus signally) - } - } - - async Task SafelyOpenLogStreamReader(string filename, CancellationToken cancellationToken) - { - while (true) - { - if (cancellationToken.IsCancellationRequested) - return null; - - try - { - //we expect that we will receive a number of FileNotFoundException's while the pod is spinning up - //Eventually the file should exist - return new StreamReader(workspace.OpenFileStreamForReading(filename), Encoding.UTF8); - } - catch (FileNotFoundException) - { - //wait for 50ms before reading the logs again - await Task.Delay(TimeSpan.FromMilliseconds(50), cancellationToken); - } - } - } - - record LogLine(ProcessOutputSource Source, string Message, DateTimeOffset Occurred); - - static async Task<(IEnumerable Logs, long FinalOffset)> ReadLogFileTail(IScriptLogWriter writer, StreamReader reader, ProcessOutputSource source, long lastOffset) - { - if (reader.BaseStream.Length == lastOffset) - return (Enumerable.Empty(), lastOffset); - - reader.BaseStream.Seek(lastOffset, SeekOrigin.Begin); - - var newLines = new List(); - string? line; - do - { - line = await reader.ReadLineAsync(); - if (line.IsNullOrEmpty()) - break; - - var logParts = line!.Split(new[] { '|' }, 2); - - if (logParts.Length != 2) - { - writer.WriteOutput(ProcessOutputSource.StdErr, $"Invalid log line detected. '{line}' is not correctly pipe-delimited."); - continue; - } - - //part 1 is the datetimeoffset - if (!DateTimeOffset.TryParse(logParts[0], out var occurred)) - { - writer.WriteOutput(ProcessOutputSource.StdErr, $"Failed to parse '{logParts[0]}' as a DateTimeOffset. Using DateTimeOffset.UtcNow."); - occurred = DateTimeOffset.UtcNow; - } - - //add the new line - newLines.Add(new LogLine(source, logParts[1], occurred)); - } while (!line.IsNullOrEmpty()); - - return (newLines, reader.BaseStream.Position); - } - } -} \ No newline at end of file +// using System; +// using System.Collections.Generic; +// using System.IO; +// using System.Linq; +// using System.Text; +// using System.Threading; +// using System.Threading.Tasks; +// using Octopus.Tentacle.Contracts; +// using Octopus.Tentacle.Scripts; +// using Octopus.Tentacle.Util; +// +// namespace Octopus.Tentacle.Kubernetes.Scripts +// { +// public class KubernetesPodOutputStreamWriter +// { +// readonly IScriptWorkspace workspace; +// long lastStdOutOffset; +// long lastStdErrOffset; +// +// public KubernetesPodOutputStreamWriter(IScriptWorkspace workspace) +// { +// this.workspace = workspace; +// } +// +// public async Task StreamPodLogsToScriptLog(IScriptLogWriter writer, CancellationToken cancellationToken, bool isFinalRead = false) +// { +// try +// { +// //open the file streams for reading +// using var stdOutStream = await SafelyOpenLogStreamReader("stdout.log", cancellationToken); +// using var stdErrStream = await SafelyOpenLogStreamReader("stderr.log", cancellationToken); +// +// //if either of these is null, just return +// if (stdOutStream is null || stdErrStream is null) +// return; +// +// // This loop is exited when either the cancellation token is cancelled (which is when the pod is finished or the script is cancelled) +// // or if this is the final read, at the end (so we read once and jump out) +// while (true) +// { +// if (cancellationToken.IsCancellationRequested) +// break; +// +// //Read both the stdout and stderr log files +// var stdOutReadTask = ReadLogFileTail(writer, stdOutStream, ProcessOutputSource.StdOut, lastStdOutOffset); +// var stdErrReadTask = ReadLogFileTail(writer, stdErrStream, ProcessOutputSource.StdErr, lastStdErrOffset); +// +// //wait for them to both complete +// await Task.WhenAll(stdOutReadTask, stdErrReadTask); +// +// //store the final offsets +// lastStdOutOffset = stdOutReadTask.Result.FinalOffset; +// lastStdErrOffset = stdErrReadTask.Result.FinalOffset; +// +// //stitch the log lines together and order by occurred, then write to actual log +// var orderedLogLines = stdOutReadTask.Result.Logs +// .Concat(stdErrReadTask.Result.Logs) +// .OrderBy(ll => ll.Occurred); +// +// //write all the read log lines to the output script log +// foreach (var logLine in orderedLogLines) +// { +// writer.WriteOutput(logLine.Source, logLine.Message, logLine.Occurred); +// } +// +// //wait for 250ms before reading the logs again (except on the final read) +// if (!isFinalRead) +// { +// await Task.Delay(TimeSpan.FromMilliseconds(250), cancellationToken); +// } +// else +// { +// //if this is the last read we need to jump out (and not spin forever) +// break; +// } +// } +// } +// catch (TaskCanceledException) +// { +// //ignore all task cancelled exceptions as they may be thrown by the pod finishing (and thus signally) +// } +// } +// +// async Task SafelyOpenLogStreamReader(string filename, CancellationToken cancellationToken) +// { +// while (true) +// { +// if (cancellationToken.IsCancellationRequested) +// return null; +// +// try +// { +// //we expect that we will receive a number of FileNotFoundException's while the pod is spinning up +// //Eventually the file should exist +// return new StreamReader(workspace.OpenFileStreamForReading(filename), Encoding.UTF8); +// } +// catch (FileNotFoundException) +// { +// //wait for 50ms before reading the logs again +// await Task.Delay(TimeSpan.FromMilliseconds(50), cancellationToken); +// } +// } +// } +// +// static async Task<(IEnumerable Logs, long FinalOffset)> ReadLogFileTail(IScriptLogWriter writer, StreamReader reader, ProcessOutputSource source, long lastOffset) +// { +// if (reader.BaseStream.Length == lastOffset) +// return (Enumerable.Empty(), lastOffset); +// +// reader.BaseStream.Seek(lastOffset, SeekOrigin.Begin); +// +// var newLines = new List(); +// string? line; +// do +// { +// line = await reader.ReadLineAsync(); +// if (line.IsNullOrEmpty()) +// break; +// +// var logLine = PodLogParser.ParseLine(writer, line!); +// if (logLine == null) +// continue; +// +// newLines.Add(logLine); +// } while (!line.IsNullOrEmpty()); +// +// return (newLines, reader.BaseStream.Position); +// } +// } +// } \ No newline at end of file diff --git a/source/Octopus.Tentacle/Kubernetes/Scripts/KubernetesPodScriptExecutor.cs b/source/Octopus.Tentacle/Kubernetes/Scripts/KubernetesPodScriptExecutor.cs index f70a6413c..50359797d 100644 --- a/source/Octopus.Tentacle/Kubernetes/Scripts/KubernetesPodScriptExecutor.cs +++ b/source/Octopus.Tentacle/Kubernetes/Scripts/KubernetesPodScriptExecutor.cs @@ -2,6 +2,7 @@ using System.Threading; using System.Threading.Tasks; using Octopus.Tentacle.Contracts.ScriptServiceV3Alpha; +using Octopus.Tentacle.Diagnostics; using Octopus.Tentacle.Scripts; namespace Octopus.Tentacle.Kubernetes.Scripts @@ -9,10 +10,14 @@ namespace Octopus.Tentacle.Kubernetes.Scripts public class KubernetesPodScriptExecutor : IScriptExecutor { readonly RunningKubernetesPod.Factory runningKubernetesPodFactory; + readonly SensitiveValueMasker sensitiveValueMasker; + readonly IKubernetesLogService kubernetesLogService; - public KubernetesPodScriptExecutor(RunningKubernetesPod.Factory runningKubernetesPodFactory) + public KubernetesPodScriptExecutor(RunningKubernetesPod.Factory runningKubernetesPodFactory, SensitiveValueMasker sensitiveValueMasker, IKubernetesLogService kubernetesLogService) { this.runningKubernetesPodFactory = runningKubernetesPodFactory; + this.sensitiveValueMasker = sensitiveValueMasker; + this.kubernetesLogService = kubernetesLogService; } public bool CanExecute(StartScriptCommandV3Alpha command) => command.ExecutionContext is KubernetesAgentScriptExecutionContext; @@ -21,7 +26,7 @@ public IRunningScript ExecuteOnBackgroundThread(StartScriptCommandV3Alpha comman { var runningScript = runningKubernetesPodFactory( workspace, - workspace.CreateLog(), + new KubernetesScriptLog( kubernetesLogService,sensitiveValueMasker, command.ScriptTicket), command.ScriptTicket, command.TaskId, scriptStateStore, diff --git a/source/Octopus.Tentacle/Kubernetes/Scripts/PodLogParser.cs b/source/Octopus.Tentacle/Kubernetes/Scripts/PodLogParser.cs new file mode 100644 index 000000000..49cf88a91 --- /dev/null +++ b/source/Octopus.Tentacle/Kubernetes/Scripts/PodLogParser.cs @@ -0,0 +1,48 @@ +using System; +using Octopus.Tentacle.Contracts; +using Octopus.Tentacle.Scripts; + +namespace Octopus.Tentacle.Kubernetes.Scripts +{ + record LogLine(int LineNumber, ProcessOutputSource Source, string Message, DateTimeOffset Occurred); + + static class PodLogParser + { + public static LogLine? ParseLine(IScriptLogWriter writer, string line) + { + var logParts = line.Split(new[] { '|' }, 4); + + if (logParts.Length != 4) + { + writer.WriteOutput(ProcessOutputSource.StdErr, $"Invalid log line detected. '{line}' is not correctly pipe-delimited."); + return null; + } + + if (!int.TryParse(logParts[0], out int lineNumber)) + { + writer.WriteOutput(ProcessOutputSource.StdErr, $"Invalid log line detected. '{logParts[0]}' is not a valid line number."); + return null; + } + + if (!DateTimeOffset.TryParse(logParts[1], out var occurred)) + { + writer.WriteOutput(ProcessOutputSource.StdErr, $"Failed to parse '{logParts[1]}' as a DateTimeOffset. Using DateTimeOffset.UtcNow."); + occurred = DateTimeOffset.UtcNow; + } + + if (!Enum.TryParse(logParts[2], true, out ProcessOutputSource source)) + { + writer.WriteOutput(ProcessOutputSource.StdErr, $"Invalid log line detected. '{logParts[2]}' is not a valid source."); + return null; + } + + //add the new line + var message = logParts[3]; + + if (message.StartsWith("EOS-075CD4F0-8C76-491D-BA76-0879D35E9CFE")) + source = ProcessOutputSource.Debug; + + return new LogLine(lineNumber, source, message, occurred); + } + } +} \ No newline at end of file diff --git a/source/Octopus.Tentacle/Kubernetes/Scripts/RunningKubernetesPod.cs b/source/Octopus.Tentacle/Kubernetes/Scripts/RunningKubernetesPod.cs index 18f8acf7c..20a5c73ba 100644 --- a/source/Octopus.Tentacle/Kubernetes/Scripts/RunningKubernetesPod.cs +++ b/source/Octopus.Tentacle/Kubernetes/Scripts/RunningKubernetesPod.cs @@ -42,7 +42,7 @@ public delegate RunningKubernetesPod Factory( readonly KubernetesAgentScriptExecutionContext executionContext; readonly CancellationToken scriptCancellationToken; readonly string? instanceName; - readonly KubernetesPodOutputStreamWriter outputStreamWriter; + //readonly KubernetesPodOutputStreamWriter outputStreamWriter; readonly string podName; public int ExitCode { get; private set; } @@ -77,12 +77,12 @@ public RunningKubernetesPod(IScriptWorkspace workspace, ScriptLog = scriptLog; instanceName = appInstanceSelector.Current.InstanceName; - outputStreamWriter = new KubernetesPodOutputStreamWriter(workspace); + //outputStreamWriter = new KubernetesPodOutputStreamWriter(workspace); State = ProcessState.Pending; // this doesn't change, so build it once - podName = scriptTicket.ToKubernetesScriptPobName(); + podName = scriptTicket.ToKubernetesScriptPodName(); } public async Task Execute() @@ -173,12 +173,12 @@ async Task MonitorPodAndLogs(IScriptLogWriter writer) var checkPodTask = CheckIfPodHasCompleted(podCompletionCancellationTokenSource); //we pass the pod completion CTS here because its used to cancel the writing of the pod stream - var monitorPodOutputTask = outputStreamWriter.StreamPodLogsToScriptLog(writer, podCompletionCancellationTokenSource.Token); + //var monitorPodOutputTask = outputStreamWriter.StreamPodLogsToScriptLog(writer, podCompletionCancellationTokenSource.Token); - await Task.WhenAll(checkPodTask, monitorPodOutputTask); + await checkPodTask; //once they have both finished, perform one last log read (and don't cancel on it) - await outputStreamWriter.StreamPodLogsToScriptLog(writer, CancellationToken.None, true); + //await outputStreamWriter.StreamPodLogsToScriptLog(writer, CancellationToken.None, true); //return the exit code of the pod return checkPodTask.Result; diff --git a/source/Octopus.Tentacle/Services/Scripts/ScriptServiceV3Alpha.cs b/source/Octopus.Tentacle/Services/Scripts/ScriptServiceV3Alpha.cs index e0e5f0638..6b1e737ac 100644 --- a/source/Octopus.Tentacle/Services/Scripts/ScriptServiceV3Alpha.cs +++ b/source/Octopus.Tentacle/Services/Scripts/ScriptServiceV3Alpha.cs @@ -1,11 +1,13 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using Octopus.Diagnostics; using Octopus.Tentacle.Contracts; using Octopus.Tentacle.Contracts.ScriptServiceV3Alpha; +using Octopus.Tentacle.Kubernetes; using Octopus.Tentacle.Scripts; using Octopus.Tentacle.Util; @@ -126,14 +128,16 @@ async Task GetResponse(ScriptTicket ticket, long la await Task.CompletedTask; var workspace = workspaceFactory.GetWorkspace(ticket); - var scriptLog = runningScript?.ScriptLog ?? workspace.CreateLog(); - var logs = scriptLog.GetOutput(lastLogSequence, out var next); if (runningScript != null) { + var scriptLog = runningScript.ScriptLog; + var logs = scriptLog.GetOutput(lastLogSequence, out var next); return new ScriptStatusResponseV3Alpha(ticket, runningScript.State, runningScript.ExitCode, logs, next); } + var emptyLogs = new List(); + // If we don't have a RunningProcess we check the ScriptStateStore to see if we have persisted a script result var scriptStateStore = scriptStateStoreFactory.Create(workspace); if (scriptStateStore.Exists()) @@ -146,10 +150,10 @@ async Task GetResponse(ScriptTicket ticket, long la scriptStateStore.Save(scriptState); } - return new ScriptStatusResponseV3Alpha(ticket, scriptState.State, scriptState.ExitCode ?? ScriptExitCodes.UnknownResultExitCode, logs, next); + return new ScriptStatusResponseV3Alpha(ticket, scriptState.State, scriptState.ExitCode ?? ScriptExitCodes.UnknownResultExitCode, emptyLogs, lastLogSequence); } - return new ScriptStatusResponseV3Alpha(ticket, ProcessState.Complete, ScriptExitCodes.UnknownScriptExitCode, logs, next); + return new ScriptStatusResponseV3Alpha(ticket, ProcessState.Complete, ScriptExitCodes.UnknownScriptExitCode, emptyLogs, lastLogSequence); } public bool IsRunningScript(ScriptTicket ticket)