Skip to content

Commit

Permalink
Fix erroneous LastUpdated timestamps appearing in data dictionary (#2489
Browse files Browse the repository at this point in the history
)

* Remove code that makes incorrect timestamps appear in the Data Dictionary

* Add unit test to validate incorrect timestamps do not appear.

---------

Co-authored-by: palindrome <[email protected]>
  • Loading branch information
foster33 and hlgp authored Nov 14, 2024
1 parent cd604f3 commit 9556319
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,6 @@ public void doReduce(BulkIngestKey key, Iterable<Value> values, TaskInputOutputC
}
ctx.getCounter(IngestOutput.TIMESTAMP_DUPLICATE).increment(duplicates);
} else {
/**
* Aggregator values if ts < 0, it is a by product of the ts deduper (combiner)
*
*/
ts = outKey.getKey().getTimestamp();

if (usingCombiner && (ts < 0)) {
outKey.getKey().setTimestamp(-1 * ts * MILLISPERDAY);
}

Iterator<Value> valueItr = values.iterator();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,6 @@
package datawave.ingest.mapreduce.job.reduce;

import static datawave.ingest.config.TableConfigCache.ACCUMULO_CONFIG_CACHE_PATH_PROPERTY;
import static datawave.ingest.config.TableConfigCache.DEFAULT_ACCUMULO_CONFIG_CACHE_PATH;
import static datawave.ingest.data.config.ingest.AccumuloHelper.INSTANCE_NAME;
import static datawave.ingest.data.config.ingest.AccumuloHelper.PASSWORD;
import static datawave.ingest.data.config.ingest.AccumuloHelper.USERNAME;
import static datawave.ingest.data.config.ingest.AccumuloHelper.ZOOKEEPERS;
import static datawave.ingest.mapreduce.job.TableConfigurationUtil.ITERATOR_CLASS_MARKER;
import static datawave.ingest.mapreduce.job.reduce.AggregatingReducer.INGEST_VALUE_DEDUP_AGGREGATION_KEY;
import static datawave.ingest.mapreduce.job.reduce.AggregatingReducer.MILLISPERDAY;
import static datawave.ingest.mapreduce.job.reduce.AggregatingReducer.USE_AGGREGATOR_PROPERTY;
import static datawave.ingest.mapreduce.job.reduce.BulkIngestKeyAggregatingReducer.CONTEXT_WRITER_CLASS;
Expand All @@ -19,7 +12,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand All @@ -32,7 +24,6 @@
import org.apache.accumulo.core.iterators.Combiner;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.commons.math3.analysis.function.Pow;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
Expand All @@ -41,13 +32,10 @@
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
import org.apache.hadoop.yarn.webapp.hamlet2.Hamlet;
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.easymock.PowerMock;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
Expand All @@ -56,9 +44,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;

import datawave.ingest.config.TableConfigCache;
import datawave.ingest.data.config.ConfigurationHelper;
import datawave.ingest.data.config.ingest.AccumuloHelper;
import datawave.ingest.mapreduce.job.BulkIngestKey;
import datawave.ingest.mapreduce.job.TableConfigurationUtil;
import datawave.ingest.mapreduce.job.writer.BulkContextWriter;
Expand Down Expand Up @@ -88,6 +73,7 @@ public class BulkIngestKeyAggregatingReducerTest {
private Counter tab2Counter;
private Counter tab3Counter;
private Counter combinerCounter;
private Counter negativeTimestampCounter;
private Counter dupCounter;

private int expectedDuplicateKey;
Expand All @@ -98,6 +84,7 @@ public class BulkIngestKeyAggregatingReducerTest {
private int expectedTab2Counter;
private int expectedTab3Counter;
private int expectedCombinerCounter;
private int expectedNegativeTimestampCounter;
private int expectedDupCounter;

private TaskID taskID;
Expand All @@ -121,6 +108,7 @@ public void setup() throws Exception {
tab2Counter = (Counter) new GenericCounter();
tab3Counter = (Counter) new GenericCounter();
combinerCounter = (Counter) new GenericCounter();
negativeTimestampCounter = (Counter) new GenericCounter();
dupCounter = (Counter) new GenericCounter();

expectedDuplicateKey = 0;
Expand All @@ -131,6 +119,7 @@ public void setup() throws Exception {
expectedTab2Counter = 0;
expectedTab3Counter = 0;
expectedCombinerCounter = 0;
expectedNegativeTimestampCounter = 0;
expectedDupCounter = 0;

conf = (Configuration) PowerMockito.mock(Configuration.class);
Expand Down Expand Up @@ -255,6 +244,7 @@ private void checkCounterValues() {
assertEquals(expectedTab2Counter, tab2Counter.getValue());
assertEquals(expectedTab3Counter, tab3Counter.getValue());
assertEquals(expectedCombinerCounter, combinerCounter.getValue());
assertEquals(expectedNegativeTimestampCounter, negativeTimestampCounter.getValue());
}

@Test
Expand Down Expand Up @@ -554,6 +544,29 @@ public void testUsingCombinerWithVerbosePartitioningCounters() throws Exception
assertEquals(expected, output);
}

@Test
public void testUsingCombinerWithNegativeTimestamps() throws Exception {
setupUsingCombiner();
reducer.setup(conf);

performDoReduce("table1", "r1", 4, -3 * MILLISPERDAY + MILLISPERDAY / 2, ExpectedValueType.COMBINED_VALUES);
performDoReduce("table1", "r2", 3, 3 * MILLISPERDAY + MILLISPERDAY / 3, ExpectedValueType.COMBINED_VALUES);
performDoReduce("table1", "r3", 1, -3 * MILLISPERDAY, ExpectedValueType.COMBINED_VALUES);
performDoReduce("table2", "r1", 2, -2 * MILLISPERDAY + MILLISPERDAY, ExpectedValueType.FIRST_VALUE);
performDoReduce("table2", "r2", 0, -2 * MILLISPERDAY + MILLISPERDAY, ExpectedValueType.ALL_VALUES);
performDoReduce("table2", "r3", 3, -2 * MILLISPERDAY + MILLISPERDAY / 3, ExpectedValueType.FIRST_VALUE);
performDoReduce("table3", "r1", 3, -4 * MILLISPERDAY + MILLISPERDAY / 3, ExpectedValueType.COMBINED_VALUES);
performDoReduce("table3", "r2", 0, -4 * MILLISPERDAY, ExpectedValueType.COMBINED_VALUES);
performDoReduce("table1", "r1", 4, 4 * MILLISPERDAY + MILLISPERDAY / 2, ExpectedValueType.COMBINED_VALUES);
performDoReduce("table1", "r2", 3, 2 * MILLISPERDAY + MILLISPERDAY, ExpectedValueType.COMBINED_VALUES);

expectedDuplicateKey = 2;
expectedCombinerCounter = 7;
expectedNegativeTimestampCounter = 7;
checkCounterValues();
assertEquals(expected, output);
}

private void performDoReduce(String table, String row, int numberOfValues) throws Exception {
performDoReduce(table, row, numberOfValues, 1L, ExpectedValueType.FIRST_VALUE);
}
Expand Down Expand Up @@ -586,6 +599,10 @@ private void performDoReduce(String table, String row, int numberOfValues, long
}

reducer.doReduce(bulkIngestKey, values, context);

if (bulkIngestKey.getKey().getTimestamp() < 0) {
negativeTimestampCounter.increment(1);
}
}

public static Value combineValues(Iterator<Value> iter) {
Expand Down

0 comments on commit 9556319

Please sign in to comment.