diff --git a/examples/benchmarks/nvbandwidth.py b/examples/benchmarks/nvbandwidth.py index 45b836734..afdb46ddf 100644 --- a/examples/benchmarks/nvbandwidth.py +++ b/examples/benchmarks/nvbandwidth.py @@ -13,10 +13,10 @@ if __name__ == '__main__': context = BenchmarkRegistry.create_benchmark_context( 'nvbandwidth', - platform=Platform.CPU, + platform=Platform.CUDA, parameters=( '--buffer_size 128 ' - '--test_cases 0,1,19,20 ' + '--test_cases host_to_device_memcpy_ce device_to_host_bidirectional_memcpy_ce ' '--skip_verification ' '--disable_affinity ' '--use_mean ' diff --git a/superbench/benchmarks/micro_benchmarks/nvbandwidth.py b/superbench/benchmarks/micro_benchmarks/nvbandwidth.py index 81a032195..2f6a9c3c0 100644 --- a/superbench/benchmarks/micro_benchmarks/nvbandwidth.py +++ b/superbench/benchmarks/micro_benchmarks/nvbandwidth.py @@ -4,15 +4,23 @@ """Module of the NV Bandwidth Test.""" import os +import subprocess import re from superbench.common.utils import logger -from superbench.benchmarks import BenchmarkRegistry, Platform +from superbench.benchmarks import BenchmarkRegistry, Platform, ReturnCode from superbench.benchmarks.micro_benchmarks import MicroBenchmarkWithInvoke class NvBandwidthBenchmark(MicroBenchmarkWithInvoke): """The NV Bandwidth Test benchmark class.""" + # Regular expressions for summary line and matrix header detection + re_block_start_pattern = re.compile(r'^Running\s+(.+)$') + re_matrix_header_line = re.compile(r'^(memcpy|memory latency)') + re_matrix_row_pattern = re.compile(r'^\s*\d') + re_summary_pattern = re.compile(r'SUM (\S+) (\d+\.\d+)') + re_unsupported_pattern = re.compile(r'ERROR: Testcase (\S+) not found!') + def __init__(self, name, parameters=''): """Constructor. @@ -38,12 +46,14 @@ def add_parser_arguments(self): self._parser.add_argument( '--test_cases', + nargs='+', type=str, - default='', + default=[], required=False, help=( - 'Specify the test case(s) to run, either by name or index. By default, all test cases are executed. ' - 'Example: --test_cases 0,1,2,19,20' + 'Specify the test case(s) to execute by name only. ' + 'To view the available test case names, run the command "nvbandwidth -l" on the host. ' + 'If no specific test case is specified, all test cases will be executed by default.' ), ) @@ -92,7 +102,9 @@ def _preprocess(self): command += f' --bufferSize {self._args.buffer_size}' if self._args.test_cases: - command += ' --testcase ' + ' '.join([testcase.strip() for testcase in self._args.test_cases.split(',')]) + command += ' --testcase ' + ' '.join(self._args.test_cases) + else: + self._args.test_cases = self._get_all_test_cases() if self._args.skip_verification: command += ' --skipVerification' @@ -111,72 +123,79 @@ def _preprocess(self): return True def _process_raw_line(self, line, parse_status): - """Process a single line of raw output from the nvbandwidth benchmark. - - This function updates the `parse_status` dictionary with parsed results from the given `line`. - It detects the start of a test, parses matrix headers and rows, and extracts summary results. + """Process a raw line of text and update the parse status accordingly. Args: - line (str): A single line of raw output from the benchmark. - parse_status (dict): A dictionary to maintain the current parsing state and results. It should contain: - - 'test_name' (str): The name of the current test being parsed. - - 'benchmark_type' (str): 'bw' or 'lat'. It also indicating if matrix data is being parsed. - - 'matrix_header' (list): The header of the matrix being parsed. - - 'results' (dict): A dictionary to store the parsed results. + line (str): The raw line of text to be processed. + parse_status (dict): A dictionary containing the current parsing status, + which will be updated based on the content of the line. - Return: + Returns: None """ - # Regular expressions for summary line and matrix header detection - block_start_pattern = re.compile(r'^Running\s+(.+)$') - summary_pattern = re.compile(r'SUM (\S+) (\d+\.\d+)') - matrix_header_line = re.compile(r'^(memcpy|memory latency)') - matrix_row_pattern = re.compile(r'^\s*\d') - line = line.strip() + # Detect unsupported test cases + if self.re_unsupported_pattern.match(line): + parse_status['unsupported_testcases'].add(self.re_unsupported_pattern.match(line).group(1).lower()) + return + # Detect the start of a test - if block_start_pattern.match(line): - parse_status['test_name'] = block_start_pattern.match(line).group(1).lower()[:-1] + if self.re_block_start_pattern.match(line): + parse_status['test_name'] = self.re_block_start_pattern.match(line).group(1).lower()[:-1] + parse_status['excuted_testcases'].add(parse_status['test_name']) return # Detect the start of matrix data - if parse_status['test_name'] and matrix_header_line.match(line): + if parse_status['test_name'] and self.re_matrix_header_line.match(line): parse_status['benchmark_type'] = 'bw' if 'bandwidth' in line else 'lat' + # Parse the row and column name + tmp_idx = line.find('(row)') + parse_status['metrix_row'] = line[tmp_idx - 3:tmp_idx].lower() + tmp_idx = line.find('(column)') + parse_status['metrix_col'] = line[tmp_idx - 3:tmp_idx].lower() return # Parse the matrix header if ( parse_status['test_name'] and parse_status['benchmark_type'] and not parse_status['matrix_header'] - and matrix_row_pattern.match(line) + and self.re_matrix_row_pattern.match(line) ): parse_status['matrix_header'] = line.split() return # Parse matrix rows - if parse_status['test_name'] and parse_status['benchmark_type'] and matrix_row_pattern.match(line): + if parse_status['test_name'] and parse_status['benchmark_type'] and self.re_matrix_row_pattern.match(line): row_data = line.split() row_index = row_data[0] for col_index, value in enumerate(row_data[1:], start=1): + # Skip 'N/A' values, 'N/A' indicates the test path is self to self. + if value == 'N/A': + continue + col_header = parse_status['matrix_header'][col_index - 1] test_name = parse_status['test_name'] benchmark_type = parse_status['benchmark_type'] - metric_name = f'{test_name}_cpu{row_index}_gpu{col_header}_{benchmark_type}' + row_name = parse_status['metrix_row'] + col_name = parse_status['metrix_col'] + metric_name = f'{test_name}_{row_name}{row_index}_{col_name}{col_header}_{benchmark_type}' parse_status['results'][metric_name] = float(value) return # Parse summary results - summary_match = summary_pattern.search(line) - if summary_match: - value = float(summary_match.group(2)) + if self.re_summary_pattern.match(line): + value = self.re_summary_pattern.match(line).group(2) test_name = parse_status['test_name'] benchmark_type = parse_status['benchmark_type'] - parse_status['results'][f'{test_name}_sum_{benchmark_type}'] = value + parse_status['results'][f'{test_name}_sum_{benchmark_type}'] = float(value) # Reset parsing state for next test parse_status['test_name'] = '' parse_status['benchmark_type'] = None parse_status['matrix_header'].clear() + parse_status['metrix_row'] = '' + parse_status['metrix_col'] = '' + return def _process_raw_result(self, cmd_idx, raw_output): """Function to parse raw results and save the summarized results. @@ -195,22 +214,45 @@ def _process_raw_result(self, cmd_idx, raw_output): content = raw_output.splitlines() parsing_status = { 'results': {}, + 'excuted_testcases': set(), + 'unsupported_testcases': set(), 'benchmark_type': None, 'matrix_header': [], 'test_name': '', + 'metrix_row': '', + 'metrix_col': '', } for line in content: self._process_raw_line(line, parsing_status) + return_code = ReturnCode.SUCCESS + # Log unsupported test cases + for testcase in parsing_status['unsupported_testcases']: + logger.warning(f'Test case {testcase} is not supported.') + return_code = ReturnCode.INVALID_ARGUMENT + self._result.add_raw_data(testcase, 'Not supported', self._args.log_raw_data) + + # Check if the test case was waived + for testcase in self._args.test_cases: + if ( + testcase not in parsing_status['unsupported_testcases'] + and testcase not in parsing_status['excuted_testcases'] + ): + logger.warning(f'Test case {testcase} was waived.') + self._result.add_raw_data(testcase, 'waived', self._args.log_raw_data) + return_code = ReturnCode.INVALID_ARGUMENT + if not parsing_status['results']: self._result.add_raw_data('nvbandwidth', 'No valid results found', self._args.log_raw_data) + return_code = ReturnCode.MICROBENCHMARK_RESULT_PARSING_FAILURE return False # Store parsed results for metric, value in parsing_status['results'].items(): self._result.add_result(metric, value) + self._result.set_return_code(return_code) return True except Exception as e: logger.error( @@ -221,5 +263,29 @@ def _process_raw_result(self, cmd_idx, raw_output): self._result.add_result('abort', 1) return False + @staticmethod + def _get_all_test_cases(): + command = 'nvbandwidth -l' + test_case_pattern = re.compile(r'(\d+),\s+([\w_]+):') + + try: + # Execute the command and capture output + result = subprocess.run(command, shell=True, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + # Check the return code + if result.returncode != 0: + logger.error(f'{command} failed with return code {result.returncode}') + return [] + + if result.stderr: + logger.error(f'{command} failed with {result.stderr}') + return [] + + # Parse the output + return [name for _, name in test_case_pattern.findall(result.stdout)] + except Exception as e: + logger.error(f'Failed to get all test case names: {e}') + return [] + BenchmarkRegistry.register_benchmark('nvbandwidth', NvBandwidthBenchmark, platform=Platform.CUDA) diff --git a/superbench/config/default.yaml b/superbench/config/default.yaml index 601136e9f..fdf758632 100644 --- a/superbench/config/default.yaml +++ b/superbench/config/default.yaml @@ -134,6 +134,22 @@ superbench: copy_type: - sm - dma + nvbandwidth: + enable: true + modes: + - name: local + parallel: no + parameters: + buffer_size: 128 + test_cases: + - host_to_device_memcpy_ce + - device_to_host_memcpy_ce + - host_to_device_memcpy_sm + - device_to_host_memcpy_sm + num_loops: 6 + skip_verification: false + disable_affinity: false + use_mean: false kernel-launch: <<: *default_local_mode gemm-flops: diff --git a/tests/benchmarks/micro_benchmarks/test_nvbandwidth.py b/tests/benchmarks/micro_benchmarks/test_nvbandwidth.py index f6c82a030..1e6f8e93c 100644 --- a/tests/benchmarks/micro_benchmarks/test_nvbandwidth.py +++ b/tests/benchmarks/micro_benchmarks/test_nvbandwidth.py @@ -23,7 +23,7 @@ def test_nvbandwidth_preprocess(self): """Test NV Bandwidth benchmark preprocess.""" benchmark_name = 'nvbandwidth' (benchmark_class, - predefine_params) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA) + _) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA) assert (benchmark_class) # Test preprocess with default parameters @@ -34,7 +34,7 @@ def test_nvbandwidth_preprocess(self): # Test preprocess with specified parameters parameters = ( '--buffer_size 256 ' - '--test_cases 0,1,2,19,20 ' + '--test_cases host_to_device_memcpy_ce device_to_host_bidirectional_memcpy_ce ' '--skip_verification ' '--disable_affinity ' '--use_mean ' @@ -47,7 +47,7 @@ def test_nvbandwidth_preprocess(self): # Check command assert (1 == len(benchmark._commands)) assert ('--bufferSize 256' in benchmark._commands[0]) - assert ('--testcase 0 1 2 19 20' in benchmark._commands[0]) + assert ('--testcase host_to_device_memcpy_ce device_to_host_bidirectional_memcpy_ce' in benchmark._commands[0]) assert ('--skipVerification' in benchmark._commands[0]) assert ('--disableAffinity' in benchmark._commands[0]) assert ('--useMean' in benchmark._commands[0]) @@ -58,7 +58,7 @@ def test_nvbandwidth_result_parsing_real_output(self, results): """Test NV Bandwidth benchmark result parsing.""" benchmark_name = 'nvbandwidth' (benchmark_class, - predefine_params) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA) + _) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark(benchmark_name, Platform.CUDA) assert (benchmark_class) benchmark = benchmark_class(benchmark_name, parameters='') @@ -78,3 +78,100 @@ def test_nvbandwidth_result_parsing_real_output(self, results): assert benchmark.result['device_to_host_memcpy_ce_sum_bw'][0] == 607.26 assert benchmark.result['host_device_latency_sm_cpu0_gpu0_lat'][0] == 772.58 assert benchmark.result['host_device_latency_sm_sum_lat'][0] == 772.58 + + def test_nvbandwidth_process_raw_result_unsupported_testcases(self): + """Test NV Bandwidth benchmark result parsing with unsupported test cases.""" + benchmark_name = 'nvbandwidth' + (benchmark_class, _) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark( + benchmark_name, Platform.CUDA + ) + assert (benchmark_class) + + benchmark = benchmark_class(benchmark_name, parameters='') + + # Preprocess and validate command + assert benchmark._preprocess() + + # Mock raw output with unsupported test cases + raw_output = """ + ERROR: Testcase unsupported_testcase_1 not found! + ERROR: Testcase unsupported_testcase_2 not found! + """ + + # Parse the provided raw output + assert not benchmark._process_raw_result(0, raw_output) + + # Validate unsupported test cases + assert 'unsupported_testcase_1' in benchmark._result.raw_data + assert benchmark._result.raw_data['unsupported_testcase_1'][0] == 'Not supported' + assert 'unsupported_testcase_2' in benchmark._result.raw_data + assert benchmark._result.raw_data['unsupported_testcase_1'][0] == 'Not supported' + + def test_nvbandwidth_process_raw_result_waived_testcases(self): + """Test NV Bandwidth benchmark result parsing with waived test cases.""" + benchmark_name = 'nvbandwidth' + (benchmark_class, _) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark( + benchmark_name, Platform.CUDA + ) + assert (benchmark_class) + + benchmark = benchmark_class(benchmark_name, parameters='') + + # Preprocess and validate command + assert benchmark._preprocess() + + # Mock raw output with no executed test cases + raw_output = """ + """ + + # Set test cases to include some that will be waived + benchmark._args.test_cases = ['waived_testcase_1', 'waived_testcase_2'] + + # Parse the provided raw output + assert not benchmark._process_raw_result(0, raw_output) + + # Validate waived test cases + assert 'waived_testcase_1' in benchmark._result.raw_data + assert benchmark._result.raw_data['waived_testcase_1'][0] == 'waived' + assert 'waived_testcase_2' in benchmark._result.raw_data + assert benchmark._result.raw_data['waived_testcase_2'][0] == 'waived' + + def test_get_all_test_cases(self): + """Test _get_all_test_cases method.""" + benchmark_name = 'nvbandwidth' + (benchmark_class, _) = BenchmarkRegistry._BenchmarkRegistry__select_benchmark( + benchmark_name, Platform.CUDA + ) + assert (benchmark_class) + + benchmark = benchmark_class(benchmark_name, parameters='') + + # Mock subprocess.run for successful execution with valid output + with unittest.mock.patch('subprocess.run') as mock_run: + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = ( + '1, host_to_device_memcpy_ce:\n' + '2, device_to_host_bidirectional_memcpy_ce:' + ) + mock_run.return_value.stderr = '' + test_cases = benchmark._get_all_test_cases() + assert test_cases == [ + 'host_to_device_memcpy_ce', + 'device_to_host_bidirectional_memcpy_ce' + ] + + # Mock subprocess.run for execution with non-zero return code + with unittest.mock.patch('subprocess.run') as mock_run: + mock_run.return_value.returncode = 1 + mock_run.return_value.stdout = '' + mock_run.return_value.stderr = 'Error' + test_cases = benchmark._get_all_test_cases() + assert test_cases == [] + + # Mock subprocess.run for execution with error message in stderr + with unittest.mock.patch('subprocess.run') as mock_run: + mock_run.return_value.returncode = 0 + mock_run.return_value.stdout = '' + mock_run.return_value.stderr = 'Error' + test_cases = benchmark._get_all_test_cases() + assert test_cases == []