-
Notifications
You must be signed in to change notification settings - Fork 36
[WIP] Add data publishing capabilities to WSO2 OB IS Accelerator modules #832
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| buildFSEventExecutors(); | ||
| } catch (IOException | XMLStreamException | OMException e) { | ||
| throw new FinancialServicesRuntimeException("Error occurred while building configuration from " + | ||
| "financial-services.xml", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 1
| buildFSEventExecutors(); | |
| } catch (IOException | XMLStreamException | OMException e) { | |
| throw new FinancialServicesRuntimeException("Error occurred while building configuration from " + | |
| "financial-services.xml", e); | |
| } catch (IOException | XMLStreamException | OMException e) { | |
| log.error("Error occurred while building configuration from financial-services.xml", e.getMessage()); | |
| throw new FinancialServicesRuntimeException("Error occurred while building configuration from " + | |
| "financial-services.xml", e); |
| private void buildFSEventExecutors() { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 2
| private void buildFSEventExecutors() { | |
| private void buildFSEventExecutors() { | |
| log.debug("Building Financial Services Event Executors configuration"); |
|
|
||
| if (StringUtils.isEmpty(obExecutorClass)) { | ||
| //Throwing exceptions since we cannot proceed without invalid executor names | ||
| throw new FinancialServicesRuntimeException("Event Executor class is not defined " + | ||
| "correctly in open-banking.xml"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 3
| if (StringUtils.isEmpty(obExecutorClass)) { | |
| //Throwing exceptions since we cannot proceed without invalid executor names | |
| throw new FinancialServicesRuntimeException("Event Executor class is not defined " + | |
| "correctly in open-banking.xml"); | |
| if (StringUtils.isEmpty(obExecutorClass)) { | |
| log.error("Event Executor class is not defined correctly in open-banking.xml"); | |
| //Throwing exceptions since we cannot proceed without invalid executor names | |
| throw new FinancialServicesRuntimeException("Event Executor class is not defined " + | |
| "correctly in open-banking.xml"); |
| } | ||
| fsEventExecutors.put(priority, obExecutorClass); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 4
| } | |
| fsEventExecutors.put(priority, obExecutorClass); | |
| } | |
| fsEventExecutors.put(priority, obExecutorClass); | |
| log.debug("Added event executor: {} with priority: {}", obExecutorClass, priority); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| protected void buildDataPublishingStreams() { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 5
| protected void buildDataPublishingStreams() { | |
| protected void buildDataPublishingStreams() { | |
| log.debug("Building Data Publishing Streams configuration"); |
|
|
||
| if (StringUtils.isEmpty(attributeName)) { | ||
| //Throwing exceptions since we cannot proceed without valid attribute names | ||
| throw new FinancialServicesRuntimeException( | ||
| "Data publishing attribute name is not defined " + | ||
| "correctly in financial-services.xml"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 6
| if (StringUtils.isEmpty(attributeName)) { | |
| //Throwing exceptions since we cannot proceed without valid attribute names | |
| throw new FinancialServicesRuntimeException( | |
| "Data publishing attribute name is not defined " + | |
| "correctly in financial-services.xml"); | |
| if (StringUtils.isEmpty(attributeName)) { | |
| log.error("Data publishing attribute name is not defined correctly in financial-services.xml"); | |
| //Throwing exceptions since we cannot proceed without valid attribute names | |
| throw new FinancialServicesRuntimeException( | |
| "Data publishing attribute name is not defined " + | |
| "correctly in financial-services.xml"); |
| String attributeKey = dataStreamName + "_" + attributeName; | ||
| dataPublishingValidationMap.put(attributeKey, metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 7
| String attributeKey = dataStreamName + "_" + attributeName; | |
| dataPublishingValidationMap.put(attributeKey, metadata); | |
| } | |
| } | |
| if (log.isDebugEnabled()) { | |
| log.debug("Added data publishing stream: {} with {} attributes", dataStreamName, attributes.size()); | |
| } |
|
|
||
| @Override | ||
| public void processEvent(FSEvent fsEvent) { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 8
| @Override | |
| public void processEvent(FSEvent fsEvent) { | |
| @Override | |
| public void processEvent(FSEvent fsEvent) { | |
| log.debug("Processing FSEvent: {}", fsEvent); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
|
|
||
| public FSEventQueue(int queueSize, int workerThreadCount) { | ||
|
|
||
| // Note : Using a fixed worker thread pool and a bounded queue to control the load on the server | ||
| executorService = Executors.newFixedThreadPool(workerThreadCount); | ||
| eventQueue = new ArrayBlockingQueue<>(queueSize); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 9
| public FSEventQueue(int queueSize, int workerThreadCount) { | |
| // Note : Using a fixed worker thread pool and a bounded queue to control the load on the server | |
| executorService = Executors.newFixedThreadPool(workerThreadCount); | |
| eventQueue = new ArrayBlockingQueue<>(queueSize); | |
| public FSEventQueue(int queueSize, int workerThreadCount) { | |
| log.info("Initializing FSEventQueue with queue size: " + queueSize + " and worker threads: " + workerThreadCount); | |
| // Note : Using a fixed worker thread pool and a bounded queue to control the load on the server | |
| executorService = Executors.newFixedThreadPool(workerThreadCount); | |
| eventQueue = new ArrayBlockingQueue<>(queueSize); |
|
|
||
| public void put(FSEvent obEvent) { | ||
|
|
||
| try { | ||
| if (eventQueue.offer(obEvent)) { | ||
| executorService.submit(new FSQueueWorker(eventQueue, executorService)); | ||
| } else { | ||
| log.error("Event queue is full. Starting to drop events."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 10
| public void put(FSEvent obEvent) { | |
| try { | |
| if (eventQueue.offer(obEvent)) { | |
| executorService.submit(new FSQueueWorker(eventQueue, executorService)); | |
| } else { | |
| log.error("Event queue is full. Starting to drop events."); | |
| public void put(FSEvent obEvent) { | |
| if (log.isDebugEnabled()) { | |
| log.debug("Attempting to add event to queue"); | |
| } | |
| try { | |
| if (eventQueue.offer(obEvent)) { | |
| executorService.submit(new FSQueueWorker(eventQueue, executorService)); | |
| } else { | |
| log.error("Event queue is full. Starting to drop events."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| } catch (RejectedExecutionException e) { | ||
| log.warn("Task submission failed. Task queue might be full", e); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| protected void finalize() throws Throwable { | ||
| executorService.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 11
| } catch (RejectedExecutionException e) { | |
| log.warn("Task submission failed. Task queue might be full", e); | |
| } | |
| } | |
| @Override | |
| protected void finalize() throws Throwable { | |
| executorService.shutdown(); | |
| log.warn("Task submission failed. Task queue might be full", e); | |
| } | |
| } | |
| @Override | |
| protected void finalize() throws Throwable { | |
| log.info("Shutting down FSEventQueue executor service"); | |
| executorService.shutdown(); | |
| super.finalize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| this.executorService = executorService; | ||
| } | ||
|
|
||
| @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 12
| @Override | |
| @Override | |
| public void run() { | |
| log.info("FSQueueWorker started processing events"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| do { | ||
| FSEvent event = eventQueue.poll(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 13
| do { | |
| FSEvent event = eventQueue.poll(); | |
| FSEvent event = eventQueue.poll(); | |
| if (event != null) { | |
| if (log.isDebugEnabled()) { | |
| log.debug("Processing FSEvent from queue"); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
| .getClassInstanceFromFQN(fsEventExecutors.get(integer))).collect(Collectors.toList()); | ||
| for (FSEventExecutor obEventExecutor : executorList) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 14
| .getClassInstanceFromFQN(fsEventExecutors.get(integer))).collect(Collectors.toList()); | |
| for (FSEventExecutor obEventExecutor : executorList) { | |
| for (FSEventExecutor obEventExecutor : executorList) { | |
| if (log.isDebugEnabled()) { | |
| log.debug("Executing event with executor: " + obEventExecutor.getClass().getSimpleName()); | |
| } | |
| obEventExecutor.processEvent(event); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| } | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 15
| } | |
| } else { | |
| } else { | |
| log.warn("Polled null event from queue, skipping processing"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already there's an error log
|
|
||
| public FSEvent(String eventType, Map<String, Object> eventData) { | ||
|
|
||
| this.eventType = eventType; | ||
| this.eventData = eventData; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 16
| public FSEvent(String eventType, Map<String, Object> eventData) { | |
| this.eventType = eventType; | |
| this.eventData = eventData; | |
| public FSEvent(String eventType, Map<String, Object> eventData) { | |
| log.debug("Creating FSEvent with type: {}", eventType); | |
| this.eventType = eventType; | |
| this.eventData = eventData; | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| private PoolingHttpClientConnectionManager connectionManager; | ||
| private FSEventQueue fsEventQueue; | ||
| private Map<Integer, String> fsEventExecutors; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 17
| private FinancialServicesCommonDataHolder() { | |
| log.info("Initializing FinancialServicesCommonDataHolder"); |
| int workerThreadCount = | ||
| Integer.parseInt((String) FinancialServicesConfigParser.getInstance().getConfiguration() | ||
| .get(FinancialServicesConstants.EVENT_WORKER_THREAD_COUNT)); | ||
| fsEventQueue = new FSEventQueue(queueSize, workerThreadCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 18
| fsEventQueue = new FSEventQueue(queueSize, workerThreadCount); | |
| fsEventQueue = new FSEventQueue(queueSize, workerThreadCount); | |
| log.info("Initialized FSEventQueue with queue size: " + queueSize + " and worker thread count: " + workerThreadCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log
| Integer.parseInt((String) FinancialServicesConfigParser.getInstance().getConfiguration() | ||
| .get(FinancialServicesConstants.EVENT_WORKER_THREAD_COUNT)); | ||
| fsEventQueue = new FSEventQueue(queueSize, workerThreadCount); | ||
| fsEventExecutors = FinancialServicesConfigParser.getInstance().getFinancialServicesEventExecutors(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 19
| fsEventExecutors = FinancialServicesConfigParser.getInstance().getFinancialServicesEventExecutors(); | |
| fsEventExecutors = FinancialServicesConfigParser.getInstance().getFinancialServicesEventExecutors(); | |
| if (log.isDebugEnabled()) { | |
| log.debug("Loaded " + fsEventExecutors.size() + " event executors"); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log
| consentPersist(consentPersistData, consentResource); | ||
| ConsentAuthorizeUtil.publishConsentApprovalStatus(consentPersistData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 20
| consentPersist(consentPersistData, consentResource); | |
| ConsentAuthorizeUtil.publishConsentApprovalStatus(consentPersistData); | |
| consentPersist(consentPersistData, consentResource); | |
| log.info("Consent persisted successfully for consent ID: " + consentResource.getConsentID()); | |
| ConsentAuthorizeUtil.publishConsentApprovalStatus(consentPersistData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log
| persistConsent(responseConsentResource, consentData); | ||
| ConsentAuthorizeUtil.publishConsentApprovalStatus(consentPersistData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 21
| persistConsent(responseConsentResource, consentData); | |
| ConsentAuthorizeUtil.publishConsentApprovalStatus(consentPersistData); | |
| ExternalAPIConsentResourceResponseDTO responseConsentResource = responseDTO.getConsentResource(); | |
| persistConsent(responseConsentResource, consentData); | |
| log.info("Consent persisted successfully for consent ID: " + consentData.getConsentId()); | |
| ConsentAuthorizeUtil.publishConsentApprovalStatus(consentPersistData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log.
|
|
||
| } catch (FinancialServicesException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 22
| } catch (FinancialServicesException e) { | |
| } catch (FinancialServicesException e) { | |
| log.error("Failed to persist consent: " + e.getMessage()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| public static void publishConsentApprovalStatus(ConsentPersistData consentPersistData) { | ||
| if (Boolean.parseBoolean((String) FinancialServicesConfigParser.getInstance().getConfiguration() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 23
| public static void publishConsentApprovalStatus(ConsentPersistData consentPersistData) { | |
| if (Boolean.parseBoolean((String) FinancialServicesConfigParser.getInstance().getConfiguration() | |
| public static void publishConsentApprovalStatus(ConsentPersistData consentPersistData) { | |
| log.info("Publishing consent approval status for consent: " + consentPersistData.getConsentData().getConsentId()); | |
| if (Boolean.parseBoolean((String) FinancialServicesConfigParser.getInstance().getConfiguration() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log
| FSDataPublisherUtil.publishData("", "", consentAuthorizationData); | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 24
| FSDataPublisherUtil.publishData("", "", consentAuthorizationData); | |
| } else { | |
| FSDataPublisherUtil.publishData("", "", consentAuthorizationData); | |
| log.debug("Successfully published consent authorization data for consent: " + consentData.getConsentId()); | |
| } else { |
|
|
||
| if (StringUtils.isBlank(requestUri)) { | ||
| log.error("Request URI not found."); | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 25
| if (StringUtils.isBlank(requestUri)) { | |
| log.error("Request URI not found."); | |
| return null; | |
| } | |
| if (StringUtils.isBlank(requestUri)) { | |
| log.error("Request URI not found."); | |
| return null; | |
| } | |
| if (log.isDebugEnabled()) { | |
| log.debug("Extracting request URI key from request URI"); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| String requestUriKey = uriParts[uriParts.length - 1]; | ||
|
|
||
| return StringUtils.isBlank(requestUriKey) ? null : requestUriKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 26
| String requestUriKey = uriParts[uriParts.length - 1]; | |
| return StringUtils.isBlank(requestUriKey) ? null : requestUriKey; | |
| String requestUriKey = uriParts[uriParts.length - 1]; | |
| if (StringUtils.isBlank(requestUriKey)) { | |
| log.warn("Request URI key is blank after extraction"); | |
| } | |
| return StringUtils.isBlank(requestUriKey) ? null : requestUriKey; |
| @Override | ||
| public void processEvent(FSEvent fsEvent) { | ||
|
|
||
| Map<String, Object> eventData = fsEvent.getEventData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 27
| @Override | |
| public void processEvent(FSEvent fsEvent) { | |
| Map<String, Object> eventData = fsEvent.getEventData(); | |
| @Override | |
| public void processEvent(FSEvent fsEvent) { | |
| log.info("Processing consent lifecycle event"); | |
| Map<String, Object> eventData = fsEvent.getEventData(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log.
| log.debug("Publishing consent data for metrics."); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 28
| log.debug("Publishing consent data for metrics."); | |
| String consentId = (String) eventData.get(CONSENT_ID); | |
| log.info("Processing consent lifecycle event for consent ID: " + consentId); | |
| String primaryUserId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log.
| try { | ||
| primaryUserId = getPrimaryUserForConsent(detailedConsentResource, consentId); | ||
| } catch (ConsentManagementException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 29
| try { | |
| primaryUserId = getPrimaryUserForConsent(detailedConsentResource, consentId); | |
| } catch (ConsentManagementException e) { | |
| } catch (ConsentManagementException e) { | |
| log.error("Error while trying to retrieve consent data: " + e.getMessage()); | |
| return; |
| } | ||
|
|
||
| if (StringUtils.isBlank(primaryUserId)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 30
| } | |
| if (StringUtils.isBlank(primaryUserId)) { | |
| if (StringUtils.isBlank(primaryUserId)) { | |
| log.warn("Primary user ID is blank for consent ID: " + consentId); | |
| return; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log
| } | ||
| publishedEventIdentifierCache.put(eventIdentifier, Boolean.TRUE); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 31
| } | |
| publishedEventIdentifierCache.put(eventIdentifier, Boolean.TRUE); | |
| } | |
| } | |
| } | |
| log.info("Publishing consent lifecycle data for consent ID: " + consentId); | |
| //publish consent lifecycle data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log
| } | ||
| } | ||
|
|
||
| private String getPrimaryUserForConsent(DetailedConsentResource detailedConsentResource, String consentId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 32
| } | |
| } | |
| private String getPrimaryUserForConsent(DetailedConsentResource detailedConsentResource, String consentId) | |
| private String getPrimaryUserForConsent(DetailedConsentResource detailedConsentResource, String consentId) | |
| throws ConsentManagementException { | |
| if (log.isDebugEnabled()) { | |
| log.debug("Retrieving primary user for consent ID: " + consentId); | |
| } | |
| String primaryUser = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
| @Override | ||
| public FinancialServicesDataPublisher create() { | ||
|
|
||
| return (FinancialServicesDataPublisher) FSAnalyticsDataHolder.getInstance().getFinancialServicesDataPublisher(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 33
| @Override | |
| public FinancialServicesDataPublisher create() { | |
| return (FinancialServicesDataPublisher) FSAnalyticsDataHolder.getInstance().getFinancialServicesDataPublisher(); | |
| } | |
| @Override | |
| public FinancialServicesDataPublisher create() { | |
| log.debug("Creating new FinancialServicesDataPublisher instance"); | |
| return (FinancialServicesDataPublisher) FSAnalyticsDataHolder.getInstance().getFinancialServicesDataPublisher(); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| @Override | ||
| public PooledObject<FinancialServicesDataPublisher> wrap(FinancialServicesDataPublisher | ||
| financialServicesDataPublisher) { | ||
|
|
||
| return new DefaultPooledObject<FinancialServicesDataPublisher>(financialServicesDataPublisher); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 34
| @Override | |
| public PooledObject<FinancialServicesDataPublisher> wrap(FinancialServicesDataPublisher | |
| financialServicesDataPublisher) { | |
| return new DefaultPooledObject<FinancialServicesDataPublisher>(financialServicesDataPublisher); | |
| } | |
| @Override | |
| public PooledObject<FinancialServicesDataPublisher> wrap(FinancialServicesDataPublisher | |
| financialServicesDataPublisher) { | |
| log.debug("Wrapping FinancialServicesDataPublisher instance in pooled object"); | |
| return new DefaultPooledObject<FinancialServicesDataPublisher>(financialServicesDataPublisher); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added
| */ | ||
| public class DataPublisherPool<FinancialServicesDataPublisher> extends | ||
| GenericObjectPool<FinancialServicesDataPublisher> { | ||
|
|
||
| public DataPublisherPool(PooledObjectFactory<FinancialServicesDataPublisher> factory, | ||
| GenericObjectPoolConfig<FinancialServicesDataPublisher> config) { | ||
|
|
||
| super(factory, config); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 35
| */ | |
| public class DataPublisherPool<FinancialServicesDataPublisher> extends | |
| GenericObjectPool<FinancialServicesDataPublisher> { | |
| public DataPublisherPool(PooledObjectFactory<FinancialServicesDataPublisher> factory, | |
| GenericObjectPoolConfig<FinancialServicesDataPublisher> config) { | |
| super(factory, config); | |
| } | |
| public class DataPublisherPool<FinancialServicesDataPublisher> extends | |
| GenericObjectPool<FinancialServicesDataPublisher> { | |
| private static final Log log = LogFactory.getLog(DataPublisherPool.class); | |
| public DataPublisherPool(PooledObjectFactory<FinancialServicesDataPublisher> factory, | |
| GenericObjectPoolConfig<FinancialServicesDataPublisher> config) { | |
| super(factory, config); | |
| log.info("DataPublisherPool initialized with max total: " + config.getMaxTotal() + | |
| ", max idle: " + config.getMaxIdle()); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to a debug log
| public EventQueue(int queueSize, int workerThreadCount) { | ||
|
|
||
| // Note : Using a fixed worker thread pool and a bounded queue to control the load on the server | ||
| publisherExecutorService = Executors.newFixedThreadPool(workerThreadCount); | ||
| eventQueue = new ArrayBlockingQueue<>(queueSize); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 36
| public EventQueue(int queueSize, int workerThreadCount) { | |
| // Note : Using a fixed worker thread pool and a bounded queue to control the load on the server | |
| publisherExecutorService = Executors.newFixedThreadPool(workerThreadCount); | |
| eventQueue = new ArrayBlockingQueue<>(queueSize); | |
| } | |
| public EventQueue(int queueSize, int workerThreadCount) { | |
| log.info("Initializing EventQueue with queueSize: " + queueSize + " and workerThreadCount: " + workerThreadCount); | |
| // Note : Using a fixed worker thread pool and a bounded queue to control the load on the server | |
| publisherExecutorService = Executors.newFixedThreadPool(workerThreadCount); | |
| eventQueue = new ArrayBlockingQueue<>(queueSize); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to a debug log
| public void put(FSAnalyticsEvent fsAnalyticsEvent) { | ||
|
|
||
| try { | ||
| if (eventQueue.offer(fsAnalyticsEvent)) { | ||
| publisherExecutorService.submit(new QueueWorker(eventQueue, publisherExecutorService)); | ||
| } else { | ||
| log.error("Event queue is full. Starting to drop OB analytics events."); | ||
| } | ||
| } catch (RejectedExecutionException e) { | ||
| log.warn("Task submission failed. Task queue might be full", e); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 37
| public void put(FSAnalyticsEvent fsAnalyticsEvent) { | |
| try { | |
| if (eventQueue.offer(fsAnalyticsEvent)) { | |
| publisherExecutorService.submit(new QueueWorker(eventQueue, publisherExecutorService)); | |
| } else { | |
| log.error("Event queue is full. Starting to drop OB analytics events."); | |
| } | |
| } catch (RejectedExecutionException e) { | |
| log.warn("Task submission failed. Task queue might be full", e); | |
| } | |
| public void put(FSAnalyticsEvent fsAnalyticsEvent) { | |
| if (log.isDebugEnabled()) { | |
| log.debug("Attempting to add analytics event to queue"); | |
| } | |
| try { | |
| if (eventQueue.offer(fsAnalyticsEvent)) { | |
| publisherExecutorService.submit(new QueueWorker(eventQueue, publisherExecutorService)); | |
| } else { | |
| log.error("Event queue is full. Starting to drop OB analytics events."); | |
| } | |
| } catch (RejectedExecutionException e) { | |
| log.warn("Task submission failed. Task queue might be full", e); | |
| } | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added,without isDebugEnabled check
|
|
||
| @Override | ||
| protected void finalize() throws Throwable { | ||
| publisherExecutorService.shutdown(); | ||
| super.finalize(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 38
| @Override | |
| protected void finalize() throws Throwable { | |
| publisherExecutorService.shutdown(); | |
| super.finalize(); | |
| @Override | |
| protected void finalize() throws Throwable { | |
| log.info("Shutting down publisher executor service"); | |
| publisherExecutorService.shutdown(); | |
| super.finalize(); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log
|
|
||
| public FinancialServicesThriftDataPublisher() { | ||
| this.init(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 39
| public FinancialServicesThriftDataPublisher() { | |
| this.init(); | |
| public FinancialServicesThriftDataPublisher() { | |
| log.info("Initializing FinancialServicesThriftDataPublisher"); | |
| this.init(); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added as a debug log
|
|
||
| @Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Log Improvement Suggestion No: 40
| @Override | |
| @Override | |
| public void publish(String streamName, String streamVersion, Map<String, Object> analyticsData) { | |
| if (log.isDebugEnabled()) { | |
| log.debug("Publishing data for stream: " + streamName + " with version: " + streamVersion); | |
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AI Agent Log Improvement Checklist
- The log-related comments and suggestions in this review were generated by an AI tool to assist with identifying potential improvements. Purpose of reviewing the code for log improvements is to improve the troubleshooting capabilities of our products.
- Please make sure to manually review and validate all suggestions before applying any changes. Not every code suggestion would make sense or add value to our purpose. Therefore, you have the freedom to decide which of the suggestions are helpful.
✅ Before merging this pull request:
- Review all AI-generated comments for accuracy and relevance.
- Complete and verify the table below. We need your feedback to measure the accuracy of these suggestions and the value they add. If you are rejecting a certain code suggestion, please mention the reason briefly in the suggestion for us to capture it.
Pull Request Title
Issue link: required
Doc Issue: Optional, link issue from documentation repository
Applicable Labels: Spec, product, version, type (specify requested labels)
Development Checklist
Testing Checklist