Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions core/utils/type-utils/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.locationtech.geowave</groupId>
<artifactId>geowave-core-geotime</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.util.List;

import org.apache.commons.lang3.tuple.Pair;
import org.locationtech.geowave.core.geotime.index.dimension.LatitudeDefinition;
import org.locationtech.geowave.core.geotime.index.dimension.LongitudeDefinition;
import org.locationtech.geowave.core.index.NumericIndexStrategy;
Expand All @@ -13,6 +14,7 @@

import com.google.common.collect.Lists;

import datawave.data.type.Type;
import datawave.data.type.util.Geometry;

/**
Expand Down Expand Up @@ -65,18 +67,21 @@ public static Index getGeometryIndex() {
}

@Override
public List<String> normalizeToMany(String geoString) throws IllegalArgumentException {
public List<Pair<String,Type.Category>> normalizeToMany(String geoString) throws IllegalArgumentException {
List<Pair<String,Type.Category>> list = Lists.newArrayList();
if (validHash(geoString)) {
return Lists.newArrayList(geoString);
for (String s : Lists.newArrayList(geoString)) {
list.add(Pair.of(s, Type.Category.GEOHASH));
}
}
return normalizeDelegateTypeToMany(createDatawaveGeometry(parseGeometry(geoString)));
}

@Override
public List<String> normalizeDelegateTypeToMany(Geometry geometry) {
List<String> list = Lists.newArrayList();
public List<Pair<String,Type.Category>> normalizeDelegateTypeToMany(Geometry geometry) {
List<Pair<String,Type.Category>> list = Lists.newArrayList();
for (byte[] one : getIndicesFromGeometry(geometry)) {
list.add(getEncodedStringFromIndexBytes(one));
list.add(Pair.of(getEncodedStringFromIndexBytes(one), Type.Category.GEOHASH));
}
return list;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,14 @@

import java.util.List;

import org.apache.commons.lang3.tuple.Pair;

import datawave.data.type.Type;

public interface OneToManyNormalizer<T> extends Normalizer<T> {

List<String> normalizeToMany(String in);
List<Pair<String,Type.Category>> normalizeToMany(String in);

List<Pair<String,Type.Category>> normalizeDelegateTypeToMany(T foo);

List<String> normalizeDelegateTypeToMany(T foo);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package datawave.data.type;

import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;

import datawave.data.normalizer.Normalizer;
import datawave.data.normalizer.OneToManyNormalizer;
Expand All @@ -18,7 +21,8 @@ public GeometryType() {
super(Normalizer.GEOMETRY_NORMALIZER);
}

public List<String> normalizeToMany(String in) {
public List<Pair<String,Category>> normalizeToMany(String in) {

return ((OneToManyNormalizer<Geometry>) normalizer).normalizeToMany(in);
}

Expand All @@ -29,7 +33,8 @@ public void setNormalizedValues(List<String> normalizedValues) {

@Override
public void normalizeAndSetNormalizedValue(Geometry valueToNormalize) {
setNormalizedValues(((OneToManyNormalizer<Geometry>) normalizer).normalizeDelegateTypeToMany(valueToNormalize));
setNormalizedValues(((OneToManyNormalizer<Geometry>) normalizer).normalizeDelegateTypeToMany(valueToNormalize).stream().map(Pair::getLeft)
.collect(Collectors.toList()));
}

public List<String> getNormalizedValues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.lang3.tuple.Pair;

import datawave.data.normalizer.Normalizer;
import datawave.util.StringUtils;
Expand All @@ -19,20 +22,21 @@ public ListType(String delegateString, Normalizer normalizer) {
}

@Override
public List<String> normalizeToMany(String in) {
public List<Pair<String,Category>> normalizeToMany(String in) {
String[] splits = StringUtils.split(in, delimiter);
List<String> strings = new ArrayList<>(splits.length);
List<Pair<String,Category>> strings = new ArrayList(splits.length);
for (String s : splits) {
String normalized = normalizer.normalize(s);
strings.add(normalized);

String str = normalizer.normalize(s);
strings.add(Pair.of(str, Category.LIST_ELEMENT));
}

return strings;
}

@Override
public void setDelegateFromString(String in) {
this.normalizedValues = normalizeToMany(in);
this.normalizedValues = normalizeToMany(in).stream().map(Pair::getLeft).collect(Collectors.toList());
this.delegate = in;
setNormalizedValue(in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@

import java.util.List;

import org.apache.commons.lang3.tuple.Pair;

public interface OneToManyNormalizerType<T extends Comparable<T>> extends Type<T> {

List<String> normalizeToMany(String in);
List<Pair<String,Category>> normalizeToMany(String in);

List<String> getNormalizedValues();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ public static Type<?> createType(String datawaveTypeClassName) {
}
}
}

enum Category {
FULL, PART, GEOHASH, LIST_ELEMENT
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package datawave.data.type;

import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang3.tuple.Pair;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.locationtech.jts.util.Assert;

import com.google.common.collect.Lists;

public class ListTypeTest {

@Test
Expand All @@ -15,7 +17,14 @@ public void test() {

LcNoDiacriticsListType t = new LcNoDiacriticsListType(str);
Assert.equals(6, t.normalizeToMany(str).size());
List<String> expected = Arrays.asList("1", "2", "3", "a", "b", "c");
List<Pair<String,Type.Category>> expected = Lists.newArrayList();
expected.add(Pair.of("1", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("2", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("3", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("a", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("b", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("c", Type.Category.LIST_ELEMENT));

Assert.equals(expected, t.normalizeToMany(str));
}

Expand All @@ -25,14 +34,25 @@ public void testLcNDList() {

LcNoDiacriticsListType t = new LcNoDiacriticsListType();
Assert.equals(6, t.normalizeToMany(str).size());
List<String> expected = Arrays.asList("01", "02", "03", "a", "b", "c");
List<Pair<String,Type.Category>> expected = Lists.newArrayList();
expected.add(Pair.of("01", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("02", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("03", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("a", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("b", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("c", Type.Category.LIST_ELEMENT));

Assert.equals(expected, t.normalizeToMany(str));
}

@Test
public void testNumberList() {
String str = "1,2,3,5.5";
List<String> expected = Arrays.asList("+aE1", "+aE2", "+aE3", "+aE5.5");
List<Pair<String,Type.Category>> expected = Lists.newArrayList();
expected.add(Pair.of("+aE1", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("+aE2", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("+aE3", Type.Category.LIST_ELEMENT));
expected.add(Pair.of("+aE5.5", Type.Category.LIST_ELEMENT));

NumberListType nt = new NumberListType();
Assert.equals(4, nt.normalizeToMany(str).size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ http://www.springframework.org/schema/util/spring-util-4.0.xsd">
<property name="edgeAttribute2" value="NAME"/>
<property name="edgeAttribute3" value="ID"/>
<property name="activityDateField" value = "PREMIERED"/>
<property name="allowedTypeCategories">
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want separate allowed type categories for the source and the sink ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would let the bidirectional config dictate that.

<list value-type="datawave.data.type.Type.Category">
<value>FULL</value>
</list>
</property>
<property name="edges">
<list>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class BaseNormalizedContent implements NormalizedContentInterface, Clonea
/** The security markings for the field value pair. */
protected Map<String,String> _markings = null;

/** type category from normalization */
protected Type.Category _typeCategory = null;

/** The field processing error if any. */
protected Throwable error = null;

Expand Down Expand Up @@ -209,4 +212,15 @@ public void normalize(Type<?> datawaveType) {
this.setError(e);
}
}

@Override
public void setTypeCategory(Type.Category t) {
this._typeCategory = t;

}

@Override
public Type.Category getTypeCategory() {
return _typeCategory;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,8 @@ public interface NormalizedContentInterface extends Cloneable {
Object clone();

void normalize(Type<?> datawaveType);

void setTypeCategory(Type.Category t);

Type.Category getTypeCategory();
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -729,9 +730,10 @@ protected List<NormalizedContentInterface> normalize(NormalizedContentInterface
List<NormalizedContentInterface> list = Lists.newArrayList();
// copy it
NormalizedContentInterface copy = new NormalizedFieldAndValue(normalizedContent);
for (String one : datawaveType.normalizeToMany(copy.getIndexedFieldValue())) {
for (Pair<String,datawave.data.type.Type.Category> one : datawaveType.normalizeToMany(copy.getIndexedFieldValue())) {
try {
copy.setIndexedFieldValue(one);
copy.setIndexedFieldValue(one.getLeft());
copy.setTypeCategory(one.getRight());
list.add(copy);
copy = new NormalizedFieldAndValue(normalizedContent);
} catch (Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hadoop.mapreduce.StatusReporter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.util.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
Expand Down Expand Up @@ -156,6 +157,8 @@ public class ProtobufEdgeDataTypeHandler<KEYIN,KEYOUT,VALUEOUT> implements Exten
long futureDelta, pastDelta;
long newFormatStartDate;

protected List<datawave.data.type.Type.Category> allowedCategories = Lists.newArrayList();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to see a static import for Type.Category


SimpleGroupFieldNameParser fieldParser = new SimpleGroupFieldNameParser();

public enum FailurePolicy {
Expand Down Expand Up @@ -274,6 +277,9 @@ public void setup(Configuration conf) {
if (thing.getEnrichmentTypeMappings() != null) {
edgeTypeLookup.put(entry.getKey(), thing.getEnrichmentTypeMappings());
}
if (thing.getAllowedTypeCategories() != null) {
allowedCategories.addAll(thing.getAllowedTypeCategories());
}
}

if (ctx.containsBean(entry.getKey() + EDGE_TABLE_DISALLOWLIST_VALUES)) {
Expand Down Expand Up @@ -673,6 +679,9 @@ public long process(KEYIN key, RawRecordContainer event, Multimap<String,Normali
if (ifaceSource == ifaceSink) {
continue;
}
if (!isAllowedCategory(ifaceSource, ifaceSink)) {
continue;
}
EdgeDataBundle edgeValue = createEdge(edgeDef, event, ifaceSource, sourceGroup, subGroup, ifaceSink, sinkGroup, subGroup,
edgeAttribute2, edgeAttribute3, normalizedFields, depthFirstList, loadDateStr, activityDate, validActivityDate);
if (edgeValue != null) {
Expand All @@ -696,6 +705,9 @@ public long process(KEYIN key, RawRecordContainer event, Multimap<String,Normali
for (NormalizedContentInterface ifaceSource : mSource.get(sourceSubGroup)) {
for (String sinkSubGroup : mSink.keySet()) {
for (NormalizedContentInterface ifaceSink : mSink.get(sinkSubGroup)) {
if (!isAllowedCategory(ifaceSource, ifaceSink)) {
continue;
}
EdgeDataBundle edgeValue = createEdge(edgeDef, event, ifaceSource, sourceGroup, sourceSubGroup, ifaceSink, sinkGroup,
sinkSubGroup, edgeAttribute2, edgeAttribute3, normalizedFields, depthFirstList, loadDateStr, activityDate,
validActivityDate);
Expand Down Expand Up @@ -724,6 +736,9 @@ public long process(KEYIN key, RawRecordContainer event, Multimap<String,Normali
if (ifaceSource == ifaceSink) {
continue;
}
if (!isAllowedCategory(ifaceSource, ifaceSink)) {
continue;
}
EdgeDataBundle edgeValue = createEdge(edgeDef, event, ifaceSource, sourceGroup, subGroup, ifaceSink, sinkGroup, subGroup,
edgeAttribute2, edgeAttribute3, normalizedFields, depthFirstList, loadDateStr, activityDate, validActivityDate);
if (edgeValue != null) {
Expand All @@ -749,6 +764,9 @@ public long process(KEYIN key, RawRecordContainer event, Multimap<String,Normali
for (NormalizedContentInterface ifaceSource : mSource.get(sourceSubGroup)) {
for (String sinkSubGroup : sinkSubGroups) {
for (NormalizedContentInterface ifaceSink : mSink.get(sinkSubGroup)) {
if (!isAllowedCategory(ifaceSource, ifaceSink)) {
continue;
}
EdgeDataBundle edgeValue = createEdge(edgeDef, event, ifaceSource, sourceGroup, sourceSubGroup, ifaceSink, sinkGroup,
sinkSubGroup, edgeAttribute2, edgeAttribute3, normalizedFields, depthFirstList, loadDateStr, activityDate,
validActivityDate);
Expand Down Expand Up @@ -1442,4 +1460,10 @@ public RawRecordMetadata getMetadata() {
public void setVersioningCache(EdgeKeyVersioningCache versioningCache) {
this.versioningCache = versioningCache;
}

public boolean isAllowedCategory(NormalizedContentInterface source, NormalizedContentInterface sink) {
// normalizers that do not specify a category do not have special subtypes, so allow null
return (allowedCategories.isEmpty() || (allowedCategories.contains(source.getTypeCategory()) && allowedCategories.contains(sink.getTypeCategory())
|| (null == source.getTypeCategory() && null == sink.getTypeCategory())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import datawave.data.type.Type;

public class EdgeDefinitionConfigurationHelper {

private static final Logger log = LoggerFactory.getLogger(EdgeDefinitionConfigurationHelper.class);
Expand All @@ -24,6 +26,8 @@ public class EdgeDefinitionConfigurationHelper {

private boolean initialized = false;

private List<Type.Category> allowedTypeCategories;

public List<EdgeDefinition> getEdges() {
if (initialized) {
return edges;
Expand Down Expand Up @@ -185,4 +189,12 @@ public Map<String,String> getEnrichmentTypeMappings() {
public void setEnrichmentTypeMappings(Map<String,String> enrichmentTypeMappings) {
this.enrichmentTypeMappings = enrichmentTypeMappings;
}

public List<Type.Category> getAllowedTypeCategories() {
return allowedTypeCategories;
}

public void setAllowedTypeCategories(List<Type.Category> categories) {
this.allowedTypeCategories = categories;
}
}
Loading