|
8 | 8 | from __future__ import annotations
|
9 | 9 |
|
10 | 10 | import os
|
| 11 | +import pathlib |
| 12 | +import signal |
11 | 13 | import subprocess
|
12 | 14 | import sys
|
13 | 15 | import tempfile
|
| 16 | +import threading |
| 17 | +import time |
14 | 18 | import unittest
|
15 | 19 |
|
16 | 20 | from mypy.dmypy_server import filter_out_missing_top_level_packages
|
@@ -130,3 +134,86 @@ def make_file(self, base: str, path: str) -> None:
|
130 | 134 | if not path.endswith("/"):
|
131 | 135 | with open(fullpath, "w") as f:
|
132 | 136 | f.write("# test file")
|
| 137 | + |
| 138 | + |
| 139 | +class DaemonWatchSuite(unittest.TestCase): |
| 140 | + def setUp(self): |
| 141 | + self.temp_dir = tempfile.TemporaryDirectory() |
| 142 | + self.temp_path = pathlib.Path(self.temp_dir.name) |
| 143 | + self.output_lines = [] |
| 144 | + self.stop_reader = False |
| 145 | + |
| 146 | + def _read_output(self): |
| 147 | + for line in self.process.stdout: |
| 148 | + self.output_lines.append(line.strip()) |
| 149 | + if self.stop_reader: |
| 150 | + break |
| 151 | + |
| 152 | + def _start_watching(self, args: str, start_daemon: bool = True): |
| 153 | + if start_daemon: |
| 154 | + subprocess.run( |
| 155 | + [sys.executable, "-m", "mypy.dmypy", "start"], |
| 156 | + stdout=subprocess.PIPE, |
| 157 | + stderr=subprocess.STDOUT, |
| 158 | + cwd=self.temp_path, |
| 159 | + check=True, |
| 160 | + ) |
| 161 | + |
| 162 | + self.process = subprocess.Popen( |
| 163 | + [sys.executable, "-m", "mypy.dmypy", "watch", args], |
| 164 | + stdout=subprocess.PIPE, |
| 165 | + stderr=subprocess.STDOUT, |
| 166 | + cwd=self.temp_path, |
| 167 | + text=True, |
| 168 | + universal_newlines=True, |
| 169 | + bufsize=1, |
| 170 | + ) |
| 171 | + |
| 172 | + self.reader_thread = threading.Thread(target=self._read_output, daemon=True) |
| 173 | + self.reader_thread.start() |
| 174 | + |
| 175 | + def _wait_for_output(self, text: str, timeout=5): |
| 176 | + """Wait for text to appear in output within timeout seconds.""" |
| 177 | + start_time = time.time() |
| 178 | + while time.time() - start_time < timeout: |
| 179 | + if any(text in line for line in self.output_lines): |
| 180 | + return True |
| 181 | + time.sleep(0.1) |
| 182 | + return False |
| 183 | + |
| 184 | + def test_watcher_reacts_to_file_changes(self): |
| 185 | + (self.temp_path / "valid.py").write_text( |
| 186 | + "def hello_world() -> str:\n return 'Hello World!'" |
| 187 | + ) |
| 188 | + |
| 189 | + self._start_watching(".") |
| 190 | + |
| 191 | + # The initial run can take a bit longer, therefore the 10s timeout |
| 192 | + self.assertTrue(self._wait_for_output("Success: no issues found in 1 source file", 10)) |
| 193 | + |
| 194 | + (self.temp_path / "invalid.py").write_text( |
| 195 | + "def hello_world() -> int:\n return 'Hello World!'" |
| 196 | + ) |
| 197 | + |
| 198 | + self.assertTrue(self._wait_for_output("Incompatible return value type")) |
| 199 | + self.assertTrue(self._wait_for_output("Found 1 error in 1 file")) |
| 200 | + |
| 201 | + def tearDown(self): |
| 202 | + print(self.output_lines) |
| 203 | + subprocess.run([sys.executable, "-m", "mypy.dmypy", "stop"], cwd=self.temp_path) |
| 204 | + |
| 205 | + # Send SIGINT to terminate the watcher process |
| 206 | + if self.process.poll() is None: |
| 207 | + self.process.send_signal(signal.SIGINT) |
| 208 | + try: |
| 209 | + self.process.wait(timeout=5) |
| 210 | + except subprocess.TimeoutExpired: |
| 211 | + self.process.kill() |
| 212 | + |
| 213 | + # Stop the output reader thread |
| 214 | + self.stop_reader = True |
| 215 | + if self.reader_thread.is_alive(): |
| 216 | + self.reader_thread.join(timeout=5) |
| 217 | + |
| 218 | + # Clean up temp directory |
| 219 | + self.temp_dir.cleanup() |
0 commit comments