Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 105 additions & 35 deletions dissect/target/plugins/os/unix/log/lastlog.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,22 @@
from __future__ import annotations

from typing import TYPE_CHECKING, BinaryIO
from dataclasses import asdict, dataclass
from typing import TYPE_CHECKING

from dissect.cstruct import cstruct
from dissect.util import ts
from dissect.database.sqlite3 import SQLite3
from dissect.util.ts import from_unix

from dissect.target.exceptions import FileNotFoundError, UnsupportedPluginError
from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import Plugin, export

if TYPE_CHECKING:
from collections.abc import Iterator
from datetime import datetime
from pathlib import Path

from dissect.target.target import Target

LastLogRecord = TargetRecordDescriptor(
"linux/log/lastlog",
Expand All @@ -20,6 +26,8 @@
("string", "ut_user"), # name
("string", "ut_host"), # source
("string", "ut_tty"), # port
("string", "ut_service"),
("path", "source"),
],
)

Expand All @@ -44,56 +52,118 @@
c_lastlog = cstruct().load(lastlog_def)


@dataclass
class LastLogEntry:
ts: datetime
uid: int
ut_user: str | None
ut_tty: str | None
ut_host: str | None
ut_service: str | None # lastlog2 specific


class LastLogFile:
def __init__(self, fh: BinaryIO):
self.fh = fh
"""Lastlog sparse file iterator.

References:
- https://github.com/linux-pam/linux-pam/tree/master/modules/pam_lastlog
"""

def __init__(self, path: Path, users: dict | None):
self.path = path
self.fh = path.open()
self.users = users

def __iter__(self):
def __iter__(self) -> Iterator[LastLogEntry]:
idx = -1
while True:
try:
yield c_lastlog.entry(self.fh)
except EOFError: # noqa: PERF203
idx += 1
entry = c_lastlog.entry(self.fh)
user = self.users.get(idx)

if entry.ll_time.tv_sec == 0:
continue

yield LastLogEntry(
ts=from_unix(entry.ll_time.tv_sec),
uid=idx,
ut_user=user,
ut_tty=entry.ut_user.decode().strip("\x00") or None,
ut_host=entry.ut_host.decode(errors="ignore").strip("\x00") or None,
ut_service=None,
)

except EOFError:
break


class LastLogDb:
"""Lastlog2 database file iterator.

References:
- https://github.com/util-linux/util-linux/tree/master/liblastlog2
- https://github.com/util-linux/util-linux/tree/master/pam_lastlog2
"""

def __init__(self, path: Path, users: dict | None):
self.path = path
self.db = SQLite3(path)
self.users: dict[str, int] = {v: k for k, v in users.items()}

def __iter__(self) -> Iterator[LastLogEntry]:
if not (table := self.db.table("Lastlog2")):
return None

for row in table.rows():
yield LastLogEntry(
ts=from_unix(row.Time),
uid=self.users.get(row.Name),
ut_user=row.Name,
ut_tty=row.TTY,
ut_host=row.RemoteHost or None,
ut_service=row.Service,
)


class LastLogPlugin(Plugin):
"""Unix lastlog plugin."""
"""UNIX lastlog plugin."""

def __init__(self, target: Target):
super().__init__(target)

self.paths = list(self.target.fs.path("/").glob("var/log/lastlog*")) + list(
self.target.fs.path("/").glob("var/lib/lastlog/lastlog2*")
)

def check_compatible(self) -> None:
lastlog = self.target.fs.path("/var/log/lastlog")
if not lastlog.exists():
raise UnsupportedPluginError("No lastlog file found")
if not self.paths:
raise UnsupportedPluginError("No lastlog file(s) found on target")

@export(record=LastLogRecord)
def lastlog(self) -> Iterator[LastLogRecord]:
"""Return last logins information from /var/log/lastlog.
"""Return login information from ``/var/log/lastlog`` and ``/var/lib/lastlog/lastlog2.db`` files.

The lastlog file contains the most recent logins of all users on a Unix based operating system.
Lastlog files contain the most recent logins of all users on a UNIX based operating system.

Newer UNIX distributions use ``lastlog2`` and ``liblastlog2`` which use SQLite3 database files.

References:
- https://www.tutorialspoint.com/unix_commands/lastlog.htm
"""
try:
lastlog = self.target.fs.open("/var/log/lastlog")
except FileNotFoundError:
return

users = {}
for user in self.target.users():
users[user.uid] = user.name
seen = set()
users: dict[int, str] = (
{user.uid: user.name for user in self.target.users()} if self.target.has_function("users") else {}
)

log = LastLogFile(lastlog)

for idx, entry in enumerate(log):
# if ts=0 the uid has never logged in before
if entry.ut_host.strip(b"\x00") == b"" or entry.ll_time.tv_sec == 0:
for path in self.paths:
if any(seen_path.samefile(path) for seen_path in seen):
continue

yield LastLogRecord(
ts=ts.from_unix(entry.ll_time.tv_sec),
uid=idx,
ut_user=users.get(idx),
ut_tty=entry.ut_user.decode().strip("\x00"),
ut_host=entry.ut_host.decode(errors="ignore").strip("\x00"),
_target=self.target,
)
iterator = LastLogDb if path.suffix == ".db" else LastLogFile
for entry in iterator(path, users):
yield LastLogRecord(
**asdict(entry),
source=path,
_target=self.target,
)
3 changes: 3 additions & 0 deletions tests/_data/plugins/os/unix/log/lastlog/lastlog2.db
Git LFS file not shown
55 changes: 50 additions & 5 deletions tests/plugins/os/unix/log/test_lastlog.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,62 @@
from dissect.target.target import Target


def test_lastlog_plugin(target_linux: Target, fs_linux: VirtualFilesystem) -> None:
def test_lastlog_sparse(target_linux_users: Target, fs_linux: VirtualFilesystem) -> None:
"""Test if we can correctly parse a sparse lastlog file."""

data_file = absolute_path("_data/plugins/os/unix/log/lastlog/lastlog")
fs_linux.map_file("/var/log/lastlog", data_file)

target_linux.add_plugin(LastLogPlugin)
target_linux_users.add_plugin(LastLogPlugin)

results = list(target_linux.lastlog())
assert len(results) == 1
results = sorted(target_linux_users.lastlog(), key=lambda r: r.ts)
assert len(results) == 3

assert results[0].ts == datetime(2021, 12, 8, 16, 14, 6, tzinfo=timezone.utc)
assert results[0].uid == 1001
assert results[0].ut_user is None
assert results[0].ut_user is None # since 1001 is not defined in target_linux_users /etc/passwd.
assert results[0].ut_host == "127.0.0.1"
assert results[0].ut_tty == "pts/0"
assert results[0].source == "/var/log/lastlog"

assert results[1].ts == datetime(2021, 12, 8, 16, 20, 23, tzinfo=timezone.utc)
assert results[1].uid == 0
assert results[1].ut_user == "root"
assert results[1].ut_host is None
assert results[1].ut_tty == "tty1"
assert results[1].source == "/var/log/lastlog"

assert results[2].ts == datetime(2022, 12, 7, 16, 33, 30, tzinfo=timezone.utc)
assert results[2].uid == 1000
assert results[2].ut_user == "user"
assert results[2].ut_host is None
assert results[2].ut_tty == "tty1"
assert results[2].source == "/var/log/lastlog"


def test_lastlog_sqlite(target_linux_users: Target, fs_linux: VirtualFilesystem) -> None:
"""Test if we can parse a lastlog2 SQLite3 database."""

fs_linux.map_file("/var/lib/lastlog/lastlog2.db", absolute_path("_data/plugins/os/unix/log/lastlog/lastlog2.db"))

target_linux_users.add_plugin(LastLogPlugin)

records = sorted(target_linux_users.lastlog(), key=lambda r: r.ts)

assert len(records) == 2

assert records[0].ts == datetime(2026, 1, 28, 15, 2, 33, tzinfo=timezone.utc)
assert records[0].uid == 0 # reverse lookup from /etc/passwd
assert records[0].ut_user == "root"
assert records[0].ut_host is None
assert records[0].ut_tty == "pts/0"
assert records[0].ut_service == "su"
assert records[0].source == "/var/lib/lastlog/lastlog2.db"

assert records[1].ts == datetime(2026, 2, 9, 4, 49, 13, tzinfo=timezone.utc)
assert records[1].uid == 1000 # reverse lookup from /etc/passwd
assert records[1].ut_user == "user"
assert records[1].ut_host == "127.0.0.1"
assert records[1].ut_tty == "ssh"
assert records[1].ut_service == "sshd"
assert records[1].source == "/var/lib/lastlog/lastlog2.db"