Skip to content

Client support for lightweight consumer #88

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

Mrunal98
Copy link
Member

@Mrunal98 Mrunal98 commented May 6, 2025

added createLightWeightSub(), commit() methods and seek() methods for kafka lightweight subscriber

@oracle-contributor-agreement oracle-contributor-agreement bot added the OCA Verified All contributors have signed the Oracle Contributor Agreement. label May 6, 2025
commitSyncAll(node, topic, partitions, priorities, subshards, sequences);
}

public void createLightWeightSub(String topic, Node node) throws SQLException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

createLightWeightSub where do we invoke this from ?
Also, if we are creating a lightweight sub, can we set a flag in TopicConsumers to indicate that this is a lightweight sub.


Connection con = ((AQjmsSession)topicConsumersMap.get(node).getSession()).getDBConnection();
String user = con.getMetaData().getUserName();
cStmt = con.prepareCall("{call dbms_teqk.AQ$_COMMITSYNC(?, ?, ?, ?, ?, ?, ?)}");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to cache the callable statements and create it once only. We can close them from consumer.close()

}

private String getCurrentUser(Node node) throws SQLException, JMSException {
Connection con = ((AQjmsSession) topicConsumersMap.get(node).getSession()).getDBConnection();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See if getCurrentUser() can use getConnection() ?

}
}

public void lightWeightSeektoBeginning(Node node, String topic, int[] partition_id, int[] priority) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to modify the seek() method to use lightweight seek.
We need to invoke lightWeightSeektoBeginning when OffsetRessetStrategy is set to SEEK_TO_BEGINNING

@@ -58,6 +58,9 @@ public class CommonClientConfigs extends org.apache.kafka.clients.CommonClientCo

public static final String ORACLE_TRANSACTIONAL_PRODUCER ="oracle.transactional.producer";

public static final String ORACLE_CONSUMER_LIGHTWEIGHT = "oracle.consumer.lightweight";
public static final String ORACLE_CONSUMER_LIGHTWEIGHT_DOC = "Creates a light weight subscriber";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we want that in CommonClientConfigs and ConsumerConfig ?

}
}

public void closeCallableStmt(Node node) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invoke this when we close the consumer

private void commitOffsetsLightWeightSub(Node node, String topic, Map<TopicPartition, OffsetAndMetadata> offsets) {

final int OFFSET_DIVISOR = 20000;
int size = offsets.size();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check for an existing variable for SUBSHARDSIZE

}
else {
long subshard = offset / 20000;
long sequence = offset % 20000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace 20000 with appropriate variable

cStmt.execute();
log.debug("lightWeightSeektoEnd executed for topic: {}, partitions: {}", topic, partition_id.length);
} catch (Exception ex) {
log.error("Error in lightWeightSeektoEnd for topic: " + topic + ", node: " + node, ex);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

throw the exception to caller as well. That will be used to set in the response.

@@ -1586,9 +1846,11 @@ public TopicConsumers(Node node,int mode) throws JMSException {

try {
this.dbVersion = ConnectionUtils.getDBVersion(oConn);
this.lightWeightSub = configs.getBoolean(ConsumerConfig.ORACLE_CONSUMER_LIGHTWEIGHT);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get DBMajorVersion and if >= 26 and if LIGHTWEIGHT property is set then only allow to crate LTWT subscriber

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
OCA Verified All contributors have signed the Oracle Contributor Agreement.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants