From d10d93a28fe34429d94cc8f87d1ef5f7b4e36f53 Mon Sep 17 00:00:00 2001 From: Christopher Gross cogross Date: Mon, 1 Apr 2024 12:11:05 +0000 Subject: [PATCH] fix major issues from sonarqube --- .../common/cache/SharedCacheCoordinator.java | 6 ++++ .../map/BulkResultsFileOutputMapper.java | 3 ++ .../map/BulkResultsTableOutputMapper.java | 3 ++ .../logic/composite/CompositeQueryLogic.java | 2 ++ .../CompositeQueryLogicResultsIterator.java | 1 + .../SecureEventSequenceFileInputFormat.java | 1 + .../data/tokenize/TokenizationHelper.java | 3 ++ .../reader/AbstractEventRecordReader.java | 3 ++ .../ingest/mapreduce/EventMapper.java | 9 +++++- .../handler/facet/HashTableFunction.java | 5 +++- .../ContentIndexingColumnBasedHandler.java | 5 +++- ...ndedContentIndexingColumnBasedHandler.java | 6 +++- .../job/BulkIngestMapFileLoader.java | 5 ++++ .../ingest/mapreduce/job/IngestJob.java | 1 + .../job/metrics/AggregatingMetricsStore.java | 3 ++ .../job/metrics/PassThroughMetricsStore.java | 3 ++ .../BulkIngestKeyAggregatingReducer.java | 3 ++ .../reduce/BulkIngestKeyDedupeCombiner.java | 3 ++ .../writer/AbstractChainedContextWriter.java | 3 ++ .../job/writer/TableCachingContextWriter.java | 3 ++ .../datawave/ingest/util/KillJobByRegex.java | 6 ++++ .../java/datawave/ingest/util/ThreadUtil.java | 1 + .../java/datawave/util/flag/FlagMaker.java | 6 +++- .../ingest/csv/mr/input/CSVReaderBase.java | 1 + .../wikipedia/WikipediaDataTypeHandler.java | 4 +++ .../metrics/mapreduce/MetricsIngester.java | 3 ++ .../metrics/mapreduce/util/JobSetupUtil.java | 3 ++ ...DatawaveFieldIndexCachingIteratorJexl.java | 4 +++ .../DatawaveFieldIndexRegexIteratorJexl.java | 2 -- .../mr/bulk/MultiRfileInputformat.java | 5 +++- .../java/datawave/mr/bulk/RecordIterator.java | 10 ++++++- .../java/datawave/mr/bulk/RfileScanner.java | 5 +++- .../mr/bulk/RfileSplitInputFormat.java | 1 + .../AncestorIndexBuildingVisitor.java | 2 +- .../query/ancestor/AncestorIndexIterator.java | 2 -- .../query/cardinality/CardinalityRecord.java | 10 +++++-- .../query/composite/CompositeRange.java | 28 ++++--------------- .../datawave/query/function/LogTiming.java | 4 ++- .../lookup/ConcurrentScannerInitializer.java | 6 +++- .../datawave/query/iterator/QueryOptions.java | 4 ++- .../query/iterator/SourceManager.java | 5 ++-- .../query/iterator/filter/LoadDateFilter.java | 4 +-- .../iterator/pipeline/PipelineIterator.java | 3 ++ .../query/jexl/DatawaveArithmetic.java | 4 ++- .../EvaluationPhaseFilterFunctions.java | 4 ++- .../query/jexl/lookups/AsyncIndexLookup.java | 7 ++++- .../query/jexl/lookups/RegexIndexLookup.java | 6 ++-- .../ShardIndexQueryTableStaticMethods.java | 3 -- .../CustomWildcardQueryNodeProcessor.java | 6 ++-- .../query/planner/DefaultQueryPlanner.java | 5 +++- .../query/planner/IndexQueryPlanner.java | 5 +++- .../planner/ThreadedRangeBundlerIterator.java | 6 ++++ .../ChainableEventDataQueryFilter.java | 2 +- .../query/scheduler/PushdownFunction.java | 1 + .../query/tables/BatchScannerSession.java | 1 + .../query/tables/RangeStreamScanner.java | 8 +++++- .../datawave/query/tables/ScannerSession.java | 2 ++ .../query/tables/async/SpeculativeScan.java | 1 + .../results/cached/CachedResultsBean.java | 4 +++ .../CreateQuerySessionIDFilter.java | 4 +-- .../webservice/common/health/HealthBean.java | 1 + .../common/remote/RemoteHttpService.java | 2 ++ .../query-war/src/main/webapp/index.html | 2 +- .../datawave/webservice/mr/MapReduceBean.java | 3 ++ .../metrics/ShardTableQueryMetricHandler.java | 7 ++--- .../query/metric/QueryMetricHolder.java | 2 +- .../query/metric/QueryMetricMessage.java | 2 +- .../query/metric/QueryMetricsWriter.java | 13 +++++++-- .../query/runner/QueryExecutorBean.java | 1 + .../webservice/query/runner/RunningQuery.java | 6 ++++ .../security/cache/AccumuloCacheStore.java | 13 +++++++-- .../web-root/src/main/webapp/index.html | 5 ++-- 72 files changed, 240 insertions(+), 76 deletions(-) diff --git a/core/connection-pool/src/main/java/datawave/core/common/cache/SharedCacheCoordinator.java b/core/connection-pool/src/main/java/datawave/core/common/cache/SharedCacheCoordinator.java index ac322004e26..8e3ede8c80c 100644 --- a/core/connection-pool/src/main/java/datawave/core/common/cache/SharedCacheCoordinator.java +++ b/core/connection-pool/src/main/java/datawave/core/common/cache/SharedCacheCoordinator.java @@ -726,11 +726,17 @@ protected void reapEvictions() { String recursiveDeletePath = ZKPaths.makePath(curatorClient.getNamespace(), path); ZKUtil.deleteRecursive(curatorClient.getZookeeperClient().getZooKeeper(), recursiveDeletePath); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.trace("Problem deleting " + path + " (this may be ok): " + e.getMessage(), e); } } } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.warn("Error cleaning up eviction notices: " + e.getMessage(), e); } } diff --git a/core/map-reduce/src/main/java/datawave/core/mapreduce/bulkresults/map/BulkResultsFileOutputMapper.java b/core/map-reduce/src/main/java/datawave/core/mapreduce/bulkresults/map/BulkResultsFileOutputMapper.java index 62b7f2d403e..9b172440bc4 100644 --- a/core/map-reduce/src/main/java/datawave/core/mapreduce/bulkresults/map/BulkResultsFileOutputMapper.java +++ b/core/map-reduce/src/main/java/datawave/core/mapreduce/bulkresults/map/BulkResultsFileOutputMapper.java @@ -119,6 +119,9 @@ protected void map(Key key, Value value, org.apache.hadoop.mapreduce.Mapper createRecordReader(InputSplit split, T try { reader.initialize(split, context); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException("Error initializing SecureEventSequenceFileRecordReader", e); } return reader; diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/tokenize/TokenizationHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/tokenize/TokenizationHelper.java index 931b2efa40e..3ecf92f76fb 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/tokenize/TokenizationHelper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/tokenize/TokenizationHelper.java @@ -1,6 +1,7 @@ package datawave.ingest.data.tokenize; import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.log4j.Logger; @@ -25,6 +26,7 @@ public static class HeartBeatThread extends Thread { public static final long INTERVAL = 500; // half second resolution public static volatile int counter = 0; + public static long lastRun; static { @@ -41,6 +43,7 @@ public void run() { try { Thread.sleep(INTERVAL); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/input/reader/AbstractEventRecordReader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/input/reader/AbstractEventRecordReader.java index 52e0c50e864..069fe1343f0 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/input/reader/AbstractEventRecordReader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/input/reader/AbstractEventRecordReader.java @@ -175,6 +175,9 @@ public RawRecordContainer getEvent() { try { event.setRawRecordNumber(getCurrentKey().get()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Unable to get current key", e); } catch (Exception e) { throw new RuntimeException("Unable to get current key", e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java index edce34ef644..63baa7b8be7 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/EventMapper.java @@ -253,6 +253,9 @@ public void setup(Context context) throws IOException, InterruptedException { try { contextWriter = contextWriterClass.getDeclaredConstructor().newInstance(); contextWriter.setup(filterConf, filterConf.getBoolean(CONTEXT_WRITER_OUTPUT_TABLE_COUNTERS, false)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } catch (Exception e) { throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } @@ -480,7 +483,11 @@ public void map(K1 key, V1 value, Context context) throws IOException, Interrupt NDC.push(origFiles.iterator().next()); reprocessedNDCPush = true; } - + } catch (InterruptedException e) { + contextWriter.rollback(); + Thread.currentThread().interrupt(); + log.error("Failed to clean event from error table. Terminating map", e); + throw new IOException("Failed to clean event from error table, Terminating map", e); } catch (Exception e) { contextWriter.rollback(); log.error("Failed to clean event from error table. Terminating map", e); diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/facet/HashTableFunction.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/facet/HashTableFunction.java index 14d376fc24c..e8b3f758d06 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/facet/HashTableFunction.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/facet/HashTableFunction.java @@ -107,7 +107,10 @@ public Collection apply(@Nullable Collection getShardNamesAndValues(Raw if (indexField || reverseIndexField) { try { tokenizeField(analyzer, nci, indexField, reverseIndexField, reporter); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ex); } catch (Exception ex) { throw new RuntimeException(ex); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/tokenize/ExtendedContentIndexingColumnBasedHandler.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/tokenize/ExtendedContentIndexingColumnBasedHandler.java index 8141959ae98..5ddf3cceab8 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/tokenize/ExtendedContentIndexingColumnBasedHandler.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/handler/tokenize/ExtendedContentIndexingColumnBasedHandler.java @@ -229,7 +229,10 @@ public void close(TaskAttemptContext context) { this.docWriterService.shutdown(); this.docWriterService.awaitTermination(1, TimeUnit.MINUTES); this.docWriter.close(); - } catch (InterruptedException | MutationsRejectedException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Unable to terminate document writing service!", e); + } catch (MutationsRejectedException e) { log.error("Unable to terminate document writing service!", e); } } @@ -669,6 +672,7 @@ public void run() { try { Thread.sleep(INTERVAL); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java index 45d720019a7..02e7d849981 100755 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/BulkIngestMapFileLoader.java @@ -512,6 +512,7 @@ public void run() { try { Thread.sleep(FAILURE_SLEEP_TIME); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); log.warn("Interrupted while sleeping.", ie); } } @@ -822,6 +823,7 @@ public void bringMapFilesOnline(Path mapFilesDir) throws IOException, AccumuloEx if (e == null) e = importTask.getException(); } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); // this task was interrupted, wait for the others and then terminate if (e == null) e = interrupted; @@ -844,6 +846,7 @@ public void bringMapFilesOnline(Path mapFilesDir) throws IOException, AccumuloEx if (e == null) e = importTask.getException(); } catch (InterruptedException interrupted) { + Thread.currentThread().interrupt(); // this task was interrupted, wait for the others and then terminate if (e == null) e = interrupted; @@ -1238,6 +1241,7 @@ public void markSourceFilesLoaded(Path jobDirectory) throws IOException { } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); if (null != e.getCause()) throw new IOException(e.getCause().getMessage()); else @@ -1320,6 +1324,7 @@ private void sleep() { System.gc(); Thread.sleep(SLEEP_TIME); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.warn("Interrupted while sleeping.", e); } } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java index 86e12627ed1..2b064abcc0c 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/IngestJob.java @@ -415,6 +415,7 @@ public int run(String[] args) throws Exception { try { Thread.sleep(3000); } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); // do nothing } } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/AggregatingMetricsStore.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/AggregatingMetricsStore.java index 6ef1b98e2fa..bafd7bb7465 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/AggregatingMetricsStore.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/AggregatingMetricsStore.java @@ -60,6 +60,9 @@ public void flush(ConcurrentMap counts) { byte[] countAsBytes = TextUtil.toUtf8(String.valueOf(entry.getValue().get())); Key key = KeyConverter.fromString(entry.getKey()); contextWriter.write(new BulkIngestKey(table, key), new Value(countAsBytes), context); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Could not flush metrics, dropping them", e); } catch (Exception e) { logger.error("Could not flush metrics, dropping them", e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/PassThroughMetricsStore.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/PassThroughMetricsStore.java index 033c35cbf2b..514154c356e 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/PassThroughMetricsStore.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/metrics/PassThroughMetricsStore.java @@ -47,6 +47,9 @@ public void increase(String key, long count) { Value v = (count == 1L && ONE != null) ? ONE : new Value(String.valueOf(count).getBytes(ENCODING)); Key k = KeyConverter.fromString(key); contextWriter.write(new BulkIngestKey(table, k), v, context); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Could not write metrics to the context writer, dropping them...", e); } catch (Exception e) { logger.error("Could not write metrics to the context writer, dropping them...", e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java index 1eed6e5f53b..2a7b0ccf036 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyAggregatingReducer.java @@ -65,6 +65,9 @@ protected void setupContextWriter(Configuration conf) throws IOException { try { setContextWriter(contextWriterClass.newInstance()); contextWriter.setup(conf, conf.getBoolean(CONTEXT_WRITER_OUTPUT_TABLE_COUNTERS, false)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } catch (Exception e) { throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyDedupeCombiner.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyDedupeCombiner.java index ae64849a43f..c552a7b724b 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyDedupeCombiner.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/reduce/BulkIngestKeyDedupeCombiner.java @@ -56,6 +56,9 @@ protected void setupContextWriter(Configuration conf) throws IOException { try { setContextWriter(contextWriterClass.getDeclaredConstructor().newInstance()); contextWriter.setup(conf, conf.getBoolean(CONTEXT_WRITER_OUTPUT_TABLE_COUNTERS, false)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } catch (Exception e) { throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/AbstractChainedContextWriter.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/AbstractChainedContextWriter.java index 842c028da10..09e617ed009 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/AbstractChainedContextWriter.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/AbstractChainedContextWriter.java @@ -44,6 +44,9 @@ public void setup(Configuration conf, boolean outputTableCounters) throws IOExce try { contextWriter = contextWriterClass.newInstance(); contextWriter.setup(conf, outputTableCounters); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialized " + contextWriterClass + " from property " + getChainedContextWriterOption(), e); } catch (Exception e) { throw new IOException("Failed to initialized " + contextWriterClass + " from property " + getChainedContextWriterOption(), e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/TableCachingContextWriter.java b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/TableCachingContextWriter.java index 4fdecb7b6ac..7fcb794fe24 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/TableCachingContextWriter.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/mapreduce/job/writer/TableCachingContextWriter.java @@ -90,6 +90,9 @@ public void setup(Configuration conf, boolean outputTableCounters) throws IOExce try { contextWriter = contextWriterClass.getDeclaredConstructor().newInstance(); contextWriter.setup(conf, outputTableCounters); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } catch (Exception e) { throw new IOException("Failed to initialized " + contextWriterClass + " from property " + CONTEXT_WRITER_CLASS, e); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/util/KillJobByRegex.java b/warehouse/ingest-core/src/main/java/datawave/ingest/util/KillJobByRegex.java index f48bb7eaff1..ba0cd7e1984 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/util/KillJobByRegex.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/util/KillJobByRegex.java @@ -38,6 +38,11 @@ public void run() { Job job = null; try { job = cluster.getJob(js.getJobID()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + System.out.println("Error getting job: " + js.getJobID() + " Reason: " + e.getMessage()); + JOB_FAILED_COUNT.getAndIncrement(); + return; } catch (Exception e) { System.out.println("Error getting job: " + js.getJobID() + " Reason: " + e.getMessage()); JOB_FAILED_COUNT.getAndIncrement(); @@ -82,6 +87,7 @@ public static void main(String[] args) throws IOException, InterruptedException JOB_KILLER_SVC.shutdown(); // signal shutdown JOB_KILLER_SVC.awaitTermination(1, TimeUnit.MINUTES); // allow processes to stop } catch (InterruptedException e) { + Thread.currentThread().interrupt(); JOB_KILLER_SVC.shutdownNow(); } diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/util/ThreadUtil.java b/warehouse/ingest-core/src/main/java/datawave/ingest/util/ThreadUtil.java index 2205ab79a29..07522a69270 100644 --- a/warehouse/ingest-core/src/main/java/datawave/ingest/util/ThreadUtil.java +++ b/warehouse/ingest-core/src/main/java/datawave/ingest/util/ThreadUtil.java @@ -29,6 +29,7 @@ public static boolean shutdownAndWait(ThreadPoolExecutor executor, long timeToWa executor.awaitTermination(timeToWait, unit); return true; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); logger.warn("Closed thread pool but not all threads completed successfully."); return false; } diff --git a/warehouse/ingest-core/src/main/java/datawave/util/flag/FlagMaker.java b/warehouse/ingest-core/src/main/java/datawave/util/flag/FlagMaker.java index 66926763ee0..bd28da85514 100644 --- a/warehouse/ingest-core/src/main/java/datawave/util/flag/FlagMaker.java +++ b/warehouse/ingest-core/src/main/java/datawave/util/flag/FlagMaker.java @@ -226,6 +226,7 @@ public void run() { } catch (Exception ex) { log.error("An unexpected exception occurred. Exiting", ex); running = false; + Thread.currentThread().interrupt(); } } } finally { @@ -575,7 +576,10 @@ private boolean processResults(List> futures, Collection getSplits(JobContext job) throws IOException { List inputSplits = Lists.newArrayList(); try { inputSplits = computeSplitPoints(job, tableName, ranges); - } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException | InterruptedException e) { + } catch (TableNotFoundException | AccumuloException | AccumuloSecurityException e) { + throw new IOException(e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new IOException(e); } diff --git a/warehouse/query-core/src/main/java/datawave/mr/bulk/RecordIterator.java b/warehouse/query-core/src/main/java/datawave/mr/bulk/RecordIterator.java index 11315035581..c114f1da1eb 100644 --- a/warehouse/query-core/src/main/java/datawave/mr/bulk/RecordIterator.java +++ b/warehouse/query-core/src/main/java/datawave/mr/bulk/RecordIterator.java @@ -347,6 +347,8 @@ protected synchronized void initialize(Configuration conf, boolean initRanges) t } catch (ExecutionException e) { close(); throw new RuntimeException(e.getCause()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); } catch (Exception e) { } @@ -562,6 +564,7 @@ protected synchronized void fail(Throwable t) { try { Thread.sleep(failureSleep); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); try { close(); } catch (IOException e1) { @@ -590,6 +593,11 @@ protected synchronized void fail(Throwable t) { seekLastSeen(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + if (!callClosed.get() && !Thread.interrupted()) { + fail(e); + } } catch (Exception e) { if (!callClosed.get() && !Thread.interrupted()) fail(e); @@ -853,7 +861,7 @@ public synchronized void setBlockFile(Reader reader) throws InterruptedException this.reader = reader; } - public FSDataInputStream getInputStream() { + public synchronized FSDataInputStream getInputStream() { return inputStream; } diff --git a/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileScanner.java b/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileScanner.java index 99e95ebb95f..cf1ace1477e 100644 --- a/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileScanner.java +++ b/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileScanner.java @@ -203,7 +203,10 @@ public Iterator> iterator() { log.trace("KV iterator is " + (kv == null ? "null" : "not null")); } } while (null == kv); - + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + IOUtils.cleanupWithLogger(null, this); + throw new RuntimeException(e); } catch (Exception e) { IOUtils.cleanupWithLogger(null, this); throw new RuntimeException(e); diff --git a/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileSplitInputFormat.java b/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileSplitInputFormat.java index 637c3ea12d2..17f700ef1fe 100644 --- a/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileSplitInputFormat.java +++ b/warehouse/query-core/src/main/java/datawave/mr/bulk/RfileSplitInputFormat.java @@ -58,6 +58,7 @@ public List getSplits(JobContext job) throws IOException { try { split.add(frSplit); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } diff --git a/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexBuildingVisitor.java b/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexBuildingVisitor.java index 23b01f6c087..8569b925e11 100644 --- a/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexBuildingVisitor.java +++ b/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexBuildingVisitor.java @@ -251,7 +251,7 @@ private String getTLDId(Key key) { String uid = getUid(key); // if the uid is not empty - if (!uid.isEmpty()) { + if (uid != null && !uid.isEmpty()) { uid = TLD.parseRootPointerFromId(uid); } diff --git a/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexIterator.java b/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexIterator.java index 453a56b01ac..6e3f796ef83 100644 --- a/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexIterator.java +++ b/warehouse/query-core/src/main/java/datawave/query/ancestor/AncestorIndexIterator.java @@ -33,8 +33,6 @@ protected AncestorIndexIterator(Builder builder) { @Override protected Range buildIndexRange(Range r) { Key start = r.getStartKey(); - Key end = r.getEndKey(); - String endCf = (end == null || end.getColumnFamily() == null ? "" : end.getColumnFamily().toString()); String startCf = (start == null || start.getColumnFamily() == null ? "" : start.getColumnFamily().toString()); // if the start key is inclusive, and contains a datatype/0UID, then move back to the top level ancestor diff --git a/warehouse/query-core/src/main/java/datawave/query/cardinality/CardinalityRecord.java b/warehouse/query-core/src/main/java/datawave/query/cardinality/CardinalityRecord.java index 2b9d971d84c..0e8067858a8 100644 --- a/warehouse/query-core/src/main/java/datawave/query/cardinality/CardinalityRecord.java +++ b/warehouse/query-core/src/main/java/datawave/query/cardinality/CardinalityRecord.java @@ -30,6 +30,8 @@ public class CardinalityRecord implements Serializable { private HashMultimap cardinalityMap = HashMultimap.create(); private static Logger log = Logger.getLogger(CardinalityRecord.class); + private static final Object LOCK = new Object(); + public enum DateType { DOCUMENT, CURRENT } @@ -236,7 +238,7 @@ public static void writeToDisk(final CardinalityRecord cardinalityRecord, final if (!cardinalityRecord.getCardinalityMap().isEmpty()) { ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(() -> { - synchronized (file) { + synchronized (LOCK) { ObjectOutputStream oos = null; try { FileOutputStream fos = new FileOutputStream(file); @@ -246,7 +248,7 @@ public static void writeToDisk(final CardinalityRecord cardinalityRecord, final log.error(e.getMessage(), e); } finally { IOUtils.closeQuietly(oos); - file.notify(); + file.notifyAll(); } } }); @@ -273,6 +275,10 @@ public void merge(File file) { CardinalityRecord cardinalityRecord = CardinalityRecord.readFromDisk(file); + if (cardinalityRecord == null) { + throw new IllegalArgumentException("ResultsCardinalityRecords have different resultCardinalityValueFields"); + } + int intersectionSize = Sets.intersection(this.resultCardinalityValueFields, cardinalityRecord.resultCardinalityValueFields).size(); if (intersectionSize != this.resultCardinalityValueFields.size() || intersectionSize != cardinalityRecord.resultCardinalityValueFields.size()) { throw new IllegalArgumentException("ResultsCardinalityRecords have different resultCardinalityValueFields"); diff --git a/warehouse/query-core/src/main/java/datawave/query/composite/CompositeRange.java b/warehouse/query-core/src/main/java/datawave/query/composite/CompositeRange.java index 27ce96858a3..ff4873513da 100644 --- a/warehouse/query-core/src/main/java/datawave/query/composite/CompositeRange.java +++ b/warehouse/query-core/src/main/java/datawave/query/composite/CompositeRange.java @@ -320,10 +320,10 @@ else if (node instanceof ASTEQNode) public int hashCode() { final int prime = 31; int result = super.hashCode(); - result = prime * result + ((expressionListLowerBound == null) ? 0 : expressionListLowerBound.hashCode()); - result = prime * result + ((jexlNodeListLowerBound == null) ? 0 : jexlNodeListLowerBound.hashCode()); - result = prime * result + ((expressionListUpperBound == null) ? 0 : expressionListUpperBound.hashCode()); - result = prime * result + ((jexlNodeListUpperBound == null) ? 0 : jexlNodeListUpperBound.hashCode()); + result = prime * result + expressionListLowerBound.hashCode(); + result = prime * result + jexlNodeListLowerBound.hashCode(); + result = prime * result + expressionListUpperBound.hashCode(); + result = prime * result + jexlNodeListUpperBound.hashCode(); return result; } @@ -338,25 +338,7 @@ public boolean equals(Object obj) { if (!super.equals(obj)) return false; CompositeRange other = (CompositeRange) obj; - if (expressionListLowerBound == null) { - if (other.expressionListLowerBound != null) - return false; - } else if (!expressionListLowerBound.equals(other.expressionListLowerBound)) - return false; - if (jexlNodeListLowerBound == null) { - if (other.jexlNodeListLowerBound != null) - return false; - } else if (!jexlNodeListLowerBound.equals(other.jexlNodeListLowerBound)) - return false; - if (expressionListUpperBound == null) { - if (other.expressionListUpperBound != null) - return false; - } else if (!expressionListUpperBound.equals(other.expressionListUpperBound)) - return false; - if (jexlNodeListUpperBound == null) { - if (other.jexlNodeListUpperBound != null) - return false; - } else if (!jexlNodeListUpperBound.equals(other.jexlNodeListUpperBound)) + if (!jexlNodeListUpperBound.equals(other.jexlNodeListUpperBound)) return false; return true; } diff --git a/warehouse/query-core/src/main/java/datawave/query/function/LogTiming.java b/warehouse/query-core/src/main/java/datawave/query/function/LogTiming.java index eed51322de1..a2d450cf1cb 100644 --- a/warehouse/query-core/src/main/java/datawave/query/function/LogTiming.java +++ b/warehouse/query-core/src/main/java/datawave/query/function/LogTiming.java @@ -24,6 +24,8 @@ public class LogTiming implements Function,Entry initializeScannerStreams(List initializeScannerStreams(List options; protected String scanId; @@ -2150,7 +2152,7 @@ public void setStatsdHostAndPort(String statsdHostAndPort) { public QueryStatsDClient getStatsdClient() { if (statsdHostAndPort != null && queryId != null) { if (statsdClient == null) { - synchronized (queryId) { + synchronized (LOCK) { if (statsdClient == null) { setStatsdClient(new QueryStatsDClient(queryId, getStatsdHost(statsdHostAndPort), getStatsdPort(statsdHostAndPort), getStatsdMaxQueueSize())); diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/SourceManager.java b/warehouse/query-core/src/main/java/datawave/query/iterator/SourceManager.java index e1293ba5c10..cfda35fa919 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/SourceManager.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/SourceManager.java @@ -5,6 +5,7 @@ import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; @@ -27,7 +28,7 @@ public class SourceManager implements SortedKeyValueIterator { private static final Logger log = Logger.getLogger(SourceManager.class); protected volatile int sources = 0; - protected volatile int deepCopiesCalled = 0; + protected AtomicInteger deepCopiesCalled = new AtomicInteger(0); protected long initialSize = 0; protected long createdSize = 0; @@ -230,7 +231,7 @@ public void seek(Range range, Collection columnFamilies, boolean i * translate to realistic performance benefits. */ public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { - deepCopiesCalled++; + deepCopiesCalled.incrementAndGet(); if (initialSize > 0) { Queue queue = sourceQueue; SourceManager nextSource = null; diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/filter/LoadDateFilter.java b/warehouse/query-core/src/main/java/datawave/query/iterator/filter/LoadDateFilter.java index 06afc382ac3..b85e16fcf56 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/filter/LoadDateFilter.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/filter/LoadDateFilter.java @@ -56,7 +56,7 @@ public SortedKeyValueIterator deepCopy(IteratorEnvironment env) { * @throws IOException * for issues with read/write */ - private void initOptions(Map options) throws IOException { + private void initializeOptions(Map options) throws IOException { String e = options.get(ColumnRangeIterator.RANGE_NAME); if (e == null) { @@ -68,7 +68,7 @@ private void initOptions(Map options) throws IOException { @Override public void init(SortedKeyValueIterator source, Map options, IteratorEnvironment env) throws java.io.IOException { super.init(source, options, env); - initOptions(options); + initializeOptions(options); } /** diff --git a/warehouse/query-core/src/main/java/datawave/query/iterator/pipeline/PipelineIterator.java b/warehouse/query-core/src/main/java/datawave/query/iterator/pipeline/PipelineIterator.java index 01214d848e4..0c20e80a5c3 100644 --- a/warehouse/query-core/src/main/java/datawave/query/iterator/pipeline/PipelineIterator.java +++ b/warehouse/query-core/src/main/java/datawave/query/iterator/pipeline/PipelineIterator.java @@ -151,6 +151,9 @@ private Entry getNext(boolean remove) { return next; } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } // cancel out existing executions cancel(); diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/DatawaveArithmetic.java b/warehouse/query-core/src/main/java/datawave/query/jexl/DatawaveArithmetic.java index 5c668e88816..7988fad30cf 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/DatawaveArithmetic.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/DatawaveArithmetic.java @@ -26,6 +26,8 @@ public abstract class DatawaveArithmetic extends JexlArithmetic { private static final Logger log = Logger.getLogger(DatawaveArithmetic.class); + private static final Object LOCK = new Object(); + /** * Default to being lenient so we don't have to add "null" for every field in the query that doesn't exist in the document */ @@ -317,7 +319,7 @@ public static boolean matchesFst(Object object, FST fst) throws IOException { final IntsRefBuilder irBuilder = new IntsRefBuilder(); Util.toUTF16(object.toString(), irBuilder); final IntsRef ints = irBuilder.get(); - synchronized (fst) { + synchronized (LOCK) { return Util.get(fst, ints) != null; } } diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/functions/EvaluationPhaseFilterFunctions.java b/warehouse/query-core/src/main/java/datawave/query/jexl/functions/EvaluationPhaseFilterFunctions.java index ce38f809b38..4a4a13118d9 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/functions/EvaluationPhaseFilterFunctions.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/functions/EvaluationPhaseFilterFunctions.java @@ -53,6 +53,8 @@ public class EvaluationPhaseFilterFunctions { */ public static final String CASE_INSENSITIVE = ".*\\(\\?[idmsux]*-[dmsux]*i[idmsux]*\\).*"; + public static final Object LOCK = new Object(); + private static final Logger log = Logger.getLogger(EvaluationPhaseFilterFunctions.class); public static boolean occurrence(Iterable fieldValues, String operator, int count) { @@ -1549,7 +1551,7 @@ public static long getNextTime(long time, int granularity) { * if the value failed to be parsed using the supplied format */ public static long getTime(Object value, DateFormat format) throws ParseException { - synchronized (format) { + synchronized (LOCK) { return format.parse(ValueTuple.getStringValue(value)).getTime(); } } diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java index 137d21e5c3b..808b80433ce 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/AsyncIndexLookup.java @@ -47,6 +47,7 @@ protected void timedScanWait(Future future, CountDownLatch startedLatch try { startedLatch.await(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new UnsupportedOperationException("Interrupted while waiting for IndexLookup to start", e); } } else { @@ -79,7 +80,10 @@ protected void timedScanWait(Future future, CountDownLatch startedLatch break; } - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { throw new RuntimeException(e); } catch (TimeoutException e) { future.cancel(true); @@ -87,6 +91,7 @@ protected void timedScanWait(Future future, CountDownLatch startedLatch try { stoppedLatch.await(); } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); log.error("Interrupted waiting for canceled AsyncIndexLookup to complete."); throw new RuntimeException(ex); } diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/RegexIndexLookup.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/RegexIndexLookup.java index 96ec7dab0d9..aaaa7cb26d5 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/RegexIndexLookup.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/RegexIndexLookup.java @@ -46,6 +46,8 @@ * An asynchronous index lookup which looks up concrete values for the specified regex term. */ public class RegexIndexLookup extends AsyncIndexLookup { + + private static final Object LOCK = new Object(); private static final Logger log = ThreadConfigurableLogger.getLogger(RegexIndexLookup.class); protected MetadataHelper helper; @@ -308,7 +310,7 @@ protected Callable createTimedCallable(final Iterator> String field = holder.toString(); // synchronize access to fieldsToValues - synchronized (indexLookupMap) { + synchronized (LOCK) { // We are only returning a mapping of field value to field name, no need to // determine cardinality and such at this point. indexLookupMap.put(field, term); @@ -331,7 +333,7 @@ protected Callable createTimedCallable(final Iterator> log.debug("Failed or Timed out " + e); } // synchronize access to fieldsToValues - synchronized (indexLookupMap) { + synchronized (LOCK) { // Only if not doing an unfielded lookup should we mark all fields as having an exceeded threshold if (!unfieldedLookup) { for (String field : fields) { diff --git a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/ShardIndexQueryTableStaticMethods.java b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/ShardIndexQueryTableStaticMethods.java index 61a827704ed..dfddd6d6a0c 100644 --- a/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/ShardIndexQueryTableStaticMethods.java +++ b/warehouse/query-core/src/main/java/datawave/query/jexl/lookups/ShardIndexQueryTableStaticMethods.java @@ -540,9 +540,6 @@ public static final void configureGlobalIndexDataTypeFilter(ShardQueryConfigurat } IteratorSetting cfg = configureGlobalIndexDataTypeFilter(config, dataTypes); - if (cfg == null) { - return; - } bs.addScanIterator(cfg); } diff --git a/warehouse/query-core/src/main/java/datawave/query/language/processor/lucene/CustomWildcardQueryNodeProcessor.java b/warehouse/query-core/src/main/java/datawave/query/language/processor/lucene/CustomWildcardQueryNodeProcessor.java index 91b33bd9f1b..1bdc75765a8 100644 --- a/warehouse/query-core/src/main/java/datawave/query/language/processor/lucene/CustomWildcardQueryNodeProcessor.java +++ b/warehouse/query-core/src/main/java/datawave/query/language/processor/lucene/CustomWildcardQueryNodeProcessor.java @@ -39,7 +39,7 @@ protected QueryNode postProcessNode(QueryNode node) throws QueryNodeException { // Code below simulates the old lucene parser behavior for wildcards - if (isWildcard(text)) { + if (checkIfWildcard(text)) { if (isPrefixWildcard(text)) { return new PrefixWildcardQueryNode(fqn.getField(), text, fqn.getBegin(), fqn.getEnd()); } else { @@ -51,7 +51,7 @@ protected QueryNode postProcessNode(QueryNode node) throws QueryNodeException { return node; } - private boolean isWildcard(CharSequence text) { + private boolean checkIfWildcard(CharSequence text) { if (text == null || text.length() <= 0) return false; @@ -67,7 +67,7 @@ private boolean isWildcard(CharSequence text) { } private boolean isPrefixWildcard(CharSequence text) { - if (text == null || text.length() <= 0 || !isWildcard(text)) + if (text == null || text.length() <= 0 || !checkIfWildcard(text)) return false; // Validate last character is a '*' and was not escaped diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java index 3c7fd8ce575..0d981d81599 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/DefaultQueryPlanner.java @@ -2352,7 +2352,10 @@ protected IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQ if (settingFuture.isDone()) try { return settingFuture.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e.getCause()); + } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); } else diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java b/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java index 9ead8720514..fe41051d220 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/IndexQueryPlanner.java @@ -44,7 +44,10 @@ public IteratorSetting getQueryIterator(MetadataHelper metadataHelper, ShardQuer if (null == cfg) { try { cfg = settingFuture.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { throw new RuntimeException(e); } } diff --git a/warehouse/query-core/src/main/java/datawave/query/planner/ThreadedRangeBundlerIterator.java b/warehouse/query-core/src/main/java/datawave/query/planner/ThreadedRangeBundlerIterator.java index 3252ae5afba..a4cd46475c7 100644 --- a/warehouse/query-core/src/main/java/datawave/query/planner/ThreadedRangeBundlerIterator.java +++ b/warehouse/query-core/src/main/java/datawave/query/planner/ThreadedRangeBundlerIterator.java @@ -197,6 +197,9 @@ public boolean hasNext() { break; } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.error("Exception in ThreadedRangeBundlerIterator", e); throw new RuntimeException(e); } @@ -391,6 +394,9 @@ public void run() { } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } // only propogate the exception if we weren't being shutdown. if (running) { throw new RuntimeException(e); diff --git a/warehouse/query-core/src/main/java/datawave/query/predicate/ChainableEventDataQueryFilter.java b/warehouse/query-core/src/main/java/datawave/query/predicate/ChainableEventDataQueryFilter.java index c6caed26a97..463eecd3ba2 100644 --- a/warehouse/query-core/src/main/java/datawave/query/predicate/ChainableEventDataQueryFilter.java +++ b/warehouse/query-core/src/main/java/datawave/query/predicate/ChainableEventDataQueryFilter.java @@ -105,7 +105,7 @@ private Range updateRange(Range current, Range candidate) { } // test the end key - if (result == null || result.getEndKey() == null || result.getEndKey().compareTo(candidate.getEndKey()) >= 0) { + if (result.getEndKey() == null || result.getEndKey().compareTo(candidate.getEndKey()) >= 0) { result = new Range(result.getStartKey(), result.isStartKeyInclusive(), candidate.getEndKey(), result.isEndKeyInclusive() && candidate.isEndKeyInclusive()); } diff --git a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java index 9f168e63e32..974a352c875 100644 --- a/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java +++ b/warehouse/query-core/src/main/java/datawave/query/scheduler/PushdownFunction.java @@ -215,6 +215,7 @@ else if (ctx.getTableState(tableId) == TableState.OFFLINE) try { Thread.sleep(100); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } } else { diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java index b38883be416..476c654e0b0 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/BatchScannerSession.java @@ -714,6 +714,7 @@ protected void shutdownServices() { log.error("Executor did not fully shutdown"); } } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // we were interrupted while waiting } } diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java b/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java index f6cb3d74f1c..ed1f4062c37 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/RangeStreamScanner.java @@ -378,6 +378,7 @@ public boolean hasNext() { currentEntry = resultQueue.poll(getPollTime(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.error(e); throw new RuntimeException(e); } @@ -410,7 +411,10 @@ private void submitTask() { Future future = myExecutor.submit(this); try { future.get(); - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { throw new RuntimeException(e); } } @@ -449,6 +453,7 @@ protected int scannerInvariant(final Iterator iter) { } prevDay = null; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); return 0; } } @@ -544,6 +549,7 @@ protected int scannerInvariant(final Iterator iter) { prevDay = myEntry; } } catch (InterruptedException exception) { + Thread.currentThread().interrupt(); prevDay = myEntry; } diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java index 6cbccce6089..558eb5cd95b 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/ScannerSession.java @@ -325,6 +325,7 @@ public boolean hasNext() { currentEntry = resultQueue.poll(getPollTime(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.trace("hasNext" + isRunning() + " interrupted"); log.error("Interrupted before finding next", e); throw new RuntimeException(e); @@ -557,6 +558,7 @@ protected int scannerInvariant(final Iterator iter) { try { accepted = resultQueue.offer(myEntry, 200, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // keep trying } } diff --git a/warehouse/query-core/src/main/java/datawave/query/tables/async/SpeculativeScan.java b/warehouse/query-core/src/main/java/datawave/query/tables/async/SpeculativeScan.java index 69ddd40b23d..c664cb2f3bd 100644 --- a/warehouse/query-core/src/main/java/datawave/query/tables/async/SpeculativeScan.java +++ b/warehouse/query-core/src/main/java/datawave/query/tables/async/SpeculativeScan.java @@ -202,6 +202,7 @@ public void onSuccess(Scan result) { } catch (InterruptedException e) { close(); + Thread.currentThread().interrupt(); throw new RuntimeException(e); } finally { writeControl.unlock(); diff --git a/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java b/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java index f9324e1f756..633ffb7b279 100644 --- a/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java +++ b/web-services/cached-results/src/main/java/datawave/webservice/results/cached/CachedResultsBean.java @@ -629,6 +629,9 @@ else if (maxValueLength > maxLength) { response.addException(e.getBottomQueryException()); throw new NoResultsException(e, response.getResult()); } catch (QueryCanceledQueryException | InterruptedException e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.info("Query " + queryId + " canceled on request"); if (crq != null) { crq.getMetric().setLifecycle(QueryMetric.Lifecycle.CANCELLED); @@ -2359,6 +2362,7 @@ public CachedRunningQuery retrieve(String id, String owner) throws IOException { outReadThread.join(); errReadThread.join(); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); log.error("Thread interrupted waiting for import to finish, killing import process"); process.destroy(); } finally { diff --git a/web-services/common-util/src/main/java/datawave/resteasy/interceptor/CreateQuerySessionIDFilter.java b/web-services/common-util/src/main/java/datawave/resteasy/interceptor/CreateQuerySessionIDFilter.java index cf4d4d7ddfe..8e14cd7868b 100644 --- a/web-services/common-util/src/main/java/datawave/resteasy/interceptor/CreateQuerySessionIDFilter.java +++ b/web-services/common-util/src/main/java/datawave/resteasy/interceptor/CreateQuerySessionIDFilter.java @@ -43,7 +43,7 @@ public void filter(ContainerRequestContext request, ContainerResponseContext res // If we're sending an error response, then there's no need to set a cookie since // there's no query "session" to stick to this server. setCookie = false; - QUERY_ID.set(null); + QUERY_ID.remove(); break; default: @@ -51,7 +51,7 @@ public void filter(ContainerRequestContext request, ContainerResponseContext res log.error(method.getResourceClass() + "." + method.getMethod().getName() + " did not set QUERY_ID threadlocal."); } else { id = QUERY_ID.get(); - QUERY_ID.set(null); + QUERY_ID.remove(); } break; } diff --git a/web-services/common/src/main/java/datawave/webservice/common/health/HealthBean.java b/web-services/common/src/main/java/datawave/webservice/common/health/HealthBean.java index d496e870179..75596461af0 100644 --- a/web-services/common/src/main/java/datawave/webservice/common/health/HealthBean.java +++ b/web-services/common/src/main/java/datawave/webservice/common/health/HealthBean.java @@ -187,6 +187,7 @@ public Response waitForQueryCompletion(@QueryParam("timeoutMinutes") @DefaultVal try { Thread.sleep(queryCompletionWaitIntervalMillis); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); LOG.warn("Interrupted while waiting for queries to complete."); } } diff --git a/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java b/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java index d95b52dc1ff..5fa4101b393 100644 --- a/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java +++ b/web-services/common/src/main/java/datawave/webservice/common/remote/RemoteHttpService.java @@ -208,6 +208,7 @@ protected void shutdown() { try { Thread.sleep(1000L); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); break; } totalWait = System.currentTimeMillis() - waitStart; @@ -579,6 +580,7 @@ public boolean retryRequest(IOException exception, int executionCount, HttpConte try { Thread.sleep(unavailableRetryDelay); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // Ignore -- we'll just end up retrying a little too fast } } diff --git a/web-services/examples/query-war/src/main/webapp/index.html b/web-services/examples/query-war/src/main/webapp/index.html index 25247f89bcb..db205c3f054 100644 --- a/web-services/examples/query-war/src/main/webapp/index.html +++ b/web-services/examples/query-war/src/main/webapp/index.html @@ -1,6 +1,6 @@ - + DataWave Web Service Examples diff --git a/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java b/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java index df051b3d6a6..3cdbe0cc8de 100644 --- a/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java +++ b/web-services/map-reduce/src/main/java/datawave/webservice/mr/MapReduceBean.java @@ -508,6 +508,9 @@ public GenericResponse submit(@FormParam("jobName") String jobName, @For try { j.submit(); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } QueryException qe = new QueryException(DatawaveErrorCode.MAPREDUCE_JOB_START_ERROR, e); log.error(qe.getMessage(), qe); response.addException(qe.getBottomQueryException()); diff --git a/web-services/metrics/src/main/java/datawave/query/metrics/ShardTableQueryMetricHandler.java b/web-services/metrics/src/main/java/datawave/query/metrics/ShardTableQueryMetricHandler.java index 4ce461b03d9..80aec4604a9 100644 --- a/web-services/metrics/src/main/java/datawave/query/metrics/ShardTableQueryMetricHandler.java +++ b/web-services/metrics/src/main/java/datawave/query/metrics/ShardTableQueryMetricHandler.java @@ -595,8 +595,6 @@ public BaseQueryMetric toMetric(EventBase event) { m.setPlan(fieldValue); } else if (fieldName.equals("QUERY_LOGIC")) { m.setQueryLogic(fieldValue); - } else if (fieldName.equals("QUERY_ID")) { - m.setQueryId(fieldValue); } else if (fieldName.equals("BEGIN_DATE")) { try { Date d = sdf_date_time1.parse(fieldValue); @@ -697,8 +695,6 @@ public BaseQueryMetric toMetric(EventBase event) { m.addVersion(BaseQueryMetric.DATAWAVE, fieldValue); } else if (fieldName.startsWith("VERSION.")) { m.addVersion(fieldName.substring(8), fieldValue); - } else if (fieldName.equals("YIELD_COUNT")) { - m.setYieldCount(Long.parseLong(fieldValue)); } else if (fieldName.equals("LOGIN_TIME")) { m.setLoginTime(Long.parseLong(fieldValue)); } else { @@ -816,6 +812,9 @@ public void reload() { try { this.recordWriter.close(null); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.error(e.getMessage(), e); } } diff --git a/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricHolder.java b/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricHolder.java index 51334b4b7ec..aa7a4b28246 100644 --- a/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricHolder.java +++ b/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricHolder.java @@ -56,7 +56,7 @@ public boolean equals(Object o) { @Override public String toString() { if (this.queryMetric == null || this.principal == null) { - return null; + return ""; } else { return this.principal + ":" + this.queryMetric; } diff --git a/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricMessage.java b/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricMessage.java index ddbcfc451dc..e31121b9ddd 100644 --- a/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricMessage.java +++ b/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricMessage.java @@ -41,7 +41,7 @@ public boolean equals(Object o) { @Override public String toString() { if (this.metricHolder == null) { - return null; + return ""; } else { return this.metricHolder.toString(); } diff --git a/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricsWriter.java b/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricsWriter.java index daf82f4b24e..d5c997305b2 100644 --- a/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricsWriter.java +++ b/web-services/query/src/main/java/datawave/webservice/query/metric/QueryMetricsWriter.java @@ -115,7 +115,9 @@ public void shutdown() { try { Thread.sleep(200); } catch (Exception e) { - + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } } } this.shutDownMetricProcessors = true; @@ -123,7 +125,9 @@ public void shutdown() { try { f.get(Math.max(500, maxShutDownMs - (System.currentTimeMillis() - start)), TimeUnit.MILLISECONDS); } catch (Exception e) { - + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } } } log.info(String.format("shut down with %d metric updates in queue", blockingQueue.size())); @@ -199,7 +203,7 @@ private List getMetricsFromQueue(int batchSize, long maxLaten metricHolderList.add(holder); } } catch (InterruptedException e) { - + Thread.currentThread().interrupt(); } } if (metricHolderList.size() > 0 || blockingQueue.size() > 0) { @@ -313,6 +317,9 @@ public void run() { } } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } log.error(e.getMessage(), e); } } diff --git a/web-services/query/src/main/java/datawave/webservice/query/runner/QueryExecutorBean.java b/web-services/query/src/main/java/datawave/webservice/query/runner/QueryExecutorBean.java index 4044facb91e..7408209b62e 100644 --- a/web-services/query/src/main/java/datawave/webservice/query/runner/QueryExecutorBean.java +++ b/web-services/query/src/main/java/datawave/webservice/query/runner/QueryExecutorBean.java @@ -1288,6 +1288,7 @@ public VoidResponse reset(@Required("id") @PathParam("id") String id) { CreateQuerySessionIDFilter.QUERY_ID.set(id); return response; } catch (InterruptedException e) { + Thread.currentThread().interrupt(); if (query != null) { query.getMetric().setLifecycle(QueryMetric.Lifecycle.CANCELLED); } diff --git a/web-services/query/src/main/java/datawave/webservice/query/runner/RunningQuery.java b/web-services/query/src/main/java/datawave/webservice/query/runner/RunningQuery.java index 3b44d65d656..92809ea1a1e 100644 --- a/web-services/query/src/main/java/datawave/webservice/query/runner/RunningQuery.java +++ b/web-services/query/src/main/java/datawave/webservice/query/runner/RunningQuery.java @@ -254,6 +254,7 @@ private Object getResultsThread() { try { Thread.sleep(1); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new RuntimeException(e); } } @@ -275,6 +276,9 @@ private Object getResultsThread() { } } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } if (settings.getUncaughtExceptionHandler() != null) { settings.getUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), e); } else { @@ -318,6 +322,7 @@ private boolean hasNext(long pageStartTime) throws TimeoutException { try { hasNext.wait(timeout); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // if we got interrupted, then just return false return false; } @@ -361,6 +366,7 @@ private Object getNext(long pageStartTime) throws TimeoutException { try { gotNext.wait(timeout); } catch (InterruptedException e) { + Thread.currentThread().interrupt(); // if we got interrupted, then just return null return null; } diff --git a/web-services/security/src/main/java/datawave/security/cache/AccumuloCacheStore.java b/web-services/security/src/main/java/datawave/security/cache/AccumuloCacheStore.java index b5a7fbd9fc3..080274816fc 100644 --- a/web-services/security/src/main/java/datawave/security/cache/AccumuloCacheStore.java +++ b/web-services/security/src/main/java/datawave/security/cache/AccumuloCacheStore.java @@ -191,7 +191,10 @@ public boolean delete(Object key) { } catch (MutationsRejectedException e) { throw new PersistenceException("Unable to write cache value to Accumulo", e); } - } catch (IOException | InterruptedException e) { + } catch (IOException e) { + throw new PersistenceException("Unable to serialize key: " + key, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new PersistenceException("Unable to serialize key: " + key, e); } } @@ -209,7 +212,10 @@ public MarshalledEntry _load(Object key, boolean loadValue, boolean loadMet scanner.setRange(new Range(new Text(keyBytes))); } catch (TableNotFoundException e) { throw new PersistenceException(e); - } catch (IOException | InterruptedException e) { + } catch (IOException e) { + throw new PersistenceException("Unable to serialize key " + key, e); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); throw new PersistenceException("Unable to serialize key " + key, e); } @@ -296,6 +302,9 @@ public void process(KeyFilter filter, CacheLoaderTask task, Exec task.processEntry(marshalledEntry, taskContext); } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new PersistenceException(e); } } diff --git a/web-services/web-root/src/main/webapp/index.html b/web-services/web-root/src/main/webapp/index.html index 8be6325e0bb..0147d934b51 100644 --- a/web-services/web-root/src/main/webapp/index.html +++ b/web-services/web-root/src/main/webapp/index.html @@ -1,8 +1,9 @@ - + + DataWave

This is DataWave.

- \ No newline at end of file +