Skip to content

Commit

Permalink
ConfigurableAgeOffFilter revised caching implementation (#2532)
Browse files Browse the repository at this point in the history
* ConfigurableAgeOffFilter revised caching implementation
Implements a cache loader that determines when age-off rules have expired. The cache value
in the loader provides state information on 1) base ruleset and 2) timestamp/version of rules.

Additionally, set a volatile modifier on the rule cache used in the age-off filter.

* Update log message interval for age-off filter

Co-authored-by: Moriarty <[email protected]>

* Fixed formatter on imports

* Age-off rule load log message update for errors

* Age-off rule load exceptions updated to be more specific

---------

Co-authored-by: Moriarty <[email protected]>
Co-authored-by: hgklohr <[email protected]>
  • Loading branch information
3 people authored Aug 30, 2024
1 parent 8b84fa7 commit 3dd29d0
Show file tree
Hide file tree
Showing 16 changed files with 553 additions and 121 deletions.
5 changes: 5 additions & 0 deletions warehouse/age-off/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<version>${version.hamcrest}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<!-- Needed for org.slf4j imports -->
<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
Expand All @@ -12,6 +13,7 @@

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;

import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.hadoop.io.IOUtils;
Expand All @@ -21,6 +23,7 @@
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import datawave.iterators.filter.AgeOffConfigParams;
import datawave.iterators.filter.ageoff.FilterOptions;
Expand All @@ -34,14 +37,18 @@ public AgeOffRuleLoader(AgeOffFileLoaderDependencyProvider loaderConfig) {
this.loaderConfig = loaderConfig;
}

public List<FilterRule> load(InputStream in) throws IOException, java.lang.reflect.InvocationTargetException, NoSuchMethodException {
public List<FilterRule> load(InputStream in) throws IOException {
List<RuleConfig> mergedRuleConfigs = loadRuleConfigs(in);
List<FilterRule> filterRules = new ArrayList<>();
/**
* This has been changed to support extended options.
*/
for (RuleConfig ruleConfig : mergedRuleConfigs) {
try {
if (ruleConfig.filterClassName == null) {
throw new IllegalArgumentException("The filter class must not be null");
}

FilterRule filter = (FilterRule) Class.forName(ruleConfig.filterClassName).getDeclaredConstructor().newInstance();

FilterOptions option = new FilterOptions();
Expand Down Expand Up @@ -69,8 +76,9 @@ public List<FilterRule> load(InputStream in) throws IOException, java.lang.refle

filterRules.add(filter);

} catch (InstantiationException | ClassNotFoundException | IllegalAccessException e) {
log.error(e);
} catch (IllegalArgumentException | InstantiationException | ClassNotFoundException | IllegalAccessException | InvocationTargetException
| NoSuchMethodException e) {
log.trace("An error occurred while loading age-off rules, the exception will be rethrown", e);
throw new IOException(e);
}
}
Expand Down Expand Up @@ -133,8 +141,8 @@ protected List<RuleConfig> loadRuleConfigs(InputStream in) throws IOException {
ruleConfigs.addAll(childRules);

// @formatter:on
} catch (Exception ex) {
log.error("uh oh: " + ex);
} catch (ParserConfigurationException | SAXException ex) {
log.trace("An error occurred while loading age-off rules, the exception will be rethrown", ex);
throw new IOException(ex);
} finally {
IOUtils.closeStream(in);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package datawave.ingest.util.cache.watch;

import java.io.IOException;
import java.io.InputStream;

import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.w3c.dom.Node;

class FileLoaderDependencyProvider implements AgeOffRuleLoader.AgeOffFileLoaderDependencyProvider {
private final FileSystem fs;
private final Path filePath;
private final IteratorEnvironment iterEnv;

FileLoaderDependencyProvider(FileSystem fs, Path filePath, IteratorEnvironment iterEnv) {
this.fs = fs;
this.filePath = filePath;
this.iterEnv = iterEnv;
}

@Override
public IteratorEnvironment getIterEnv() {
return iterEnv;
}

@Override
public InputStream getParentStream(Node parent) throws IOException {

String parentPathStr = parent.getTextContent();

if (null == parentPathStr || parentPathStr.isEmpty()) {
throw new IllegalArgumentException("Invalid parent config path, none specified!");
}
// loading parent relative to dir that child is in.
Path parentPath = new Path(filePath.getParent(), parentPathStr);
if (!fs.exists(parentPath)) {
throw new IllegalArgumentException("Invalid parent config path specified, " + parentPathStr + " does not exist!");
}
return fs.open(parentPath);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package datawave.ingest.util.cache.watch;

import java.io.IOException;

import org.apache.hadoop.fs.Path;

import com.google.common.cache.CacheLoader;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

/**
* Cache loader implementation for loading {@link FileRuleCacheValue} referencing {@link Path} keys.
*/
public class FileRuleCacheLoader extends CacheLoader<String,FileRuleCacheValue> {
private final static int CONFIGURED_DIFF = 1;

/**
* Reloads a new {@link FileRuleCacheValue} if the cached value has changes, otherwise returns the @param oldValue.
*
* @param key
* the key to reload for
* @param oldValue
* the existing value
* @return a new value if there are changes, otherwise @param oldValue is returned
* @throws IOException
* if any errors occur when loading a new instance of the cache value
*/
@Override
public ListenableFuture<FileRuleCacheValue> reload(String key, FileRuleCacheValue oldValue) throws IOException {
// checks here are performed on the caller thread
FileRuleCacheValue resultValue = oldValue.hasChanges() ? load(key) : oldValue;
return Futures.immediateFuture(resultValue);
}

/**
* Loads a new rule cache value instance
*
* @param key
* the non-null key whose value should be loaded
* @return a new rule cache value instance
* @throws IOException
* if any errors occur when loading a new instance of the cache value
*/
@Override
public FileRuleCacheValue load(String key) throws IOException {
return FileRuleCacheValue.newCacheValue(key, CONFIGURED_DIFF);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package datawave.ingest.util.cache.watch;

import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;

import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;

import com.google.common.annotations.VisibleForTesting;

import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterRule;

/**
* Rule cache value implementation for use with age-off rule loading. The implementation is thread-safe and supports concurrent access for all methods.
*/
public class FileRuleCacheValue {
private final static Logger log = Logger.getLogger(FileRuleCacheValue.class);

private final Path filePath;
private final long configuredDiff;
private final FileSystem fs;

private volatile FileRuleReference ruleRef;

FileRuleCacheValue(FileSystem fs, Path filePath, long configuredDiff) {
this.filePath = filePath;
this.configuredDiff = configuredDiff;
this.fs = fs;
}

/**
* Creates a new instance of this class for the specified @param filePath. Actual evaluation of the @param filePath are deferred until calls to
* {@link #newRulesetView(long, IteratorEnvironment)}
*
* @param filePath
* the file path to prepare a cached representation on
* @param configuredDiff
* the threshold time (in milliseconds) for when timestamp differences are considered changes
* @return a new cache value instance
* @throws IOException
* if the cache value instance cannot be created
*/
public static FileRuleCacheValue newCacheValue(String filePath, long configuredDiff) throws IOException {
Path filePathObj = new Path(filePath);
FileSystem fs = filePathObj.getFileSystem(new Configuration());
return new FileRuleCacheValue(fs, filePathObj, configuredDiff);
}

/**
* Gets the file path of this instance.
*
* @return path for the instance
*/
public Path getFilePath() {
return filePath;
}

/**
* Check if the cached representation has changes. Changes are determined by checking the baseline modification time when the cached representation was
* discovered against the current modification time of the file.
*
* @return true if there are changes, otherwise false
*/
public boolean hasChanges() {
if (ruleRef == null) {
return true;
}
long currentTime;
try {
currentTime = fs.getFileStatus(filePath).getModificationTime();
} catch (IOException e) {
log.debug("Error getting file status for: " + filePath, e);
return true;
}
long previousTime = ruleRef.getTimestamp();
boolean changed = (currentTime - previousTime) > configuredDiff;
if (log.isTraceEnabled()) {
log.trace("Changes result: " + changed + ", current time: " + currentTime);
}
return changed;
}

/**
* Creates a new ruleset view of the file. The initial call to the method will lazily create the base rules and return a view of the baseline rules. The
* next calls will create new view copies derived from the baseline rules.
*
* @param scanStart
* the start of a scan operation to use for the ruleset
* @param iterEnv
* the iterator environment for the scan
* @return a deep copy of the cached {@link AppliedRule} baseline rules
* @throws IOException
* if there are errors during the cache value creation, on initial call
*/
public Collection<AppliedRule> newRulesetView(long scanStart, IteratorEnvironment iterEnv) throws IOException {
// rule initialization/copies are performed on the calling thread
// the base iterator rules will use an iterator environment from the caller (and keep in the AppliedRule)
// the deep copy always creates new views of the rules with the caller's iterator environment
if (ruleRef == null) {
long ts = fs.getFileStatus(filePath).getModificationTime();
Collection<FilterRule> rulesBase = loadFilterRules(iterEnv);
ruleRef = new FileRuleReference(ts, rulesBase);
}
return ruleRef.deepCopy(scanStart, iterEnv);
}

@VisibleForTesting
Collection<FilterRule> loadFilterRules(IteratorEnvironment iterEnv) throws IOException {
AgeOffRuleLoader ruleLoader = new AgeOffRuleLoader(new FileLoaderDependencyProvider(fs, filePath, iterEnv));
Collection<FilterRule> rulesBase;
try (InputStream in = fs.open(filePath)) {
rulesBase = ruleLoader.load(in);
}
return rulesBase;
}

@VisibleForTesting
FileRuleReference getRuleRef() {
return ruleRef;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package datawave.ingest.util.cache.watch;

import java.util.Collection;
import java.util.stream.Collectors;

import org.apache.accumulo.core.iterators.IteratorEnvironment;

import datawave.iterators.filter.ageoff.AppliedRule;
import datawave.iterators.filter.ageoff.FilterRule;

class FileRuleReference {
private final long ts;
private final Collection<FilterRule> rulesBase;

FileRuleReference(long ts, Collection<FilterRule> rulesBase) {
this.ts = ts;
this.rulesBase = rulesBase;
}

public long getTimestamp() {
return ts;
}

public Collection<AppliedRule> deepCopy(long scanStart, IteratorEnvironment iterEnv) {
return rulesBase.stream().map(rule -> (AppliedRule) rule.deepCopy(scanStart, iterEnv)).collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.log4j.Logger;
import org.w3c.dom.Node;

import datawave.iterators.filter.ageoff.FilterRule;

Expand Down Expand Up @@ -89,37 +88,10 @@ public FileRuleWatcher(Path filePath, long configuredDiff, IteratorEnvironment i
@Override
protected Collection<FilterRule> loadContents(InputStream in) throws IOException {
try {
AgeOffRuleLoader ruleLoader = new AgeOffRuleLoader(new FileWatcherDependencyProvider());
AgeOffRuleLoader ruleLoader = new AgeOffRuleLoader(new FileLoaderDependencyProvider(fs, filePath, iterEnv));
return ruleLoader.load(in);
} catch (Exception ex) {
log.error("uh oh: " + ex);
throw new IOException(ex);
} finally {
IOUtils.closeStream(in);
}
}

private class FileWatcherDependencyProvider implements AgeOffRuleLoader.AgeOffFileLoaderDependencyProvider {
@Override
public IteratorEnvironment getIterEnv() {
return iterEnv;
}

@Override
public InputStream getParentStream(Node parent) throws IOException {

String parentPathStr = parent.getTextContent();

if (null == parentPathStr || parentPathStr.isEmpty()) {
throw new IllegalArgumentException("Invalid parent config path, none specified!");
}
// loading parent relative to dir that child is in.
Path parentPath = new Path(filePath.getParent(), parentPathStr);
if (!fs.exists(parentPath)) {
throw new IllegalArgumentException("Invalid parent config path specified, " + parentPathStr + " does not exist!");
}
return fs.open(parentPath);
}
}

}
Loading

0 comments on commit 3dd29d0

Please sign in to comment.