Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #1438 - optionally gather fields after ingest exception #1439

Open
wants to merge 26 commits into
base: integration
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
45df65f
Fix #1438 - optionally gather fields after ingest exception
Feb 24, 2022
8638428
Fix #1438: remove getFields method after researching extensions
Feb 28, 2022
5c7359d
Fix #1438: consolidate FieldHarvester methods and adjust tests
Feb 28, 2022
539b3a2
Fix #1438: preserve original exception instead of wrapping it
Feb 28, 2022
9f09371
Fix #1438: adjust Mapper test to reflect supplemental fields addition…
Feb 28, 2022
a4015f9
Fix #1438: initial PR feedback
Mar 14, 2022
e17f5b3
Fix #1438: change interface as per PR feedback
Mar 14, 2022
d0389ab
Branch was auto-updated on change of target.
github-actions[bot] Apr 22, 2022
70d7d1d
Branch was auto-updated on change of target.
github-actions[bot] Apr 22, 2022
342c896
Branch was auto-updated on change of target.
github-actions[bot] Apr 22, 2022
bf987e3
Branch was auto-updated on change of target.
github-actions[bot] Apr 28, 2022
64cd680
Branch was auto-updated on change of target.
github-actions[bot] Apr 28, 2022
1e1fe86
Branch was auto-updated on change of target.
datawave-bot-builder Apr 28, 2022
ea08a5b
Branch was auto-updated on change of target.
datawave-bot-builder May 2, 2022
09931d7
Branch was auto-updated on change of target.
datawave-bot-builder May 3, 2022
d426b49
Branch was auto-updated on change of target.
datawave-bot-builder May 3, 2022
cabe5f3
Branch was auto-updated on change of target.
datawave-bot-builder May 3, 2022
d02de12
Branch was auto-updated on change of target.
datawave-bot-builder May 10, 2022
99d7027
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
6be4d92
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
edff00d
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
3c76aa1
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
c37c3b0
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
3a4f54b
Branch was auto-updated on change of target.
datawave-bot-builder May 12, 2022
94bac43
Updating for current integration
mineralntl Nov 13, 2024
88b6595
Merge branch 'integration' into task/resilient-error-handling
mineralntl Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import datawave.data.type.OneToManyNormalizerType;
import datawave.ingest.config.IngestConfiguration;
import datawave.ingest.config.IngestConfigurationFactory;
import datawave.ingest.data.RawRecordContainer;
import datawave.ingest.data.Type;
import datawave.ingest.data.TypeRegistry;
import datawave.ingest.data.config.DataTypeHelperImpl;
Expand Down Expand Up @@ -764,6 +765,12 @@ protected NormalizedContentInterface normalizeFieldValue(NormalizedContentInterf
return copy;
}

@Override
public void getEventFields(RawRecordContainer value, Multimap<String,NormalizedContentInterface> fields) {
// default implementation calls legacy method
fields.putAll(this.getEventFields(value));
}

/**
* This is a helper routine that will create and normalize a field out of a base normalized field.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ default boolean isShardExcluded(String fieldName) {
}

/**
* Fully parse the raw record and return a map of field names and values.
* Deprecated. Use #getEventFields(value, fields)
*/
@Deprecated
Multimap<String,NormalizedContentInterface> getEventFields(RawRecordContainer value);

/**
* Fully parse the raw record and update the provided multimap of field names and values, with a partial update in the event of an exception.
*
* @param value
* a {@link RawRecordContainer}
* @return a MultiMap of normalized field values
* @param fields
*/
Multimap<String,NormalizedContentInterface> getEventFields(RawRecordContainer value);
void getEventFields(RawRecordContainer value, Multimap<String,NormalizedContentInterface> fields);

Multimap<String,NormalizedContentInterface> normalizeMap(Multimap<String,NormalizedContentInterface> fields);

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
package datawave.ingest.mapreduce;

import java.util.Date;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.log4j.NDC;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;

import datawave.data.normalizer.DateNormalizer;
import datawave.ingest.data.RawRecordContainer;
import datawave.ingest.data.config.NormalizedContentInterface;
import datawave.ingest.data.config.NormalizedFieldAndValue;
import datawave.ingest.data.config.ingest.CompositeIngest;
import datawave.ingest.data.config.ingest.FilterIngest;
import datawave.ingest.data.config.ingest.IngestHelperInterface;
import datawave.ingest.data.config.ingest.VirtualIngest;
import datawave.ingest.time.Now;
import datawave.util.StringUtils;

/**
* Encapsulates the logic for extracting fields from a record, making use of a provided IngestHelperInterface. Generates virtual fields, composite fields, and
* supplemental fields (like LOAD_DATE, ORIG_FILE, and RAW_FILE). Some logic for handling errors is also included here: extracting salvagable fields if any
* exception occurs, and detecting if there were field errors (indicating a normalization failure).
*/
public class FieldHarvester {
private static final Logger log = Logger.getLogger(FieldHarvester.class);

private static Now now = Now.getInstance();

public static final String LOAD_DATE_FIELDNAME = "LOAD_DATE";
public static final String SEQUENCE_FILE_FIELDNAME = "ORIG_FILE";
public static final String RAW_FILE_FIELDNAME = "RAW_FILE";
public static final String LOAD_SEQUENCE_FILE_NAME = "ingest.event.mapper.load.seq.filename";
public static final String TRIM_SEQUENCE_FILE_NAME = "ingest.event.mapper.trim.sequence.filename";
public static final String LOAD_RAW_FILE_NAME = "ingest.event.mapper.load.raw.filename";

private boolean createSequenceFileName;
private boolean trimSequenceFileName;
private boolean createRawFileName;
private final DateNormalizer dateNormalizer = new DateNormalizer();

private static final String SRC_FILE_DEL = "|";
private Exception originalException;

public FieldHarvester(Configuration configuration) {
this.createSequenceFileName = configuration.getBoolean(LOAD_SEQUENCE_FILE_NAME, true);
this.trimSequenceFileName = configuration.getBoolean(TRIM_SEQUENCE_FILE_NAME, true);
this.createRawFileName = configuration.getBoolean(LOAD_RAW_FILE_NAME, true);
}

/**
* Updates "fields" with extracted, derived, and automatically generated fields. Will capture exception along the way and attempt to add salvaged fields
* before rethrowing the exception.
*
* @param fields
* the Multimap to modify with extracted and generated fields
* @param ingestHelper
* interface to use for field extraction
* @param value
* the record from which the fields will be extracted
* @param offset
* record offset within the source file
* @param splitStart
* the splitStart for the record
*/
public void extractFields(Multimap<String,NormalizedContentInterface> fields, IngestHelperInterface ingestHelper, RawRecordContainer value, long offset,
String splitStart) throws Exception {
// reset exception-in-extraction tracking
this.originalException = null;

// "candidateFields" holds the fields that will eventually be added to "fields"
Multimap<String,NormalizedContentInterface> candidateFields = HashMultimap.create();

try {
// parse the record into its candidate field names and values using the IngestHelperInterface.
ingestHelper.getEventFields(value, candidateFields);
} catch (Exception exception) {
// delay throwing the exception to attempt salvaging
this.originalException = exception;
}

try {
// try adding supplemental fields to candidateFields, whether or not they were complete
addSupplementalFields(value, offset, splitStart, ingestHelper, candidateFields);
} catch (Exception exception) {
if (null == this.originalException) {
this.originalException = exception;
} else {
// preserve original exception and log the latest exception
log.error("A secondary exception occurred while adding supplemental fields", exception);
}
}

// add candidateFields to fields, even if there was an error
// identify if any individual fields contain an error
addFieldsAndDetectFieldErrors(fields, candidateFields);

if (null != this.originalException) {
log.error("Rethrowing original exception after completing field extraction.");
throw originalException;
}
}

@VisibleForTesting
boolean hasError() {
return null != this.originalException;
}

@VisibleForTesting
Exception getOriginalException() {
return this.originalException;
}

private void addSupplementalFields(RawRecordContainer value, long offset, String splitStart, IngestHelperInterface ingestHelper,
Multimap<String,NormalizedContentInterface> fields) {
addVirtualFields(ingestHelper, fields);
addCompositeFields(ingestHelper, fields);
addLoadDateField(fields);
addFileNameFields(value, offset, splitStart, fields);
applyFieldFilters(ingestHelper, fields);
}

private void addVirtualFields(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also get the virtual fields, if applicable.
if (null != newFields && ingestHelper instanceof VirtualIngest) {
VirtualIngest vHelper = (VirtualIngest) ingestHelper;
Multimap<String,NormalizedContentInterface> virtualFields = vHelper.getVirtualFields(newFields);
for (Map.Entry<String,NormalizedContentInterface> v : virtualFields.entries())
newFields.put(v.getKey(), v.getValue());
}
}

private void addCompositeFields(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also get the composite fields, if applicable
if (null != newFields && ingestHelper instanceof CompositeIngest) {
CompositeIngest vHelper = (CompositeIngest) ingestHelper;
Multimap<String,NormalizedContentInterface> compositeFields = vHelper.getCompositeFields(newFields);
for (String fieldName : compositeFields.keySet()) {
// if this is an overloaded composite field, we are replacing the existing field data
if (vHelper.isOverloadedCompositeField(fieldName)) {
newFields.removeAll(fieldName);
}
newFields.putAll(fieldName, compositeFields.get(fieldName));
}
}
}

private void addLoadDateField(Multimap<String,NormalizedContentInterface> newFields) {
// Create a LOAD_DATE parameter, which is the current time in milliseconds, for all datatypes
long loadDate = now.get();
NormalizedFieldAndValue loadDateValue = new NormalizedFieldAndValue(LOAD_DATE_FIELDNAME, Long.toString(loadDate));
// set an indexed field value for use by the date index data type handler
loadDateValue.setIndexedFieldValue(dateNormalizer.normalizeDelegateType(new Date(loadDate)));
newFields.put(LOAD_DATE_FIELDNAME, loadDateValue);
}

private void addRawFileField(RawRecordContainer value, Multimap<String,NormalizedContentInterface> newFields, String seqFileName) {
if (createRawFileName && !value.getRawFileName().isEmpty() && !value.getRawFileName().equals(seqFileName)) {
newFields.put(RAW_FILE_FIELDNAME, new NormalizedFieldAndValue(RAW_FILE_FIELDNAME, value.getRawFileName()));
}
}

private void addOrigFileField(Multimap<String,NormalizedContentInterface> newFields, long offset, String splitStart, String seqFileName) {
if (null != seqFileName) {
StringBuilder seqFile = new StringBuilder(seqFileName);

seqFile.append(SRC_FILE_DEL).append(offset);

if (null != splitStart) {
seqFile.append(SRC_FILE_DEL).append(splitStart);
}

newFields.put(SEQUENCE_FILE_FIELDNAME, new NormalizedFieldAndValue(SEQUENCE_FILE_FIELDNAME, seqFile.toString()));
}
}

private String getSeqFileName() {
String seqFileName;
seqFileName = NDC.peek();

if (trimSequenceFileName) {
seqFileName = StringUtils.substringAfterLast(seqFileName, "/");
}
return seqFileName;
}

private void addFileNameFields(RawRecordContainer value, long offset, String splitStart, Multimap<String,NormalizedContentInterface> newFields) {
String seqFileName = null;

if (createSequenceFileName) {
seqFileName = getSeqFileName();

// place the sequence filename into the event
addOrigFileField(newFields, offset, splitStart, seqFileName);
}

addRawFileField(value, newFields, seqFileName);
}

private void applyFieldFilters(IngestHelperInterface ingestHelper, Multimap<String,NormalizedContentInterface> newFields) {
// Also if this helper needs to filter the fields before returning, apply now
if (ingestHelper instanceof FilterIngest) {
FilterIngest fHelper = (FilterIngest) ingestHelper;
fHelper.filter(newFields);
}
}

/**
* Adds candidateFields to fields. Looks at each of the candidate fields, inspection for field errors. Sets the field harvester's exception field if any
* field errors were found.
*/
private void addFieldsAndDetectFieldErrors(Multimap<String,NormalizedContentInterface> fields,
Multimap<String,NormalizedContentInterface> candidateFields) {
if (null == candidateFields) {
return;
}
Throwable fieldError = null;
for (Map.Entry<String,NormalizedContentInterface> entry : candidateFields.entries()) {
// noinspection ThrowableResultOfMethodCallIgnored
if (null != entry.getValue().getError()) {
fieldError = entry.getValue().getError();
}
fields.put(entry.getKey(), entry.getValue());
}
if (null != fieldError) {
if (null == this.originalException) {
this.originalException = new FieldNormalizationError("Failed getting all fields", fieldError);
} else {
// preserve original exception
log.error("A field exception was observed while adding fields", fieldError);
}
}
}

public static class FieldNormalizationError extends Exception {

private static final long serialVersionUID = 1L;

public FieldNormalizationError(String message, Throwable cause) {
super(message, cause);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
import datawave.ingest.data.config.NormalizedContentInterface;
import datawave.ingest.data.config.ingest.IngestHelperInterface;
import datawave.ingest.data.normalizer.SimpleGroupFieldNameParser;
import datawave.ingest.mapreduce.EventMapper;
import datawave.ingest.mapreduce.FieldHarvester;
import datawave.ingest.mapreduce.handler.ExtendedDataTypeHandler;
import datawave.ingest.mapreduce.handler.edge.define.EdgeDataBundle;
import datawave.ingest.mapreduce.handler.edge.define.EdgeDefinition;
Expand Down Expand Up @@ -808,7 +808,7 @@ private void setupPreconditionEvaluation(Multimap<String,NormalizedContentInterf
// this could be moved to a more generic class
private String getLoadDateString(Multimap<String,NormalizedContentInterface> fields) {
String loadDateStr;
Collection<NormalizedContentInterface> loadDates = fields.get(EventMapper.LOAD_DATE_FIELDNAME);
Collection<NormalizedContentInterface> loadDates = fields.get(FieldHarvester.LOAD_DATE_FIELDNAME);
if (!loadDates.isEmpty()) {
NormalizedContentInterface nci = loadDates.iterator().next();
Date date = new Date(Long.parseLong(nci.getEventFieldValue()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import datawave.ingest.config.IngestConfigurationFactory;
import datawave.ingest.config.RawRecordContainerImpl;
import datawave.ingest.data.config.MarkingsHelper;
import datawave.util.TypeRegistryTestSetup;

public class RawRecordContainerImplTest {

Expand Down Expand Up @@ -53,8 +54,7 @@ public void setUp() throws Exception {
conf.set("samplecsv" + TypeRegistry.INGEST_HELPER, TestCSVIngestHelper.class.getName());
conf.set("samplecsv.reader.class", TestCSVReader.class.getName());
conf.set("samplecsv" + MarkingsHelper.DEFAULT_MARKING, "PUBLIC|PRIVATE");
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
dataType = TypeRegistry.getType("samplecsv");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import org.junit.Before;
import org.junit.Test;

import datawave.ingest.data.TypeRegistry;
import datawave.policy.IngestPolicyEnforcer;
import datawave.util.TypeRegistryTestSetup;

public class DataTypeHelperImplTest {

Expand All @@ -25,8 +25,7 @@ public void setup() {
@Test(expected = IllegalArgumentException.class)
public void testInvalidConfig() {
DataTypeHelperImpl helper = new DataTypeHelperImpl();
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
helper.setup(conf);
}

Expand All @@ -35,9 +34,8 @@ public void testValidConfig() throws Exception {
InputStream configStream = getClass().getResourceAsStream("/fake-datatype-config.xml");
Assert.assertNotNull(configStream);
conf.addResource(configStream);
Assert.assertThat(conf.get("data.name"), is("fake"));
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
Assert.assertEquals(conf.get("data.name"), ("fake"));
TypeRegistryTestSetup.resetTypeRegistry(conf);
DataTypeHelperImpl helper = new DataTypeHelperImpl();
helper.setup(conf);

Expand All @@ -54,8 +52,7 @@ public void testDowncaseFields() throws Exception {
Assert.assertNotNull(configStream);
conf.addResource(configStream);
conf.set("fake" + DataTypeHelper.Properties.DOWNCASE_FIELDS, "one,two,three,FOUR");
TypeRegistry.reset();
TypeRegistry.getInstance(conf);
TypeRegistryTestSetup.resetTypeRegistry(conf);
DataTypeHelperImpl helper = new DataTypeHelperImpl();
helper.setup(conf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public static class NonGroupedInstance implements NormalizedContentInterface {
private Map<String,String> _markings;
private Throwable _error;

protected NonGroupedInstance() {
public NonGroupedInstance() {
_fieldName = "TestNonGroupedInstance";

_indexedFieldName = "TestIndexedField";
Expand Down Expand Up @@ -142,7 +142,7 @@ public static class GroupedInstance implements GroupedNormalizedContentInterface
private String _group;
private String _subGroup;

protected GroupedInstance() {
public GroupedInstance() {
_fieldName = "TestNonGroupedInstance";

_indexedFieldName = "TestIndexedField";
Expand Down
Loading
Loading