99package com .refinitiv .ema .access ;
1010
1111import java .nio .ByteBuffer ;
12+
1213import java .util .ArrayList ;
14+ import java .util .ArrayDeque ;
1315import java .util .Deque ;
1416import java .util .List ;
17+
1518import java .util .concurrent .ConcurrentLinkedDeque ;
16- import java .util .ArrayDeque ;
19+ import java .util .concurrent .locks .Lock ;
20+ import java .util .concurrent .locks .ReentrantLock ;
1721
1822import com .refinitiv .eta .valueadd .common .VaPool ;
1923
@@ -25,6 +29,7 @@ class EmaObjectManager
2529 private final static int MAX_BYTE_BUFFER_CAPABILITY = 2000 ;
2630 private final static int DEFAULT_ETA_CONTAINER_SIZE = 10 ;
2731
32+ private Lock _byteBufferLock = null ;
2833 private List <ByteBuffer >[] _byteBufferList ;
2934 private boolean _intialized ;
3035
@@ -113,6 +118,11 @@ class EmaObjectManager
113118
114119 EmaObjectManager (boolean globalLock )
115120 {
121+ if (globalLock )
122+ {
123+ _byteBufferLock = new ReentrantLock ();
124+ }
125+
116126 _ommIntPool = new VaPool (globalLock );
117127 _ommUIntPool = new VaPool (globalLock );
118128 _ommFloatPool = new VaPool (globalLock );
@@ -375,33 +385,44 @@ ByteBuffer acquireByteBuffer(int length)
375385 int pos = length / DEFAULT_BYTE_BUFFER_SIZE ;
376386 ByteBuffer retVal ;
377387
378- if (pos < MAX_NUM_BYTE_BUFFER )
388+ if (_byteBufferLock != null )
389+ _byteBufferLock .lock ();
390+
391+ try
379392 {
380- if (! _byteBufferList [ pos ]. isEmpty () )
393+ if (pos < MAX_NUM_BYTE_BUFFER )
381394 {
382- retVal = _byteBufferList [pos ].remove (_byteBufferList [pos ].size () - 1 );
383- retVal .clear ();
384- return retVal ;
385- }
395+ if (!_byteBufferList [pos ].isEmpty ())
396+ {
397+ retVal = _byteBufferList [pos ].remove (_byteBufferList [pos ].size () - 1 );
398+ retVal .clear ();
399+ return retVal ;
400+ }
386401
387- return ByteBuffer .allocate ((pos + 1 ) * DEFAULT_BYTE_BUFFER_SIZE );
388- } else
389- {
390- if (!_byteBufferList [MAX_NUM_BYTE_BUFFER ].isEmpty ())
402+ return ByteBuffer .allocate ((pos + 1 ) * DEFAULT_BYTE_BUFFER_SIZE );
403+ } else
391404 {
392- int size = _byteBufferList [MAX_NUM_BYTE_BUFFER ].size () - 1 ;
393- for (int index = size ; index >= 0 ; --index )
405+ if (!_byteBufferList [MAX_NUM_BYTE_BUFFER ].isEmpty ())
394406 {
395- if (length < _byteBufferList [MAX_NUM_BYTE_BUFFER ].get (index ).capacity ())
407+ int size = _byteBufferList [MAX_NUM_BYTE_BUFFER ].size () - 1 ;
408+ for (int index = size ; index >= 0 ; --index )
396409 {
397- retVal = _byteBufferList [MAX_NUM_BYTE_BUFFER ].remove (index );
398- retVal .clear ();
399- return retVal ;
410+ if (length < _byteBufferList [MAX_NUM_BYTE_BUFFER ].get (index ).capacity ())
411+ {
412+ retVal = _byteBufferList [MAX_NUM_BYTE_BUFFER ].remove (index );
413+ retVal .clear ();
414+ return retVal ;
415+ }
400416 }
401417 }
402- }
403418
404- return ByteBuffer .allocate (length );
419+ return ByteBuffer .allocate (length );
420+ }
421+ }
422+ finally
423+ {
424+ if (_byteBufferLock != null )
425+ _byteBufferLock .unlock ();
405426 }
406427 }
407428
@@ -410,12 +431,23 @@ void releaseByteBuffer(ByteBuffer buffer)
410431 if (buffer == null )
411432 return ;
412433
413- int pos = buffer .capacity () / DEFAULT_BYTE_BUFFER_SIZE - 1 ;
434+ if (_byteBufferLock != null )
435+ _byteBufferLock .lock ();
414436
415- if (pos < MAX_NUM_BYTE_BUFFER )
416- _byteBufferList [pos ].add (buffer );
417- else
418- _byteBufferList [MAX_NUM_BYTE_BUFFER ].add (buffer );
437+ try
438+ {
439+ int pos = buffer .capacity () / DEFAULT_BYTE_BUFFER_SIZE - 1 ;
440+
441+ if (pos < MAX_NUM_BYTE_BUFFER )
442+ _byteBufferList [pos ].add (buffer );
443+ else
444+ _byteBufferList [MAX_NUM_BYTE_BUFFER ].add (buffer );
445+ }
446+ finally
447+ {
448+ if (_byteBufferLock != null )
449+ _byteBufferLock .unlock ();
450+ }
419451 }
420452
421453 @ SuppressWarnings ("unchecked" )
0 commit comments