Skip to content

Commit c7a0bee

Browse files
committed
Update impact area testing
1 parent 5f5b67d commit c7a0bee

File tree

9 files changed

+572
-109
lines changed

9 files changed

+572
-109
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import ast
2+
import argparse
3+
import os
4+
import sys
5+
import pathlib
6+
import logging
7+
import json
8+
9+
logger = logging.getLogger()
10+
handler = logging.StreamHandler(sys.stdout)
11+
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
12+
handler.setFormatter(formatter)
13+
14+
15+
def find_python_files(directory):
16+
"""
17+
Recursively find all python files in a given directory.
18+
Args:
19+
directory (str): The path to the directory.
20+
Returns:
21+
list: A list of file paths.
22+
"""
23+
python_files = []
24+
for root, _, files in os.walk(directory):
25+
for script in files:
26+
if script.endswith(".py"):
27+
python_files.append(os.path.join(root, script))
28+
return python_files
29+
30+
31+
def get_called_methods_or_functions(node):
32+
"""
33+
Returns a set of functions or methods that this function or method calls.
34+
"""
35+
calls = set()
36+
for sub_node in ast.walk(node):
37+
if isinstance(sub_node, ast.Call):
38+
if isinstance(sub_node.func, ast.Name): # Function call
39+
calls.add(sub_node.func.id)
40+
elif isinstance(sub_node.func, ast.Attribute): # Method call
41+
calls.add(sub_node.func.attr)
42+
return calls
43+
44+
45+
def find_function_and_method_calls(filepath):
46+
"""
47+
Find all function and method calls in a given file.
48+
Returns a dictionary like this:
49+
{
50+
'func_name_1': {'func_a', 'func_b', 'func_c'},
51+
'ClassName.method_name': {'func_d', 'func_e'}
52+
}
53+
"""
54+
function_calls = {}
55+
ast_tree = None
56+
with open(filepath, 'r') as py_file:
57+
try:
58+
ast_tree = ast.parse(py_file.read())
59+
except SyntaxError as e:
60+
logger.error(f'Error parsing file {filepath}: {e}')
61+
return function_calls
62+
63+
for ast_node in ast.walk(ast_tree):
64+
if isinstance(ast_node, ast.FunctionDef): # Standalone function
65+
function_calls[ast_node.name] = get_called_methods_or_functions(ast_node)
66+
elif isinstance(ast_node, ast.ClassDef): # Class with methods
67+
for class_node in ast_node.body:
68+
if isinstance(class_node, ast.FunctionDef): # Method in class
69+
method_name = f"{ast_node.name}.{class_node.name}"
70+
function_calls[method_name] = get_called_methods_or_functions(class_node)
71+
72+
return function_calls
73+
74+
75+
def find_dependent_functions_and_methods(function_name, calls):
76+
"""
77+
Find all functions and methods that directly or indirectly call the given function_name.
78+
"""
79+
dependent_items = set()
80+
visited_items = set()
81+
82+
def helper(item_name):
83+
if item_name in visited_items:
84+
return
85+
visited_items.add(item_name)
86+
for caller, callee_set in calls.items():
87+
if item_name in callee_set:
88+
dependent_items.add(caller)
89+
helper(caller)
90+
91+
helper(function_name)
92+
return dependent_items
93+
94+
95+
def find_tests_using_fixture(fixture_name, python_files):
96+
"""
97+
Find all test files that use the given pytest fixture.
98+
"""
99+
affected_test_files = set()
100+
for py_file in python_files:
101+
with open(py_file, 'r') as f:
102+
try:
103+
tree = ast.parse(f.read())
104+
except SyntaxError as e:
105+
logger.error(f'Error parsing file {py_file}: {e}')
106+
continue
107+
108+
for node in tree.body:
109+
if isinstance(node, ast.FunctionDef) and node.name.startswith('test'):
110+
for arg in node.args.args:
111+
if arg.arg == fixture_name:
112+
affected_test_files.add(py_file)
113+
break
114+
return affected_test_files
115+
116+
117+
if __name__ == '__main__':
118+
description = """Given a function name and directory containing python files,
119+
find all functions that directly or indirectly call the given function or use the given fixture."""
120+
parser = argparse.ArgumentParser(description=description)
121+
122+
parser.add_argument('--function_name',
123+
type=str,
124+
required=True, help='The name of the function or fixture to check dependencies for.')
125+
parser.add_argument('--directory',
126+
type=str,
127+
required=True,
128+
help='The path to the directory containing python files.')
129+
parser.add_argument('--trace', action='store_true', help='Enable trace logging.')
130+
131+
args = parser.parse_args()
132+
if args.trace:
133+
logger.setLevel(logging.DEBUG)
134+
else:
135+
logger.setLevel(logging.INFO)
136+
137+
logger.debug('Function Name:', args.function_name)
138+
logger.debug('Directory:', args.directory)
139+
140+
# find all python files
141+
python_files = find_python_files(args.directory)
142+
logger.debug(f'Scanning {len(python_files)} python files in {args.directory}')
143+
144+
# Update to handle both functions and methods
145+
function_calls = {}
146+
function_calls_per_file = {}
147+
for py_file in python_files:
148+
file_function_calls = find_function_and_method_calls(py_file)
149+
for func_name, called_items in file_function_calls.items():
150+
if func_name in function_calls:
151+
function_calls[func_name].update(called_items)
152+
else:
153+
function_calls[func_name] = called_items.copy()
154+
function_calls_per_file[py_file] = file_function_calls.keys()
155+
156+
dependent_items = find_dependent_functions_and_methods(args.function_name, function_calls)
157+
158+
affected_files = set()
159+
for item in dependent_items:
160+
for file_path, function_items in function_calls_per_file.items():
161+
if item in function_items:
162+
affected_files.add(file_path)
163+
break
164+
165+
affected_test_files = set()
166+
for file_path in affected_files:
167+
p = pathlib.Path(file_path)
168+
if p.name.startswith('test'):
169+
affected_test_files.add(file_path)
170+
171+
# Check if the function_name is a fixture
172+
fixture_test_files = find_tests_using_fixture(args.function_name, python_files)
173+
affected_test_files.update(fixture_test_files)
174+
175+
# Check if the function_name is a test function
176+
if args.function_name.startswith('test'):
177+
for file_path, function_items in function_calls_per_file.items():
178+
if args.function_name in function_items:
179+
affected_test_files.add(file_path)
180+
181+
impacted_files = {
182+
'total_scanned': len(python_files),
183+
'number_of_impacted_tests': len(affected_test_files),
184+
'tests': list(affected_test_files)
185+
}
186+
print(json.dumps(impacted_files, indent=4))
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#!/usr/bin/env python3
2+
3+
"""
4+
Scripts for getting test scripts in impacted area
5+
Example:
6+
python impacted_area_testing/get_test_scripts.py --files file1.py file2.py
7+
8+
It will get all test scripts in specific impacted area.
9+
"""
10+
import re
11+
import logging
12+
import json
13+
import argparse
14+
from constant import PR_TOPOLOGY_TYPE, EXCLUDE_TEST_SCRIPTS
15+
from pathlib import Path
16+
17+
18+
def topo_name_to_topo_checker(topo_name):
19+
pattern = re.compile(r'^(ciscovs-7nodes|ciscovs-5nodes|wan|wan-pub-isis|wan-com|wan-pub|wan-pub-cisco|wan-3link-tg|'
20+
r't0|t0-52|t0-mclag|mgmttor|m0|mc0|mx|'
21+
r't1|t1-lag|t1-56-lag|t1-64-lag|'
22+
r'ptf|fullmesh|dualtor|t2|tgen|multidut-tgen|dpu|any|snappi|util|'
23+
r't0-2vlans|t0-sonic|t1-multi-asic)$')
24+
match = pattern.match(topo_name)
25+
if match is None:
26+
logging.warning("Unsupported testbed type - {}".format(topo_name))
27+
return topo_name
28+
29+
topo_type = match.group()
30+
if topo_type in ['mgmttor', 'm0', 'mc0', 'mx', 't0-52', 't0-mclag']:
31+
# certain testbed types are in 't0' category with different names.
32+
topo_type = 't0'
33+
elif topo_type in ['t1-lag', 't1-56-lag', 't1-64-lag']:
34+
topo_type = 't1'
35+
elif 't2' in topo_type:
36+
topo_type = 't2'
37+
38+
topology_checker = topo_type + "_checker"
39+
40+
return topology_checker
41+
42+
43+
def distribute_scripts_to_PR_checkers(match, script_name, test_scripts_per_topology_checker):
44+
for topology in match.group(1).split(","):
45+
topology_mark = topology.strip().strip('"').strip("'")
46+
if topology_mark == "any":
47+
for key in ["t0_checker", "t1_checker", "t2_checker"]:
48+
if script_name not in test_scripts_per_topology_checker[key]:
49+
test_scripts_per_topology_checker[key].append(script_name)
50+
else:
51+
topology_checker = topo_name_to_topo_checker(topology_mark)
52+
if topology_checker in test_scripts_per_topology_checker \
53+
and script_name not in test_scripts_per_topology_checker[topology_checker]:
54+
test_scripts_per_topology_checker[topology_checker].append(script_name)
55+
56+
57+
def collect_scripts_by_topology_type_from_files(files: list) -> dict:
58+
"""
59+
This function collects test scripts from the provided list of files and categorizes them by topology type.
60+
61+
Args:
62+
files: List of file paths to analyze.
63+
64+
Returns:
65+
Dict: A dict of test scripts categorized by topology type.
66+
"""
67+
# Regex pattern to find pytest topology markers
68+
pattern = re.compile(r"[^@]pytest\.mark\.topology\(([^\)]*)\)")
69+
70+
# Init the dict to record the mapping of topology type and test scripts
71+
test_scripts_per_topology_checker = {}
72+
for topology_type in PR_TOPOLOGY_TYPE:
73+
test_scripts_per_topology_checker[topology_type] = []
74+
75+
for file_path in files:
76+
# Remove the top-level 'tests' directory from the file path
77+
script_name = str(Path(file_path).relative_to("tests"))
78+
if script_name in EXCLUDE_TEST_SCRIPTS:
79+
continue
80+
81+
try:
82+
with open(file_path, 'r') as script:
83+
for line in script:
84+
# Get topology type of script from mark `pytest.mark.topology`
85+
match = pattern.search(line)
86+
if match:
87+
distribute_scripts_to_PR_checkers(match, script_name, test_scripts_per_topology_checker)
88+
break
89+
except Exception as e:
90+
raise Exception(f'Exception occurred while trying to get topology in {file_path}, error {e}')
91+
92+
return {k: v for k, v in test_scripts_per_topology_checker.items() if v}
93+
94+
95+
def main(files):
96+
scripts_list = collect_scripts_by_topology_type_from_files(files)
97+
print(json.dumps(scripts_list))
98+
99+
100+
if __name__ == '__main__':
101+
parser = argparse.ArgumentParser()
102+
parser.add_argument("--files", help="List of files to analyze", nargs='+', type=str, required=True)
103+
args = parser.parse_args()
104+
105+
files = args.files
106+
main(files)

0 commit comments

Comments
 (0)