From 7dcf057daf06a14b31daa9a74d559c601781efd5 Mon Sep 17 00:00:00 2001 From: Ilya Gavrilov Date: Wed, 24 Jan 2024 15:06:58 +0100 Subject: [PATCH] updates for consumer integration --- packet_sorter.go | 4 +- packet_unix_socket.go | 127 +++++++++++++++++++++++++++--------------- pkg/unixpacket/pkt.go | 24 ++++++++ 3 files changed, 109 insertions(+), 46 deletions(-) create mode 100644 pkg/unixpacket/pkt.go diff --git a/packet_sorter.go b/packet_sorter.go index f054728..ee3fd94 100644 --- a/packet_sorter.go +++ b/packet_sorter.go @@ -152,7 +152,9 @@ func (s *PacketSorter) initCbufPcap() { } func (s *PacketSorter) initSocketPcap() { - s.socketPcap = NewSocketPcap(misc.GetPacketSocketPath()) + unixSocketFile := misc.GetPacketSocketPath() + _ = os.Remove(unixSocketFile) + s.socketPcap = NewSocketPcap(unixSocketFile) } func (s *PacketSorter) Close() { diff --git a/packet_unix_socket.go b/packet_unix_socket.go index 2f1b194..57cc83b 100644 --- a/packet_unix_socket.go +++ b/packet_unix_socket.go @@ -6,19 +6,21 @@ import ( "sync" "syscall" "time" - "unsafe" "github.com/kubeshark/gopacket" + "github.com/kubeshark/tracer/pkg/unixpacket" "github.com/rs/zerolog/log" ) const ( maxUnixPacketClients = 10 - packetHeaderSize = int(unsafe.Sizeof(PacketHeader{})) ) type SocketPcapConnection struct { packetCounter uint64 + writeChannel chan []byte + packetSent uint64 + packetDropped uint64 } type SocketPcap struct { @@ -28,9 +30,81 @@ type SocketPcap struct { sync.Mutex } -type PacketHeader struct { - packetCounter uint64 - timestamp uint64 +func (s *SocketPcapConnection) Run(conn *net.UnixConn, sock *SocketPcap) { + for { + select { + case buf := <-s.writeChannel: + _, err := conn.Write(buf) + if err != nil { + if errors.Is(err, syscall.EPIPE) { + log.Info().Str("Address", conn.RemoteAddr().String()).Msg("Unix socket connection closed:") + } else { + log.Error().Err(err).Str("Address", conn.RemoteAddr().String()).Msg("Unix socket connection error:") + } + sock.Disconnected(conn) + return + } + + } + } +} + +func (s *SocketPcap) WritePacket(pkt gopacket.SerializeBuffer) error { + s.Lock() + defer s.Unlock() + defer func() { + s.packetCounter++ + }() + if len(s.connections) == 0 { + return nil + } + + hdrBytes, err := pkt.PrependBytes(unixpacket.PacketHeaderSize) + if err != nil { + return err + } + + p := unixpacket.PacketUnixSocket(hdrBytes) + hdr := p.GetHeader() + hdr.Timestamp = uint64(time.Now().UnixNano()) + // clear buffer at the end as soon as it is prepended with specific data + defer func() { + _ = pkt.Clear() + }() + + buf := pkt.Bytes() + for _, conn := range s.connections { + copyBuf := make([]byte, len(buf)) + copy(copyBuf, buf) + p = unixpacket.PacketUnixSocket(copyBuf) + hdr = p.GetHeader() + hdr.PacketCounter = conn.packetCounter + conn.packetCounter++ + select { + case conn.writeChannel <- copyBuf: + conn.packetSent++ + default: + conn.packetDropped++ + } + } + return nil +} + +func (s *SocketPcap) Connected(conn *net.UnixConn) { + ch := make(chan []byte, 8) + s.Lock() + defer s.Unlock() + c := &SocketPcapConnection{ + writeChannel: ch, + } + s.connections[conn] = c + go c.Run(conn, s) +} + +func (s *SocketPcap) Disconnected(conn *net.UnixConn) { + s.Lock() + defer s.Unlock() + delete(s.connections, conn) } func NewSocketPcap(unixSocketFileName string) *SocketPcap { @@ -61,47 +135,10 @@ func (c *SocketPcap) acceptClients(l *net.UnixListener) { if c.clientsConnected == maxUnixPacketClients { log.Info().Str("Address", conn.RemoteAddr().String()).Msg("Unix socket max connections exceeded, closing:") conn.Close() - } else { - c.connections[conn] = &SocketPcapConnection{} - c.clientsConnected++ - } - c.Unlock() - } -} - -func (c *SocketPcap) WritePacket(buf gopacket.SerializeBuffer) error { - var err error - var hdrBytes []byte - c.Lock() - defer c.Unlock() - c.packetCounter++ - if len(c.connections) > 0 { - hdrBytes, err = buf.PrependBytes(packetHeaderSize) - if err != nil { - return err - } - // clear buffer at the end as soon as it is prepended with specific data - defer func() { - _ = buf.Clear() - }() - } - for sock, conn := range c.connections { - hdr := (*PacketHeader)(unsafe.Pointer(&hdrBytes)) - hdr.packetCounter = conn.packetCounter - - _, err = sock.Write(buf.Bytes()) - - if err != nil { - if errors.Is(err, syscall.EPIPE) { - log.Info().Str("Address", sock.RemoteAddr().String()).Msg("Unix socket connection closed:") - } else { - log.Error().Err(err).Str("Address", sock.RemoteAddr().String()).Msg("Unix socket connection error:") - } - delete(c.connections, sock) + c.Unlock() continue } - conn.packetCounter++ + c.Unlock() + c.Connected(conn) } - - return nil } diff --git a/pkg/unixpacket/pkt.go b/pkg/unixpacket/pkt.go new file mode 100644 index 0000000..8642e3a --- /dev/null +++ b/pkg/unixpacket/pkt.go @@ -0,0 +1,24 @@ +package unixpacket + +import ( + "unsafe" +) + +const PacketHeaderSize = int(unsafe.Sizeof(PacketUnixSocketHeader{})) + +type PacketUnixSocketHeader struct { + PacketCounter uint64 + Timestamp uint64 +} + +type PacketUnixSocket []byte + +func (pkt *PacketUnixSocket) GetHeader() *PacketUnixSocketHeader { + data := []byte(*pkt) + return (*PacketUnixSocketHeader)(unsafe.Pointer(&data[0])) +} + +func (pkt *PacketUnixSocket) GetData() []byte { + data := []byte(*pkt) + return data[PacketHeaderSize:] +}