Skip to content

Commit 1c6b407

Browse files
authored
add dropoff event to ObjectTracing (#1060)
1 parent 4dc26b2 commit 1c6b407

File tree

5 files changed

+49
-3
lines changed

5 files changed

+49
-3
lines changed

src/main/java/emissary/output/filter/AbstractRollableFilter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import emissary.pool.AgentPool;
1111
import emissary.roll.RollManager;
1212
import emissary.roll.Roller;
13+
import emissary.spi.ObjectTracing;
14+
import emissary.spi.ObjectTracingService;
1315
import emissary.util.io.FileNameGenerator;
1416

1517
import org.apache.commons.lang3.StringUtils;
@@ -35,6 +37,7 @@ public abstract class AbstractRollableFilter extends AbstractFilter {
3537
public static final String MAX_ROLL_FILE_SIZE = "MAX_FILE_SIZE";
3638
public static final String MAX_OUTPUT_APPENDERS = "MAX_OUTPUT_APPENDERS";
3739
public static final String ROLL_INTERVAL_UNIT = "ROLL_INTERVAL_UNIT";
40+
public static final String ENABLE_OBJECT_TRACE = "ENABLE_OBJECT_TRACE";
3841

3942
protected String defaultOutputPath = "./out";
4043
protected Path outputPath;
@@ -46,6 +49,7 @@ public abstract class AbstractRollableFilter extends AbstractFilter {
4649
protected IJournaler rollable;
4750
protected FileNameGenerator fileNameGenerator;
4851
protected boolean appendNewLine = true;
52+
protected boolean enableObjectTrace = false;
4953

5054
/**
5155
* Method to convert payload(s) to an output type
@@ -98,6 +102,7 @@ protected void initRollConfig() {
98102
this.maxOutputAppenders = this.filterConfig.findIntEntry(MAX_OUTPUT_APPENDERS, AgentPool.computePoolSize());
99103
this.rollInterval = this.filterConfig.findLongEntry(CFG_ROLL_INTERVAL, rollInterval);
100104
this.rollIntervalUnits = TimeUnit.valueOf(this.filterConfig.findStringEntry(ROLL_INTERVAL_UNIT, rollIntervalUnits.toString()));
105+
this.enableObjectTrace = this.filterConfig.findBooleanEntry(ENABLE_OBJECT_TRACE, enableObjectTrace);
101106
}
102107

103108
/**
@@ -183,6 +188,14 @@ public int filter(final List<IBaseDataObject> payloadList, final Map<String, Obj
183188
if (code == STATUS_SUCCESS) {
184189
ko.commit();
185190
}
191+
192+
// Emit dropoff object tracing events
193+
if (enableObjectTrace) {
194+
for (IBaseDataObject d : payloadList) {
195+
ObjectTracingService.emitLifecycleEvent(d, d.getFilename(), ObjectTracing.Stage.DROP_OFF, true, this.filterName,
196+
String.valueOf(ko.getFinalDestination().getFileName()));
197+
}
198+
}
186199
} catch (IOException e) {
187200
logger.error("IOException during dropoff.", e);
188201
code = STATUS_FAILURE;

src/main/java/emissary/spi/EmissaryObjectTracingProvider.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,21 @@
77
public class EmissaryObjectTracingProvider implements ObjectTracing {
88

99
@Override
10-
public void getObjectTraceFields(IBaseDataObject d, String filename, Stage stage, Map<String, String> fieldMap) {
10+
public void getObjectTracePickUpFields(IBaseDataObject d, String filename, Stage stage, Map<String, String> fieldMap) {
1111
fieldMap.put("inputFileName", filename);
1212
fieldMap.put("stage", String.valueOf(stage));
1313
}
1414

15+
@Override
16+
public void getObjectTraceDropOffFields(IBaseDataObject d, String filename, Stage stage, String filterName, String outputFileName,
17+
Map<String, String> fieldMap) {
18+
fieldMap.put("inputFileName", filename);
19+
fieldMap.put("stage", String.valueOf(stage));
20+
fieldMap.put("outputFileName", outputFileName);
21+
fieldMap.put("uuid", String.valueOf(d.getInternalId()));
22+
fieldMap.put("outputType", filterName);
23+
}
24+
1525
@Override
1626
public void mapFieldNames(Map<String, String> fieldMap) {
1727
// no actions to perform

src/main/java/emissary/spi/ObjectTracing.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,20 @@ enum Stage {
1818
* @param stage The stage
1919
* @param fieldMap The map of fields we are adding to
2020
*/
21-
void getObjectTraceFields(IBaseDataObject d, String filename, ObjectTracing.Stage stage, Map<String, String> fieldMap);
21+
void getObjectTracePickUpFields(IBaseDataObject d, String filename, ObjectTracing.Stage stage, Map<String, String> fieldMap);
22+
23+
/**
24+
* With this provider, add the appropriate fields/values to the fieldMap
25+
*
26+
* @param d The IBDO
27+
* @param filename The filename of the object
28+
* @param stage The stage
29+
* @param filterName The name of the output filter (json, xml, etc.)
30+
* @param outputFileName The output filename that is generated
31+
* @param fieldMap The map of fields we are adding to
32+
*/
33+
void getObjectTraceDropOffFields(IBaseDataObject d, String filename, Stage stage, String filterName, String outputFileName,
34+
Map<String, String> fieldMap);
2235

2336
/**
2437
* Remaps field names if needed

src/main/java/emissary/spi/ObjectTracingService.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@ public class ObjectTracingService {
2222
private ObjectTracingService() {}
2323

2424
public static synchronized void emitLifecycleEvent(IBaseDataObject d, String filename, ObjectTracing.Stage stage, boolean useObjectTracing) {
25+
emitLifecycleEvent(d, filename, stage, useObjectTracing, null, null);
26+
}
27+
28+
public static synchronized void emitLifecycleEvent(IBaseDataObject d, String filename, ObjectTracing.Stage stage, boolean useObjectTracing,
29+
@Nullable String filterName, @Nullable String outputFileName) {
2530
if (useObjectTracing) {
2631
if (loader == null) {
2732
loader = ServiceLoader.load(ObjectTracing.class);
@@ -30,7 +35,11 @@ public static synchronized void emitLifecycleEvent(IBaseDataObject d, String fil
3035
// have the appropriate providers add fields
3136
Map<String, String> jsonFieldMap = new HashMap<>();
3237
for (ObjectTracing tracing : loader) {
33-
tracing.getObjectTraceFields(d, filename, stage, jsonFieldMap);
38+
if (stage.equals(ObjectTracing.Stage.PICK_UP)) {
39+
tracing.getObjectTracePickUpFields(d, filename, stage, jsonFieldMap);
40+
} else {
41+
tracing.getObjectTraceDropOffFields(d, filename, stage, filterName, outputFileName, jsonFieldMap);
42+
}
3443
}
3544
// once we have added fields from all providers, perform remapping of field names as appropriate
3645
for (ObjectTracing tracing : loader) {

src/main/resources/emissary/output/filter/JsonOutputFilter.cfg

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@ EXTRA_PARAM = "*"
88
#DENYLIST_FIELD =
99
#DENYLIST_PREFIX =
1010
#EMIT_PAYLOAD = "false"
11+
ENABLE_OBJECT_TRACE = "true"

0 commit comments

Comments
 (0)