2525import static org .apache .ignite .internal .TestWrappers .unwrapIgniteImpl ;
2626import static org .apache .ignite .internal .lang .IgniteStringFormatter .format ;
2727import static org .apache .ignite .internal .testframework .IgniteTestUtils .await ;
28+ import static org .apache .ignite .internal .tx .TransactionIds .beginTimestamp ;
2829import static org .hamcrest .MatcherAssert .assertThat ;
2930import static org .hamcrest .Matchers .greaterThan ;
30- import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
3131import static org .hamcrest .Matchers .is ;
32- import static org .hamcrest .Matchers .lessThanOrEqualTo ;
3332import static org .junit .jupiter .api .Assertions .assertEquals ;
3433import static org .junit .jupiter .api .Assertions .assertTrue ;
3534
3635import java .util .ArrayList ;
3736import java .util .Collection ;
37+ import java .util .Comparator ;
38+ import java .util .HashMap ;
3839import java .util .List ;
40+ import java .util .Map ;
3941import java .util .Objects ;
42+ import java .util .UUID ;
4043import java .util .concurrent .TimeUnit ;
4144import java .util .stream .Collectors ;
4245import java .util .stream .Stream ;
4548import org .apache .ignite .internal .ClusterPerClassIntegrationTest ;
4649import org .apache .ignite .internal .app .IgniteImpl ;
4750import org .apache .ignite .internal .catalog .Catalog ;
51+ import org .apache .ignite .internal .catalog .CatalogManager ;
4852import org .apache .ignite .internal .catalog .CatalogManagerImpl ;
4953import org .apache .ignite .internal .catalog .compaction .CatalogCompactionRunner .TimeHolder ;
54+ import org .apache .ignite .internal .lang .IgniteStringBuilder ;
5055import org .apache .ignite .internal .network .ClusterNodeImpl ;
5156import org .apache .ignite .internal .network .InternalClusterNode ;
57+ import org .apache .ignite .internal .tx .ActiveLocalTxMinimumRequiredTimeProvider ;
5258import org .apache .ignite .internal .tx .InternalTransaction ;
5359import org .apache .ignite .tx .Transaction ;
5460import org .apache .ignite .tx .TransactionOptions ;
@@ -138,63 +144,82 @@ void testGlobalMinimumTxRequiredTime() {
138144 IgniteImpl node1 = unwrapIgniteImpl (CLUSTER .node (1 ));
139145 IgniteImpl node2 = unwrapIgniteImpl (CLUSTER .node (2 ));
140146
147+ DebugInfoCollector debug = new DebugInfoCollector (List .of (node0 , node1 , node2 ));
148+
141149 List <CatalogCompactionRunner > compactors = List .of (
142150 node0 .catalogCompactionRunner (),
143151 node1 .catalogCompactionRunner (),
144152 node2 .catalogCompactionRunner ()
145153 );
146154
147- Catalog catalog1 = getLatestCatalog (node2 );
155+ debug .recordCatalogState ("init" );
156+ debug .recordMinTxTimesState ("init" );
148157
149- Transaction tx1 = beginTx (node0 , false );
158+ Catalog catalog1 = getLatestCatalog (node2 );
159+ InternalTransaction tx1 = beginTx (node0 , false );
160+ debug .recordTx (tx1 );
150161
151162 // Changing the catalog and starting transaction.
152163 sql ("create table a(a int primary key)" );
153164 Catalog catalog2 = getLatestCatalog (node0 );
154165 assertThat (catalog2 .version (), is (catalog1 .version () + 1 ));
155- List <Transaction > txs2 = Stream .of (node1 , node2 ).map (node -> beginTx (node , false )).collect (Collectors .toList ());
166+ List <InternalTransaction > txs2 = Stream .of (node1 , node2 ).map (node -> beginTx (node , false )).collect (Collectors .toList ());
156167 List <InternalTransaction > ignoredReadonlyTxs = Stream .of (node0 , node1 , node2 )
157168 .map (node -> beginTx (node , true ))
158169 .collect (Collectors .toList ());
159170
171+ debug .recordTx (txs2 );
172+ debug .recordTx (ignoredReadonlyTxs );
173+
160174 // Changing the catalog again and starting transaction.
161175 sql ("alter table a add column (b int)" );
162176
163- Awaitility .await ().untilAsserted (() -> assertThat ( getLatestCatalogVersion (node1 ), is (catalog2 .version () + 1 ) ));
177+ Awaitility .await ().until (() -> getLatestCatalogVersion (node1 ), is (catalog2 .version () + 1 ));
164178 Catalog catalog3 = getLatestCatalog (node1 );
165179
166- List <Transaction > txs3 = Stream .of (node0 , node2 ).map (node -> beginTx (node , false )).collect (Collectors .toList ());
180+ List <InternalTransaction > txs3 = Stream .of (node0 , node2 ).map (node -> beginTx (node , false )).collect (Collectors .toList ());
181+
182+ debug .recordTx (txs3 );
167183
168184 Collection <InternalClusterNode > topologyNodes = node0 .cluster ().nodes ().stream ()
169185 .map (ClusterNodeImpl ::fromPublicClusterNode )
170186 .collect (toUnmodifiableList ());
171187
172- compactors .forEach (compactor -> {
173- TimeHolder timeHolder = await (compactor .determineGlobalMinimumRequiredTime (topologyNodes , 0L ));
174- assertThat (timeHolder .txMinRequiredTime , is (catalog1 .time ()));
175- });
188+ for (int i = 0 ; i < compactors .size (); i ++) {
189+ TimeHolder timeHolder = await (compactors .get (i ).determineGlobalMinimumRequiredTime (topologyNodes , 0L ));
190+
191+ String failureMessage = "Initial condition failed on node #" + i ;
192+
193+ assertEquals (catalog1 .time (), timeHolder .txMinRequiredTime , () -> debug .dumpDebugInfo (failureMessage ));
194+ }
176195
177196 tx1 .rollback ();
178197
179- compactors .forEach (compactor -> {
180- TimeHolder timeHolder = await (compactor .determineGlobalMinimumRequiredTime (topologyNodes , 0L ));
181- assertThat (timeHolder .txMinRequiredTime , is (catalog2 .time ()));
182- });
198+ for (int i = 0 ; i < compactors .size (); i ++) {
199+ TimeHolder timeHolder = await (compactors .get (i ).determineGlobalMinimumRequiredTime (topologyNodes , 0L ));
200+
201+ String failureMessage = "Condition failed after first tx rollback on node #" + i ;
202+
203+ assertEquals (catalog2 .time (), timeHolder .txMinRequiredTime , () -> debug .dumpDebugInfo (failureMessage ));
204+ }
183205
184206 txs2 .forEach (Transaction ::commit );
185207
186- compactors .forEach (compactor -> {
187- TimeHolder timeHolder = await (compactor .determineGlobalMinimumRequiredTime (topologyNodes , 0L ));
188- assertThat (timeHolder .txMinRequiredTime , is (catalog3 .time ()));
189- });
208+ for (int i = 0 ; i < compactors .size (); i ++) {
209+ TimeHolder timeHolder = await (compactors .get (i ).determineGlobalMinimumRequiredTime (topologyNodes , 0L ));
210+
211+ String failureMessage = "Condition failed after transactions commit on node #" + i ;
212+
213+ assertEquals (catalog3 .time (), timeHolder .txMinRequiredTime , () -> debug .dumpDebugInfo (failureMessage ));
214+ }
190215
191216 txs3 .forEach (Transaction ::rollback );
192217
193218 // Since there are no active RW transactions in the cluster, the minimum time will be min(now()) across all nodes.
194- compactors .forEach ( compactor -> {
219+ for ( int i = 0 ; i < compactors .size (); i ++) {
195220 long minTime = Stream .of (node0 , node1 , node2 ).map (node -> node .clockService ().nowLong ()).min (Long ::compareTo ).orElseThrow ();
196221
197- TimeHolder timeHolder = await (compactor .determineGlobalMinimumRequiredTime (topologyNodes , 0L ));
222+ TimeHolder timeHolder = await (compactors . get ( i ) .determineGlobalMinimumRequiredTime (topologyNodes , 0L ));
198223
199224 long maxTime = Stream .of (node0 , node1 , node2 ).map (node -> node .clockService ().nowLong ()).min (Long ::compareTo ).orElseThrow ();
200225
@@ -203,9 +228,12 @@ void testGlobalMinimumTxRequiredTime() {
203228 assertThat (timeHolder .txMinRequiredTime , greaterThan (tx .schemaTimestamp ().longValue ()));
204229 });
205230
206- assertThat (timeHolder .txMinRequiredTime , greaterThanOrEqualTo (minTime ));
207- assertThat (timeHolder .txMinRequiredTime , lessThanOrEqualTo (maxTime ));
208- });
231+ long actual = timeHolder .txMinRequiredTime ;
232+
233+ String failureMessage = "node #" + i + ": " + minTime + " <= " + actual + " <= " + maxTime ;
234+
235+ assertTrue (minTime <= actual && actual <= maxTime , () -> debug .dumpDebugInfo (failureMessage ));
236+ }
209237
210238 ignoredReadonlyTxs .forEach (Transaction ::rollback );
211239 }
@@ -305,4 +333,109 @@ private static void expectEarliestCatalogVersion(int expectedVersion) {
305333 }
306334 });
307335 }
336+
337+ private class DebugInfoCollector {
338+ private final List <IgniteImpl > nodes ;
339+ private final Map <UUID , String > nodesById ;
340+ private final IgniteStringBuilder buffer = new IgniteStringBuilder ("Test debug info" ).nl ();
341+ private final List <InternalTransaction > transactions = new ArrayList <>();
342+
343+ DebugInfoCollector (List <IgniteImpl > nodes ) {
344+ this .nodes = nodes ;
345+ Map <UUID , String > nodesById = new HashMap <>();
346+
347+ for (IgniteImpl node : nodes ) {
348+ nodesById .put (node .id (), node .name ());
349+ }
350+
351+ this .nodesById = nodesById ;
352+ }
353+
354+ void recordCatalogState (String contextMessage ) {
355+ buffer .nl ();
356+
357+ for (IgniteImpl node : nodes ) {
358+ buffer .app ("Catalog state(" ).app (contextMessage )
359+ .app (") on node " ).app (node .name ()).nl ();
360+
361+ CatalogManager mgr = node .catalogManager ();
362+
363+ for (int ver = mgr .earliestCatalogVersion (); ver <= mgr .latestCatalogVersion (); ver ++) {
364+ Catalog catalog = mgr .catalog (ver );
365+
366+ buffer .app (" " ).app (ver ).app (" | " ).app (catalog == null ? -1 : catalog .time ()).nl ();
367+ }
368+ }
369+ }
370+
371+ void recordMinTxTimesState (String contextMessage ) {
372+ buffer .nl ();
373+
374+ buffer .app ("Minimum RW tx times (" ).app (contextMessage ).app (')' ).nl ();
375+
376+ for (IgniteImpl node : nodes ) {
377+ ActiveLocalTxMinimumRequiredTimeProvider timeProvider = node .catalogCompactionRunner ()
378+ .activeLocalTxMinimumRequiredTimeProvider ();
379+
380+ buffer .app (" " ).app (node .name ()).app (": " ).app (timeProvider .minimumRequiredTime ()).nl ();
381+ }
382+ }
383+
384+ void recordTransactionsState () {
385+ // Sort by start time.
386+ transactions .sort (Comparator .comparing (t -> beginTimestamp (t .id ())));
387+
388+ List <InternalTransaction > roTransactions = new ArrayList <>();
389+ List <InternalTransaction > rwTransactions = new ArrayList <>();
390+
391+ for (InternalTransaction tx : transactions ) {
392+ if (tx .isReadOnly ()) {
393+ roTransactions .add (tx );
394+ } else {
395+ rwTransactions .add (tx );
396+ }
397+ }
398+
399+ buffer .nl ();
400+ buffer .app ("RW transactions state" ).nl ();
401+
402+ for (InternalTransaction tx : rwTransactions ) {
403+ buffer .app (" " )
404+ .app (tx .isFinishingOrFinished () ? "finished" : "active " ).app (" | " )
405+ .app (nodesById .get (tx .coordinatorId ())).app (" | " )
406+ .app (beginTimestamp (tx .id ()))
407+ .nl ();
408+ }
409+
410+ buffer .nl ();
411+ buffer .app ("RO transactions state" ).nl ();
412+
413+ for (InternalTransaction tx : roTransactions ) {
414+ buffer .app (" " )
415+ .app (tx .isFinishingOrFinished () ? "finished" : "active " ).app (" | " )
416+ .app (beginTimestamp (tx .id ()))
417+ .nl ();
418+ }
419+ }
420+
421+ void recordTx (InternalTransaction tx ) {
422+ transactions .add (tx );
423+ }
424+
425+ void recordTx (List <InternalTransaction > txs ) {
426+ transactions .addAll (txs );
427+ }
428+
429+ String dumpDebugInfo (String messageHeader ) {
430+ recordCatalogState ("onFailure" );
431+ recordMinTxTimesState ("onFailure" );
432+ recordTransactionsState ();
433+
434+ String debugInfo = messageHeader + System .lineSeparator () + System .lineSeparator () + buffer .toString ();
435+
436+ log .info (debugInfo );
437+
438+ return debugInfo ;
439+ }
440+ }
308441}
0 commit comments