Skip to content

Commit 28be59c

Browse files
authored
Allow passing a custom script and params if hash matches to scripted updates (#118)
1 parent 447dfc9 commit 28be59c

File tree

8 files changed

+334
-16
lines changed

8 files changed

+334
-16
lines changed

src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelStatics.cs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,19 @@ internal static class ElasticsearchChannelStatics
1818
public static readonly byte[] DocUpdateHeaderEnd = " }"u8.ToArray();
1919

2020
public static readonly byte[] ScriptedHashUpsertStart =
21-
"{ \"scripted_upsert\": true, \"upsert\": {}, \"script\": { \"source\": \"if (ctx.op != 'create') { if (ctx._source."u8.ToArray();
21+
"{ \"scripted_upsert\": true, \"upsert\": {}, \"script\": { \"source\": \"if (ctx._source."u8.ToArray();
2222

23-
public static readonly byte[] ScriptedHashUpsertMiddle =
24-
" == params.hash ) { ctx.op = 'noop' } } ctx._source = params.doc\", \"params\": { \"hash\": "u8.ToArray();
23+
public static readonly byte[] ScriptedHashUpsertAfterIfCheck = " == params.hash ) { "u8.ToArray();
2524

26-
public static readonly byte[] ScriptedHashUpsertDocPreamble =
25+
public static readonly byte[] ScriptedHashUpdateScript = "ctx.op = 'noop'"u8.ToArray();
26+
27+
public static readonly byte[] ScriptedHashParamComma = ", "u8.ToArray();
28+
public static readonly byte[] ScriptedHashKeySeparator = ": "u8.ToArray();
29+
30+
public static readonly byte[] ScriptedHashAfterIfCheckOp =
31+
" } else { ctx._source = params.doc } \", \"params\": { \"hash\": "u8.ToArray();
32+
33+
public static readonly byte[] ScriptHashDocAsParameter =
2734
", \"doc\":"u8.ToArray();
2835

2936
public static readonly byte[] ScriptedHashUpsertEnd = " } } }"u8.ToArray();

src/Elastic.Ingest.Elasticsearch/Indices/IndexChannelOptions.cs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
33
// See the LICENSE file in the project root for more information
44
using System;
5+
using System.Collections;
6+
using System.Collections.Generic;
57
using System.Security.Cryptography;
68
using System.Text;
79
using Elastic.Transport;
@@ -11,10 +13,33 @@ namespace Elastic.Ingest.Elasticsearch.Indices;
1113
/// <summary>
1214
/// For scripted hash bulk upserts returns per operation the field and the hash to use for the scripted upsert
1315
/// </summary>
14-
/// <param name="Field">The field to check the previous hash against</param>
15-
/// <param name="Hash">The current hash of the document</param>
16-
public record HashedBulkUpdate(string Field, string Hash)
16+
public record HashedBulkUpdate
1717
{
18+
/// <summary>
19+
/// For scripted hash bulk upserts returns per operation the field and the hash to use for the scripted upsert
20+
/// </summary>
21+
/// <param name="field">The field to check the previous hash against</param>
22+
/// <param name="hash">The current hash of the document</param>
23+
/// <param name="updateScript">The update script</param>
24+
/// <param name="parameters"></param>
25+
public HashedBulkUpdate(string field, string hash, string? updateScript, IDictionary<string, string>? parameters)
26+
: this(field, hash)
27+
{
28+
UpdateScript = updateScript;
29+
Parameters = parameters;
30+
}
31+
32+
/// <summary>
33+
/// For scripted hash bulk upserts returns per operation the field and the hash to use for the scripted upsert
34+
/// </summary>
35+
/// <param name="field">The field to check the previous hash against</param>
36+
/// <param name="hash">The current hash of the document</param>
37+
public HashedBulkUpdate(string field, string hash)
38+
{
39+
Field = field;
40+
Hash = hash;
41+
}
42+
1843
/// <summary>
1944
/// A short SHA256 hash of the provided <paramref name="components"/>
2045
/// </summary>
@@ -23,13 +48,25 @@ public record HashedBulkUpdate(string Field, string Hash)
2348
public static string CreateHash(params string[] components)
2449
{
2550
#if NET8_0_OR_GREATER
26-
return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(string.Join("", components))))[..8];
51+
return Convert.ToHexString(SHA256.HashData(Encoding.UTF8.GetBytes(string.Join("", components))))[..16].ToLowerInvariant();
2752
#else
2853
var sha = SHA256.Create();
2954
var hash = sha.ComputeHash(Encoding.UTF8.GetBytes(string.Join("", components)));
30-
return BitConverter.ToString(hash).Replace("-", "")[..8];
55+
return BitConverter.ToString(hash).Replace("-", "")[..16].ToLowerInvariant();
3156
#endif
3257
}
58+
59+
/// <summary>The field to check the previous hash against</summary>
60+
public string Field { get; init; }
61+
62+
/// <summary>The current hash of the document</summary>
63+
public string Hash { get; init; }
64+
65+
/// <summary>Optional update script if hashes match defaults to ''</summary>
66+
public string? UpdateScript { get; init; }
67+
68+
/// <summary> Optional additional parameters for <see cref="UpdateScript"/> </summary>
69+
public IDictionary<string, string>? Parameters { get; init; }
3370
};
3471

3572
/// <summary>

src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,36 @@ public static ReadOnlyMemory<byte> GetBytes<TEvent>(ArraySegment<TEvent> page,
6262
var field = Encoding.UTF8.GetBytes(hashUpdate.UpdateInformation.Field);
6363
bufferWriter.Write(field);
6464
writer.Reset();
65-
bufferWriter.Write(ScriptedHashUpsertMiddle);
65+
bufferWriter.Write(ScriptedHashUpsertAfterIfCheck);
6666
writer.Reset();
67+
68+
bufferWriter.Write(ScriptedHashUpdateScript);
69+
writer.Reset();
70+
if (hashUpdate.UpdateInformation.UpdateScript is not null)
71+
{
72+
bufferWriter.Write(Encoding.UTF8.GetBytes(hashUpdate.UpdateInformation.UpdateScript));
73+
writer.Reset();
74+
}
75+
else
76+
{
77+
bufferWriter.Write(ScriptedHashAfterIfCheckOp);
78+
writer.Reset();
79+
}
6780
var hash = hashUpdate.UpdateInformation.Hash;
6881
JsonSerializer.Serialize(writer, hash, options.SerializerOptions);
69-
bufferWriter.Write(ScriptedHashUpsertDocPreamble);
82+
83+
if (hashUpdate.UpdateInformation.Parameters is not null)
84+
foreach (var (key, value) in hashUpdate.UpdateInformation.Parameters)
85+
{
86+
bufferWriter.Write(ScriptedHashParamComma);
87+
writer.Reset();
88+
JsonSerializer.Serialize(writer, key, options.SerializerOptions);
89+
bufferWriter.Write(ScriptedHashKeySeparator);
90+
writer.Reset();
91+
JsonSerializer.Serialize(writer, value, options.SerializerOptions);
92+
}
93+
94+
bufferWriter.Write(ScriptHashDocAsParameter);
7095
writer.Reset();
7196
}
7297

@@ -137,10 +162,31 @@ await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(),
137162
await stream.WriteAsync(ScriptedHashUpsertStart, 0, ScriptedHashUpsertStart.Length, ctx).ConfigureAwait(false);
138163
var field = Encoding.UTF8.GetBytes(hashUpdate.UpdateInformation.Field);
139164
await stream.WriteAsync(field, 0, field.Length, ctx).ConfigureAwait(false);
140-
await stream.WriteAsync(ScriptedHashUpsertMiddle, 0, ScriptedHashUpsertMiddle.Length, ctx).ConfigureAwait(false);
165+
await stream.WriteAsync(ScriptedHashUpsertAfterIfCheck, 0, ScriptedHashUpsertAfterIfCheck.Length, ctx).ConfigureAwait(false);
166+
167+
if (hashUpdate.UpdateInformation.UpdateScript is { } script && !string.IsNullOrWhiteSpace(script))
168+
{
169+
var bytes = Encoding.UTF8.GetBytes(script);
170+
await stream.WriteAsync(bytes, 0, bytes.Length, ctx).ConfigureAwait(false);
171+
}
172+
else
173+
await stream.WriteAsync(ScriptedHashUpdateScript, 0, ScriptedHashUpdateScript.Length, ctx).ConfigureAwait(false);
174+
175+
await stream.WriteAsync(ScriptedHashAfterIfCheckOp, 0, ScriptedHashAfterIfCheckOp.Length, ctx).ConfigureAwait(false);
176+
141177
var hash = hashUpdate.UpdateInformation.Hash;
142178
await JsonSerializer.SerializeAsync(stream, hash, options.SerializerOptions, ctx).ConfigureAwait(false);
143-
await stream.WriteAsync(ScriptedHashUpsertDocPreamble, 0, ScriptedHashUpsertDocPreamble.Length, ctx).ConfigureAwait(false);
179+
180+
if (hashUpdate.UpdateInformation.Parameters is { } parameters)
181+
foreach (var kv in parameters)
182+
{
183+
await stream.WriteAsync(ScriptedHashParamComma, 0, ScriptedHashParamComma.Length, ctx ).ConfigureAwait(false);
184+
await JsonSerializer.SerializeAsync(stream, kv.Key, options.SerializerOptions, ctx).ConfigureAwait(false);
185+
await stream.WriteAsync(ScriptedHashKeySeparator, 0, ScriptedHashKeySeparator.Length, ctx ).ConfigureAwait(false);
186+
await JsonSerializer.SerializeAsync(stream, kv.Value, options.SerializerOptions, ctx).ConfigureAwait(false);
187+
}
188+
189+
await stream.WriteAsync(ScriptHashDocAsParameter, 0, ScriptHashDocAsParameter.Length, ctx).ConfigureAwait(false);
144190
}
145191

146192
if (options.EventWriter?.WriteToStreamAsync != null)

src/Elastic.Ingest.Transport/Elastic.Ingest.Transport.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515

1616
<ItemGroup>
1717
<ProjectReference Include="..\Elastic.Channels\Elastic.Channels.csproj" />
18-
<PackageReference Include="Elastic.Transport" Version="0.10.0" />
18+
<PackageReference Include="Elastic.Transport" Version="0.10.1" />
1919
<PackageReference Include="System.Threading.Channels" Version="8.0.0" />
2020
</ItemGroup>
2121

0 commit comments

Comments
 (0)