From 4ce66391ea9e71e29bffaa71839b32c93613d446 Mon Sep 17 00:00:00 2001 From: rdhayes68 Date: Thu, 25 Sep 2025 18:38:46 +0000 Subject: [PATCH 1/4] Correct edge dictionary to correctly handle unidirectional edge definitions. --- .../edge/ProtobufEdgeDataTypeHandler.java | 53 ++- .../handler/edge/EdgeHandlerTestUtil.java | 5 + .../edge/ProtobufEdgeDirectionTest.java | 333 ++++++++++++++++++ .../config/EdgeSpringConfigDirection.xml | 221 ++++++++++++ .../test/resources/config/metadata-config.xml | 2 +- 5 files changed, 586 insertions(+), 28 deletions(-) create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDirectionTest.java create mode 100644 warehouse/ingest-core/src/test/resources/config/EdgeSpringConfigDirection.xml diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java index 090e43f407e..3df3d0d30c5 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java @@ -74,6 +74,7 @@ import datawave.ingest.table.config.LoadDateTableConfigHelper; import datawave.ingest.time.Now; import datawave.marking.MarkingFunctions; +import datawave.metadata.protobuf.EdgeMetadata; import datawave.metadata.protobuf.EdgeMetadata.MetadataValue; import datawave.metadata.protobuf.EdgeMetadata.MetadataValue.Metadata; import datawave.util.StringUtils; @@ -936,37 +937,15 @@ protected void registerEventMetadata(Map> eventMetadataRegistr // add to the eventMetadataRegistry map Key baseKey = createMetadataEdgeKey(edgeValue, edgeValue.getSource(), edgeValue.getSource().getIndexedFieldValue(), edgeValue.getSink(), edgeValue.getSink().getIndexedFieldValue(), this.getVisibility(edgeValue)); - Key fwdMetaKey = EdgeKey.getMetadataKey(baseKey); - Key revMetaKey = EdgeKey.getMetadataKey(EdgeKey.swapSourceSink(EdgeKey.decode(baseKey)).encode()); - Set fwdMetaSet = eventMetadataRegistry.get(fwdMetaKey); - if (null == fwdMetaSet) { - fwdMetaSet = new HashSet<>(); - eventMetadataRegistry.put(fwdMetaKey, fwdMetaSet); - } - Set revMetaSet = eventMetadataRegistry.get(revMetaKey); - if (null == revMetaSet) { - revMetaSet = new HashSet<>(); - eventMetadataRegistry.put(revMetaKey, revMetaSet); - } - - // Build the Protobuf for the value - Metadata.Builder forwardBuilder = Metadata.newBuilder().setSource(edgeValue.getSource().getFieldName()).setSink(edgeValue.getSink().getFieldName()) - .setDate(DateHelper.format(new Date(edgeValue.getEventDate()))); - Metadata.Builder reverseBuilder = Metadata.newBuilder().setDate(DateHelper.format(new Date(edgeValue.getEventDate()))) - .setSource(edgeValue.getSink().getFieldName()).setSink(edgeValue.getSource().getFieldName()); - if (enrichmentFieldName != null) { - forwardBuilder.setEnrichment(enrichmentFieldName).setEnrichmentIndex(edgeValue.getEnrichedIndex()); - reverseBuilder.setEnrichment(enrichmentFieldName).setEnrichmentIndex(edgeValue.getEnrichedIndex()); - } + Key fwdMetaKey = EdgeKey.getMetadataKey(baseKey); + addMetadata(eventMetadataRegistry, enrichmentFieldName, edgeValue, jexlPrecondition, fwdMetaKey); - if (jexlPrecondition != null) { - forwardBuilder.setJexlPrecondition(jexlPrecondition); - reverseBuilder.setJexlPrecondition(jexlPrecondition); + if (edgeValue.getEdgeDirection().equals(EdgeDirection.BIDIRECTIONAL)) { + Key revMetaKey = EdgeKey.getMetadataKey(EdgeKey.swapSourceSink(EdgeKey.decode(baseKey)).encode()); + addMetadata(eventMetadataRegistry, enrichmentFieldName, edgeValue, jexlPrecondition, revMetaKey); } - fwdMetaSet.add(forwardBuilder.build()); - revMetaSet.add(reverseBuilder.build()); } protected String getEnrichmentFieldName(EdgeDefinition edgeDef) { @@ -1184,6 +1163,26 @@ private Key createEdgeKey(EdgeDataBundle edgeValue, VertexValue source, String s return builder.build().encode(); } + private Set addMetadata(Map> metadataRegistry, String enrichmentFieldName, + EdgeDataBundle edgeDataBundle, String jexlPrecondition, Key key) { + Set metadata = metadataRegistry.computeIfAbsent(key, k -> new HashSet<>()); + + // Build the Protobuf for the value + Metadata.Builder builder = Metadata.newBuilder().setSource(edgeDataBundle.getSource().getFieldName()).setSink(edgeDataBundle.getSink().getFieldName()) + .setDate(DateHelper.format(new Date(edgeDataBundle.getEventDate()))); + + if (enrichmentFieldName != null) { + builder.setEnrichment(enrichmentFieldName).setEnrichmentIndex(edgeDataBundle.getEnrichedIndex()); + } + if (jexlPrecondition != null) { + builder.setJexlPrecondition(jexlPrecondition); + } + + metadata.add(builder.build()); + + return metadata; + } + protected Key createStatsKey(STATS_TYPE statsType, EdgeDataBundle edgeValue, VertexValue vertex, String value, Text visibility, EdgeKey.DATE_TYPE date_type) { String typeName = edgeValue.getDataTypeName(); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/EdgeHandlerTestUtil.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/EdgeHandlerTestUtil.java index cb11e7362d7..31273c01749 100644 --- a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/EdgeHandlerTestUtil.java +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/EdgeHandlerTestUtil.java @@ -33,10 +33,12 @@ public class EdgeHandlerTestUtil { public static final Text edgeTableName = new Text(TableName.EDGE); + public static final Text metaDataTableName = new Text(TableName.METADATA); public static final String NB = "\u0000"; public static ListMultimap edgeKeyResults = ArrayListMultimap.create(); public static ListMultimap edgeValueResults = ArrayListMultimap.create(); + public static ListMultimap metaData = ArrayListMultimap.create(); private static Logger log = Logger.getLogger(EdgeHandlerTestUtil.class); @@ -71,6 +73,9 @@ public static void processEvent(Multimap even edgeKeys.add(entry.getKey().getKey()); edgeValueResults.put(entry.getKey().getKey().getRow().toString().replaceAll(NB, "%00;"), EdgeValue.decode(entry.getValue()).toString()); } + if (entry.getKey().getTableName().equals(metaDataTableName)) { + metaData.put(entry.getKey().getKey(), entry.getValue()); + } if (!entry.getKey().getTableName().equals(edgeTableName) || entry.getKey().getKey().isDeleted() == edgeDeleteMode) { if (countMap.containsKey(entry.getKey().getTableName())) { countMap.put(entry.getKey().getTableName(), countMap.get(entry.getKey().getTableName()) + 1); diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDirectionTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDirectionTest.java new file mode 100644 index 00000000000..90fbb1a720a --- /dev/null +++ b/warehouse/ingest-core/src/test/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDirectionTest.java @@ -0,0 +1,333 @@ +package datawave.ingest.mapreduce.handler.edge; + +import static datawave.ingest.mapreduce.handler.edge.EdgeKeyVersioningCache.KEY_VERSION_CACHE_DIR; +import static datawave.ingest.mapreduce.handler.edge.EdgeKeyVersioningCache.KEY_VERSION_DIST_CACHE_DIR; + +import java.time.Instant; +import java.time.format.DateTimeFormatter; +import java.util.Collections; +import java.util.HashSet; +import java.util.Objects; +import java.util.Set; + +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; + +import datawave.data.hash.UID; +import datawave.ingest.config.RawRecordContainerImpl; +import datawave.ingest.data.RawRecordContainer; +import datawave.ingest.data.Type; +import datawave.ingest.data.TypeRegistry; +import datawave.ingest.data.config.BaseNormalizedContent; +import datawave.ingest.data.config.NormalizedContentInterface; +import datawave.ingest.data.config.NormalizedFieldAndValue; +import datawave.ingest.data.config.ingest.FakeIngestHelper; +import datawave.ingest.mapreduce.SimpleDataTypeHandler; +import datawave.ingest.mapreduce.job.BulkIngestKey; + +public class ProtobufEdgeDirectionTest { + + private static final Multimap fields = HashMultimap.create(); + // bidirectional edge definition + private static final Type typeBi = new Type("bidirectional", FakeIngestHelper.class, null, new String[] {SimpleDataTypeHandler.class.getName()}, 10, null); + // unidirectional edge definition + private static final Type typeUni = new Type("unidirectional", FakeIngestHelper.class, null, new String[] {SimpleDataTypeHandler.class.getName()}, 10, + null); + // bidirectional edge definition with jexl conditionl + private static final Type typeBiJexl = new Type("bidirectional_jexlCondition", FakeIngestHelper.class, null, + new String[] {SimpleDataTypeHandler.class.getName()}, 10, null); + // unidirectional edge definition + private static final Type typeUniJexl = new Type("unidirectional_jexlCondition", FakeIngestHelper.class, null, + new String[] {SimpleDataTypeHandler.class.getName()}, 10, null); + + private static final String jexlConditon = "FISH == 'guppy'"; + + private Configuration conf; + + private ProtobufEdgeDataTypeHandler edgeHandler; + + @Before + public void setup() { + TypeRegistry.reset(); + conf = new Configuration(); + conf.addResource(ClassLoader.getSystemResource("config/all-config.xml")); + conf.addResource(ClassLoader.getSystemResource("config/edge-ingest-config.xml")); + conf.addResource(ClassLoader.getSystemResource("config/metadata-config.xml")); + conf.setBoolean(ProtobufEdgeDataTypeHandler.EVALUATE_PRECONDITIONS, true); + conf.set(ProtobufEdgeDataTypeHandler.EDGE_SPRING_CONFIG, "config/EdgeSpringConfigDirection.xml"); + conf.set(KEY_VERSION_CACHE_DIR, ClassLoader.getSystemResource("config").getPath()); + conf.set(KEY_VERSION_DIST_CACHE_DIR, ClassLoader.getSystemResource("config").getPath()); + + TypeRegistry registry = TypeRegistry.getInstance(conf); + registry.put(typeBi.typeName(), typeBi); + registry.put(typeUni.typeName(), typeUni); + registry.put(typeBiJexl.typeName(), typeBiJexl); + registry.put(typeUniJexl.typeName(), typeUniJexl); + + fields.clear(); + EdgeHandlerTestUtil.edgeKeyResults.clear(); + EdgeHandlerTestUtil.edgeValueResults.clear(); + EdgeHandlerTestUtil.metaData.clear(); + + fields.put("EVENT_DATE", new BaseNormalizedContent("EVENT_DATE", "2022-10-26T01:31:53Z")); + fields.put("UUID", new BaseNormalizedContent("UUID", "0016dd72-0000-827d-dd4d-001b2163ba09")); + fields.put("FELINE", new NormalizedFieldAndValue("FELINE", "tabby", "PET", "0")); + fields.put("FELINE", new NormalizedFieldAndValue("FELINE", "siamese", "PET", "1")); + fields.put("FISH", new NormalizedFieldAndValue("FISH", "salmon", "WILD", "0")); + fields.put("FISH", new NormalizedFieldAndValue("FISH", "guppy", "WILD", "1")); + fields.put("ACTIVITY", new NormalizedFieldAndValue("ACTIVITY", "fetch", "THING", "0")); + + edgeHandler = new ProtobufEdgeDataTypeHandler<>(); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + edgeHandler.setup(context); + } + + /** + * Test Edge Definition with direction set to BIDIRECTIONAL + */ + @Test + public void testBiDirection() { + ProtobufEdgeDataTypeHandler edgeHandler = new ProtobufEdgeDataTypeHandler<>(); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + edgeHandler.setup(context); + + Set expectedKeys = new HashSet<>(); + expectedKeys.add("guppy"); + expectedKeys.add("guppy%00;siamese"); + expectedKeys.add("guppy%00;tabby"); + + expectedKeys.add("salmon"); + expectedKeys.add("salmon%00;tabby"); + expectedKeys.add("salmon%00;siamese"); + + expectedKeys.add("siamese"); + expectedKeys.add("siamese%00;guppy"); + expectedKeys.add("siamese%00;salmon"); + + expectedKeys.add("tabby"); + expectedKeys.add("tabby%00;salmon"); + expectedKeys.add("tabby%00;guppy"); + + RawRecordContainer myEvent = getEvent(typeBi); + + EdgeHandlerTestUtil.processEvent(fields, edgeHandler, myEvent, expectedKeys.size(), true, false); + + // check edge dictionary + // forward direction + Assert.assertEquals(1, EdgeHandlerTestUtil.metaData.keySet().stream().filter(Objects::nonNull).filter(key -> { + return key.getColumnFamily().toString().equals("edge") && key.getRow().toString().equals("MY_EDGE_TYPE/FROM-TO"); + }).count()); + + // reverse direction + Assert.assertEquals(1, EdgeHandlerTestUtil.metaData.keySet().stream().filter(Objects::nonNull).filter(key -> { + return key.getColumnFamily().toString().equals("edge") && key.getRow().toString().equals("MY_EDGE_TYPE/TO-FROM"); + }).count()); + + // check edge key results + Assert.assertEquals(expectedKeys, EdgeHandlerTestUtil.edgeKeyResults.keySet()); + + // check forward direction keys + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese").get(0))[0]); + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby").get(0))[0]); + + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese%00;guppy").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese%00;salmon").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby%00;guppy").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby%00;salmon").get(0))[0]); + + // check reverse direction keys + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("guppy").get(0))[0]); + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("salmon").get(0))[0]); + + Assert.assertEquals("MY_EDGE_TYPE/TO-FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("guppy%00;siamese").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/TO-FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("guppy%00;tabby").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/TO-FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("salmon%00;siamese").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/TO-FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("salmon%00;tabby").get(0))[0]); + } + + /** + * Test Edge Definition with direction set to UNIDIRECTIONAL + */ + @Test + public void testUniDirection() { + Set expectedKeys = new HashSet<>(); + expectedKeys.add("siamese"); + expectedKeys.add("siamese%00;guppy"); + expectedKeys.add("siamese%00;salmon"); + + expectedKeys.add("tabby"); + expectedKeys.add("tabby%00;salmon"); + expectedKeys.add("tabby%00;guppy"); + + RawRecordContainer myEvent = getEvent(typeUni); + + EdgeHandlerTestUtil.processEvent(fields, edgeHandler, myEvent, expectedKeys.size(), true, false); + Assert.assertEquals(expectedKeys, EdgeHandlerTestUtil.edgeKeyResults.keySet()); + + // check edge dictionary + // forward direction + Assert.assertEquals(1, EdgeHandlerTestUtil.metaData.keySet().stream().filter(Objects::nonNull).filter(key -> { + return key.getColumnFamily().toString().equals("edge") && key.getRow().toString().equals("MY_EDGE_TYPE/FROM-TO"); + }).count()); + + // NO reverse direction + Assert.assertEquals(0, EdgeHandlerTestUtil.metaData.keySet().stream().filter(Objects::nonNull).filter(key -> { + return key.getColumnFamily().toString().equals("edge") && key.getRow().toString().equals("MY_EDGE_TYPE/TO-FROM"); + }).count()); + + // check forward direction keys + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese").get(0))[0]); + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby").get(0))[0]); + + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese%00;guppy").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese%00;salmon").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby%00;guppy").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby%00;salmon").get(0))[0]); + + // NO reverse direction keys + for (String reverseKey : new String[] {"guppy", "salmon", "guppy%00;siamese", "guppy%00;tabby", "salmon%00;siamese", "salmon%00;tabby"}) { + Assert.assertFalse(EdgeHandlerTestUtil.edgeKeyResults.containsKey(reverseKey)); + } + } + + /** + * Test Edge Definition with direction set to BIDIRECTIONAL and Jexl Condition => FISH==guppy + */ + @Test + public void testBiDirectionWithJexlCondition() { + ProtobufEdgeDataTypeHandler edgeHandler = new ProtobufEdgeDataTypeHandler<>(); + TaskAttemptContext context = new TaskAttemptContextImpl(conf, new TaskAttemptID()); + edgeHandler.setup(context); + + Set expectedKeys = new HashSet<>(); + expectedKeys.add("guppy"); + expectedKeys.add("guppy%00;siamese"); + expectedKeys.add("guppy%00;tabby"); + + expectedKeys.add("salmon"); + expectedKeys.add("salmon%00;tabby"); + expectedKeys.add("salmon%00;siamese"); + + expectedKeys.add("siamese"); + expectedKeys.add("siamese%00;guppy"); + expectedKeys.add("siamese%00;salmon"); + + expectedKeys.add("tabby"); + expectedKeys.add("tabby%00;salmon"); + expectedKeys.add("tabby%00;guppy"); + + RawRecordContainer myEvent = getEvent(typeBiJexl); + + EdgeHandlerTestUtil.processEvent(fields, edgeHandler, myEvent, expectedKeys.size(), true, false); + + // check edge dictionary + // forward direction + Assert.assertEquals(1, EdgeHandlerTestUtil.metaData.keySet().stream().filter(Objects::nonNull).filter(key -> { + return key.getColumnFamily().toString().equals("edge") && key.getRow().toString().equals("MY_EDGE_TYPE/FROM-TO"); + }).count()); + + // reverse direction + Assert.assertEquals(1, EdgeHandlerTestUtil.metaData.keySet().stream().filter(Objects::nonNull).filter(key -> { + return key.getColumnFamily().toString().equals("edge") && key.getRow().toString().equals("MY_EDGE_TYPE/TO-FROM"); + }).count()); + + // jexl condition + Assert.assertEquals(2, EdgeHandlerTestUtil.metaData.values().stream().filter(Objects::nonNull).filter(value -> { + return value.toString().contains(jexlConditon); + }).count()); + + // check edge key results + Assert.assertEquals(expectedKeys, EdgeHandlerTestUtil.edgeKeyResults.keySet()); + + // check forward direction keys + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese").get(0))[0]); + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby").get(0))[0]); + + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese%00;guppy").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese%00;salmon").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby%00;guppy").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby%00;salmon").get(0))[0]); + + // check reverse direction keys + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("guppy").get(0))[0]); + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("salmon").get(0))[0]); + + Assert.assertEquals("MY_EDGE_TYPE/TO-FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("guppy%00;siamese").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/TO-FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("guppy%00;tabby").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/TO-FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("salmon%00;siamese").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/TO-FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("salmon%00;tabby").get(0))[0]); + } + + /** + * Test Edge Definition with direction set to UNIDIRECTIONAL and Jexl Condition => FISH==guppy + */ + @Test + public void testUniDirectionWithJexlCondition() { + Set expectedKeys = new HashSet<>(); + expectedKeys.add("siamese"); + expectedKeys.add("siamese%00;guppy"); + expectedKeys.add("siamese%00;salmon"); + + expectedKeys.add("tabby"); + expectedKeys.add("tabby%00;salmon"); + expectedKeys.add("tabby%00;guppy"); + + RawRecordContainer myEvent = getEvent(typeUniJexl); + + EdgeHandlerTestUtil.processEvent(fields, edgeHandler, myEvent, expectedKeys.size(), true, false); + Assert.assertEquals(expectedKeys, EdgeHandlerTestUtil.edgeKeyResults.keySet()); + + // check edge dictionary + // forward direction + Assert.assertEquals(1, EdgeHandlerTestUtil.metaData.keySet().stream().filter(Objects::nonNull).filter(key -> { + return key.getColumnFamily().toString().equals("edge") && key.getRow().toString().equals("MY_EDGE_TYPE/FROM-TO"); + }).count()); + + // NO reverse direction + Assert.assertEquals(0, EdgeHandlerTestUtil.metaData.keySet().stream().filter(Objects::nonNull).filter(key -> { + return key.getColumnFamily().toString().equals("edge") && key.getRow().toString().equals("MY_EDGE_TYPE/TO-FROM"); + }).count()); + // jexl condition + Assert.assertEquals(1, EdgeHandlerTestUtil.metaData.values().stream().filter(Objects::nonNull).filter(value -> { + return value.toString().contains(jexlConditon); + }).count()); + + // check forward direction keys + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese").get(0))[0]); + Assert.assertEquals("STATS/ACTIVITY/MY_EDGE_TYPE/FROM", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby").get(0))[0]); + + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese%00;guppy").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("siamese%00;salmon").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby%00;guppy").get(0))[0]); + Assert.assertEquals("MY_EDGE_TYPE/FROM-TO", Objects.requireNonNull(EdgeHandlerTestUtil.edgeKeyResults.get("tabby%00;salmon").get(0))[0]); + + // NO reverse direction keys + for (String reverseKey : new String[] {"guppy", "salmon", "guppy%00;siamese", "guppy%00;tabby", "salmon%00;siamese", "salmon%00;tabby"}) { + Assert.assertFalse(EdgeHandlerTestUtil.edgeKeyResults.containsKey(reverseKey)); + } + } + + private RawRecordContainer getEvent(Type type) { + RawRecordContainerImpl myEvent = new RawRecordContainerImpl(); + myEvent.addSecurityMarking("columnVisibility", "PRIVATE"); + myEvent.setDataType(type); + myEvent.setId(UID.builder().newId()); + myEvent.setAltIds(Collections.singleton("0016dd72-0000-827d-dd4d-001b2163ba09")); + myEvent.setConf(conf); + + Instant i = Instant.from(DateTimeFormatter.ISO_INSTANT.parse("2022-10-26T01:31:53Z")); + myEvent.setTimestamp(i.toEpochMilli()); + + return myEvent; + } +} diff --git a/warehouse/ingest-core/src/test/resources/config/EdgeSpringConfigDirection.xml b/warehouse/ingest-core/src/test/resources/config/EdgeSpringConfigDirection.xml new file mode 100644 index 00000000000..807093803e2 --- /dev/null +++ b/warehouse/ingest-core/src/test/resources/config/EdgeSpringConfigDirection.xml @@ -0,0 +1,221 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + TO + FROM + + + + + + + + + + MY_CSV_DATA + UNKNOWN + + + + + diff --git a/warehouse/ingest-core/src/test/resources/config/metadata-config.xml b/warehouse/ingest-core/src/test/resources/config/metadata-config.xml index 24dd87487b6..54e98f94b44 100644 --- a/warehouse/ingest-core/src/test/resources/config/metadata-config.xml +++ b/warehouse/ingest-core/src/test/resources/config/metadata-config.xml @@ -18,7 +18,7 @@ metadata.table.name - ${table.name.metadata} + DatawaveMetadata From b7d45d3aab2472d78ee4d4e1c144bdff60731239 Mon Sep 17 00:00:00 2001 From: rdhayes68 Date: Fri, 26 Sep 2025 18:28:16 +0000 Subject: [PATCH 2/4] Correct edge dictionary to correctly handle unidirectional edge definitions. --- .../edge/ProtobufEdgeDataTypeHandler.java | 51 +++++++++++++++++-- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java index 3df3d0d30c5..7c03ed4e1b0 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java @@ -936,18 +936,59 @@ protected void registerEventMetadata(Map> eventMetadataRegistr String jexlPrecondition) { // add to the eventMetadataRegistry map Key baseKey = createMetadataEdgeKey(edgeValue, edgeValue.getSource(), edgeValue.getSource().getIndexedFieldValue(), edgeValue.getSink(), - edgeValue.getSink().getIndexedFieldValue(), this.getVisibility(edgeValue)); - + edgeValue.getSink().getIndexedFieldValue(), this.getVisibility(edgeValue)); Key fwdMetaKey = EdgeKey.getMetadataKey(baseKey); - addMetadata(eventMetadataRegistry, enrichmentFieldName, edgeValue, jexlPrecondition, fwdMetaKey); - if (edgeValue.getEdgeDirection().equals(EdgeDirection.BIDIRECTIONAL)) { + Set fwdMetaSet = eventMetadataRegistry.get(fwdMetaKey); + if (null == fwdMetaSet) { + fwdMetaSet = new HashSet<>(); + eventMetadataRegistry.put(fwdMetaKey, fwdMetaSet); + } + + // Build the Protobuf for the value + Metadata.Builder forwardBuilder = Metadata.newBuilder().setSource(edgeValue.getSource().getFieldName()).setSink(edgeValue.getSink().getFieldName()) + .setDate(DateHelper.format(new Date(edgeValue.getEventDate()))); + + if (enrichmentFieldName != null) { + forwardBuilder.setEnrichment(enrichmentFieldName).setEnrichmentIndex(edgeValue.getEnrichedIndex()); + } + + if (jexlPrecondition != null) { + forwardBuilder.setJexlPrecondition(jexlPrecondition); + } + + fwdMetaSet.add(forwardBuilder.build()); + + if (isNullOrBidirectional(edgeValue.getEdgeDirection())) { Key revMetaKey = EdgeKey.getMetadataKey(EdgeKey.swapSourceSink(EdgeKey.decode(baseKey)).encode()); - addMetadata(eventMetadataRegistry, enrichmentFieldName, edgeValue, jexlPrecondition, revMetaKey); + + Set revMetaSet = eventMetadataRegistry.get(revMetaKey); + if (null == revMetaSet) { + revMetaSet = new HashSet<>(); + eventMetadataRegistry.put(revMetaKey, revMetaSet); + } + + // Build the Protobuf for the value + Metadata.Builder reverseBuilder = Metadata.newBuilder().setDate(DateHelper.format(new Date(edgeValue.getEventDate()))) + .setSource(edgeValue.getSink().getFieldName()).setSink(edgeValue.getSource().getFieldName()); + + if (enrichmentFieldName != null) { + reverseBuilder.setEnrichment(enrichmentFieldName).setEnrichmentIndex(edgeValue.getEnrichedIndex()); + } + + if (jexlPrecondition != null) { + reverseBuilder.setJexlPrecondition(jexlPrecondition); + } + + revMetaSet.add(reverseBuilder.build()); } + } + private boolean isNullOrBidirectional(EdgeDirection direction) { + return direction == null || direction.equals(EdgeDirection.BIDIRECTIONAL); } + protected String getEnrichmentFieldName(EdgeDefinition edgeDef) { return (edgeDef.isEnrichmentEdge() ? edgeDef.getEnrichmentField() : null); } From cd132cf5eb149271225db2a0f3c61de598826c2d Mon Sep 17 00:00:00 2001 From: rdhayes68 Date: Fri, 26 Sep 2025 18:32:45 +0000 Subject: [PATCH 3/4] Correct edge dictionary to correctly handle unidirectional edge definitions. --- .../handler/edge/ProtobufEdgeDataTypeHandler.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java index 7c03ed4e1b0..9a13219bbae 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java @@ -936,7 +936,7 @@ protected void registerEventMetadata(Map> eventMetadataRegistr String jexlPrecondition) { // add to the eventMetadataRegistry map Key baseKey = createMetadataEdgeKey(edgeValue, edgeValue.getSource(), edgeValue.getSource().getIndexedFieldValue(), edgeValue.getSink(), - edgeValue.getSink().getIndexedFieldValue(), this.getVisibility(edgeValue)); + edgeValue.getSink().getIndexedFieldValue(), this.getVisibility(edgeValue)); Key fwdMetaKey = EdgeKey.getMetadataKey(baseKey); Set fwdMetaSet = eventMetadataRegistry.get(fwdMetaKey); @@ -947,7 +947,7 @@ protected void registerEventMetadata(Map> eventMetadataRegistr // Build the Protobuf for the value Metadata.Builder forwardBuilder = Metadata.newBuilder().setSource(edgeValue.getSource().getFieldName()).setSink(edgeValue.getSink().getFieldName()) - .setDate(DateHelper.format(new Date(edgeValue.getEventDate()))); + .setDate(DateHelper.format(new Date(edgeValue.getEventDate()))); if (enrichmentFieldName != null) { forwardBuilder.setEnrichment(enrichmentFieldName).setEnrichmentIndex(edgeValue.getEnrichedIndex()); @@ -970,7 +970,7 @@ protected void registerEventMetadata(Map> eventMetadataRegistr // Build the Protobuf for the value Metadata.Builder reverseBuilder = Metadata.newBuilder().setDate(DateHelper.format(new Date(edgeValue.getEventDate()))) - .setSource(edgeValue.getSink().getFieldName()).setSink(edgeValue.getSource().getFieldName()); + .setSource(edgeValue.getSink().getFieldName()).setSink(edgeValue.getSource().getFieldName()); if (enrichmentFieldName != null) { reverseBuilder.setEnrichment(enrichmentFieldName).setEnrichmentIndex(edgeValue.getEnrichedIndex()); @@ -988,7 +988,6 @@ private boolean isNullOrBidirectional(EdgeDirection direction) { return direction == null || direction.equals(EdgeDirection.BIDIRECTIONAL); } - protected String getEnrichmentFieldName(EdgeDefinition edgeDef) { return (edgeDef.isEnrichmentEdge() ? edgeDef.getEnrichmentField() : null); } From a023b36a1625d30d6eea629226e8722484f55fe9 Mon Sep 17 00:00:00 2001 From: rdhayes68 Date: Mon, 29 Sep 2025 18:30:07 +0000 Subject: [PATCH 4/4] Correct edge dictionary to correctly handle unidirectional edge definitions. --- .../edge/ProtobufEdgeDataTypeHandler.java | 43 ++----------------- 1 file changed, 3 insertions(+), 40 deletions(-) diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java index 9a13219bbae..1246637210b 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/edge/ProtobufEdgeDataTypeHandler.java @@ -937,50 +937,13 @@ protected void registerEventMetadata(Map> eventMetadataRegistr // add to the eventMetadataRegistry map Key baseKey = createMetadataEdgeKey(edgeValue, edgeValue.getSource(), edgeValue.getSource().getIndexedFieldValue(), edgeValue.getSink(), edgeValue.getSink().getIndexedFieldValue(), this.getVisibility(edgeValue)); - Key fwdMetaKey = EdgeKey.getMetadataKey(baseKey); - - Set fwdMetaSet = eventMetadataRegistry.get(fwdMetaKey); - if (null == fwdMetaSet) { - fwdMetaSet = new HashSet<>(); - eventMetadataRegistry.put(fwdMetaKey, fwdMetaSet); - } - - // Build the Protobuf for the value - Metadata.Builder forwardBuilder = Metadata.newBuilder().setSource(edgeValue.getSource().getFieldName()).setSink(edgeValue.getSink().getFieldName()) - .setDate(DateHelper.format(new Date(edgeValue.getEventDate()))); - - if (enrichmentFieldName != null) { - forwardBuilder.setEnrichment(enrichmentFieldName).setEnrichmentIndex(edgeValue.getEnrichedIndex()); - } - - if (jexlPrecondition != null) { - forwardBuilder.setJexlPrecondition(jexlPrecondition); - } - fwdMetaSet.add(forwardBuilder.build()); + Key fwdMetaKey = EdgeKey.getMetadataKey(baseKey); + addMetadata(eventMetadataRegistry, enrichmentFieldName, edgeValue, jexlPrecondition, fwdMetaKey); if (isNullOrBidirectional(edgeValue.getEdgeDirection())) { Key revMetaKey = EdgeKey.getMetadataKey(EdgeKey.swapSourceSink(EdgeKey.decode(baseKey)).encode()); - - Set revMetaSet = eventMetadataRegistry.get(revMetaKey); - if (null == revMetaSet) { - revMetaSet = new HashSet<>(); - eventMetadataRegistry.put(revMetaKey, revMetaSet); - } - - // Build the Protobuf for the value - Metadata.Builder reverseBuilder = Metadata.newBuilder().setDate(DateHelper.format(new Date(edgeValue.getEventDate()))) - .setSource(edgeValue.getSink().getFieldName()).setSink(edgeValue.getSource().getFieldName()); - - if (enrichmentFieldName != null) { - reverseBuilder.setEnrichment(enrichmentFieldName).setEnrichmentIndex(edgeValue.getEnrichedIndex()); - } - - if (jexlPrecondition != null) { - reverseBuilder.setJexlPrecondition(jexlPrecondition); - } - - revMetaSet.add(reverseBuilder.build()); + addMetadata(eventMetadataRegistry, enrichmentFieldName, edgeValue, jexlPrecondition, revMetaKey); } }