Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions SConstruct
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ Clean(["."], cache_dir)
# Build common module
SConscript(['common/SConscript'])
Import('_common')
common = [_common, 'json11', 'zmq']
common = [_common, 'json11']
Export('common')

# Build messaging (cereal + msgq + socketmaster + their dependencies)
Expand All @@ -189,7 +189,7 @@ SConscript(['opendbc_repo/SConscript'], exports={'env': env_swaglog})
SConscript(['cereal/SConscript'])

Import('socketmaster', 'msgq')
messaging = [socketmaster, msgq, 'capnp', 'kj',]
messaging = [socketmaster, msgq, 'zmq', 'capnp', 'kj']
Export('messaging')


Expand Down
2 changes: 1 addition & 1 deletion cereal/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ cereal = env.Library('cereal', [f'gen/cpp/{s}.c++' for s in schema_files])

# Build messaging
services_h = env.Command(['services.h'], ['services.py'], 'python3 ' + cereal_dir.path + '/services.py > $TARGET')
env.Program('messaging/bridge', ['messaging/bridge.cc', 'messaging/msgq_to_zmq.cc'], LIBS=[msgq, common, 'pthread'])
env.Program('messaging/bridge', ['messaging/bridge.cc', 'messaging/msgq_to_zmq.cc'], LIBS=[msgq, 'zmq', common, 'pthread'])

socketmaster = env.Library('socketmaster', ['messaging/socketmaster.cc'])

Expand Down
4 changes: 2 additions & 2 deletions common/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ Export('_common')
if GetOption('extras'):
env.Program('tests/test_common',
['tests/test_runner.cc', 'tests/test_params.cc', 'tests/test_util.cc', 'tests/test_swaglog.cc'],
LIBS=[_common, 'json11', 'zmq', 'pthread'])
LIBS=[_common, 'json11', 'pthread'])

# Cython bindings
params_python = envCython.Program('params_pyx.so', 'params_pyx.pyx', LIBS=envCython['LIBS'] + [_common, 'zmq', 'json11'])
params_python = envCython.Program('params_pyx.so', 'params_pyx.pyx', LIBS=envCython['LIBS'] + [_common, 'json11'])

common_python = [params_python]

Expand Down
88 changes: 88 additions & 0 deletions common/ipc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import os
import socket


class PushSocket:
"""Non-blocking PUSH socket using Unix datagram sockets."""

def __init__(self):
self.sock: socket.socket | None = None
self.path: str | None = None

def connect(self, ipc_path: str):
"""Connect to a Unix domain socket.

Args:
ipc_path: Socket path in format 'ipc:///path/to/socket' or '/path/to/socket'
"""
self.path = ipc_path.replace("ipc://", "")
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self.sock.setblocking(False)

def send(self, data: bytes) -> bool:
"""Send data to the socket (non-blocking).

Returns True if sent successfully, False if dropped.
Max message size is ~64KB on Linux (kernel limit for datagrams).
"""
if self.sock is None or self.path is None:
return False
try:
self.sock.sendto(data, self.path)
return True
except (BlockingIOError, ConnectionRefusedError, FileNotFoundError, OSError):
# Drop message on any send error (matches ZMQ NOBLOCK behavior)
# OSError includes EMSGSIZE (message too long) for large datagrams
return False

def close(self):
if self.sock:
self.sock.close()
self.sock = None


class PullSocket:
"""Blocking/non-blocking PULL socket using Unix datagram sockets."""

def __init__(self):
self.sock: socket.socket | None = None
self.path: str | None = None

def bind(self, ipc_path: str):
"""Bind to a Unix domain socket.

Args:
ipc_path: Socket path in format 'ipc:///path/to/socket' or '/path/to/socket'
"""
self.path = ipc_path.replace("ipc://", "")
if os.path.exists(self.path):
os.unlink(self.path)
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self.sock.bind(self.path)

def recv(self, flags: int = 0) -> bytes:
"""Receive data from the socket.

Args:
flags: If non-zero, use non-blocking mode

Returns:
Received bytes

Raises:
BlockingIOError: If non-blocking and no data available
"""
if self.sock is None:
raise RuntimeError("Socket not bound")
if flags: # NOBLOCK
self.sock.setblocking(False)
else:
self.sock.setblocking(True)
return self.sock.recv(65536)

def close(self):
if self.sock:
self.sock.close()
self.sock = None
if self.path and os.path.exists(self.path):
os.unlink(self.path)
47 changes: 34 additions & 13 deletions common/swaglog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,38 @@
#include <mutex>
#include <string>

#include <zmq.h>
#include <clocale>
#include <cstring>
#include <fcntl.h>
#include <stdarg.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>

#include "third_party/json11/json11.hpp"
#include "common/version.h"
#include "system/hardware/hw.h"

class SwaglogState {
public:
SwaglogState() {
zctx = zmq_ctx_new();
sock = zmq_socket(zctx, ZMQ_PUSH);

// Timeout on shutdown for messages to be received by the logging process
int timeout = 100;
zmq_setsockopt(sock, ZMQ_LINGER, &timeout, sizeof(timeout));
zmq_connect(sock, Path::swaglog_ipc().c_str());
// Create Unix datagram socket
sock_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (sock_fd >= 0) {
// Set non-blocking
int flags = fcntl(sock_fd, F_GETFL, 0);
fcntl(sock_fd, F_SETFL, flags | O_NONBLOCK);

// Set up destination address
memset(&sock_addr, 0, sizeof(sock_addr));
sock_addr.sun_family = AF_UNIX;
std::string ipc_path = Path::swaglog_ipc();
// Convert "ipc:///path" to "/path"
if (ipc_path.rfind("ipc://", 0) == 0) {
ipc_path = ipc_path.substr(6);
}
strncpy(sock_addr.sun_path, ipc_path.c_str(), sizeof(sock_addr.sun_path) - 1);
}

// workaround for https://github.com/dropbox/json11/issues/38
setlocale(LC_NUMERIC, "C");
Expand Down Expand Up @@ -62,21 +78,26 @@ class SwaglogState {
}

~SwaglogState() {
zmq_close(sock);
zmq_ctx_destroy(zctx);
if (sock_fd >= 0) {
close(sock_fd);
}
}

void log(int levelnum, const char* filename, int lineno, const char* func, const char* msg, const std::string& log_s) {
std::lock_guard lk(lock);
if (levelnum >= print_level) {
printf("%s: %s\n", filename, msg);
}
zmq_send(sock, log_s.data(), log_s.length(), ZMQ_NOBLOCK);
if (sock_fd >= 0) {
// Non-blocking sendto, silently drops on failure (matches ZMQ NOBLOCK behavior)
sendto(sock_fd, log_s.data(), log_s.length(), 0,
(struct sockaddr*)&sock_addr, sizeof(sock_addr));
}
}

std::mutex lock;
void* zctx = nullptr;
void* sock = nullptr;
int sock_fd = -1;
struct sockaddr_un sock_addr;
int print_level;
json11::Json::object ctx_j;
};
Expand Down
24 changes: 5 additions & 19 deletions common/swaglog.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import logging
import os
import time
import warnings
from pathlib import Path
from logging.handlers import BaseRotatingHandler

import zmq

from openpilot.common.ipc import PushSocket
from openpilot.common.logging_extra import SwagLogger, SwagFormatter, SwagLogFileFormatter
from openpilot.system.hardware.hw import Paths

Expand Down Expand Up @@ -68,8 +66,6 @@ def __init__(self, formatter):
logging.Handler.__init__(self)
self.setFormatter(formatter)
self.pid = None

self.zctx = None
self.sock = None

def __del__(self):
Expand All @@ -78,30 +74,20 @@ def __del__(self):
def close(self):
if self.sock is not None:
self.sock.close()
if self.zctx is not None:
self.zctx.term()
self.sock = None

def connect(self):
self.zctx = zmq.Context()
self.sock = self.zctx.socket(zmq.PUSH)
self.sock.setsockopt(zmq.LINGER, 10)
self.sock = PushSocket()
self.sock.connect(Paths.swaglog_ipc())
self.pid = os.getpid()

def emit(self, record):
if os.getpid() != self.pid:
# TODO suppresses warning about forking proc with zmq socket, fix root cause
warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<zmq.*>")
self.connect()

msg = self.format(record).rstrip('\n')
# print("SEND".format(repr(msg)))
try:
s = chr(record.levelno)+msg
self.sock.send(s.encode('utf8'), zmq.NOBLOCK)
except zmq.error.Again:
# drop :/
pass
s = chr(record.levelno) + msg
self.sock.send(s.encode('utf8')) # Non-blocking, drops on failure


class ForwardingHandler(logging.Handler):
Expand Down
Loading
Loading