Skip to content

Client support for lightweight consumer #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,10 @@ public class ConsumerConfig extends AbstractConfig {
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;

private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);


/** <code>oracle.consumer.lightweight</code> */
public static final String ORACLE_CONSUMER_LIGHTWEIGHT_CONFIG = "oracle.consumer.lightweight";

static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import oracle.jdbc.OracleData;
import oracle.jdbc.OracleTypes;
import oracle.jdbc.OracleArray;
import oracle.jms.AQjmsBytesMessage;
import oracle.jms.AQjmsConnection;
import oracle.jms.AQjmsConsumer;
Expand Down Expand Up @@ -87,6 +88,7 @@
import org.oracle.okafka.common.utils.MessageIdConverter;
import org.oracle.okafka.common.utils.MessageIdConverter.OKafkaOffset;
import org.apache.kafka.common.utils.Time;
import oracle.jdbc.OracleConnection;

/**
* This class consumes messages from AQ
Expand Down Expand Up @@ -275,17 +277,26 @@ public ClientResponse commit(ClientRequest request) {
log.debug("Commit Nodes. " + nodes.size());
for(Map.Entry<Node, List<TopicPartition>> node : nodes.entrySet()) {
if(node.getValue().size() > 0) {
String topic = node.getValue().get(0).topic();
TopicConsumers consumers = topicConsumersMap.get(node.getKey());
try {
log.debug("Committing now for node " + node.toString());
TopicSession jmsSession =consumers.getSession();
if(jmsSession != null)
{
String ltwtSub = configs.getString(ConsumerConfig.ORACLE_CONSUMER_LIGHTWEIGHT_CONFIG);

if(!ltwtSub.equals("true")) {
log.debug("Committing now for node " + node.toString());
jmsSession.commit();
log.debug("Commit done");
}else {
log.info("No valid session to commit for node " + node);
TopicSession jmsSession =consumers.getSession();
if(jmsSession != null)
{
log.debug("Committing now for node " + node.toString());
jmsSession.commit();
log.debug("Commit done");
}else {
log.info("No valid session to commit for node " + node);
}
}
else{
log.debug("Performing lightweight commit for node " + node);
commitOffsetsLightWeightSub(node.getKey(), topic, offsets);
}
result.put(node.getKey(), null);

Expand All @@ -307,6 +318,236 @@ public ClientResponse commit(ClientRequest request) {
return createCommitResponse(request, nodes, offsets, result, error);
}

private void commitOffsetsLightWeightSub(Node node, String topic, Map<TopicPartition, OffsetAndMetadata> offsets) {

final int OFFSET_DIVISOR = 20000;
int size = offsets.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check for an existing variable for SUBSHARDSIZE

int[] partitions = new int[size];
int[] priorities = new int[size];
long[] subshards = new long[size];
long[] sequences = new long[size];

int index = 0;
for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsets.entrySet()) {
TopicPartition tp = offsetEntry.getKey();
OffsetAndMetadata metadata = offsetEntry.getValue();
partitions[index] = tp.partition() * 2;
priorities[index] = 0;
subshards[index] = metadata.offset() / OFFSET_DIVISOR;
sequences[index] = metadata.offset() % OFFSET_DIVISOR;
index++;
}

commitSyncAll(node, topic, partitions, priorities, subshards, sequences);
}

public void createLightWeightSub(String topic, Node node) throws SQLException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createLightWeightSub where do we invoke this from ?
Also, if we are creating a lightweight sub, can we set a flag in TopicConsumers to indicate that this is a lightweight sub.


CallableStatement cStmt = null;
try {
Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection();
cStmt = con.prepareCall("{call sys.dbms_aqadm.add_ltwt_subscriber(?, sys.aq$_agent(?,null,null))}");
cStmt.setString(1, ConnectionUtils.enquote(topic));
cStmt.setString(2, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.execute();
log.debug("Lightweight subscriber created for topic: " + topic + ", node: " + node);
}

catch(Exception ex) {
log.error("Error creating lightweight subscriber for topic: " + topic + ", node: " + node, ex);
throw new SQLException("Failed to create lightweight subscriber", ex);
}
finally {
try {
if(cStmt != null)
cStmt.close();
} catch(Exception e) {
//do nothing
}
}
}

public void CommitSync(Node node, String topic, int partition_id, int priority,
long subshard_id, long seq_num ) {

CallableStatement cStmt = null;
try {

Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection();
String user = con.getMetaData().getUserName();
cStmt = con.prepareCall("{call dbms_teqk.AQ$_COMMITSYNC(?, ?, ?, ?, ?, ?, ?)}");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to cache the callable statements and create it once only. We can close them from consumer.close()

cStmt.setString(1, user);
cStmt.setString(2, topic);
cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.setInt(4, partition_id);
cStmt.setInt(5, priority);
cStmt.setLong(6, subshard_id);
cStmt.setLong(7, seq_num);
cStmt.execute();
log.debug("CommitSync executed successfully for topic: {}, partition: {}, subshard: {}, seq: {}",
topic, partition_id, subshard_id, seq_num);
}

catch(Exception ex) {
log.error("Error during CommitSync for node: " + node + ", topic: " + topic, ex);
}
finally {
try {
if(cStmt != null)
cStmt.close();
} catch(Exception e) {
//do nothing
}
}

}

public void commitSyncAll(Node node, String topic, int[] partition_id, int[] priority,
long[] subshard_id, long[] seq_num ) {

CallableStatement cStmt = null;
try {

Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection();

OracleConnection oracleCon = (OracleConnection) con;
String user = con.getMetaData().getUserName();

Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", partition_id);
Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", priority);
Array subshardArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", subshard_id);
Array sequenceArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", seq_num);

cStmt = con.prepareCall("{call dbms_teqk.AQ$_COMMITSYNC_ALL(?, ?, ?, ?, ?, ?, ?)}");
cStmt.setString(1, user);
cStmt.setString(2, topic);
cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.setArray(4, partitionArray);
cStmt.setArray(5, priorityArray);
cStmt.setArray(6, subshardArray);
cStmt.setArray(7, sequenceArray);
cStmt.execute();
log.debug("CommitSyncAll executed for topic: {}, partitions: {}", topic, partition_id.length);
}

catch(Exception ex) {
log.error("Error in commitSyncAll for topic: " + topic + ", node: " + node, ex);
}
finally {
try {
if(cStmt != null)
cStmt.close();
} catch(Exception e) {
//do nothing
}
}

}

public void lightWeightSeek(Node node, String topic, int partition_id, int priority,
long subshard_id, long seq_num ) {

CallableStatement cStmt = null;

try {
Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection();
String user = con.getMetaData().getUserName();
cStmt = con.prepareCall("{call dbms_teqk.AQ$_SEEK(?, ?, ?, ?, ?, ?, ?)}");
cStmt.setString(1, user);
cStmt.setString(2, topic);
cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.setInt(4, partition_id);
cStmt.setInt(5, priority);
cStmt.setLong(6, subshard_id);
cStmt.setLong(7, seq_num);
cStmt.execute();
log.debug("Light weight seek executed successfully for topic: {}, partition: {}, subshard: {}, seq: {}",
topic, partition_id, subshard_id, seq_num);
}

catch(Exception ex) {
log.error("Error in lightWeightseek for topic: " + topic + ", node: " + node, ex);
}
finally {
try {
if(cStmt != null)
cStmt.close();
} catch(Exception e) {
//do nothing
}
}

}

public void lightWeightSeektoBeginning(Node node, String topic, int[] partition_id, int[] priority) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to modify the seek() method to use lightweight seek.
We need to invoke lightWeightSeektoBeginning when OffsetRessetStrategy is set to SEEK_TO_BEGINNING


CallableStatement cStmt = null;

try {
Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection();
OracleConnection oracleCon = (OracleConnection) con;
String user = con.getMetaData().getUserName();
Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", partition_id);
Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", priority);
cStmt = con.prepareCall("{call dbms_teqk.AQ$_SEEKTOBEGINNING(?, ?, ?, ?, ?)}");
cStmt.setString(1, user);
cStmt.setString(2, topic);
cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.setArray(4, partitionArray);
cStmt.setArray(5, priorityArray);
cStmt.execute();
log.debug("lightWeightSeektoBeginning executed for topic: {}, partitions: {}", topic, partition_id.length);
}

catch(Exception ex) {
log.error("Error in lightWeightSeektoBeginning for topic: " + topic + ", node: " + node, ex);
}
finally {
try {
if(cStmt != null)
cStmt.close();
} catch(Exception e) {
//do nothing
}
}

}

public void lightWeightSeektoEnd(Node node, String topic, int[] partition_id, int[] priority) {

CallableStatement cStmt = null;

try {
Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection();
OracleConnection oracleCon = (OracleConnection) con;
String user = con.getMetaData().getUserName();
Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", partition_id);
Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", priority);

cStmt = con.prepareCall("{call dbms_teqk.AQ$_SEEKTOEND(?, ?, ?, ?, ?)}");
cStmt.setString(1, user);
cStmt.setString(2, topic);
cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.setArray(4, partitionArray);
cStmt.setArray(5, priorityArray);
cStmt.execute();
log.debug("lightWeightSeektoEnd executed for topic: {}, partitions: {}", topic, partition_id.length);
}

catch(Exception ex) {
log.error("Error in lightWeightSeektoEnd for topic: " + topic + ", node: " + node, ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw the exception to caller as well. That will be used to set in the response.

}
finally {
try {
if(cStmt != null)
cStmt.close();
} catch(Exception e) {
//do nothing
}
}

}

private ClientResponse createCommitResponse(ClientRequest request, Map<Node, List<TopicPartition>> nodes,
Map<TopicPartition, OffsetAndMetadata> offsets, Map<Node, Exception> result, boolean error) {
return new ClientResponse(request.makeHeader((short)1), request.callback(), request.destination(),
Expand Down