Skip to content

Commit 447dfc9

Browse files
authored
Add support for scripted hash updates (#117)
* Add support for scripted hash updates * Include a hash of the channel options
1 parent 162202f commit 447dfc9

File tree

8 files changed

+429
-17
lines changed

8 files changed

+429
-17
lines changed

src/Elastic.Ingest.Elasticsearch/Catalog/CatalogIndexChannel.cs

Lines changed: 78 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,40 @@ public class CatalogIndexChannelOptionsBase<TDocument>(ITransport transport) : I
2828
/// <inheritdoc cref="CatalogIndexChannel{TDocument}" />
2929
public class CatalogIndexChannelOptions<TDocument>(ITransport transport) : CatalogIndexChannelOptionsBase<TDocument>(transport)
3030
{
31+
private readonly Func<string>? _getMapping;
32+
3133
/// A function that returns the mapping for <typeparamref name="TDocument"/>.
32-
public Func<string>? GetMapping { get; init; }
34+
public Func<string>? GetMapping
35+
{
36+
get => _getMapping;
37+
init
38+
{
39+
_getMapping = value;
40+
_channelHash = HashedBulkUpdate.CreateHash(base.ChannelHash, ActiveSearchAlias,
41+
_getMapping?.Invoke() ?? string.Empty, _getMapping?.Invoke() ?? string.Empty
42+
);
43+
}
44+
}
45+
46+
private readonly Func<string>? _getMappingSettings;
3347

3448
/// A function that returns settings to accompany <see cref="GetMapping"/>.
35-
public Func<string>? GetMappingSettings { get; init; }
49+
public Func<string>? GetMappingSettings
50+
{
51+
get => _getMappingSettings;
52+
init
53+
{
54+
_getMappingSettings = value;
55+
_channelHash = HashedBulkUpdate.CreateHash(base.ChannelHash, ActiveSearchAlias,
56+
_getMapping?.Invoke() ?? string.Empty, _getMapping?.Invoke() ?? string.Empty
57+
);
58+
}
59+
}
60+
61+
private readonly string _channelHash = string.Empty;
62+
63+
/// <inheritdoc />
64+
public override string ChannelHash => string.IsNullOrEmpty(_channelHash) ? base.ChannelHash : _channelHash;
3665
}
3766

3867
/// <inheritdoc cref="CatalogIndexChannel{TDocument}" />
@@ -61,7 +90,7 @@ public class CatalogIndexChannel<TDocument, TChannelOptions> : IndexChannel<TDoc
6190
where TChannelOptions : CatalogIndexChannelOptionsBase<TDocument>
6291
where TDocument : class
6392
{
64-
private readonly string _url;
93+
private string _url;
6594

6695
/// <inheritdoc cref="CatalogIndexChannel{TDocument}"/>
6796
public CatalogIndexChannel(TChannelOptions options, ICollection<IChannelCallbacks<TDocument, BulkResponse>>? callbackListeners = null)
@@ -82,7 +111,7 @@ public CatalogIndexChannel(TChannelOptions options, ICollection<IChannelCallback
82111

83112
/// The index name used for indexing. This the configured <see cref="IndexChannelOptions{TDocument}.IndexFormat"/> to compute a single index name for all operations.
84113
/// <para>Since this is catalog data and not time series data, all data needs to end up in a single index</para>
85-
public string IndexName { get; }
114+
public string IndexName { get; private set; }
86115

87116
/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.CreateBulkOperationHeader"/>
88117
protected override BulkOperationHeader CreateBulkOperationHeader(TDocument @event) =>
@@ -97,6 +126,38 @@ protected override BulkOperationHeader CreateBulkOperationHeader(TDocument @even
97126
/// <inheritdoc cref="ElasticsearchChannelBase{TEvent,TChannelOptions}.AlwaysBootstrapComponentTemplates"/>
98127
protected override bool AlwaysBootstrapComponentTemplates => true;
99128

129+
/// <inheritdoc />
130+
public override async Task<bool> BootstrapElasticsearchAsync(BootstrapMethod bootstrapMethod, string? ilmPolicy = null, CancellationToken ctx = default)
131+
{
132+
if (Options.ScriptedHashBulkUpsertLookup is null)
133+
return await base.BootstrapElasticsearchAsync(bootstrapMethod, ilmPolicy, ctx).ConfigureAwait(false);
134+
var latestAlias = string.Format(Options.IndexFormat, "latest");
135+
var matchingIndices = string.Format(Options.IndexFormat, "*");
136+
var currentIndex = await ShouldRemovePreviousAliasAsync(matchingIndices, latestAlias, ctx).ConfigureAwait(false);
137+
if (string.IsNullOrEmpty(currentIndex))
138+
return await base.BootstrapElasticsearchAsync(bootstrapMethod, ilmPolicy, ctx).ConfigureAwait(false);
139+
140+
IndexName = currentIndex;
141+
_url = $"{IndexName}/{base.BulkPathAndQuery}";
142+
return await base.BootstrapElasticsearchAsync(bootstrapMethod, ilmPolicy, ctx).ConfigureAwait(false);
143+
}
144+
145+
/// <inheritdoc />
146+
public override bool BootstrapElasticsearch(BootstrapMethod bootstrapMethod, string? ilmPolicy = null)
147+
{
148+
if (Options.ScriptedHashBulkUpsertLookup is null)
149+
return base.BootstrapElasticsearch(bootstrapMethod, ilmPolicy);
150+
var latestAlias = string.Format(Options.IndexFormat, "latest");
151+
var matchingIndices = string.Format(Options.IndexFormat, "*");
152+
var currentIndex = ShouldRemovePreviousAlias(matchingIndices, latestAlias);
153+
if (string.IsNullOrEmpty(currentIndex))
154+
return base.BootstrapElasticsearch(bootstrapMethod, ilmPolicy);
155+
156+
IndexName = currentIndex;
157+
_url = $"{IndexName}/{base.BulkPathAndQuery}";
158+
return base.BootstrapElasticsearch(bootstrapMethod, ilmPolicy);
159+
}
160+
100161
/// Applies the latest alias to the index.
101162
public async Task<bool> ApplyLatestAliasAsync(CancellationToken ctx = default)
102163
{
@@ -192,6 +253,12 @@ private async Task<string> ShouldRemovePreviousAliasAsync(string matchingIndices
192253
.Trim(Environment.NewLine.ToCharArray());
193254
return hasPreviousVersions.ApiCallDetails.HttpStatusCode == 200 ? queryAliasIndex : string.Empty;
194255
}
256+
private string ShouldRemovePreviousAlias(string matchingIndices, string alias)
257+
{
258+
var hasPreviousVersions = Options.Transport.Head($"{matchingIndices}?allow_no_indices=false");
259+
var queryAliasIndex = Cat($"_cat/aliases/{alias}?h=index").Trim(Environment.NewLine.ToCharArray());
260+
return hasPreviousVersions.ApiCallDetails.HttpStatusCode == 200 ? queryAliasIndex : string.Empty;
261+
}
195262

196263
private async Task<string> CatAsync(string url, CancellationToken ctx)
197264
{
@@ -201,6 +268,13 @@ private async Task<string> CatAsync(string url, CancellationToken ctx)
201268
return catResponse.Body;
202269
}
203270

271+
private string Cat(string url)
272+
{
273+
var rq = new RequestConfiguration { Accept = "text/plain" };
274+
var catResponse = Options.Transport.Request<StringResponse>(new EndpointPath(HttpMethod.GET, url), postData: null, null, rq);
275+
return catResponse.Body;
276+
}
277+
204278

205279
/// Applies the latest and current aliases to <see cref="IndexName"/>. Use this if you want to ensure that the latest index is always the active index
206280
/// immediately after writing to the index.

src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ protected override bool Retry(BulkResponse response)
5353
/// <summary>
5454
/// The URL for the bulk request.
5555
/// </summary>
56-
protected virtual string BulkPathAndQuery => "_bulk?filter_path=error,items.*.status,items.*.error";
56+
protected virtual string BulkPathAndQuery => "_bulk?filter_path=error,items.*.status,items.*.error,items.*.result,items.*._version";
5757

5858
/// <inheritdoc cref="ResponseItemsBufferedChannelBase{TChannelOptions,TEvent,TResponse,TBulkResponseItem}.RetryAllItems"/>
5959
protected override bool RetryAllItems(BulkResponse response) => response.ApiCallDetails.HttpStatusCode == 429;

src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelStatics.cs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,17 @@ internal static class ElasticsearchChannelStatics
1717
public static readonly byte[] DocUpdateHeaderStart = "{\"doc_as_upsert\": true, \"doc\": "u8.ToArray();
1818
public static readonly byte[] DocUpdateHeaderEnd = " }"u8.ToArray();
1919

20+
public static readonly byte[] ScriptedHashUpsertStart =
21+
"{ \"scripted_upsert\": true, \"upsert\": {}, \"script\": { \"source\": \"if (ctx.op != 'create') { if (ctx._source."u8.ToArray();
22+
23+
public static readonly byte[] ScriptedHashUpsertMiddle =
24+
" == params.hash ) { ctx.op = 'noop' } } ctx._source = params.doc\", \"params\": { \"hash\": "u8.ToArray();
25+
26+
public static readonly byte[] ScriptedHashUpsertDocPreamble =
27+
", \"doc\":"u8.ToArray();
28+
29+
public static readonly byte[] ScriptedHashUpsertEnd = " } } }"u8.ToArray();
30+
2031
public static readonly HashSet<int> RetryStatusCodes = [502, 503, 504, 429];
2132

2233
public static readonly JsonSerializerOptions SerializerOptions = new ()

src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs

Lines changed: 67 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,36 @@
22
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information
44
using System;
5+
using System.Security.Cryptography;
6+
using System.Text;
57
using Elastic.Transport;
68

79
namespace Elastic.Ingest.Elasticsearch.Indices;
810

11+
/// <summary>
12+
/// For scripted hash bulk upserts returns per operation the field and the hash to use for the scripted upsert
13+
/// </summary>
14+
/// <param name="Field">The field to check the previous hash against</param>
15+
/// <param name="Hash">The current hash of the document</param>
16+
public record HashedBulkUpdate(string Field, string Hash)
17+
{
18+
/// <summary>
19+
/// A short SHA256 hash of the provided <paramref name="components"/>
20+
/// </summary>
21+
/// <param name="components"></param>
22+
/// <returns></returns>
23+
public static string CreateHash(params string[] components)
24+
{
25+
#if NET8_0_OR_GREATER
26+
return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(string.Join("", components))))[..8];
27+
#else
28+
var sha = SHA256.Create();
29+
var hash = sha.ComputeHash(Encoding.UTF8.GetBytes(string.Join("", components)));
30+
return BitConverter.ToString(hash).Replace("-", "")[..8];
31+
#endif
32+
}
33+
};
34+
935
/// <summary>
1036
/// Provides options to <see cref="IndexChannel{TEvent}"/> to control how and where data gets written to Elasticsearch
1137
/// </summary>
@@ -15,27 +41,47 @@ public class IndexChannelOptions<TEvent> : ElasticsearchChannelOptionsBase<TEven
1541
/// <inheritdoc cref="IndexChannelOptions{TEvent}"/>
1642
public IndexChannelOptions(ITransport transport) : base(transport) { }
1743

44+
private string _indexFormat = $"{typeof(TEvent).Name.ToLowerInvariant()}-{{0:yyyy.MM.dd}}";
45+
1846
/// <summary>
1947
/// Gets or sets the format string for the Elastic search index. The current <c>DateTimeOffset</c> is passed as parameter
2048
/// 0.
2149
/// <para> Defaults to "<typeparamref name="TEvent"/>.Name.ToLowerInvariant()-{0:yyyy.MM.dd}"</para>
2250
/// <para> If no {0} parameter is defined the index name is effectively fixed</para>
2351
/// </summary>
24-
public virtual string IndexFormat { get; set; } = $"{typeof(TEvent).Name.ToLowerInvariant()}-{{0:yyyy.MM.dd}}";
52+
public virtual string IndexFormat
53+
{
54+
get => _indexFormat;
55+
set
56+
{
57+
_indexFormat = value;
58+
_channelHash = HashedBulkUpdate.CreateHash(_indexFormat, _indexOffset?.ToString() ?? string.Empty);
59+
}
60+
}
61+
62+
private TimeSpan? _indexOffset;
2563

2664
/// <summary>
2765
/// Gets or sets the offset to use for the index <c>DateTimeOffset</c>. The default value is null, which uses the system local
2866
/// offset. Use "00:00" for UTC.
2967
/// </summary>
30-
public TimeSpan? IndexOffset { get; set; }
68+
public TimeSpan? IndexOffset
69+
{
70+
get => _indexOffset;
71+
set
72+
{
73+
_indexOffset = value;
74+
_channelHash = HashedBulkUpdate.CreateHash(_indexFormat, _indexOffset?.ToString() ?? string.Empty);
75+
}
76+
}
3177

3278
/// <summary>
3379
/// Provide a per document <c>DateTimeOffset</c> to be used as the date passed as parameter 0 to <see cref="IndexFormat"/>
3480
/// </summary>
3581
public Func<TEvent, DateTimeOffset?>? TimestampLookup { get; set; }
3682

3783
/// <summary>
38-
/// If the document provides an I, D, this allows you to set a per document `_id`.
84+
/// If the document provides an ID, this allows you to set a per document `_id`.
3985
/// <para>If an `_id` is defined, an `_index` bulk operation will be created.</para>
4086
/// <para>Otherwise (the default) `_create` bulk operation will be issued for the document.</para>
4187
/// <para>Read more about bulk operations here:</para>
@@ -52,8 +98,26 @@ public IndexChannelOptions(ITransport transport) : base(transport) { }
5298
/// </summary>
5399
public Func<TEvent, string, bool>? BulkUpsertLookup { get; set; }
54100

101+
/// <summary>
102+
/// Uses the callback provided to <see cref="BulkOperationIdLookup"/> to determine if this is in fact an update operation
103+
/// <para>Returns the field and the hash to use for the scripted upsert </para>
104+
/// <para>The string passed to the callback is the hash of the current channel options that can be used to hash bust in case of channel option changes</para>
105+
/// <para>Otherwise (the default) `index` bulk operation will be issued for the document.</para>
106+
/// <para>Read more about bulk operations here:</para>
107+
/// <para>https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-api-request-body</para>
108+
/// </summary>
109+
public Func<TEvent, string, HashedBulkUpdate>? ScriptedHashBulkUpsertLookup { get; set; }
110+
55111
/// <summary>
56112
/// Control the operation header for each bulk operation.
57113
/// </summary>
58114
public OperationMode OperationMode { get; set; }
115+
116+
private string _channelHash = HashedBulkUpdate.CreateHash($"{typeof(TEvent).Name.ToLowerInvariant()}-{{0:yyyy.MM.dd}}");
117+
/// <summary>
118+
/// A hash of the channel options. This can be used to hash bust in case of channel option changes.
119+
/// It's also passed to <see cref="ScriptedHashBulkUpsertLookup"/>
120+
/// </summary>
121+
public virtual string ChannelHash => _channelHash;
122+
59123
}

src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,12 @@
55
using System.Collections.Generic;
66
using System.Text.Json;
77
using System.Text.Json.Serialization;
8+
using Elastic.Ingest.Elasticsearch.Indices;
89
using Elastic.Transport.Products.Elasticsearch;
910

1011
namespace Elastic.Ingest.Elasticsearch.Serialization;
1112

12-
/// <summary> Represents the _bulk operation meta header </summary>
13+
/// <summary> Represents the _bulk operation meta-header </summary>
1314
public abstract class BulkOperationHeader
1415
{
1516
/// <summary> The index or data stream to write to </summary>
@@ -25,7 +26,7 @@ public abstract class BulkOperationHeader
2526
public bool? RequireAlias { get; init; }
2627
}
2728

28-
/// <summary> Represents the _bulk create operation meta header </summary>
29+
/// <summary> Represents the _bulk create operation meta-header </summary>
2930
[JsonConverter(typeof(BulkOperationHeaderConverter<CreateOperation>))]
3031
public class CreateOperation : BulkOperationHeader
3132
{
@@ -34,7 +35,7 @@ public class CreateOperation : BulkOperationHeader
3435
public Dictionary<string, string>? DynamicTemplates { get; init; }
3536
}
3637

37-
/// <summary> Represents the _bulk index operation meta header </summary>
38+
/// <summary> Represents the _bulk index operation meta-header </summary>
3839
[JsonConverter(typeof(BulkOperationHeaderConverter<IndexOperation>))]
3940
public class IndexOperation : BulkOperationHeader
4041
{
@@ -43,16 +44,20 @@ public class IndexOperation : BulkOperationHeader
4344
public Dictionary<string, string>? DynamicTemplates { get; init; }
4445
}
4546

46-
/// <summary> Represents the _bulk delete operation meta header </summary>
47+
/// <summary> Represents the _bulk delete operation meta-header </summary>
4748
[JsonConverter(typeof(BulkOperationHeaderConverter<DeleteOperation>))]
48-
public class DeleteOperation : BulkOperationHeader
49-
{
50-
}
49+
public class DeleteOperation : BulkOperationHeader;
5150

52-
/// <summary> Represents the _bulk update operation meta header </summary>
51+
/// <summary> Represents the _bulk update operation meta-header </summary>
5352
[JsonConverter(typeof(BulkOperationHeaderConverter<UpdateOperation>))]
54-
public class UpdateOperation : BulkOperationHeader
53+
public class UpdateOperation : BulkOperationHeader;
54+
55+
/// <summary> Represents the _bulk update operation meta-header </summary>
56+
[JsonConverter(typeof(BulkOperationHeaderConverter<ScriptedHashUpdateOperation>))]
57+
public class ScriptedHashUpdateOperation : BulkOperationHeader
5558
{
59+
/// <summary> </summary>
60+
public required HashedBulkUpdate UpdateInformation { get; init; }
5661
}
5762

5863
internal class BulkOperationHeaderConverter<THeader> : JsonConverter<THeader>
@@ -69,6 +74,7 @@ public override void Write(Utf8JsonWriter writer, THeader value, JsonSerializerO
6974
DeleteOperation _ => "delete",
7075
IndexOperation _ => "index",
7176
UpdateOperation _ => "update",
77+
ScriptedHashUpdateOperation _ => "update",
7278
_ => throw new ArgumentOutOfRangeException(nameof(value), value, null)
7379
};
7480
writer.WriteStartObject();

0 commit comments

Comments
 (0)