Skip to content

Client support for lightweight consumer #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class CommonClientConfigs extends org.apache.kafka.clients.CommonClientCo

public static final String ORACLE_TRANSACTIONAL_PRODUCER ="oracle.transactional.producer";

public static final String ORACLE_CONSUMER_LIGHTWEIGHT = "oracle.consumer.lightweight";
public static final String ORACLE_CONSUMER_LIGHTWEIGHT_DOC = "Creates a light weight subscriber";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want that in CommonClientConfigs and ConsumerConfig ?


/*
public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
package org.oracle.okafka.clients.consumer;

import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -42,6 +41,7 @@
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.oracle.okafka.common.config.SslConfigs;
import org.oracle.okafka.clients.CommonClientConfigs;

import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -338,15 +338,20 @@ public class ConsumerConfig extends AbstractConfig {
" broker allows for it using `auto.create.topics.enable` broker configuration. This configuration must" +
" be set to `false` when using brokers older than 0.11.0";
public static final boolean DEFAULT_ALLOW_AUTO_CREATE_TOPICS = true;

/**
* <code>security.providers</code>
*/
public static final String SECURITY_PROVIDERS_CONFIG = SecurityConfig.SECURITY_PROVIDERS_CONFIG;
private static final String SECURITY_PROVIDERS_DOC = SecurityConfig.SECURITY_PROVIDERS_DOC;

private static final AtomicInteger CONSUMER_CLIENT_ID_SEQUENCE = new AtomicInteger(1);


/** <code>oracle.consumer.lightweight</code> */
public static final String ORACLE_CONSUMER_LIGHTWEIGHT = "oracle.consumer.lightweight";
public static final String ORACLE_CONSUMER_LIGHTWEIGHT_DOC = "Creates a light weight subscriber";


static {
CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
Type.LIST,
Expand Down Expand Up @@ -606,7 +611,13 @@ public class ConsumerConfig extends AbstractConfig {
.define(org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN,
ConfigDef.Type.STRING,
Importance.MEDIUM,
org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN_DOC);
org.oracle.okafka.clients.CommonClientConfigs.ORACLE_NET_TNS_ADMIN_DOC)
.define(CommonClientConfigs.ORACLE_CONSUMER_LIGHTWEIGHT,
ConfigDef.Type.BOOLEAN,
false,
Importance.LOW,
CommonClientConfigs.ORACLE_CONSUMER_LIGHTWEIGHT_DOC)
;

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import javax.jms.JMSException;
import javax.jms.Message;
Expand All @@ -33,6 +34,7 @@

import oracle.jdbc.OracleData;
import oracle.jdbc.OracleTypes;
import oracle.jdbc.OracleArray;
import oracle.jms.AQjmsBytesMessage;
import oracle.jms.AQjmsConnection;
import oracle.jms.AQjmsConsumer;
Expand Down Expand Up @@ -87,6 +89,7 @@
import org.oracle.okafka.common.utils.MessageIdConverter;
import org.oracle.okafka.common.utils.MessageIdConverter.OKafkaOffset;
import org.apache.kafka.common.utils.Time;
import oracle.jdbc.OracleConnection;

/**
* This class consumes messages from AQ
Expand All @@ -104,6 +107,15 @@ public final class AQKafkaConsumer extends AQClient{

private boolean skipConnectMe = false;
private boolean externalConn = false;

private static final String LTWT_COMMIT_SYNC = "{call dbms_teqk.AQ$_COMMITSYNC(?, ?, ?, ?, ?, ?, ?)}";
private static final String LTWT_COMMIT_SYNC_ALL = "{call dbms_teqk.AQ$_COMMITSYNC_ALL(?, ?, ?, ?, ?, ?, ?)}";
private static final String LTWT_SEEK = "{call dbms_teqk.AQ$_SEEK(?, ?, ?, ?, ?, ?, ?)}";
private static final String LTWT_SEEK_TO_BEGINNING = "{call dbms_teqk.AQ$_SEEKTOBEGINNING(?, ?, ?, ?, ?)}";
private static final String LTWT_SEEK_TO_END = "{call dbms_teqk.AQ$_SEEKTOEND(?, ?, ?, ?, ?)}";
private static final String LTWT_SUB = "{call sys.dbms_aqadm.add_ltwt_subscriber(?, sys.aq$_agent(?,null,null))}";

private final Map<Node, Map<String, CallableStatement>> callableCacheMap = new ConcurrentHashMap<>();

public AQKafkaConsumer(LogContext logContext, ConsumerConfig configs, Time time, Metadata metadata,Metrics metrics)

Expand All @@ -123,6 +135,36 @@ public void setAssignors(List<ConsumerPartitionAssignor> _assignores )
{
assignors = _assignores;
}

private CallableStatement getOrCreateCallable(Node node, String key, String sql) {
Map<String, CallableStatement> nodeMap = callableCacheMap.computeIfAbsent(node, n -> new ConcurrentHashMap<>());
return nodeMap.computeIfAbsent(key, k -> {
try {
Connection con = getConnection(node);
return con.prepareCall(sql);
} catch (SQLException | JMSException e) {
throw new RuntimeException("Failed to prepare statement for " + key, e);
}
});
}

public void closeCallableStmt(Node node) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invoke this when we close the consumer

Map<String, CallableStatement> stmts = callableCacheMap.remove(node);
if (stmts != null) {
for (CallableStatement stmt : stmts.values()) {
try { stmt.close(); } catch (Exception e) {}
}
}
}

private String getCurrentUser(Node node) throws SQLException, JMSException {
Connection con = ((AQjmsSession) topicConsumersMap.get(node).getSession()).getDBConnection();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if getCurrentUser() can use getConnection() ?

return con.getMetaData().getUserName();
}

private Connection getConnection(Node node) throws SQLException, JMSException {
return ((AQjmsSession) topicConsumersMap.get(node).getSession()).getDBConnection();
}

public ClientResponse send(ClientRequest request) {
this.selectorMetrics.requestCompletedSend(request.destination());
Expand All @@ -132,7 +174,7 @@ public ClientResponse send(ClientRequest request) {
}
return cr;
}

/**
* Determines the type of request and calls appropriate method for handling request
* @param request request to be sent
Expand Down Expand Up @@ -275,17 +317,26 @@ public ClientResponse commit(ClientRequest request) {
log.debug("Commit Nodes. " + nodes.size());
for(Map.Entry<Node, List<TopicPartition>> node : nodes.entrySet()) {
if(node.getValue().size() > 0) {
String topic = node.getValue().get(0).topic();
TopicConsumers consumers = topicConsumersMap.get(node.getKey());
try {
log.debug("Committing now for node " + node.toString());
TopicSession jmsSession =consumers.getSession();
if(jmsSession != null)
{
Boolean ltwtSub = configs.getBoolean(ConsumerConfig.ORACLE_CONSUMER_LIGHTWEIGHT);

if(!ltwtSub.equals(true)) {
log.debug("Committing now for node " + node.toString());
jmsSession.commit();
log.debug("Commit done");
}else {
log.info("No valid session to commit for node " + node);
TopicSession jmsSession =consumers.getSession();
if(jmsSession != null)
{
log.debug("Committing now for node " + node.toString());
jmsSession.commit();
log.debug("Commit done");
}else {
log.info("No valid session to commit for node " + node);
}
}
else{
log.debug("Performing lightweight commit for node " + node);
commitOffsetsLightWeightSub(node.getKey(), topic, offsets);
}
result.put(node.getKey(), null);

Expand All @@ -307,6 +358,140 @@ public ClientResponse commit(ClientRequest request) {
return createCommitResponse(request, nodes, offsets, result, error);
}

private void commitOffsetsLightWeightSub(Node node, String topic, Map<TopicPartition, OffsetAndMetadata> offsets) {

final int OFFSET_DIVISOR = 20000;
int size = offsets.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check for an existing variable for SUBSHARDSIZE

int[] partitions = new int[size];
int[] priorities = new int[size];
long[] subshards = new long[size];
long[] sequences = new long[size];

int index = 0;
for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetEntry : offsets.entrySet()) {
TopicPartition tp = offsetEntry.getKey();
OffsetAndMetadata metadata = offsetEntry.getValue();
partitions[index] = tp.partition() * 2;
priorities[index] = 0;
subshards[index] = metadata.offset() / OFFSET_DIVISOR;
sequences[index] = metadata.offset() % OFFSET_DIVISOR;
index++;
}

commitSyncAll(node, topic, partitions, priorities, subshards, sequences);
}

public void CommitSync(Node node, String topic, int partition_id, int priority,
long subshard_id, long seq_num ) {

try {
String user = getCurrentUser(node);
CallableStatement cStmt = getOrCreateCallable(node, "COMMIT_SYNC", LTWT_COMMIT_SYNC);
cStmt.setString(1, user);
cStmt.setString(2, topic);
cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.setInt(4, partition_id);
cStmt.setInt(5, priority);
cStmt.setLong(6, subshard_id);
cStmt.setLong(7, seq_num);
cStmt.execute();
log.debug("Light weight CommitSync executed successfully for topic: {}, partition: {}, subshard: {}, seq: {}",
topic, partition_id, subshard_id, seq_num);
} catch(Exception ex) {
log.error("Error during light weight CommitSync for node: " + node + ", topic: " + topic, ex);
}
}

public void commitSyncAll(Node node, String topic, int[] partition_id, int[] priority,
long[] subshard_id, long[] seq_num ) {

try {
OracleConnection oracleCon = (OracleConnection) getConnection(node);
String user = getCurrentUser(node);

Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", partition_id);
Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", priority);
Array subshardArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", subshard_id);
Array sequenceArray = oracleCon.createOracleArray("DBMS_TEQK.INPUT_ARRAY_T", seq_num);

CallableStatement cStmt = getOrCreateCallable(node, "COMMIT_SYNC_ALL", LTWT_COMMIT_SYNC_ALL);
cStmt.setString(1, user);
cStmt.setString(2, topic);
cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.setArray(4, partitionArray);
cStmt.setArray(5, priorityArray);
cStmt.setArray(6, subshardArray);
cStmt.setArray(7, sequenceArray);
cStmt.execute();
log.debug("Light weight CommitSyncAll executed for topic: {}, partitions: {}", topic, partition_id.length);
} catch(Exception ex) {
log.error("Error in light weight commitSyncAll for topic: " + topic + ", node: " + node, ex);
}
}

public void lightWeightSeek(Node node, String topic, int partition_id, int priority,
long subshard_id, long seq_num ) {

try {
String user = getCurrentUser(node);
CallableStatement cStmt = getOrCreateCallable(node, "SEEK", LTWT_SEEK);
cStmt.setString(1, user);
cStmt.setString(2, topic);
cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.setInt(4, partition_id);
cStmt.setInt(5, priority);
cStmt.setLong(6, subshard_id);
cStmt.setLong(7, seq_num);
cStmt.execute();
log.debug("Light weight seek executed successfully for topic: {}, partition: {}, subshard: {}, seq: {}",
topic, partition_id, subshard_id, seq_num);
} catch(Exception ex) {
log.error("Error in lightWeightseek for topic: " + topic + ", node: " + node, ex);
}
}

public void lightWeightSeektoBeginning(Node node, String topic, int[] partition_id, int[] priority) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to modify the seek() method to use lightweight seek.
We need to invoke lightWeightSeektoBeginning when OffsetRessetStrategy is set to SEEK_TO_BEGINNING


try {
OracleConnection oracleCon = (OracleConnection) getConnection(node);
String user = getCurrentUser(node);
Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", partition_id);
Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", priority);
CallableStatement cStmt = getOrCreateCallable(node, "SEEK_TO_BEGINNING", LTWT_SEEK_TO_BEGINNING);
cStmt.setString(1, user);
cStmt.setString(2, topic);
cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.setArray(4, partitionArray);
cStmt.setArray(5, priorityArray);
cStmt.execute();
log.debug("lightWeightSeektoBeginning executed for topic: {}, partitions: {}", topic, partition_id.length);
} catch(Exception ex) {
log.error("Error in lightWeightSeektoBeginning for topic: " + topic + ", node: " + node, ex);
}
}

public void lightWeightSeektoEnd(Node node, String topic, int[] partition_id, int[] priority) {

try {
OracleConnection oracleCon = (OracleConnection) getConnection(node);
String user = getCurrentUser(node);
Array partitionArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", partition_id);
Array priorityArray = oracleCon.createOracleArray("DBMS_TEQK.SEEK_INPUT_ARRAY_T", priority);

CallableStatement cStmt = getOrCreateCallable(node, "SEEK_TO_END", LTWT_SEEK_TO_END);
cStmt.setString(1, user);
cStmt.setString(2, topic);
cStmt.setString(3, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.setArray(4, partitionArray);
cStmt.setArray(5, priorityArray);
cStmt.execute();
log.debug("lightWeightSeektoEnd executed for topic: {}, partitions: {}", topic, partition_id.length);
} catch(Exception ex) {
log.error("Error in lightWeightSeektoEnd for topic: " + topic + ", node: " + node, ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw the exception to caller as well. That will be used to set in the response.

}
}


private ClientResponse createCommitResponse(ClientRequest request, Map<Node, List<TopicPartition>> nodes,
Map<TopicPartition, OffsetAndMetadata> offsets, Map<Node, Exception> result, boolean error) {
return new ClientResponse(request.makeHeader((short)1), request.callback(), request.destination(),
Expand Down Expand Up @@ -1449,7 +1634,13 @@ public ClientResponse subscribe(ClientRequest request) {
topicConsumersMap.put(node, new TopicConsumers(node));
}
TopicConsumers consumers = topicConsumersMap.get(node);
consumers.getTopicSubscriber(topic);

if(consumers.getlightWeightSub()) {
consumers.createLightWeightSub(topic, node);
}
else {
consumers.getTopicSubscriber(topic);
}
metadata.setDBVersion(consumers.getDBVersion());
} catch(JMSException exception) {
log.error("Exception during Subscribe request " + exception, exception);
Expand Down Expand Up @@ -1563,10 +1754,12 @@ private final class TopicConsumers {
private Map<String, TopicSubscriber> topicSubscribers = null;
private final Node node;
private String dbVersion;
private Boolean lightWeightSub;
public TopicConsumers(Node node) throws JMSException {
this(node, TopicSession.AUTO_ACKNOWLEDGE);
}
public TopicConsumers(Node node,int mode) throws JMSException {

this.node = node;
conn = createTopicConnection(node);

Expand All @@ -1586,9 +1779,11 @@ public TopicConsumers(Node node,int mode) throws JMSException {

try {
this.dbVersion = ConnectionUtils.getDBVersion(oConn);
this.lightWeightSub = configs.getBoolean(ConsumerConfig.ORACLE_CONSUMER_LIGHTWEIGHT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get DBMajorVersion and if >= 26 and if LIGHTWEIGHT property is set then only allow to crate LTWT subscriber


}catch(Exception e)
{
log.error("Exception whle fetching DB Version " + e);
log.error("Exception whle fetching DB Version and lightweight consumer config" + e);
}

}catch(Exception e)
Expand Down Expand Up @@ -1652,6 +1847,19 @@ private TopicSubscriber createTopicSubscriber(String topic) throws JMSException
topicSubscribers.put(topic, subscriber);
return subscriber;
}

private void createLightWeightSub(String topic, Node node) {
try {
CallableStatement cStmt = getOrCreateCallable(node, "CREATE_LTWT_SUB", LTWT_SUB);
cStmt.setString(1, ConnectionUtils.enquote(topic));
cStmt.setString(2, configs.getString(ConsumerConfig.GROUP_ID_CONFIG));
cStmt.execute();
log.debug("Lightweight subscriber created for topic: " + topic + ", node: " + node);
}
catch(Exception ex) {
log.error("Error creating lightweight subscriber for topic: " + topic + ", node: " + node, ex);
}
}

private void refresh(Node node) throws JMSException {
conn = createTopicConnection(node);
Expand Down Expand Up @@ -1693,6 +1901,10 @@ public String getDBVersion()
{
return dbVersion;
}

public boolean getlightWeightSub() {
return lightWeightSub;
}

}

Expand Down