Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 83 additions & 9 deletions dissect/target/plugins/os/unix/log/utmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@

import ipaddress
import struct
from enum import IntEnum
from typing import TYPE_CHECKING, NamedTuple

from dissect.cstruct import cstruct
from dissect.util.ts import from_unix
from dissect.database.sqlite3 import SQLite3
from dissect.util.ts import from_unix, from_unix_us

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.fsutil import open_decompress
Expand All @@ -21,13 +23,16 @@

UTMP_FIELDS = [
("datetime", "ts"),
("datetime", "ts_logout"),
("string", "ut_type"),
("string", "ut_user"),
("varint", "ut_pid"),
("string", "ut_line"),
("string", "ut_id"),
("string", "ut_host"),
("net.ipaddress", "ut_addr"),
("string", "ut_service"),
("path", "source"),
]

BtmpRecord = TargetRecordDescriptor(
Expand Down Expand Up @@ -90,13 +95,15 @@

class UtmpRecord(NamedTuple):
ts: datetime
ts_logout: datetime | None # utmpdb specific
ut_type: str
ut_user: str
ut_pid: int
ut_pid: int | None
ut_line: str
ut_id: str
ut_id: str | None
ut_host: str
ut_addr: ipaddress.IPv4Address | ipaddress.IPv6Address | None
ut_service: str | None # utmpdb specific


class UtmpFile:
Expand Down Expand Up @@ -140,42 +147,103 @@ def __iter__(self) -> Iterator[UtmpRecord]:

yield UtmpRecord(
ts=from_unix(entry.ut_tv.tv_sec),
ts_logout=None,
ut_type=r_type,
ut_pid=entry.ut_pid,
ut_user=entry.ut_user.decode(errors="surrogateescape").strip("\x00"),
ut_line=entry.ut_line.decode(errors="surrogateescape").strip("\x00"),
ut_id=entry.ut_id.decode(errors="surrogateescape").strip("\x00"),
ut_host=ut_host,
ut_addr=ut_addr,
ut_service=None,
)

except EOFError: # noqa: PERF203
break


class WtmpDbEntryType(IntEnum):
"""IntEnum of WtmpDb entry Types.

This differs from utmp's Type enum as BOOT_TIME and RUNLEVEL are swapped
and USER_PROCESS is 0x3 instead of 0x7.

References:
- https://github.com/thkukuk/wtmpdb/blob/main/include/wtmpdb.h
"""

EMPTY = 0
BOOT_TIME = 1
RUN_LVL = 2 # called RUNLEVEL in wtmpdb.h, renamed for consistency with utmp
USER_PROCESS = 3


class WtmpDbFile:
"""Parser for WtmpDb files.

References:
- https://github.com/thkukuk/wtmpdb
- https://packages.debian.org/trixie/libpam-wtmpdb
"""

def __init__(self, path: Path):
self.path = path
self.db = SQLite3(path)

def __iter__(self) -> Iterator[UtmpRecord]:
"""Iterate over the ``wtmp`` SQLite3 table.

References:
- https://github.com/thkukuk/wtmpdb/blob/main/README.md#database
"""
if not (table := self.db.table("wtmp")):
return None

for row in table.rows():
yield UtmpRecord(
ts=from_unix_us(row.Login),
ts_logout=from_unix_us(row.Logout) if row.Logout else None,
ut_type=WtmpDbEntryType(row.Type).name,
ut_pid=None,
ut_user=row.User,
ut_line=row.TTY,
ut_id=None,
ut_host=row.RemoteHost or None,
ut_addr=row.RemoteHost or None,
ut_service=row.Service,
)


class UtmpPlugin(Plugin):
"""Unix utmp log plugin."""

def __init__(self, target: Target):
super().__init__(target)
self.btmp_paths = list(self.target.fs.path("/").glob("var/log/btmp*"))
self.wtmp_paths = list(self.target.fs.path("/").glob("var/log/wtmp*"))
self.wtmp_paths = list(self.target.fs.path("/").glob("var/log/wtmp*")) + list(
self.target.fs.path("/").glob("var/lib/wtmpdb/wtmp*")
)
self.utmp_paths = list(self.target.fs.path("/").glob("var/run/utmp*"))

def check_compatible(self) -> None:
if not any(self.btmp_paths + self.wtmp_paths + self.utmp_paths):
raise UnsupportedPluginError("No wtmp and/or btmp log files found")

def _build_record(self, record: TargetRecordDescriptor, entry: UtmpRecord) -> Iterator[BtmpRecord | WtmpRecord]:
def _build_record(
self, record: TargetRecordDescriptor, entry: UtmpRecord, source: Path
) -> Iterator[BtmpRecord | WtmpRecord]:
return record(
ts=entry.ts,
ts_logout=entry.ts_logout,
ut_type=entry.ut_type,
ut_pid=entry.ut_pid,
ut_user=entry.ut_user,
ut_line=entry.ut_line,
ut_id=entry.ut_id,
ut_host=entry.ut_host,
ut_addr=entry.ut_addr,
ut_service=entry.ut_service,
source=source,
_target=self.target,
)

Expand All @@ -195,12 +263,12 @@ def btmp(self) -> Iterator[BtmpRecord]:
continue

for entry in UtmpFile(path):
yield self._build_record(BtmpRecord, entry)
yield self._build_record(BtmpRecord, entry, path)

@alias("utmp")
@export(record=WtmpRecord)
def wtmp(self) -> Iterator[WtmpRecord]:
"""Yield contents of wtmp log files.
"""Yield contents of wtmp and wtmpdb log files.

The wtmp file contains the historical data of the utmp file. The utmp file contains information about users
logins at which terminals, logouts, system events and current status of the system, system boot time
Expand All @@ -209,11 +277,17 @@ def wtmp(self) -> Iterator[WtmpRecord]:
References:
- https://www.thegeekdiary.com/what-is-the-purpose-of-utmp-wtmp-and-btmp-files-in-linux/
"""
seen = set()

for path in self.wtmp_paths + self.utmp_paths:
if not path.is_file():
self.target.log.warning("Unable to parse wtmp file: %s is not a file", path)
continue

for entry in UtmpFile(path):
yield self._build_record(WtmpRecord, entry)
if any(seen_path.samefile(path) for seen_path in seen):
continue
seen.add(path)

iterator = WtmpDbFile if path.suffix == ".db" else UtmpFile
for entry in iterator(path):
yield self._build_record(WtmpRecord, entry, path)
3 changes: 3 additions & 0 deletions tests/_data/plugins/os/unix/log/wtmp/wtmp.db
Git LFS file not shown
42 changes: 41 additions & 1 deletion tests/plugins/os/unix/log/test_utmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from datetime import datetime, timezone
from typing import TYPE_CHECKING

from dissect.target.plugins.os.unix.log.utmp import UtmpPlugin
from dissect.target.plugins.os.unix.log.utmp import UtmpPlugin, WtmpDbEntryType
from tests._utils import absolute_path

if TYPE_CHECKING:
Expand Down Expand Up @@ -79,3 +79,43 @@ def test_utmp_plugin(target_linux: Target, fs_linux: VirtualFilesystem) -> None:
target_linux.add_plugin(UtmpPlugin)
results = list(target_linux.utmp())
assert len(results) == 70


def test_wtmpdb(target_linux: Target, fs_linux: VirtualFilesystem) -> None:
"""Test if we can parse a /var/log/wtmp.db SQLite3 file from libpam-wtmpdb."""

fs_linux.map_file("/var/log/wtmp.db", absolute_path("_data/plugins/os/unix/log/wtmp/wtmp.db"))
fs_linux.map_file("/var/lib/wtmpdb/wtmp.db", absolute_path("_data/plugins/os/unix/log/wtmp/wtmp.db"))
fs_linux.map_file("/var/lib/wtmpdb/wtmp_20261231.db", absolute_path("_data/plugins/os/unix/log/wtmp/wtmp.db"))

plugin = target_linux.add_plugin(UtmpPlugin)
results = list(target_linux.wtmp())

assert len(results) == 3
assert list(map(str, plugin.wtmp_paths)) == [
"/var/log/wtmp.db",
"/var/lib/wtmpdb/wtmp.db",
"/var/lib/wtmpdb/wtmp_20261231.db",
]

assert results[0].ts == datetime(2026, 1, 28, 13, 13, 52, 776830, tzinfo=timezone.utc)
assert not results[0].ts_logout # user is still logged in
assert results[0].ut_type == WtmpDbEntryType.USER_PROCESS.name
assert results[0].ut_user == "root"
assert results[0].ut_line == "pts/0"
assert not results[0].ut_host
assert not results[0].ut_addr
assert results[0].ut_service == "su"
assert results[0].source == "/var/log/wtmp.db"
assert results[0].hostname == "localhost"

assert results[1].ts == datetime(2026, 1, 28, 13, 13, 52, 776830, tzinfo=timezone.utc)
assert results[1].ts_logout == datetime(2026, 1, 28, 13, 13, 52, 876830, tzinfo=timezone.utc)
assert results[1].ut_type == WtmpDbEntryType.USER_PROCESS.name
assert results[1].ut_user == "root"
assert results[1].ut_line == "ssh"
assert results[1].ut_host == "127.0.0.1"
assert results[1].ut_addr == "127.0.0.1"
assert results[1].ut_service == "sshd"
assert results[1].source == "/var/log/wtmp.db"
assert results[1].hostname == "localhost"