Skip to content

Commit

Permalink
updates for consumer integration
Browse files Browse the repository at this point in the history
  • Loading branch information
iluxa committed Jan 24, 2024
1 parent c02a284 commit 7dcf057
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 46 deletions.
4 changes: 3 additions & 1 deletion packet_sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,9 @@ func (s *PacketSorter) initCbufPcap() {
}

func (s *PacketSorter) initSocketPcap() {
s.socketPcap = NewSocketPcap(misc.GetPacketSocketPath())
unixSocketFile := misc.GetPacketSocketPath()
_ = os.Remove(unixSocketFile)
s.socketPcap = NewSocketPcap(unixSocketFile)
}

func (s *PacketSorter) Close() {
Expand Down
127 changes: 82 additions & 45 deletions packet_unix_socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,21 @@ import (
"sync"
"syscall"
"time"
"unsafe"

"github.com/kubeshark/gopacket"
"github.com/kubeshark/tracer/pkg/unixpacket"
"github.com/rs/zerolog/log"
)

const (
maxUnixPacketClients = 10
packetHeaderSize = int(unsafe.Sizeof(PacketHeader{}))
)

type SocketPcapConnection struct {
packetCounter uint64
writeChannel chan []byte
packetSent uint64
packetDropped uint64
}

type SocketPcap struct {
Expand All @@ -28,9 +30,81 @@ type SocketPcap struct {
sync.Mutex
}

type PacketHeader struct {
packetCounter uint64
timestamp uint64
func (s *SocketPcapConnection) Run(conn *net.UnixConn, sock *SocketPcap) {
for {
select {
case buf := <-s.writeChannel:
_, err := conn.Write(buf)
if err != nil {
if errors.Is(err, syscall.EPIPE) {
log.Info().Str("Address", conn.RemoteAddr().String()).Msg("Unix socket connection closed:")
} else {
log.Error().Err(err).Str("Address", conn.RemoteAddr().String()).Msg("Unix socket connection error:")
}
sock.Disconnected(conn)
return
}

}
}
}

func (s *SocketPcap) WritePacket(pkt gopacket.SerializeBuffer) error {
s.Lock()
defer s.Unlock()
defer func() {
s.packetCounter++
}()
if len(s.connections) == 0 {
return nil
}

hdrBytes, err := pkt.PrependBytes(unixpacket.PacketHeaderSize)
if err != nil {
return err
}

p := unixpacket.PacketUnixSocket(hdrBytes)
hdr := p.GetHeader()
hdr.Timestamp = uint64(time.Now().UnixNano())
// clear buffer at the end as soon as it is prepended with specific data
defer func() {
_ = pkt.Clear()
}()

buf := pkt.Bytes()
for _, conn := range s.connections {
copyBuf := make([]byte, len(buf))
copy(copyBuf, buf)
p = unixpacket.PacketUnixSocket(copyBuf)
hdr = p.GetHeader()
hdr.PacketCounter = conn.packetCounter
conn.packetCounter++
select {
case conn.writeChannel <- copyBuf:
conn.packetSent++
default:
conn.packetDropped++
}
}
return nil
}

func (s *SocketPcap) Connected(conn *net.UnixConn) {
ch := make(chan []byte, 8)
s.Lock()
defer s.Unlock()
c := &SocketPcapConnection{
writeChannel: ch,
}
s.connections[conn] = c
go c.Run(conn, s)
}

func (s *SocketPcap) Disconnected(conn *net.UnixConn) {
s.Lock()
defer s.Unlock()
delete(s.connections, conn)
}

func NewSocketPcap(unixSocketFileName string) *SocketPcap {
Expand Down Expand Up @@ -61,47 +135,10 @@ func (c *SocketPcap) acceptClients(l *net.UnixListener) {
if c.clientsConnected == maxUnixPacketClients {
log.Info().Str("Address", conn.RemoteAddr().String()).Msg("Unix socket max connections exceeded, closing:")
conn.Close()
} else {
c.connections[conn] = &SocketPcapConnection{}
c.clientsConnected++
}
c.Unlock()
}
}

func (c *SocketPcap) WritePacket(buf gopacket.SerializeBuffer) error {
var err error
var hdrBytes []byte
c.Lock()
defer c.Unlock()
c.packetCounter++
if len(c.connections) > 0 {
hdrBytes, err = buf.PrependBytes(packetHeaderSize)
if err != nil {
return err
}
// clear buffer at the end as soon as it is prepended with specific data
defer func() {
_ = buf.Clear()
}()
}
for sock, conn := range c.connections {
hdr := (*PacketHeader)(unsafe.Pointer(&hdrBytes))
hdr.packetCounter = conn.packetCounter

_, err = sock.Write(buf.Bytes())

if err != nil {
if errors.Is(err, syscall.EPIPE) {
log.Info().Str("Address", sock.RemoteAddr().String()).Msg("Unix socket connection closed:")
} else {
log.Error().Err(err).Str("Address", sock.RemoteAddr().String()).Msg("Unix socket connection error:")
}
delete(c.connections, sock)
c.Unlock()
continue
}
conn.packetCounter++
c.Unlock()
c.Connected(conn)
}

return nil
}
24 changes: 24 additions & 0 deletions pkg/unixpacket/pkt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package unixpacket

import (
"unsafe"
)

const PacketHeaderSize = int(unsafe.Sizeof(PacketUnixSocketHeader{}))

type PacketUnixSocketHeader struct {
PacketCounter uint64
Timestamp uint64
}

type PacketUnixSocket []byte

func (pkt *PacketUnixSocket) GetHeader() *PacketUnixSocketHeader {
data := []byte(*pkt)
return (*PacketUnixSocketHeader)(unsafe.Pointer(&data[0]))
}

func (pkt *PacketUnixSocket) GetData() []byte {
data := []byte(*pkt)
return data[PacketHeaderSize:]
}

0 comments on commit 7dcf057

Please sign in to comment.