|
15 | 15 | import sys |
16 | 16 | from functools import partial |
17 | 17 | from operator import attrgetter |
18 | | -from typing import Dict, List, Optional, Set, TextIO, Type |
| 18 | +from typing import Dict, Iterator, List, Optional, Set, TextIO, Type |
19 | 19 |
|
20 | 20 | from pydantic.json import custom_pydantic_encoder # pylint: disable=no-name-in-module |
21 | 21 |
|
@@ -220,56 +220,67 @@ def print_json(self, out: TextIO) -> None: |
220 | 220 | } |
221 | 221 | json.dump(json_dict, out, indent=2, default=encoder) |
222 | 222 |
|
223 | | - def print_human_readable(self, out: TextIO, details: bool = True) -> None: |
| 223 | + def print_human_readable(self, out: TextIO, detailed: bool = True) -> None: |
224 | 224 | """Print a human-readable rendering of this analysis to 'out'.""" |
225 | | - if self.is_enabled(Action.LIST_SOURCES): |
226 | | - if details: |
| 225 | + |
| 226 | + def render_sources() -> Iterator[str]: |
| 227 | + if detailed: |
227 | 228 | # Sort sources by type, then by path |
228 | 229 | source_types = [ |
229 | 230 | (CodeSource, "Sources of Python code:"), |
230 | 231 | (DepsSource, "Sources of declared dependencies:"), |
231 | 232 | (PyEnvSource, "Python environments:"), |
232 | 233 | ] |
233 | 234 | for source_type, heading in source_types: |
234 | | - sources = {s for s in self.sources if s.source_type is source_type} |
235 | | - if sources: |
236 | | - print("\n" + heading, file=out) |
237 | | - lines = [f" {src.render(details)}" for src in sources] |
238 | | - print("\n".join(sorted(lines)), file=out) |
| 235 | + filtered = {s for s in self.sources if s.source_type is source_type} |
| 236 | + if filtered: |
| 237 | + yield "\n" + heading |
| 238 | + yield from sorted([f" {src.render(True)}" for src in filtered]) |
239 | 239 | else: |
240 | | - unique_paths = {src.render(details) for src in self.sources} |
241 | | - print("\n".join(unique_paths), file=out) |
| 240 | + yield from sorted({src.render(False) for src in self.sources}) |
242 | 241 |
|
243 | | - if self.is_enabled(Action.LIST_IMPORTS): |
244 | | - if details: |
| 242 | + def render_imports() -> Iterator[str]: |
| 243 | + if detailed: |
245 | 244 | # Sort imports by source, then by name |
246 | 245 | for imp in sorted(self.imports, key=attrgetter("source", "name")): |
247 | | - print(f"{imp.source}: {imp.name}", file=out) |
| 246 | + yield f"{imp.source}: {imp.name}" |
248 | 247 | else: |
249 | 248 | unique_imports = {i.name for i in self.imports} |
250 | | - print("\n".join(sorted(unique_imports)), file=out) |
251 | | - |
252 | | - if self.is_enabled(Action.LIST_DEPS): |
253 | | - if details: |
254 | | - # Sort dependencies by location, then by name |
255 | | - for dep in sorted( |
256 | | - set(self.declared_deps), key=attrgetter("source", "name") |
257 | | - ): |
258 | | - print(f"{dep.source}: {dep.name}", file=out) |
| 249 | + yield from sorted(unique_imports) |
| 250 | + |
| 251 | + def render_declared_deps() -> Iterator[str]: |
| 252 | + if detailed: |
| 253 | + # Sort dependencies by source, then by name |
| 254 | + unique_deps = set(self.declared_deps) |
| 255 | + for dep in sorted(unique_deps, key=attrgetter("source", "name")): |
| 256 | + yield f"{dep.source}: {dep.name}" |
259 | 257 | else: |
260 | | - print( |
261 | | - "\n".join(sorted(set(d.name for d in self.declared_deps))), file=out |
262 | | - ) |
| 258 | + yield from sorted(set(d.name for d in self.declared_deps)) |
263 | 259 |
|
264 | | - if self.is_enabled(Action.REPORT_UNDECLARED) and self.undeclared_deps: |
265 | | - print("\nThese imports appear to be undeclared dependencies:", file=out) |
| 260 | + def render_undeclared() -> Iterator[str]: |
| 261 | + yield "\nThese imports appear to be undeclared dependencies:" |
266 | 262 | for undeclared in self.undeclared_deps: |
267 | | - print(f"- {undeclared.render(details)}", file=out) |
| 263 | + yield f"- {undeclared.render(detailed)}" |
268 | 264 |
|
269 | | - if self.is_enabled(Action.REPORT_UNUSED) and self.unused_deps: |
270 | | - print(f"\n{UNUSED_DEPS_OUTPUT_PREFIX}:", file=out) |
| 265 | + def render_unused() -> Iterator[str]: |
| 266 | + yield f"\n{UNUSED_DEPS_OUTPUT_PREFIX}:" |
271 | 267 | for unused in sorted(self.unused_deps, key=lambda d: d.name): |
272 | | - print(f"- {unused.render(details)}", file=out) |
| 268 | + yield f"- {unused.render(detailed)}" |
| 269 | + |
| 270 | + def output(lines: Iterator[str]) -> None: |
| 271 | + for line in lines: |
| 272 | + print(line, file=out) |
| 273 | + |
| 274 | + if self.is_enabled(Action.LIST_SOURCES): |
| 275 | + output(render_sources()) |
| 276 | + if self.is_enabled(Action.LIST_IMPORTS): |
| 277 | + output(render_imports()) |
| 278 | + if self.is_enabled(Action.LIST_DEPS): |
| 279 | + output(render_declared_deps()) |
| 280 | + if self.is_enabled(Action.REPORT_UNDECLARED) and self.undeclared_deps: |
| 281 | + output(render_undeclared()) |
| 282 | + if self.is_enabled(Action.REPORT_UNUSED) and self.unused_deps: |
| 283 | + output(render_unused()) |
273 | 284 |
|
274 | 285 | @staticmethod |
275 | 286 | def success_message(check_undeclared: bool, check_unused: bool) -> Optional[str]: |
@@ -318,11 +329,11 @@ def print_output( |
318 | 329 | if analysis.settings.output_format == OutputFormat.JSON: |
319 | 330 | analysis.print_json(stdout) |
320 | 331 | elif analysis.settings.output_format == OutputFormat.HUMAN_DETAILED: |
321 | | - analysis.print_human_readable(stdout, details=True) |
| 332 | + analysis.print_human_readable(stdout, detailed=True) |
322 | 333 | if exit_code == 0 and success_message: |
323 | 334 | print(f"\n{success_message}", file=stdout) |
324 | 335 | elif analysis.settings.output_format == OutputFormat.HUMAN_SUMMARY: |
325 | | - analysis.print_human_readable(stdout, details=False) |
| 336 | + analysis.print_human_readable(stdout, detailed=False) |
326 | 337 | if exit_code == 0 and success_message: |
327 | 338 | print(f"\n{success_message}", file=stdout) |
328 | 339 | else: |
|
0 commit comments