Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

public class BaseType<T extends Comparable<T> & Serializable> implements Serializable, Type<T>, ObjectSizeOf {

private static final long serialVersionUID = 5354270429891763693L;
private static final long serialVersionUID = -3747720721391071135L;
private static final long STATIC_SIZE = PrecomputedSizes.STRING_STATIC_REF + Sizer.REFERENCE + Sizer.REFERENCE;

protected T delegate;
Expand All @@ -34,7 +34,8 @@ public T getDelegate() {
}

public void setDelegateFromString(String in) {
setDelegate(normalizer.denormalize(in));
T denormalized = normalizer.denormalize(in);
setDelegate(denormalized);
}

public void setDelegate(T delegate) {
Expand Down Expand Up @@ -91,7 +92,8 @@ public boolean normalizedRegexIsLossy(String in) {

@Override
public void normalizeAndSetNormalizedValue(T valueToNormalize) {
setNormalizedValue(normalizer.normalizeDelegateType(valueToNormalize));
String normalized = normalizer.normalizeDelegateType(valueToNormalize);
setNormalizedValue(normalized);
}

public void validate() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@

import java.util.concurrent.TimeUnit;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;

/**
* TypeFactory that uses an internal loading cache to limit new Type objects
Expand Down Expand Up @@ -33,15 +32,12 @@ public TypeFactory() {
*/
public TypeFactory(int size, int timeout) {
// @formatter:off
typeCache = CacheBuilder.newBuilder()
typeCache = Caffeine.newBuilder()
.maximumSize(size)
.expireAfterWrite(timeout, TimeUnit.MINUTES)
.build(new CacheLoader<>() {
@Override
public Type<?> load(String className) throws Exception {
Class<?> clazz = Class.forName(className);
return (Type<?>) clazz.getDeclaredConstructor().newInstance();
}
.build(className-> {
Class<?> clazz = Class.forName(className);
return (Type<?>) clazz.getDeclaredConstructor().newInstance();
});
// @formatter:on
}
Expand All @@ -67,6 +63,6 @@ public Type<?> createType(String className) {
* @return the current cache size
*/
public long getCacheSize() {
return typeCache.size();
return typeCache.estimatedSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

import datawave.core.cache.CaffeineClassCache;
import datawave.core.cache.ClassCache;
import datawave.query.Constants;
import datawave.query.jexl.DatawaveJexlContext;

Expand All @@ -41,6 +43,9 @@ public abstract class Attribute<T extends Comparable<T>> implements WritableComp
protected int hashcode = Integer.MIN_VALUE;
protected long sizeInBytes = Long.MIN_VALUE;

// used by Document, Attributes and TypeAttribute
protected static final ThreadLocal<ClassCache> classCache = ThreadLocal.withInitial(CaffeineClassCache::new);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious why its thread local. Off the top of my head it sounds like it'd be better to share across all threads since we're still gonna end up doing reflection on each thread. Is it to remove contention?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Even though the Caffeine cache won out on performance there was still a lot of contention in an environment with lots of threads. Making this ThreadLocal improved the time to parse each document and reduced the overall wall clock time.


public Attribute() {}

public Attribute(Key metadata, boolean toKeep) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,16 @@
import datawave.marking.MarkingFunctions;
import datawave.query.collections.FunctionalSet;
import datawave.query.jexl.DatawaveJexlContext;
import datawave.query.util.cache.ClassCache;

public class Attributes extends AttributeBag<Attributes> implements Serializable {

private static final long serialVersionUID = 4677957768640489928L;
private static final long serialVersionUID = 6225336487950799972L;
private static final Logger log = Logger.getLogger(Attributes.class);
private Set<Attribute<? extends Comparable<?>>> attributes;
private int _count = 0;
// cache the size in bytes as it can be expensive to compute on the fly if we have many attributes
private long _bytes = super.sizeInBytes(16) + 16 + 48;

private static final ClassCache classCache = new ClassCache();

/**
* Should sizes of documents be tracked
*/
Expand Down Expand Up @@ -149,7 +146,7 @@ public void readFields(DataInput in) throws IOException {
String attrClassName = WritableUtils.readString(in);
Class<?> clz = null;
try {
clz = classCache.get(attrClassName);
clz = classCache.get().get(attrClassName);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down Expand Up @@ -350,7 +347,7 @@ private Attribute<?> createAttributeFromClassName(String clazzName) {
Class<?> clz;
try {
// Get the Class for the name of the class of the concrete Attribute
clz = classCache.get(clazzName);
clz = classCache.get().get(clazzName);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;

import org.apache.accumulo.core.data.Key;
Expand All @@ -20,7 +21,7 @@
import datawave.query.jexl.DatawaveJexlContext;

public class Content extends Attribute<Content> implements Serializable {
private static final long serialVersionUID = -642410227862723970L;
private static final long serialVersionUID = -7916992260001007223L;

private static final Type<?> normalizer = new LcNoDiacriticsType();

Expand Down Expand Up @@ -157,13 +158,17 @@ public void read(Kryo kryo, Input input) {
this.toKeep = input.readBoolean();
boolean hasSource = input.readBoolean();
if (hasSource) {
String clazz = input.readString();
Class sourceClass;
String className = null;
try {
sourceClass = Class.forName(clazz);
source = (Attribute<?>) sourceClass.newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
throw new RuntimeException("could not parse source", e);
className = input.readString();
Class<?> clazz = classCache.get().get(className);
if(Attribute.class.isAssignableFrom(clazz)) {
source = (Attribute<?>) clazz.getDeclaredConstructor().newInstance();
} else {
throw new RuntimeException(className + " does not extend " + Attribute.class.getSimpleName());
}
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new RuntimeException("Could not instantiate " + className, e);
}

source.read(kryo, input);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,15 @@
import datawave.query.predicate.EventDataQueryFilter;
import datawave.query.predicate.ValueToAttributes;
import datawave.query.util.TypeMetadata;
import datawave.query.util.cache.ClassCache;
import datawave.util.time.DateHelper;

public class Document extends AttributeBag<Document> implements Serializable {
private static final long serialVersionUID = -377226620954754934L;
private static final long serialVersionUID = -7939658996525050446L;

private static final Logger log = Logger.getLogger(Document.class);

public static final String DOCKEY_FIELD_NAME = "RECORD_ID";

private static final ClassCache classCache = new ClassCache();

// @formatter:off
private static final LoadingCache<Text, Long> timestampCache = CacheBuilder.newBuilder()
.maximumSize(128)
Expand Down Expand Up @@ -864,7 +861,7 @@ private Attribute<?> createAttributeFromClassName(String clazzName) {
Class<?> clz;
try {
// Get the Class for the name of the class of the concrete Attribute
clz = classCache.get(clazzName);
clz = classCache.get().get(clazzName);
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,16 @@
import datawave.data.type.Type;
import datawave.query.collections.FunctionalSet;
import datawave.query.jexl.DatawaveJexlContext;
import datawave.query.util.cache.ClassCache;
import datawave.webservice.query.data.ObjectSizeOf;

public class TypeAttribute<T extends Comparable<T>> extends Attribute<TypeAttribute<T>> implements Serializable {

private static final long serialVersionUID = 7264249641813898860L;
private static final long serialVersionUID = -6108667228858778287L;

private static final Logger log = Logger.getLogger(TypeAttribute.class);

private static final ClassCache classCache = new ClassCache();

private Type<T> datawaveType;

private int hashCode = Integer.MIN_VALUE;
private String delegateString = null;

protected TypeAttribute() {
Expand Down Expand Up @@ -100,7 +96,7 @@ public void readFields(DataInput in) throws IOException {
}
readMetadata(in);
if (datawaveType == null) {
datawaveType = (Type) new NoOpType();
datawaveType = (Type<T>) new NoOpType();
}
this.datawaveType.setDelegateFromString(WritableUtils.readString(in));
this.toKeep = WritableUtils.readVInt(in) != 0;
Expand All @@ -125,7 +121,7 @@ public boolean equals(Object o) {
}

if (o instanceof TypeAttribute) {
TypeAttribute other = (TypeAttribute) o;
TypeAttribute<T> other = (TypeAttribute<T>) o;
return this.getType().equals(other.getType()) && (0 == this.compareMetadata(other));
}

Expand Down Expand Up @@ -189,17 +185,18 @@ public void read(Kryo kryo, Input input) {
}
super.readMetadata(kryo, input);
if (datawaveType == null) {
datawaveType = (Type) new NoOpType();
datawaveType = (Type<T>) new NoOpType();
}
this.datawaveType.read(kryo, input);
this.toKeep = input.readBoolean();
this.hashCode = input.readInt(true);
this.hashcode = input.readInt(true);
}

@SuppressWarnings("unchecked")
private void setDatawaveType(String datawaveTypeString)
throws InstantiationException, IllegalAccessException, ClassNotFoundException, NoSuchMethodException, InvocationTargetException {
Class<?> clazz = classCache.get(datawaveTypeString);
Constructor<Type> constructor = (Constructor<Type>) clazz.getDeclaredConstructor();
Class<?> clazz = classCache.get().get(datawaveTypeString);
Constructor<Type<T>> constructor = (Constructor<Type<T>>) clazz.getDeclaredConstructor();
this.datawaveType = constructor.newInstance();
}

Expand All @@ -209,8 +206,8 @@ private void setDatawaveType(String datawaveTypeString)
* @see Attribute#deepCopy()
*/
@Override
public TypeAttribute copy() {
return new TypeAttribute(this.getType(), this.getMetadata(), this.isToKeep());
public TypeAttribute<T> copy() {
return new TypeAttribute<>(this.getType(), this.getMetadata(), this.isToKeep());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@
* Cache for Class instances.
* <p>
* Used by {@link TypeAttribute} and {@link Attributes}
* <p>
* Replaced by {@link datawave.core.cache.CaffeineClassCache}
*/
@Deprecated(forRemoval = true)
public class ClassCache {

// @formatter:off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.ByteArrayInputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -28,6 +29,7 @@
import datawave.data.type.NumberType;
import datawave.query.attributes.Attribute;
import datawave.query.attributes.AttributeFactory;
import datawave.query.attributes.Content;
import datawave.query.attributes.Document;
import datawave.query.attributes.DocumentKey;
import datawave.query.function.deserializer.KryoDocumentDeserializer;
Expand Down Expand Up @@ -56,6 +58,8 @@ public class DocumentSerializationIT {
public void setup() {
TypeMetadata metadata = new TypeMetadata();
metadata.put("FIELD_A", datatype, LcNoDiacriticsType.class.getTypeName());
metadata.put("FIELD_B", datatype, LcNoDiacriticsType.class.getTypeName());
metadata.put("FIELD_C", datatype, LcNoDiacriticsType.class.getTypeName());
metadata.put("NUM", datatype, NumberType.class.getTypeName());
metadata.put("DATE", datatype, DateType.class.getTypeName());

Expand Down Expand Up @@ -143,9 +147,16 @@ private List<byte[]> createSerializedDocuments(int n) {

private Document createDocument() {
Document doc = new Document();
doc.put("FIELD_A", createAttribute("FIELD_A", "some text"));
doc.put("FIELD_A", createAttribute("FIELD_A", "more text"));
doc.put("FIELD_A", createAttribute("FIELD_A", "less text"));
Set<String> fields = Set.of("FIELD_A", "FIELD_B", "FIELD_C");
for (String field : fields) {
for (int i = 0; i < 6; i++) {
doc.put(field, createAttribute(field, "some random text: " + i));
}
}
for (int i = 0; i < 10; i++) {
Content content = new Content("token " + i, documentKey, true);
doc.put("CONTENT", content);
}
doc.put("NUM", createAttribute("NUM", "23"));
doc.put("DATE", createAttribute("DATE", DateHelper.format(System.currentTimeMillis())));

Expand Down
Loading