12
12
import static org .usf .traceapi .core .State .DISPACH ;
13
13
14
14
import java .util .ArrayList ;
15
+ import java .util .Collection ;
15
16
import java .util .List ;
16
17
import java .util .concurrent .ScheduledExecutorService ;
18
+ import java .util .function .Consumer ;
17
19
import java .util .function .Function ;
18
20
import java .util .function .Predicate ;
19
21
@@ -50,12 +52,12 @@ public ScheduledSessionDispatcher(SessionDispatcherProperties properties, Predic
50
52
51
53
public boolean add (Session ... sessions ) {
52
54
if (state != DISABLE ) { // CACHE | DISPATCH
53
- safeQueue (q -> addAll (q , sessions ));
55
+ doSync (q -> addAll (q , sessions ));
54
56
log .trace ("{} sessions buffered" , queue .size ());
55
57
return true ;
56
58
}
57
59
else {
58
- log .warn ("{} sessions rejected because dispatcher.state={}" , sessions .length , state );
60
+ log .warn ("{} sessions rejected, dispatcher.state={}" , sessions .length , state );
59
61
return false ;
60
62
}
61
63
}
@@ -84,60 +86,67 @@ private void dispatch() {
84
86
}
85
87
catch (Exception e ) {// do not throw exception : retry later
86
88
log .warn ("error while dispatching {} sessions, attempts={} because : {}" , cs .size (), attempts , e .getMessage ()); //do not log exception stack trace
87
- safeQueue (q -> {
88
- q .addAll (0 , cs );
89
+ }
90
+ if (attempts > 0 ) { //exception | !dispatch
91
+ doSync (q -> {
92
+ q .addAll (0 , cs );
89
93
if (properties .getBufferMaxSize () > -1 && q .size () > properties .getBufferMaxSize ()) {
90
94
var diff = q .size () - properties .getBufferMaxSize ();
91
95
q .subList (properties .getBufferMaxSize (), cs .size ()).clear (); //remove exceeding cache sessions (LIFO)
92
96
log .warn ("{} last sessions have been removed from buffer" , diff );
93
97
}
94
- return null ;
95
98
});
96
- }
99
+ }
97
100
}
98
101
}
99
102
100
103
public List <Session > peekSessions () {
101
- return safeQueue (q -> {
104
+ return applySync (q -> {
102
105
if (q .isEmpty ()) {
103
106
return emptyList ();
104
107
}
105
- var s = queue .stream ();
108
+ var s = q .stream ();
106
109
if (nonNull (filter )) {
107
110
s = s .filter (filter );
108
111
}
109
112
return s .collect (toCollection (SessionList ::new ));
110
113
});
111
114
}
112
115
113
- public List <Session > popSessions () {
114
- return safeQueue (q -> {
116
+ List <Session > popSessions () {
117
+ return applySync (q -> {
115
118
if (q .isEmpty ()) {
116
119
return emptyList ();
117
120
}
118
121
if (isNull (filter )) {
119
- var c = new ArrayList <> (q );
122
+ var c = new SessionList (q );
120
123
q .clear ();
121
124
return c ;
122
125
}
123
126
var c = new SessionList (q .size ());
124
127
for (var it =q .iterator (); it .hasNext ();) {
125
- var s = it .next ();
126
- if (filter .test (s )) {
127
- c .add (s );
128
+ var o = it .next ();
129
+ if (filter .test (o )) {
130
+ c .add (o );
128
131
it .remove ();
129
132
}
130
133
}
131
134
return c ;
132
135
});
133
136
}
134
137
135
- private <T > T safeQueue (Function <List <Session >, T > queueFn ) {
138
+ private void doSync (Consumer <List <Session >> cons ) {
139
+ synchronized (queue ){
140
+ cons .accept (queue );
141
+ }
142
+ }
143
+
144
+ private <T > T applySync (Function <List <Session >, T > fn ) {
136
145
synchronized (queue ){
137
- return queueFn .apply (queue );
146
+ return fn .apply (queue );
138
147
}
139
148
}
140
-
149
+
141
150
public void shutdown () throws InterruptedException {
142
151
updateState (DISABLE ); //stop add Sessions
143
152
log .info ("shutting down scheduler service" );
@@ -146,7 +155,7 @@ public void shutdown() throws InterruptedException {
146
155
while (!executor .awaitTermination (5 , SECONDS )); //wait for last save complete
147
156
}
148
157
finally {
149
- dispatch ();
158
+ tryDispatch ();
150
159
}
151
160
}
152
161
@@ -161,6 +170,10 @@ public SessionList() {
161
170
public SessionList (int initialCapacity ) {
162
171
super (initialCapacity );
163
172
}
173
+
174
+ public SessionList (Collection <? extends Session > c ) {
175
+ super (c );
176
+ }
164
177
}
165
178
166
179
@ FunctionalInterface
0 commit comments