Skip to content

Commit e19fb3b

Browse files
committed
add level_specs filter
1 parent e09f30d commit e19fb3b

File tree

1 file changed

+87
-43
lines changed

1 file changed

+87
-43
lines changed

morebuiltins/cmd/log_server.py

Lines changed: 87 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import logging.handlers
66
import os
77
import pickle
8+
import re
89
import shutil
910
import signal
1011
import sys
@@ -74,14 +75,26 @@ def datefmt(self) -> str:
7475
return getattr(self.formatter, "datefmt", "")
7576

7677
def __post_init__(self):
78+
formatter = self.formatter
79+
if isinstance(formatter, str):
80+
# base64 formatter
81+
self.formatter = self.pickle_from_base64(formatter)
82+
elif isinstance(formatter, logging.Formatter):
83+
pass
84+
else:
85+
self.formatter = DefaultLogSetting.formatter
7786
for index, level in enumerate(self.level_specs):
7887
if isinstance(level, int):
7988
continue
8089
level = str(level).upper()
8190
if level not in logging._nameToLevel:
82-
raise ValueError(
83-
f"level_specs[{index}] invalid log level name: {level}"
84-
)
91+
if re.match(r"^Level \d+$", level):
92+
level = level.split()[-1]
93+
return int(level)
94+
else:
95+
raise ValueError(
96+
f"level_specs[{index}] invalid log level name: {level}"
97+
)
8598
self.level_specs[index] = logging._nameToLevel[level]
8699
super().__post_init__()
87100

@@ -98,29 +111,24 @@ def pickle_from_base64(data: str):
98111
return pickle.loads(base64.b64decode(data.encode("utf-8")))
99112

100113
@classmethod
101-
def from_dict(
102-
cls, formatter: typing.Union[str, logging.Formatter, None] = None, **kwargs
103-
):
104-
if isinstance(formatter, str):
105-
# base64 formatter
106-
kwargs["formatter"] = cls.pickle_from_base64(formatter)
107-
elif isinstance(formatter, logging.Formatter):
108-
kwargs["formatter"] = formatter
109-
else:
110-
kwargs["formatter"] = DefaultLogSetting.formatter
114+
def from_dict(cls, **kwargs):
111115
kwargs = {k: v for k, v in kwargs.items() if k in cls.__annotations__}
112116
return cls(**kwargs)
113117

114118
def to_dict_with_meta(self) -> dict:
115-
data = asdict(self)
116-
data["formatter"] = self.pickle_to_base64(self.formatter)
117-
data["fmt"] = self.fmt
118-
data["datefmt"] = self.datefmt
119+
meta: dict = {
120+
"create_time": self.create_time,
121+
"fmt": self.fmt,
122+
"datefmt": self.datefmt,
123+
}
124+
meta.update(asdict(self))
125+
# base64 formatter
126+
meta["formatter"] = self.pickle_to_base64(self.formatter)
119127
# int to str
120-
data["level_specs"] = [
128+
meta["level_specs"] = [
121129
logging.getLevelName(level) for level in self.level_specs
122130
]
123-
return data
131+
return meta
124132

125133
def __eq__(self, other):
126134
if not isinstance(other, LogSetting):
@@ -329,6 +337,7 @@ async def __aenter__(self):
329337
async def __aexit__(self, *_errors):
330338
await asyncio.sleep(0.01)
331339
await super().__aexit__(*_errors)
340+
await asyncio.to_thread(self.close_opened_files)
332341

333342
@staticmethod
334343
def default_settings():
@@ -382,12 +391,16 @@ def get_targets(
382391
name: str,
383392
max_size=DefaultLogSetting.max_size,
384393
max_backups=DefaultLogSetting.max_backups,
394+
level_spec: typing.Optional[int] = None,
385395
):
386396
targets = []
387-
if self.log_stream:
388-
targets.append(self.log_stream)
389-
elif name == self.name:
397+
if name == self.name:
398+
# server log always to stderr
390399
targets.append(sys.stderr)
400+
elif self.log_stream:
401+
if not level_spec:
402+
# spec log only to file
403+
targets.append(self.log_stream)
391404
if self.log_dir:
392405
if name in self._opened_files:
393406
fw = self._opened_files[name]
@@ -408,13 +421,18 @@ def get_targets(
408421
max_backups=max_backups,
409422
compress=self.compress,
410423
)
424+
self._opened_files[name] = fw
425+
targets.append(fw)
411426
except Exception as e:
412427
self.send_log(
413-
f"error in get_targets({name!r}, {max_size!r}, {max_backups!r})",
428+
f"get targets error ({name!r}, {max_size!r}, {max_backups!r}) {e!r}",
414429
e,
415430
level=logging.ERROR,
416431
)
417-
targets.append(self._opened_files.setdefault(name, fw))
432+
if level_spec is not None:
433+
for t in targets:
434+
if isinstance(t, RotatingFileWriter):
435+
setattr(t, "level_spec", level_spec)
418436
return targets
419437

420438
def save_new_setting(self, name, setting: LogSetting):
@@ -522,16 +540,36 @@ def write_queue_consumer(self):
522540
for name, record_list in new_lines.items():
523541
setting = self.get_setting(name)
524542
_format = setting.formatter.format
525-
lines = [_format(record) for record in record_list]
543+
lines = [
544+
(record.levelno, _format(record)) for record in record_list
545+
]
526546
targets = self.get_targets(
527547
name,
528548
max_size=setting.max_size,
529549
max_backups=setting.max_backups,
530550
)
551+
if setting.level_specs:
552+
for levelno in setting.level_specs:
553+
levelname = (
554+
logging.getLevelName(levelno).lower().replace(" ", "-")
555+
)
556+
alias_name = f"{name}_{levelname}"
557+
targets.extend(
558+
self.get_targets(
559+
alias_name,
560+
max_size=setting.max_size,
561+
max_backups=setting.max_backups,
562+
level_spec=levelno,
563+
)
564+
)
565+
text_counter = 0
531566
for log_file in targets:
532567
try:
533-
lines_text = "\n".join(lines)
534-
log_file.write(f"{lines_text}\n")
568+
levelno = getattr(log_file, "level_spec", 0)
569+
_lines = [text for level, text in lines if level >= levelno]
570+
lines_text = "\n".join(_lines) + "\n"
571+
text_counter += len(lines_text)
572+
log_file.write(lines_text)
535573
log_file.flush()
536574
except Exception as e:
537575
self.send_log(
@@ -541,9 +579,7 @@ def write_queue_consumer(self):
541579
)
542580
if name != self.name:
543581
self._lines_counter[name] += len(lines)
544-
self._size_counter[name] += sum(
545-
[len(line) for line in lines]
546-
) + len(lines)
582+
self._size_counter[name] += text_counter
547583
if self._lines_counter:
548584
now = time.time()
549585
if now - last_log_time > interval:
@@ -617,8 +653,15 @@ def handle_signal(self, sig, frame):
617653
self.loop.call_soon_threadsafe(self._shutdown_ev.set)
618654

619655
def __del__(self):
620-
for f in self._opened_files.values():
621-
f.close()
656+
self.close_opened_files()
657+
658+
def close_opened_files(self):
659+
while self._opened_files:
660+
_, fw = self._opened_files.popitem()
661+
try:
662+
fw.close()
663+
except Exception as e:
664+
pass
622665

623666
async def run_wrapper(self):
624667
async with self:
@@ -642,7 +685,7 @@ def __exit__(self, exc_type, exc_value, traceback):
642685
self._write_queue.put_nowait(STOP_SIG)
643686
self.shutdown()
644687
self._thread.join(timeout=1)
645-
return self
688+
self.close_opened_files()
646689

647690

648691
CONNECTED_HANDLERS: typing.Dict[
@@ -823,26 +866,27 @@ async def main():
823866
await ls.wait_closed()
824867

825868

826-
def sync_test():
827-
with LogServer() as ls:
828-
logger = get_logger("test_logger", host=ls.host, port=ls.port)
869+
async def async_test():
870+
async with LogServer(log_dir="logs"):
871+
logger = get_logger("test_async", level_specs=[logging.ERROR, logging.INFO])
829872
for i in range(5):
830873
logger.info(f"log server test message {i + 1}")
874+
logger.error(f"log server test message {i + 1}")
875+
# shutil.rmtree("logs", ignore_errors=True)
831876

832877

833-
async def async_test():
834-
async with LogServer(log_dir="logs"):
835-
logger = get_logger("test_logger", level_specs=[logging.ERROR])
878+
def sync_test():
879+
# return asyncio.run(async_test())
880+
with LogServer(log_dir="logs"):
881+
logger = get_logger("test_sync", level_specs=[logging.ERROR, 13])
836882
for i in range(5):
837883
logger.info(f"log server test message {i + 1}")
838-
logger.error(f"log server test message {i + 1}")
839-
shutil.rmtree("logs")
884+
# shutil.rmtree("logs", ignore_errors=True)
840885

841886

842887
def entrypoint():
843-
return asyncio.run(main())
844-
# return asyncio.run(async_test())
845888
# return sync_test()
889+
return asyncio.run(main())
846890

847891

848892
if __name__ == "__main__":

0 commit comments

Comments
 (0)