From 9b2dc283ebe4939e6f8e37e5bfefd2c0e9bacacf Mon Sep 17 00:00:00 2001 From: Jason Wells Date: Thu, 24 Jun 2021 16:36:40 -0700 Subject: [PATCH] add parellelism to bulk redactor --- .../annotations/AnnotationEvent.java | 2 +- .../annotations/AnnotationRepository.java | 2 +- .../annotations/BulkRedactor.java | 133 +++++++++++------- .../superutilities/export/PdfWorkCache.java | 4 +- 4 files changed, 86 insertions(+), 55 deletions(-) diff --git a/Java/src/main/java/com/nuix/superutilities/annotations/AnnotationEvent.java b/Java/src/main/java/com/nuix/superutilities/annotations/AnnotationEvent.java index e605f85..cd235bc 100644 --- a/Java/src/main/java/com/nuix/superutilities/annotations/AnnotationEvent.java +++ b/Java/src/main/java/com/nuix/superutilities/annotations/AnnotationEvent.java @@ -2,9 +2,9 @@ import java.util.Collection; +import org.apache.log4j.Logger; import org.joda.time.DateTime; -import jxl.common.Logger; import nuix.Case; import nuix.Item; diff --git a/Java/src/main/java/com/nuix/superutilities/annotations/AnnotationRepository.java b/Java/src/main/java/com/nuix/superutilities/annotations/AnnotationRepository.java index 2f01536..0886f25 100644 --- a/Java/src/main/java/com/nuix/superutilities/annotations/AnnotationRepository.java +++ b/Java/src/main/java/com/nuix/superutilities/annotations/AnnotationRepository.java @@ -23,7 +23,7 @@ import com.nuix.superutilities.misc.SQLiteBacked; import com.nuix.superutilities.query.QueryHelper; -import jxl.common.Logger; +import org.apache.log4j.Logger; import nuix.BulkAnnotater; import nuix.Case; import nuix.Item; diff --git a/Java/src/main/java/com/nuix/superutilities/annotations/BulkRedactor.java b/Java/src/main/java/com/nuix/superutilities/annotations/BulkRedactor.java index 5b142f0..b0097c9 100644 --- a/Java/src/main/java/com/nuix/superutilities/annotations/BulkRedactor.java +++ b/Java/src/main/java/com/nuix/superutilities/annotations/BulkRedactor.java @@ -5,10 +5,12 @@ import java.util.Collection; import java.util.Comparator; import java.util.HashSet; -import java.util.TreeMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -69,7 +71,7 @@ public void whenProgressUpdated(Consumer callback) { progressUpdatedCallback = callback; } - private void fireProgressUpdated(BulkRedactorProgressInfo info) { + private synchronized void fireProgressUpdated(BulkRedactorProgressInfo info) { if(progressUpdatedCallback != null) { progressUpdatedCallback.accept(info); } @@ -197,13 +199,17 @@ public List findExpressionsInPdfFile(File file, Colle * @param nuixCase The source Nuix case. Needed to obtain items (if none were given) and/or obtain the appropriate markup set. * @param settings The settings used to find and generate the redactions. * @param scopeItems Items to find and redact. + * @param concurrency How many threads to put in ForkJoinPool * @throws Exception If something goes wrong * @return Returns a list of all match region objects (so they can be reported, inspected, etc) */ - public List findAndMarkup(Case nuixCase, BulkRedactorSettings settings, Collection scopeItems) throws Exception { + public List findAndMarkup(Case nuixCase, BulkRedactorSettings settings, Collection scopeItems, int concurrency) throws Exception { + Collection itemsToProcess; if(scopeItems == null || scopeItems.size() < 1) { logger.info("No scopeItems were provided, using all items in case"); - scopeItems = nuixCase.search(""); + itemsToProcess = nuixCase.search(""); + } else { + itemsToProcess = scopeItems; } List allFoundRegions = new ArrayList(); @@ -213,12 +219,13 @@ public List findAndMarkup(Case nuixCase, BulkRedactor com.nuix.data.util.aspose.AsposePdf.ensureInitialised(); PdfWorkCache pdfCache = new PdfWorkCache(settings.getTempDirectory()); - MarkupSet markupSet = null; - if (settings.getApplyRedactions() || settings.getApplyRedactions()) { + MarkupSet markupSet; + if (settings.getApplyRedactions() || settings.getApplyRedactions()) { markupSet = settings.getMarkupSet(nuixCase); + } else { + markupSet = null; } - int currentIteration = 0; - int matches = 0; + logMessage("Regular Expressions:"); for(String expression : settings.getExpressions()) { @@ -230,53 +237,75 @@ public List findAndMarkup(Case nuixCase, BulkRedactor logMessage(namedEntity); } - for(Item item : scopeItems) { - currentIteration += 1; - File tempPdf = pdfCache.getPdfPath(item); - - List regions = findExpressionsInPdfFile(tempPdf, settings.getExpressions()); - if(regions.size() > 0) { - for(NuixImageAnnotationRegion region : regions) { - region.setItem(item); - } - allFoundRegions.addAll(regions); - logMessage("Item with GUID %s had %s matches",item.getGuid(),regions.size()); - for(NuixImageAnnotationRegion region : regions) { - matches++; - if(settings.getApplyRedactions()) { region.applyRedaction(markupSet); } - if(settings.getApplyHighLights()) { region.applyHighlight(markupSet); } - } - } - - //Named entities require that we get matched values, convert those to expressions and then do another pass - if (settings.getNamedEntityTypes().size() > 0) { - Set entityValues = new HashSet(); - for(String entityType : settings.getNamedEntityTypes()) { - entityValues.addAll(item.getEntities(entityType)); - } - - Set entityExpressions = entityValues.stream().map(v -> BulkRedactorSettings.phraseToExpression(v)).collect(Collectors.toSet()); - List entityRegions = findExpressionsInPdfFile(tempPdf, entityExpressions); - if(entityRegions.size() > 0) { - for(NuixImageAnnotationRegion region : entityRegions) { - region.setItem(item); + int scopeItemsSize = scopeItems.size(); + AtomicInteger currentIteration = new AtomicInteger(0); + AtomicInteger matches = new AtomicInteger(0); + + Consumer workHorse = new Consumer() { + @Override + public void accept(Item item) { + try { + currentIteration.addAndGet(1); + File tempPdf = pdfCache.getPdfPath(item); + + Set allExpressions = new HashSet(); + allExpressions.addAll(settings.getExpressions()); + if (settings.getNamedEntityTypes().size() > 0) { + Set entityValues = new HashSet(); + for(String entityType : settings.getNamedEntityTypes()) { + entityValues.addAll(item.getEntities(entityType)); + } + entityValues.stream().map(v -> BulkRedactorSettings.phraseToExpression(v)).forEach(new Consumer() { + @Override + public void accept(String exp) { + allExpressions.add(exp); + } + }); } - allFoundRegions.addAll(regions); - logMessage("Item with GUID %s had %s named entity matches",item.getGuid(),entityRegions.size()); - for(NuixImageAnnotationRegion region : entityRegions) { - matches++; - if(settings.getApplyRedactions()) { region.applyRedaction(markupSet); } - if(settings.getApplyHighLights()) { region.applyHighlight(markupSet); } - } + + List regions = findExpressionsInPdfFile(tempPdf, allExpressions); + if(regions.size() > 0) { + for(NuixImageAnnotationRegion region : regions) { + region.setItem(item); + } + allFoundRegions.addAll(regions); + logMessage("Item with GUID %s had %s matches",item.getGuid(),regions.size()); + for(NuixImageAnnotationRegion region : regions) { + if(settings.getApplyRedactions()) { region.applyRedaction(markupSet); } + if(settings.getApplyHighLights()) { region.applyHighlight(markupSet); } + } + matches.addAndGet(regions.size()); + } + + pdfCache.forgetItem(item); + + // Report progress + synchronized(this) { + BulkRedactorProgressInfo progressInfo = new BulkRedactorProgressInfo(); + progressInfo.setCurrent(currentIteration.get()); + progressInfo.setTotal(scopeItemsSize); + progressInfo.setMatches(matches.get()); + fireProgressUpdated(progressInfo); + } + } catch (Exception e) { + logMessage("Exception processing item with GUID %s, %s (See Nuix logs for more detail)", item.getGuid(), e.getMessage()); + logger.error(String.format("Error while processing item with GUID %s", item.getGuid()),e); } } - - // Report progress - BulkRedactorProgressInfo progressInfo = new BulkRedactorProgressInfo(); - progressInfo.setCurrent(currentIteration); - progressInfo.setTotal(scopeItems.size()); - progressInfo.setMatches(matches); - fireProgressUpdated(progressInfo); + }; + + ForkJoinPool pool = null; + try { + pool = new ForkJoinPool(concurrency); + pool.submit(()->{ + itemsToProcess.parallelStream().forEach(workHorse); + }).get(); + } catch (Exception e) { + logger.error("Error while scanning",e); + throw e; + } finally { + if(pool != null) + pool.shutdown(); } logMessage("Cleaning up temp directory %s",settings.getTempDirectory()); diff --git a/Java/src/main/java/com/nuix/superutilities/export/PdfWorkCache.java b/Java/src/main/java/com/nuix/superutilities/export/PdfWorkCache.java index 4a5cf35..8dc09a9 100644 --- a/Java/src/main/java/com/nuix/superutilities/export/PdfWorkCache.java +++ b/Java/src/main/java/com/nuix/superutilities/export/PdfWorkCache.java @@ -53,7 +53,9 @@ public synchronized File getPdfPath(Item item) throws Exception { tempPdf = new File(tempPdf,guid.substring(3, 6)); tempPdf.mkdirs(); tempPdf = new File(tempPdf,item.getGuid()+".pdf"); - item.getPrintedImage().generate(printSettings); // Make sure PDF is generated or export can have issues + if(!item.getPrintedImage().isStored()) { + item.getPrintedImage().generate(printSettings); // Make sure PDF is generated or export can have issues + } pdfExporter.exportItem(item, tempPdf); pdfCache.put(item.getGuid(), tempPdf); }