Skip to content

Commit 75a1c90

Browse files
committed
[FIX_LOGGING] add some debug lines and throwable catch to avoid data loss
1 parent 492f41a commit 75a1c90

File tree

1 file changed

+49
-43
lines changed
  • jbpm-event-emitters/jbpm-event-emitters-kafka/src/main/java/org/jbpm/event/emitters/kafka

1 file changed

+49
-43
lines changed

jbpm-event-emitters/jbpm-event-emitters-kafka/src/main/java/org/jbpm/event/emitters/kafka/KafkaEventEmitter.java

Lines changed: 49 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -44,16 +44,16 @@
4444
import com.fasterxml.jackson.databind.SerializationFeature;
4545

4646
/**
47-
* Kafka implementation of EventEmitter that simply pushes out data to several Kafka topics depending on InstanceView type.
47+
* Kafka implementation of EventEmitter that simply pushes out data to several Kafka topics depending on InstanceView type.
4848
*
4949
* Expects following parameters to configure itself - via system properties
5050
* <ul>
51-
* <li>org.kie.jbpm.event.emitters.kafka.date_format - date and time format to be sent to Kafka - default format is yyyy-MM-dd'T'HH:mm:ss.SSSZ</li>
52-
* <li>org.kie.jbpm.event.emitters.kafka.bootstrap.servers - Kafka server ip, default is localhost:9092</li>
53-
* <li>org.kie.jbpm.event.emitters.kafka.client.id - Kafka client id</li>
54-
* <li>org.kie.jbpm.event.emitters.kafka.acks - Kafka acknowledge policy, check <a href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka documentation</a></li>
55-
* <li>org.kie.jbpm.event.emitters.kafka.topic.<processes|tasks|cases>. Topic name for subscribing to these events. Defaults are "jbpm-<processes|tasks|cases>-events"</li>
56-
* </ul>
51+
* <li>org.kie.jbpm.event.emitters.kafka.date_format - date and time format to be sent to Kafka - default format is yyyy-MM-dd'T'HH:mm:ss.SSSZ</li>
52+
* <li>org.kie.jbpm.event.emitters.kafka.bootstrap.servers - Kafka server ip, default is localhost:9092</li>
53+
* <li>org.kie.jbpm.event.emitters.kafka.client.id - Kafka client id</li>
54+
* <li>org.kie.jbpm.event.emitters.kafka.acks - Kafka acknowledge policy, check <a href="http://kafka.apache.org/documentation.html#producerconfigs">Kafka documentation</a></li>
55+
* <li>org.kie.jbpm.event.emitters.kafka.topic.<processes|tasks|cases>. Topic name for subscribing to these events. Defaults are "jbpm-<processes|tasks|cases>-events"</li>
56+
* </ul>
5757
*/
5858
public class KafkaEventEmitter implements EventEmitter {
5959

@@ -62,7 +62,7 @@ public class KafkaEventEmitter implements EventEmitter {
6262
protected static final String KAFKA_EMITTER_PREFIX = "org.kie.jbpm.event.emitters.kafka.";
6363

6464
private ObjectMapper mapper;
65-
65+
6666
private KafkaSender sender;
6767

6868
private Producer<String, byte[]> producer;
@@ -73,10 +73,10 @@ public KafkaEventEmitter() {
7373

7474
KafkaEventEmitter(Producer<String, byte[]> producer) {
7575
this.producer = producer;
76-
this.sender = Boolean.getBoolean(KAFKA_EMITTER_PREFIX+"sync") ? this::sendSync : this::sendAsync;
77-
mapper = new ObjectMapper()
76+
this.sender = Boolean.getBoolean(KAFKA_EMITTER_PREFIX + "sync") ? this::sendSync : this::sendAsync;
77+
this.mapper = new ObjectMapper()
7878
.setDateFormat(new SimpleDateFormat(System.getProperty(
79-
KAFKA_EMITTER_PREFIX+"date_format", System.getProperty(
79+
KAFKA_EMITTER_PREFIX + "date_format", System.getProperty(
8080
"org.kie.server.json.date_format",
8181
"yyyy-MM-dd'T'HH:mm:ss.SSSZ"))))
8282
.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)
@@ -95,38 +95,45 @@ public void apply(Collection<InstanceView<?>> data) {
9595
}
9696

9797
for (InstanceView<?> view : data) {
98-
String processId;
99-
long processInstanceId;
100-
String type;
101-
String topic;
102-
if (view instanceof ProcessInstanceView) {
103-
ProcessInstanceView processInstanceView = (ProcessInstanceView) view;
104-
topic = "processes";
105-
type = "process";
106-
processInstanceId = processInstanceView.getId();
107-
processId = processInstanceView.getProcessId();
108-
} else if (view instanceof TaskInstanceView) {
109-
TaskInstanceView taskInstanceView = (TaskInstanceView) view;
110-
topic = "tasks";
111-
type = "task";
112-
processInstanceId = taskInstanceView.getProcessInstanceId();
113-
processId = taskInstanceView.getProcessId();
114-
} else if (view instanceof CaseInstanceView) {
115-
CaseInstanceView caseInstanceView = (CaseInstanceView) view;
116-
topic = "cases";
117-
type = "case";
118-
processInstanceId = caseInstanceView.getId();
119-
processId = caseInstanceView.getCaseDefinitionId();
120-
} else {
121-
logger.warn("Unsupported view type {}", view.getClass());
122-
continue;
98+
try {
99+
String processId;
100+
long processInstanceId;
101+
String type;
102+
String topic;
103+
if (view instanceof ProcessInstanceView) {
104+
ProcessInstanceView processInstanceView = (ProcessInstanceView) view;
105+
topic = "processes";
106+
type = "process";
107+
processInstanceId = processInstanceView.getId();
108+
processId = processInstanceView.getProcessId();
109+
} else if (view instanceof TaskInstanceView) {
110+
TaskInstanceView taskInstanceView = (TaskInstanceView) view;
111+
topic = "tasks";
112+
type = "task";
113+
processInstanceId = taskInstanceView.getProcessInstanceId();
114+
processId = taskInstanceView.getProcessId();
115+
} else if (view instanceof CaseInstanceView) {
116+
CaseInstanceView caseInstanceView = (CaseInstanceView) view;
117+
topic = "cases";
118+
type = "case";
119+
processInstanceId = caseInstanceView.getId();
120+
processId = caseInstanceView.getCaseDefinitionId();
121+
} else {
122+
logger.warn("Unsupported view type {}", view.getClass());
123+
continue;
124+
}
125+
logger.debug("Sending view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view);
126+
sender.send(topic, type, processId, processInstanceId, view);
127+
logger.debug("Sucessfuly view sent view to topic {} type {} processId {} pid {} and view {}", topic, type, processId, processInstanceId, view);
128+
} catch (Throwable th) {
129+
logError(view, th);
123130
}
124-
sender.send(topic, type, processId, processInstanceId, view);
131+
125132
}
126133
}
127-
134+
128135
private interface KafkaSender {
129-
void send (String topic, String type, String processId, long processInstanceId, InstanceView<?> view);
136+
void send(String topic, String type, String processId, long processInstanceId, InstanceView<?> view);
130137
}
131138

132139
private byte[] viewToPayload(String type, String processId, long processInstanceId, InstanceView<?> view) throws JsonProcessingException {
@@ -140,7 +147,7 @@ private void sendAsync(String topic, String type, String processId, long process
140147
logError(view, e);
141148
}
142149
});
143-
} catch (Exception e) {
150+
} catch (Throwable e) {
144151
logError(view, e);
145152
}
146153
}
@@ -162,11 +169,10 @@ private void sendSync(String topic, String type, String processId, long processI
162169
}
163170
}
164171

165-
private void logError(InstanceView<?> view, Exception e) {
172+
private void logError(InstanceView<?> view, Throwable e) {
166173
logger.error("Error publishing view {}", view, e);
167174
}
168175

169-
170176
@Override
171177
public void drop(Collection<InstanceView<?>> data) {
172178
// nothing to do
@@ -191,7 +197,7 @@ private static Producer<String, byte[]> getProducer() {
191197

192198
private static String getTopic(String eventType) {
193199
return System.getProperty("org.kie.jbpm.event.emitters.kafka.topic." + eventType, "jbpm-" + eventType +
194-
"-events");
200+
"-events");
195201
}
196202

197203
protected static Map<String, Object> getProducerProperties() {

0 commit comments

Comments
 (0)