Skip to content

Commit c78af12

Browse files
committed
Brightness Library limit open files #223
1 parent 7c258ae commit c78af12

File tree

1 file changed

+74
-51
lines changed

1 file changed

+74
-51
lines changed

lnxlink/modules/scripts/monitor_brightness.py

Lines changed: 74 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import struct
1212
import functools
1313
import operator
14-
from typing import Optional, List, Set
14+
from typing import Optional, List, Set, BinaryIO
1515

1616

1717
class MonitorBrightness:
@@ -40,7 +40,7 @@ def list_displays() -> tuple[List["MonitorBrightness"], Set[str]]:
4040

4141
# 1. Internal Laptop Displays (Sysfs)
4242
for path in glob.glob("/sys/class/backlight/*"):
43-
found_displays.append(SysfsMonitor(path))
43+
# Check permissions
4444
if not os.access(os.path.join(path, "brightness"), os.W_OK):
4545
issues.add(
4646
"Backlight Permission Error: Create a udev rule.\n"
@@ -54,21 +54,23 @@ def list_displays() -> tuple[List["MonitorBrightness"], Set[str]]:
5454
# 2. External Monitors (I2C/DDC)
5555
for i2c_path in sorted(glob.glob("/dev/i2c-*")):
5656
try:
57-
fd = os.open(i2c_path, os.O_RDWR)
58-
# Use EDID address to identify the monitor
59-
fcntl.ioctl(fd, 0x0703, 0x50)
60-
data = os.read(fd, 256)
61-
os.close(fd)
57+
# Use 'rb+' for binary read/write, buffering=0 for unbuffered I/O
58+
with open(i2c_path, "r+b", buffering=0) as f:
59+
# Use EDID address to identify the monitor
60+
# 0x0703 is I2C_SLAVE
61+
fcntl.ioctl(f.fileno(), 0x0703, 0x50)
62+
data = f.read(256)
6263

6364
if data.startswith(bytes.fromhex("00 FF FF FF FF FF FF 00")):
6465
mfg, name, _ = DDCIPMonitor.parse_edid(data)
6566
found_displays.append(DDCIPMonitor(i2c_path, mfg, name))
67+
6668
except PermissionError:
6769
issues.add(
6870
f"I2C Permission Error: User '{os.getlogin()}' needs 'i2c' group access.\n"
6971
f"Run: sudo usermod -aG i2c {os.getlogin()} && reboot"
7072
)
71-
except Exception:
73+
except (OSError, IOError, Exception):
7274
continue
7375

7476
return found_displays, issues
@@ -85,34 +87,36 @@ class SysfsMonitor(MonitorBrightness):
8587
def __init__(self, path: str):
8688
name = os.path.basename(path)
8789
super().__init__(path, "Internal", name)
90+
# Cache max_brightness on init so we don't read it every time
91+
self.max_brightness = self._read_value("max_brightness")
8892

89-
def get_brightness(self, timeout: float = 1.0) -> Optional[int]:
93+
def _read_value(self, filename: str) -> int:
94+
"""Helper to read a single integer from a sysfs file."""
9095
try:
9196
with open(
92-
os.path.join(self.identifier, "brightness"), "r", encoding="UTF-8"
97+
os.path.join(self.identifier, filename), "r", encoding="UTF-8"
9398
) as f:
94-
cur_val = int(f.read().strip())
95-
with open(
96-
os.path.join(self.identifier, "max_brightness"), "r", encoding="UTF-8"
97-
) as f:
98-
max_val = int(f.read().strip())
99-
100-
if max_val == 0:
101-
return None
102-
self.last_successful_read = int((cur_val / max_val) * 100)
103-
return self.last_successful_read
99+
return int(f.read().strip())
104100
except (OSError, ValueError):
101+
return 0
102+
103+
def get_brightness(self, timeout: float = 1.0) -> Optional[int]:
104+
# If max is 0, device is invalid or unreadable
105+
if not self.max_brightness:
105106
return None
106107

108+
cur_val = self._read_value("brightness")
109+
self.last_successful_read = int((cur_val / self.max_brightness) * 100)
110+
return self.last_successful_read
111+
107112
def set_brightness(self, value: int) -> None:
113+
if not self.max_brightness:
114+
return
115+
108116
try:
109117
target_pct = max(0, min(100, int(value)))
110-
with open(
111-
os.path.join(self.identifier, "max_brightness"), "r", encoding="UTF-8"
112-
) as f:
113-
max_val = int(f.read().strip())
118+
actual_value = int((target_pct / 100) * self.max_brightness)
114119

115-
actual_value = int((target_pct / 100) * max_val)
116120
with open(
117121
os.path.join(self.identifier, "brightness"), "w", encoding="UTF-8"
118122
) as f:
@@ -167,57 +171,76 @@ def parse_edid(data: bytes):
167171
except Exception:
168172
return "Unknown", "Unknown", "Unknown"
169173

170-
def _ddc_command(self, payload: list, read_length: int = 0) -> Optional[bytes]:
171-
"""Low-level I2C write/read transaction."""
172-
fd = None
174+
def _ddc_command(
175+
self, file_obj: BinaryIO, payload: list, read_length: int = 0
176+
) -> Optional[bytes]:
177+
"""
178+
Low-level I2C write/read transaction.
179+
Expects an already open binary file object.
180+
"""
173181
try:
174-
fd = os.open(self.identifier, os.O_RDWR)
175-
fcntl.ioctl(fd, self.I2C_SLAVE, self.DDC_ADDR)
176-
177182
# Construct DDC packet with checksum
178183
packet = bytearray([self.HOST_ADDR_W, len(payload) | 0x80] + payload)
179184
checksum = functools.reduce(operator.xor, packet, self.DEST_ADDR_W)
180185
packet.append(checksum)
181186

182-
os.write(fd, packet)
187+
file_obj.write(packet)
183188
time.sleep(0.05)
184189

185190
if read_length > 0:
186-
data = os.read(fd, read_length)
191+
data = file_obj.read(read_length)
187192
checksum = functools.reduce(operator.xor, data[:-1], self.HOST_ADDR_R)
188193
if checksum == data[-1]:
189194
return data
190195
return None
191196
except (OSError, IOError):
192197
return None
193-
finally:
194-
if fd is not None:
195-
os.close(fd)
196198

197199
def get_brightness(self, timeout: float = 1.0) -> Optional[int]:
198200
"""Polls for external brightness via VCP."""
199201
start_time = time.monotonic()
200-
while (time.monotonic() - start_time) < timeout:
201-
data = self._ddc_command([self.GET_VCP_CMD, self.VCP_BRIGHTNESS], 11)
202-
# DDC reply: [source, length, result, code, max_h, max_l, cur_h, cur_l, checksum]
203-
if data and len(data) >= 10:
204-
max_val = int.from_bytes(data[6:8], "big")
205-
cur_val = int.from_bytes(data[8:10], "big")
206-
207-
if max_val != 0:
208-
self.last_successful_read = int((cur_val / max_val) * 100)
209-
return self.last_successful_read
210-
211-
time.sleep(0.1) # Retry interval
202+
203+
try:
204+
# Open the I2C file ONCE here to avoid overhead in the loop
205+
with open(self.identifier, "r+b", buffering=0) as f:
206+
fcntl.ioctl(f.fileno(), self.I2C_SLAVE, self.DDC_ADDR)
207+
208+
while (time.monotonic() - start_time) < timeout:
209+
# Pass the open file handle to the command
210+
data = self._ddc_command(
211+
f, [self.GET_VCP_CMD, self.VCP_BRIGHTNESS], 11
212+
)
213+
214+
if data and len(data) >= 10:
215+
max_val = int.from_bytes(data[6:8], "big")
216+
cur_val = int.from_bytes(data[8:10], "big")
217+
218+
if max_val != 0:
219+
self.last_successful_read = int((cur_val / max_val) * 100)
220+
return self.last_successful_read
221+
222+
time.sleep(0.1) # Retry interval
223+
except (OSError, IOError):
224+
pass
225+
212226
return None
213227

214228
def set_brightness(self, value: int, retries: int = 10) -> None:
215229
"""Sets brightness and verifies the hardware change."""
216230
target = max(0, min(100, int(value)))
217231
self.last_successful_read = target
218-
# Send set command
219-
for _attempt in range(retries):
220-
self._ddc_command([self.SET_VCP_CMD, self.VCP_BRIGHTNESS, 0x00, target])
232+
233+
try:
234+
with open(self.identifier, "r+b", buffering=0) as f:
235+
fcntl.ioctl(f.fileno(), self.I2C_SLAVE, self.DDC_ADDR)
236+
237+
for _attempt in range(retries):
238+
self._ddc_command(
239+
f, [self.SET_VCP_CMD, self.VCP_BRIGHTNESS, 0x00, target]
240+
)
241+
time.sleep(0.02) # Slight delay between retries if needed
242+
except (OSError, IOError):
243+
pass
221244

222245

223246
if __name__ == "__main__":

0 commit comments

Comments
 (0)