Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
Expand Down Expand Up @@ -40,6 +41,7 @@
import org.apache.accumulo.core.data.PartialKey;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.FirstEntryInRowIterator;
import org.apache.accumulo.core.iterators.ValueFormatException;
import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
Expand Down Expand Up @@ -1692,6 +1694,105 @@ public Map<String,Long> getCountsForFieldsInDateRange(Set<String> fields, Set<St
return fieldCounts;
}

/**
* Get fields that have not been ingested within the date range (start and end dates are inclusive).
*
* @param fields
* the fields
* @param datatypes
* the datatypes
* @param beginDate
* the start date
* @param endDate
* the end date
* @param specialFields
* special fields to exclude from search
* @return a set of missing fields from the given date range
*/
public Set<String> getMissingFieldsInDateRange(Set<String> fields, Set<String> datatypes, String beginDate, String endDate, Set<String> specialFields) {
SortedSet<String> sortedDatatypes = new TreeSet<>(datatypes);
Set<String> foundFields = new HashSet<>();
fields = Sets.difference(fields, specialFields);
Set<Range> ranges = createExactFieldCountRanges(fields);
StringBuilder dataTypeRegex = new StringBuilder();
List<IteratorSetting> settings = new ArrayList<>();

if (ranges.isEmpty()) {
return Collections.emptySet();
}

int index = 0;
for (String dataType : sortedDatatypes) {
if (index < sortedDatatypes.size() - 1) {
dataTypeRegex.append(dataType).append("\u0000.*").append("|");
index++;
} else {
dataTypeRegex.append(dataType).append("\u0000.*");
}
}

AccumuloClient client = accumuloClient;
if (client instanceof WrappedAccumuloClient) {
client = ((WrappedAccumuloClient) client).getReal();
}

try (BatchScanner bs = ScannerHelper.createBatchScanner(client, getMetadataTableName(), getAuths(), fields.size())) {
if (!datatypes.isEmpty()) {
IteratorSetting regexFilter = new IteratorSetting(50, "regexFilter", RegExFilter.class);
regexFilter.addOption(RegExFilter.COLQ_REGEX, dataTypeRegex.toString());
settings.add(regexFilter);
}

settings.add(new IteratorSetting(51, "firstEntryInRow", FirstEntryInRowIterator.class));
bs.fetchColumnFamily(ColumnFamilyConstants.COLF_F);
bs.setRanges(ranges);

for (IteratorSetting setting : settings) {
bs.addScanIterator(setting);
}

for (Entry<Key,Value> entry : bs) {
Text colq = entry.getKey().getColumnQualifier();
int colqIndex = colq.find(NULL_BYTE);

String remainder;
try {
remainder = Text.decode(colq.getBytes(), colqIndex + 1, colq.getLength() - (colqIndex + 1));
} catch (CharacterCodingException e) {
log.warn("Could not deserialize colqual: {} ", entry.getKey());
continue;
}
if (remainder.equals(FrequencyMetadataAggregator.AGGREGATED)) {
// This is an aggregated entry.
try {
DateFrequencyMap map = new DateFrequencyMap(entry.getValue().get());
if (!map.subMap(beginDate, endDate).isEmpty()) {
foundFields.add(entry.getKey().getRow().toString());
}
} catch (IOException e) {
log.error("Failed to convert Value to DateFrequencyMap", e);
}
} else {
// This is an entry with a count for a single date.
try {
Date date = DateHelper.parse(remainder);
// Add the field if we fall within beginDate and endDate, inclusively.
if (date.compareTo(DateHelper.parse(beginDate)) >= 0 && date.compareTo(DateHelper.parse(endDate)) <= 0) {
foundFields.add(entry.getKey().getRow().toString());
}
} catch (ValueFormatException e) {
log.warn("Could not convert the Value to a long: {}", entry.getValue());
} catch (DateTimeParseException e) {
log.warn("Could not convert date string: {}", remainder);
}
}
}
} catch (TableNotFoundException e) {
throw new RuntimeException(e);
}
return Sets.difference(fields, foundFields);
}

/**
* Build ranges for the {@link #getCountsForFieldsInDateRange(Set, Set, String, String)} method.
* <p>
Expand Down Expand Up @@ -1724,6 +1825,21 @@ private Set<Range> createFieldCountRanges(Set<String> fields, SortedSet<String>
return ranges;
}

/**
* Build ranges for the {@link #getMissingFieldsInDateRange(Set, Set, String, String, Set)} method.
*
* @param fields
* the fields
* @return a set of exact ranges for the provided fields.
*/
private Set<Range> createExactFieldCountRanges(Set<String> fields) {
Set<Range> ranges = new HashSet<>();
for (String field : fields) {
ranges.add(Range.exact(field, "f"));
}
return ranges;
}

/**
* Deserialize a Value that contains a Long
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,4 +392,100 @@ void testMixedEntryFormats() {
Assertions.assertEquals(DateHelper.parse("20200103"), helper.getEarliestOccurrenceOfFieldWithType("NAME", "maze", accumuloClient, null));
}
}

/**
* Tests for {@link MetadataHelper#getMissingFieldsInDateRange(Set, Set, String, String, Set)}.
*/
@Nested
public class GetMissingFieldsInDateRangeTest {
/**
* Test against a table that has only non-aggregated entries as matches.
*/
@Test
void testNonAggregatedEntriesOnly() throws TableNotFoundException {
givenNonAggregatedFrequencyRows("NAME", COLF_F, "csv", "20200103", "20200120", 1L);
givenNonAggregatedFrequencyRows("NAME", COLF_F, "wiki", "20200101", "20200120", 2L);
givenNonAggregatedFrequencyRows("NAME", COLF_F, "maze", "20200105", "20200120", 3L);
givenNonAggregatedFrequencyRows("NAME", COLF_F, "data", "20200107", "20200102", 3L);
givenNonAggregatedFrequencyRows("EVENT_DATE", COLF_F, "csv", "20200101", "20200120", 4L);
givenNonAggregatedFrequencyRows("EVENT_DATE", COLF_F, "wiki", "20200101", "20200120", 5L);
givenNonAggregatedFrequencyRows("EVENT_DATE", COLF_F, "maze", "20200101", "20200120", 6L);
writeMutations();

// No DataTypes
Assertions.assertEquals(Collections.emptySet(), helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE"), Collections.emptySet(), "20200101",
"20200120", Collections.emptySet()));
// Using DataTypes
Assertions.assertEquals(Set.of("EVENT_DATE"),
helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE"), Set.of("data"), "20200101", "20200120", Collections.emptySet()));
// Fictitious field
Assertions.assertEquals(Set.of("FOO"), helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE", "FOO"),
Set.of("wiki", "data", "csv", "maze"), "20200101", "20200120", Collections.emptySet()));
// Missing because of date range
Assertions.assertEquals(Set.of("NAME", "EVENT_DATE"), helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE"), Set.of("wiki", "data"),
"20190101", "20191231", Collections.emptySet()));
}

/**
* Test against a table that has only aggregated entries as matches.
*/
@Test
void testAggregatedEntriesOnly() throws TableNotFoundException {
givenAggregatedFrequencyRow("NAME", COLF_F, "csv", createDateFrequencyMap("20200113", 1L, "20200115", 5L, "20200116", 3L));
givenAggregatedFrequencyRow("NAME", COLF_F, "wiki", createDateFrequencyMap("20200111", 1L, "20200112", 15L, "20200113", 3L));
givenAggregatedFrequencyRow("NAME", COLF_F, "maze", createDateFrequencyMap("20200102", 1L, "20200104", 55L, "20200105", 3L));
givenAggregatedFrequencyRow("NAME", COLF_F, "data", createDateFrequencyMap("20200101", 1L, "20200103", 3L));
givenAggregatedFrequencyRow("EVENT_DATE", COLF_F, "csv", createDateFrequencyMap("20200101", 2L, "20200102", 3L, "20200103", 4L));
givenAggregatedFrequencyRow("EVENT_DATE", COLF_F, "wiki", createDateFrequencyMap("20200101", 2L, "20200102", 3L, "20200103", 4L));
givenAggregatedFrequencyRow("EVENT_DATE", COLF_F, "maze", createDateFrequencyMap("20200101", 2L, "20200102", 3L, "20200103", 4L));
writeMutations();

// No DataTypes
Assertions.assertEquals(Collections.emptySet(), helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE"), Collections.emptySet(), "20200101",
"20200120", Collections.emptySet()));
// Using DataTypes
Assertions.assertEquals(Set.of("EVENT_DATE"),
helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE"), Set.of("data"), "20200101", "20200120", Collections.emptySet()));
// Fictitious field
Assertions.assertEquals(Set.of("FOO"), helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE", "FOO"),
Set.of("wiki", "data", "csv", "maze"), "20200101", "20200120", Collections.emptySet()));
// Missing because of date range
Assertions.assertEquals(Set.of("NAME", "EVENT_DATE"), helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE"), Set.of("wiki", "data"),
"20190101", "20191231", Collections.emptySet()));
}

/**
* Test against a table that has both aggregated and non-aggregated entries as matches.
*/
@Test
void testMixedEntryFormats() throws TableNotFoundException {
givenAggregatedFrequencyRow("NAME", COLF_F, "csv", createDateFrequencyMap("20200111", 1L, "20200112", 5L, "20200113", 3L));
givenNonAggregatedFrequencyRows("NAME", COLF_F, "csv", "20200111", "20200120", 1L);
givenAggregatedFrequencyRow("NAME", COLF_F, "wiki", createDateFrequencyMap("20200111", 1L, "20200112", 15L, "20200113", 3L));
givenAggregatedFrequencyRow("NAME", COLF_F, "maze", createDateFrequencyMap("20200111", 1L, "20200112", 55L, "20200113", 3L));
givenNonAggregatedFrequencyRows("NAME", COLF_F, "maze", "20200103", "20200120", 3L);
givenAggregatedFrequencyRow("NAME", COLF_F, "data", createDateFrequencyMap("20200111", 1L, "20200113", 3L));
givenNonAggregatedFrequencyRows("NAME", COLF_F, "data", "20200101", "20200115", 3L);
givenAggregatedFrequencyRow("EVENT_DATE", COLF_F, "csv", createDateFrequencyMap("20200101", 2L, "20200102", 3L, "20200103", 4L));
givenAggregatedFrequencyRow("EVENT_DATE", COLF_F, "wiki", createDateFrequencyMap("20200101", 2L, "20200102", 3L, "20200103", 4L));
givenAggregatedFrequencyRow("EVENT_DATE", COLF_F, "maze", createDateFrequencyMap("20200101", 2L, "20200102", 3L, "20200103", 4L));
givenNonAggregatedFrequencyRows("EVENT_DATE", COLF_F, "csv", "20200101", "20200120", 4L);
givenNonAggregatedFrequencyRows("EVENT_DATE", COLF_F, "wiki", "20200101", "20200120", 5L);
givenNonAggregatedFrequencyRows("EVENT_DATE", COLF_F, "maze", "20200101", "20200120", 6L);
writeMutations();

// No DataTypes
Assertions.assertEquals(Collections.emptySet(), helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE"), Collections.emptySet(), "20200101",
"20200120", Collections.emptySet()));
// Using DataTypes
Assertions.assertEquals(Set.of("EVENT_DATE"),
helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE"), Set.of("data"), "20200101", "20200120", Collections.emptySet()));
// Fictitious field
Assertions.assertEquals(Set.of("FOO"), helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE", "FOO"),
Set.of("wiki", "data", "csv", "maze"), "20200101", "20200120", Collections.emptySet()));
// Missing because of date range
Assertions.assertEquals(Set.of("NAME", "EVENT_DATE"), helper.getMissingFieldsInDateRange(Set.of("NAME", "EVENT_DATE"), Set.of("wiki", "data"),
"20190101", "20191231", Collections.emptySet()));
}
}
}
Loading
Loading