Skip to content

Commit e58c1fc

Browse files
committed
IT: improve verbosity of test failures
Signed-off-by: Evgeny Malygin <[email protected]>
1 parent c933b70 commit e58c1fc

File tree

9 files changed

+139
-46
lines changed

9 files changed

+139
-46
lines changed

src/integration-tests/test_alarms.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ def test_no_alarms_if_disabled(cluster: Cluster, domain_urls: tc.DomainUrls):
5555

5656

5757
@tweak.domain.max_idle_time(1)
58-
def test_broadcast_no_alarms(cluster: Cluster, domain_urls: tc.DomainUrls): # pylint: disable=unused-argument
58+
def test_broadcast_no_alarms(
59+
cluster: Cluster, domain_urls: tc.DomainUrls
60+
): # pylint: disable=unused-argument
5961
"""
6062
Test no broker ALARMS in broadcast mode.
6163
"""
@@ -162,7 +164,7 @@ def test_priority_alarm_when_consumer_dropped(
162164
cluster: Cluster, domain_urls: tc.DomainUrls
163165
):
164166
"""
165-
Test that alarm is triggered when consumer droppped the connection in priority mode.
167+
Test that alarm is triggered when consumer dropped the connection in priority mode.
166168
"""
167169
uri_priority = domain_urls.uri_priority
168170
leader = cluster.last_known_leader

src/integration-tests/test_app_subscriptions.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
1515

16+
from typing import Any, List, Optional
17+
1618
import blazingmq.dev.it.testconstants as tc
1719

1820
from blazingmq.dev.it.fixtures import ( # pylint: disable=unused-import
@@ -25,6 +27,7 @@
2527
tweak,
2628
virtual_cluster_config,
2729
)
30+
from blazingmq.dev.it.process.broker import Broker
2831
from blazingmq.dev.it.process.client import Client
2932

3033

@@ -34,7 +37,10 @@ class TestAppSubscriptions:
3437
(apps)
3538
"""
3639

37-
def _start_client(self, broker, uri, name, subscriptions=[]):
40+
@staticmethod
41+
def _start_client(
42+
broker: Broker, uri: str, name: str, subscriptions: Optional[List[Any]] = None
43+
) -> Client:
3844
consumer = broker.create_client(name)
3945
assert (
4046
consumer.open(

src/integration-tests/test_cluster_node_shutdown.py

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,13 @@
2929
order,
3030
multi_node,
3131
)
32+
from blazingmq.dev.it.common import BMQTestError, BMQTST_ASSERT
3233
from blazingmq.dev.it.process.client import Client
3334
from blazingmq.dev.it.util import wait_until
3435

3536
pytestmark = order(6)
3637

3738

38-
class BMQITError(RuntimeError):
39-
"""
40-
BMQ IT error.
41-
"""
42-
43-
4439
class TestClusterNodeShutdown:
4540
"""
4641
This suite of test cases exercises node shutdown and subsequent failover
@@ -50,10 +45,12 @@ class TestClusterNodeShutdown:
5045
@staticmethod
5146
def open_or_raise(client: Client, uri: str, flags: List[str]):
5247
rc = client.open(uri, flags=flags, succeed=True)
53-
if rc != Client.e_SUCCESS:
54-
raise BMQITError(
55-
f"Failed to open a queue: client = {client}, uri = {uri}, rc = {rc}"
56-
)
48+
BMQTST_ASSERT(rc == Client.e_SUCCESS,
49+
"Failed to open a queue",
50+
client=client,
51+
uri=uri,
52+
flags=flags,
53+
rc=rc)
5754

5855
def setup_cluster(self, cluster: Cluster, domain_urls: tc.DomainUrls):
5956
du = domain_urls
@@ -128,7 +125,7 @@ def release_recovery_if_state_restored(line: str) -> None:
128125
_ = recovery.get(timeout=120)
129126
except queue.Empty as ex:
130127
# No recovery log observed
131-
raise BMQITError("State is not restored") from ex
128+
raise BMQTestError("State is not restored") from ex
132129
sleep(5)
133130

134131
self._verify_all_queues_operational(domain_urls) # After recovery
@@ -149,14 +146,20 @@ def check_received_one_of(consumer: Client, uri_group: str, *expected):
149146
)
150147
msgs = consumer.list(uri_group, block=True)
151148
self.history.append((str(consumer.name), uri_group, msgs))
152-
if len(msgs) != 1:
153-
raise BMQITError(
154-
f"Expected 1 message, got: {len(msgs)} msgs.\nMessages history:\n{self.history}"
155-
)
156-
if msgs[0].payload not in expected:
157-
raise BMQITError(
158-
f"Unexpected message payload: {msgs[0].payload} (expected: {expected})"
159-
)
149+
BMQTST_ASSERT(len(msgs) == 1,
150+
"Expected exactly 1 message",
151+
client=consumer,
152+
uri=uri_group,
153+
observed_messages=msgs,
154+
history=self.history)
155+
BMQTST_ASSERT(msgs[0].payload in expected,
156+
"Unexpected message payload",
157+
client=consumer,
158+
uri=uri_group,
159+
observed_messages=msgs,
160+
history=self.history,
161+
expected_any_of=expected
162+
)
160163
consumer.confirm(uri_group, "*", succeed=True)
161164

162165
def check_both_received_one_of(*expected):

src/integration-tests/test_startup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@
2525
def test_early_assign(multi_node: Cluster, domain_urls: tc.DomainUrls):
2626
"""
2727
Early open a queue on a soon-to-be leader. Legacy leader when it becomes
28-
ACTIVE, starts assigning queues _before_ it assignes partitions. Then,
29-
the leader observes `onQueueAssigned` event _before_ it becomes ACTVIE
28+
ACTIVE, starts assigning queues _before_ it assigns partitions. Then,
29+
the leader observes `onQueueAssigned` event _before_ it becomes ACTIVE
3030
primary. If that event logic erroneously decides that the self is replica,
3131
the soon-to-be primary does not write QueueCreationRecord. The primary
3232
still writes any posted message record though. That leads to either assert
@@ -70,7 +70,7 @@ def test_early_assign(multi_node: Cluster, domain_urls: tc.DomainUrls):
7070
def test_replica_late_join(multi_node: Cluster, domain_urls: tc.DomainUrls):
7171
"""
7272
In a steady-state cluster where only one replica node is down, with live
73-
messages flowing, the replica node rejoins and attemps to heal itself via
73+
messages flowing, the replica node rejoins and attempts to heal itself via
7474
the primary. The concern is that the primary could send live data to the
7575
replica, and then re-send that data as recovery data chunks; we would like
7676
to make sure our deduplication logic is working correctly to handle this

src/integration-tests/test_strong_consistency.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
multi_node,
2424
tweak,
2525
)
26+
from blazingmq.dev.it.common import BMQTestError, BMQTST_ASSERT
2627
from blazingmq.dev.it.util import wait_until
2728
from blazingmq.schemas import mqbconf
2829

@@ -105,7 +106,7 @@ def _break_post_unbreak(self, multi_node, breaker, has_timeout):
105106
lambda: len(self.consumer.list(uri, block=True)) == 1, 2
106107
)
107108
# the WC queue was last sending data and the SC one was first which
108-
# means it is sufficent to wait on WC before checking SC
109+
# means it is sufficient to wait on WC before checking SC
109110

110111
if has_timeout:
111112
# expect NACK
@@ -115,7 +116,7 @@ def _break_post_unbreak(self, multi_node, breaker, has_timeout):
115116
else:
116117
# make sure there are neither SC ACK(s) nor message(s)
117118
assert not self.producer.outputs_regex(
118-
f"MESSAGE.*ACK.*{tc.URI_FANOUT_SC}", 1
119+
f"MESSAGE.*ACK.*{tc.URI_FANOUT_SC}", timeout=1
119120
)
120121

121122
for uri in [
@@ -128,15 +129,18 @@ def _break_post_unbreak(self, multi_node, breaker, has_timeout):
128129
if not has_timeout:
129130
self.consumer.wait_push_event(timeout=120)
130131
# make sure there are SC ACK(s) and message(s)
131-
assert self.producer.outputs_regex(f"MESSAGE.*ACK.*{tc.URI_FANOUT_SC}", 25)
132+
BMQTST_ASSERT(self.producer.outputs_regex(f"MESSAGE.*ACK.*{tc.URI_FANOUT_SC}", timeout=25),
133+
client=self.producer, uri=tc.URI_FANOUT_SC)
132134
for uri in [
133135
tc.URI_FANOUT_SC_FOO,
134136
tc.URI_FANOUT_SC_BAR,
135137
tc.URI_FANOUT_SC_BAZ,
136138
]:
137-
assert wait_until(
138-
lambda: len(self.consumer.list(uri, block=True)) == 1, 2
139-
)
139+
BMQTST_ASSERT(wait_until(lambda: len(self.consumer.list(uri, block=True)) == 1, timeout=2),
140+
"Consumer expected exactly 1 message",
141+
client=self.consumer,
142+
uri=uri
143+
)
140144

141145
def test_suspend_post_resume(
142146
self,

src/python/blazingmq/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,4 @@
1515

1616

1717
class BMQError(RuntimeError):
18-
"Base class for all exceptions raised by BMQ."
18+
"""Base class for all exceptions raised by BlazingMQ."""
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# Copyright 2025 Bloomberg Finance L.P.
2+
# SPDX-License-Identifier: Apache-2.0
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
blazingmq.dev.it.common
18+
19+
PURPOSE: Provide common types and functions for ITs.
20+
"""
21+
22+
import traceback
23+
from typing import Any, Dict, Optional
24+
25+
26+
from blazingmq.core import BMQError
27+
28+
29+
class BMQTestError(BMQError):
30+
"""BlazingMQ test failure exception."""
31+
32+
33+
def _context_to_str(context: Dict[str, Any]) -> str:
34+
context_str = "\n".join(f" {key} = {val}" for key, val in context.items())
35+
if len(context_str) == 0:
36+
context_str = "EMPTY"
37+
return context_str
38+
39+
40+
def BMQTST_ASSERT(condition, message: Optional[str] = None, **kwargs) -> None:
41+
if condition:
42+
return
43+
44+
tb = traceback.StackSummary.extract(
45+
frame_gen=traceback.walk_stack(None), capture_locals=False
46+
)
47+
if isinstance(tb, list) and 2 <= len(tb):
48+
# [0] -> this function
49+
# [1] -> caller function
50+
fail_line = f"{tb[1].line} ({tb[1].filename}:{tb[1].lineno})"
51+
else:
52+
# Should never happen, don't want to use a nested assert to check it
53+
fail_line = "UNDEFINED"
54+
55+
context_str = _context_to_str(kwargs)
56+
57+
raise BMQTestError(
58+
f"{message or 'Failed condition'}\nFailure at '{fail_line}', context:\n{context_str}"
59+
)

src/python/blazingmq/dev/it/process/broker.py

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
if TYPE_CHECKING:
3535
from blazingmq.dev.it.cluster import Cluster
3636

37+
from blazingmq.dev.it.common import BMQTST_ASSERT
3738
from blazingmq.dev.it.process import proc
3839
import blazingmq.dev.it.process.bmqproc
3940
import blazingmq.dev.it.testconstants as tc
@@ -103,7 +104,7 @@ def pid(self):
103104
IMPLEMENTATION NOTE: The broker is started via a script that sets up
104105
the environment and creates directories for storage and logs, then runs
105106
the broker proper via the 'exec' shell command. Depending on the shell
106-
in use, a new process is created, or not. Thus 'pid' is overridden to
107+
in use, a new process is created, or not. Thus, 'pid' is overridden to
107108
make methods like 'kill' and 'stack_trace' use the correct process id.
108109
"""
109110
return self._pid
@@ -113,10 +114,14 @@ def wait_until_started(self):
113114
Wait until the broker has started.
114115
"""
115116
with internal_use(self):
116-
if not self.outputs_substr(
117-
"BMQbrkr started successfully", timeout=START_TIMEOUT
118-
):
119-
raise RuntimeError(f"Failed to start broker on {self.name}: timeout")
117+
BMQTST_ASSERT(
118+
self.outputs_substr(
119+
"BMQbrkr started successfully", timeout=START_TIMEOUT
120+
),
121+
"Failed to start broker: timeout",
122+
name=self.name,
123+
cwd=self._cwd,
124+
)
120125

121126
with (self._cwd / "bmqbrkr.pid").open("r") as file:
122127
self._pid = int(file.read())
@@ -210,17 +215,25 @@ def wait_status(
210215

211216
if wait_leader:
212217
leader_name = matches.pop(0)
213-
if leader_name is None:
214-
error = f"[broker {self.name}]: no active leader"
215-
self._logger.error(error)
216-
raise RuntimeError(error)
218+
BMQTST_ASSERT(
219+
leader_name is not None,
220+
"No active leader",
221+
name=self.name,
222+
cwd=self._cwd,
223+
)
224+
217225
self.last_known_leader = self.cluster.process(leader_name[1])
218226
self._logger.log(self._log_level, "leader is %s", self.last_known_leader)
219227

220-
if wait_ready and matches.pop(0) is None:
221-
error = f"[broker {self.name}]: cluster not ready"
222-
self._logger.error(error)
223-
raise RuntimeError(error)
228+
if wait_ready:
229+
cluster_name = matches.pop(0)
230+
BMQTST_ASSERT(
231+
cluster_name is not None,
232+
"Cluster is not ready",
233+
cluster=cluster,
234+
name=self.name,
235+
cwd=self._cwd,
236+
)
224237

225238
def dump_queue_internals(self, domain, queue):
226239
"""

src/python/blazingmq/dev/it/process/client.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,12 +112,14 @@ def __init__(
112112
if dump_messages:
113113
options.append("-d")
114114

115+
self._endpoint: str = f"tcp://{broker[0]}:{broker[1]}"
116+
115117
super().__init__(
116118
name,
117119
[
118120
str(tool_path),
119121
"-b",
120-
f"tcp://{broker[0]}:{broker[1]}",
122+
self._endpoint,
121123
f'--logFormat="{bmqproc.PROC_LOG_FORMAT}"',
122124
]
123125
+ options,
@@ -126,6 +128,10 @@ def __init__(
126128
**kwargs,
127129
)
128130

131+
def __repr__(self):
132+
"""Provide a string representation of this object for debug."""
133+
return f"Client(name='{self.name}', endpoint='{self._endpoint}')"
134+
129135
###########################################################################
130136
# Public API
131137

0 commit comments

Comments
 (0)