Skip to content

Commit

Permalink
Stop serialization if queue is full
Browse files Browse the repository at this point in the history
Fixes the excessive number of exceptions from being thrown when Bosun is down
  • Loading branch information
bretcope committed Jun 15, 2016
1 parent e09106c commit 6808bce
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 37 deletions.
9 changes: 5 additions & 4 deletions BosunReporter/Exceptions.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
using System;
using System.Net;
using BosunReporter.Infrastructure;

namespace BosunReporter
{
public class BosunPostException : Exception
{
public BosunPostException(HttpStatusCode statusCode, string responseBody, Exception innerException)
internal BosunPostException(HttpStatusCode statusCode, string responseBody, Exception innerException)
: base("Posting to the Bosun API failed with status code " + statusCode, innerException)
{
Data["ResponseBody"] = responseBody;
}

public BosunPostException(Exception innerException)
internal BosunPostException(Exception innerException)
: base("Posting to the Bosun API failed. Bosun did not respond.", innerException)
{
}
Expand All @@ -22,8 +23,8 @@ public class BosunQueueFullException : Exception
public int MetricsCount { get; }
public int Bytes { get; }

public BosunQueueFullException(int metricsCount, int bytes)
: base("Bosun metric queue is full. Metric data is likely being lost due to repeated failures in posting to the Bosun API.")
internal BosunQueueFullException(QueueType queueType, int metricsCount, int bytes)
: base($"Bosun {queueType} metric queue is full. Metric data is likely being lost due to repeated failures in posting to the Bosun API.")
{
MetricsCount = metricsCount;
Bytes = bytes;
Expand Down
16 changes: 13 additions & 3 deletions BosunReporter/Infrastructure/PayloadQueue.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading;

namespace BosunReporter.Infrastructure
{
internal enum QueueType
{
Local,
ExternalCounters,
}

internal class PayloadQueue
{
private int _cacheLimit = 1;
Expand All @@ -17,12 +21,18 @@ internal class PayloadQueue
internal int MaxPendingPayloads { get; set; }
internal int LastBatchPayloadCount { get; private set; }
internal int DroppedPayloads { get; private set; }
internal QueueType Type { get; }

internal int PendingPayloadsCount => _pendingPayloads.Count;
internal bool IsFull => PendingPayloadsCount >= MaxPendingPayloads;

internal event Action<BosunQueueFullException> PayloadDropped;

internal PayloadQueue(QueueType type)
{
Type = type;
}

internal MetricWriter GetWriter()
{
// sure, we could keep a cache of writers, but seriously, it's one object per serialization interval
Expand Down Expand Up @@ -64,7 +74,7 @@ internal void AddPendingPayload(Payload payload)
}
else
{
ex = new BosunQueueFullException(payload.MetricsCount, payload.Used);
ex = new BosunQueueFullException(Type, payload.MetricsCount, payload.Used);
DroppedPayloads++;
ReleasePayload(payload);
}
Expand Down
55 changes: 25 additions & 30 deletions BosunReporter/MetricsCollector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ public int MaxPayloadSize

public MetricsCollector(BosunOptions options)
{
_localMetricsQueue = new PayloadQueue();
_externalCounterQueue = new PayloadQueue();
_localMetricsQueue = new PayloadQueue(QueueType.Local);
_externalCounterQueue = new PayloadQueue(QueueType.ExternalCounters);

_localMetricsQueue.PayloadDropped += OnPayloadDropped;
_externalCounterQueue.PayloadDropped += OnPayloadDropped;
Expand Down Expand Up @@ -729,39 +729,34 @@ private void SerializeMetrics(out int metricsCount, out int bytesWritten)

metricsCount = 0;
bytesWritten = 0;
MetricWriter localWriter = null;
MetricWriter externalCounterWriter = null;
try
{
if (_localMetrics.Count > 0)
{
localWriter = _localMetricsQueue.GetWriter();
SerializeMetricListToWriter(localWriter, _localMetrics, timestamp);
metricsCount += localWriter.MetricsCount;
bytesWritten += localWriter.TotalBytesWritten;
}

if (_externalCounterMetrics.Count > 0)
{
externalCounterWriter = _externalCounterQueue.GetWriter();
SerializeMetricListToWriter(externalCounterWriter, _externalCounterMetrics, timestamp);
metricsCount += externalCounterWriter.MetricsCount;
bytesWritten += externalCounterWriter.TotalBytesWritten;
}
}
finally
{
localWriter?.EndBatch();
externalCounterWriter?.EndBatch();
}
SerializeMetricListToWriter(_localMetricsQueue, _localMetrics, timestamp, ref metricsCount, ref bytesWritten);
SerializeMetricListToWriter(_externalCounterQueue, _externalCounterMetrics, timestamp, ref metricsCount, ref bytesWritten);
}
}

private static void SerializeMetricListToWriter(MetricWriter writer, List<BosunMetric> metrics, DateTime timestamp)
private static void SerializeMetricListToWriter(PayloadQueue queue, List<BosunMetric> metrics, DateTime timestamp, ref int metricsCount, ref int bytesWritten)
{
foreach (var m in metrics)
if (metrics.Count == 0)
return;

var writer = queue.GetWriter();

try
{
foreach (var m in metrics)
{
m.SerializeInternal(writer, timestamp);

if (queue.IsFull)
break;
}

metricsCount += writer.MetricsCount;
bytesWritten += writer.TotalBytesWritten;
}
finally
{
m.SerializeInternal(writer, timestamp);
writer.EndBatch();
}
}

Expand Down

0 comments on commit 6808bce

Please sign in to comment.