Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 203 additions & 13 deletions pkg/stanza/split/split.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"regexp"

"golang.org/x/text/encoding"
"golang.org/x/text/encoding/unicode"
)

// Config is the configuration for a split func
Expand Down Expand Up @@ -41,33 +42,110 @@ func (c Config) Func(enc encoding.Encoding, flushAtEOF bool, maxLogSize int) (bu
if err != nil {
return nil, fmt.Errorf("compile line end regex: %w", err)
}
return LineEndSplitFunc(re, c.OmitPattern, flushAtEOF), nil
return LineEndSplitFunc(re, c.OmitPattern, flushAtEOF, enc), nil
}

if c.LineEndPattern == "" && c.LineStartPattern != "" {
re, err := regexp.Compile("(?m)" + c.LineStartPattern)
if err != nil {
return nil, fmt.Errorf("compile line start regex: %w", err)
}
return LineStartSplitFunc(re, c.OmitPattern, flushAtEOF), nil
return LineStartSplitFunc(re, c.OmitPattern, flushAtEOF, enc), nil
}

return nil, errors.New("only one of line_start_pattern or line_end_pattern can be set")
}

// LineStartSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that start with a match to the regex pattern provided
func LineStartSplitFunc(re *regexp.Regexp, omitPattern, flushAtEOF bool) bufio.SplitFunc {
func LineStartSplitFunc(re *regexp.Regexp, omitPattern, flushAtEOF bool, enc encoding.Encoding) bufio.SplitFunc {
decoder := enc.NewDecoder()
// Check if encoding is UTF-8 or Nop - in these cases we can match directly on bytes
isUTF8 := enc == unicode.UTF8

return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
firstLoc := re.FindIndex(data)
var firstLoc []int
var firstMatchStart, firstMatchEnd int

if isUTF8 {
// For UTF-8, match directly on bytes
firstLoc = re.FindIndex(data)
if firstLoc != nil {
firstMatchStart, firstMatchEnd = firstLoc[0], firstLoc[1]
}
} else {
// For other encodings, decode and match on decoded string
decoded, decodeErr := decoder.Bytes(data)
if decodeErr != nil {
// If decode fails, it's likely due to incomplete data at buffer boundary
if !atEOF {
return 0, nil, nil // read more data
}
// At EOF, if we can't decode, try to decode a truncated buffer
// For UTF-16LE, we need even number of bytes
truncatedLen := len(data)
if truncatedLen%2 != 0 {
truncatedLen--
}
if truncatedLen > 0 {
decoded, decodeErr = decoder.Bytes(data[:truncatedLen])
if decodeErr == nil {
data = data[:truncatedLen]
} else {
// If we still can't decode, flush at EOF
if flushAtEOF && len(data) > 0 {
return len(data), data, nil
}
return 0, nil, nil
}
} else {
// If we still can't decode, flush at EOF
if flushAtEOF && len(data) > 0 {
return len(data), data, nil
}
return 0, nil, nil
}
}
decodedStr := string(decoded)
firstLoc = re.FindStringIndex(decodedStr)
if firstLoc != nil {
// Map decoded string positions back to byte positions
// We need to encode the substring up to the match position to find byte offset
matchStartStr := decodedStr[:firstLoc[0]]
matchEndStr := decodedStr[:firstLoc[1]]
encoder := enc.NewEncoder()
// Allocate buffer for encoding (UTF-16LE uses 2 bytes per ASCII char, but allocate more for safety)
startBuf := make([]byte, len(matchStartStr)*4)
nDst, _, err := encoder.Transform(startBuf, []byte(matchStartStr), true)
if err != nil {
// If encoding fails, fall back to UTF-8 matching
firstLoc = re.FindIndex(data)
if firstLoc != nil {
firstMatchStart, firstMatchEnd = firstLoc[0], firstLoc[1]
}
} else {
endBuf := make([]byte, len(matchEndStr)*4)
nDstEnd, _, err := encoder.Transform(endBuf, []byte(matchEndStr), true)
if err != nil {
firstLoc = re.FindIndex(data)
if firstLoc != nil {
firstMatchStart, firstMatchEnd = firstLoc[0], firstLoc[1]
}
} else {
firstMatchStart = nDst
firstMatchEnd = nDstEnd
}
}
}
}

if firstLoc == nil {
// Flush if no more data is expected
if len(data) != 0 && atEOF && flushAtEOF {
return len(data), data, nil
}
return 0, nil, nil // read more data and try again.
}
firstMatchStart, firstMatchEnd := firstLoc[0], firstLoc[1]

if firstMatchStart != 0 {
// the beginning of the file does not match the start pattern, so return a token up to the first match so we don't lose data
Expand All @@ -94,12 +172,47 @@ func LineStartSplitFunc(re *regexp.Regexp, omitPattern, flushAtEOF bool) bufio.S
return len(data), data, nil
}

secondLocOfset := firstMatchEnd + 1
secondLoc := re.FindIndex(data[secondLocOfset:])
// Find second match
var secondLoc []int
var secondMatchStart int

if isUTF8 {
secondLocOfset := firstMatchEnd + 1
secondLoc = re.FindIndex(data[secondLocOfset:])
if secondLoc != nil {
secondMatchStart = secondLoc[0] + secondLocOfset
}
} else {
// Decode remaining data and find second match
remainingData := data[firstMatchEnd:]
decoded, decodeErr := decoder.Bytes(remainingData)
if decodeErr != nil {
if !atEOF {
return 0, nil, nil // read more data
}
// At EOF, no second match found
secondLoc = nil
} else {
decodedStr := string(decoded)
secondLoc = re.FindStringIndex(decodedStr)
if secondLoc != nil {
// Map decoded string position back to byte position
matchStartStr := decodedStr[:secondLoc[0]]
encoder := enc.NewEncoder()
startBuf := make([]byte, len(matchStartStr)*4)
nDst, _, err := encoder.Transform(startBuf, []byte(matchStartStr), true)
if err != nil {
return 0, nil, nil
}
secondMatchStart = firstMatchEnd + nDst
}
}
}

if secondLoc == nil {
return 0, nil, nil // read more data and try again
}
secondMatchStart := secondLoc[0] + secondLocOfset

if omitPattern {
return secondMatchStart, data[firstMatchEnd:secondMatchStart], nil
}
Expand All @@ -112,9 +225,86 @@ func LineStartSplitFunc(re *regexp.Regexp, omitPattern, flushAtEOF bool) bufio.S

// LineEndSplitFunc creates a bufio.SplitFunc that splits an incoming stream into
// tokens that end with a match to the regex pattern provided
func LineEndSplitFunc(re *regexp.Regexp, omitPattern, flushAtEOF bool) bufio.SplitFunc {
func LineEndSplitFunc(re *regexp.Regexp, omitPattern, flushAtEOF bool, enc encoding.Encoding) bufio.SplitFunc {
decoder := enc.NewDecoder()
// Check if encoding is UTF-8 or Nop - in these cases we can match directly on bytes
isUTF8 := enc == unicode.UTF8

return func(data []byte, atEOF bool) (advance int, token []byte, err error) {
loc := re.FindIndex(data)
var loc []int
var matchStart, matchEnd int

if isUTF8 {
// For UTF-8, match directly on bytes
loc = re.FindIndex(data)
if loc != nil {
matchStart, matchEnd = loc[0], loc[1]
}
} else {
// For other encodings, decode and match on decoded string
decoded, decodeErr := decoder.Bytes(data)
if decodeErr != nil {
// If decode fails, it's likely due to incomplete data at buffer boundary
if !atEOF {
return 0, nil, nil // read more data
}
// At EOF, if we can't decode, try to decode a truncated buffer
// For UTF-16LE, we need even number of bytes
truncatedLen := len(data)
if truncatedLen%2 != 0 {
truncatedLen--
}
if truncatedLen > 0 {
decoded, decodeErr = decoder.Bytes(data[:truncatedLen])
if decodeErr == nil {
data = data[:truncatedLen]
} else {
// If we still can't decode, flush at EOF
if flushAtEOF && len(data) > 0 {
return len(data), data, nil
}
return 0, nil, nil
}
} else {
// If we still can't decode, flush at EOF
if flushAtEOF && len(data) > 0 {
return len(data), data, nil
}
return 0, nil, nil
}
}
decodedStr := string(decoded)
loc = re.FindStringIndex(decodedStr)
if loc != nil {
// Map decoded string positions back to byte positions
matchStartStr := decodedStr[:loc[0]]
matchEndStr := decodedStr[:loc[1]]
encoder := enc.NewEncoder()
// Allocate buffer for encoding (UTF-16LE uses 2 bytes per ASCII char, but allocate more for safety)
startBuf := make([]byte, len(matchStartStr)*4)
nDst, _, err := encoder.Transform(startBuf, []byte(matchStartStr), true)
if err != nil {
// If encoding fails, fall back to UTF-8 matching
loc = re.FindIndex(data)
if loc != nil {
matchStart, matchEnd = loc[0], loc[1]
}
} else {
endBuf := make([]byte, len(matchEndStr)*4)
nDstEnd, _, err := encoder.Transform(endBuf, []byte(matchEndStr), true)
if err != nil {
loc = re.FindIndex(data)
if loc != nil {
matchStart, matchEnd = loc[0], loc[1]
}
} else {
matchStart = nDst
matchEnd = nDstEnd
}
}
}
}

if loc == nil {
// Flush if no more data is expected
if len(data) != 0 && atEOF && flushAtEOF {
Expand All @@ -125,15 +315,15 @@ func LineEndSplitFunc(re *regexp.Regexp, omitPattern, flushAtEOF bool) bufio.Spl

// If the match goes up to the end of the current bufer, do another
// read until we can capture the entire match
if loc[1] == len(data)-1 && !atEOF {
if matchEnd == len(data)-1 && !atEOF {
return 0, nil, nil
}

if omitPattern {
return loc[1], data[:loc[0]], nil
return matchEnd, data[:matchStart], nil
}

return loc[1], data[:loc[1]], nil
return matchEnd, data[:matchEnd], nil
}
}

Expand Down
75 changes: 75 additions & 0 deletions receiver/filelogreceiver/filelog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/json"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/regex"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/filelogreceiver/internal/metadata"
"golang.org/x/text/encoding/unicode"
)

func TestDefaultConfig(t *testing.T) {
Expand Down Expand Up @@ -373,3 +374,77 @@ func (g *fileLogGenerator) Generate() []receivertest.UniqueIDAttrVal {
require.NoError(g.t, err)
return []receivertest.UniqueIDAttrVal{id}
}

func TestUTF16LEMultilineSAPAuditLog(t *testing.T) {
t.Parallel()

// Create a UTF-16LE encoded file with 10 SAP audit log records
// Each record starts with pattern: ([23])[A-Z][A-Z][A-Z0-9]\d{14}00
sapRecord := "2AUK20250227000000002316500018D110.102.8BATCH_ALRI SAPMSSY1 0501Z91_VALR_IF&&Z91_VAL_PLSTATUS 10.122.81.29 "

// Create 10 records concatenated (no newlines between them)
allRecords := ""
for i := 0; i < 10; i++ {
allRecords += sapRecord
}

// Encode to UTF-16LE
encoder := unicode.UTF16(unicode.LittleEndian, unicode.IgnoreBOM).NewEncoder()
encodedBuf := make([]byte, len(allRecords)*4) // UTF-16LE uses 2 bytes per ASCII char, allocate more for safety
nDst, _, err := encoder.Transform(encodedBuf, []byte(allRecords), true)
require.NoError(t, err)
encoded := encodedBuf[:nDst]

tempDir := t.TempDir()
auditLogFile := filepath.Join(tempDir, "auditlog.txt")
err = os.WriteFile(auditLogFile, encoded, 0o600)
require.NoError(t, err)

// Configure filelog receiver
cfg := createDefaultConfig()
cfg.InputConfig.Include = []string{auditLogFile}
cfg.InputConfig.Encoding = "utf-16le"
cfg.InputConfig.StartAt = "beginning"
cfg.InputConfig.SplitConfig.LineStartPattern = "([23])[A-Z][A-Z][A-Z0-9]\\d{14}00"

f := NewFactory()
sink := new(consumertest.LogsSink)
rcvr, err := f.CreateLogs(t.Context(), receivertest.NewNopSettings(metadata.Type), cfg, sink)
require.NoError(t, err, "failed to create receiver")
require.NoError(t, rcvr.Start(t.Context(), componenttest.NewNopHost()))
defer func() {
require.NoError(t, rcvr.Shutdown(t.Context()))
}()

// Wait for logs to be processed
require.Eventually(t, expectNLogs(sink, 10), 2*time.Second, 10*time.Millisecond,
"expected 10 log events but got %d", sink.LogRecordCount(),
)

// Verify we got exactly 10 log events
assert.Equal(t, 10, sink.LogRecordCount(), "expected 10 log events")

// Verify each log event contains the SAP record pattern
allLogs := sink.AllLogs()
require.GreaterOrEqual(t, len(allLogs), 1, "expected at least 1 log resource")

// Count total log records across all resources
totalRecords := 0
for i := 0; i < len(allLogs); i++ {
resourceLogs := allLogs[i].ResourceLogs()
for j := 0; j < resourceLogs.Len(); j++ {
scopeLogs := resourceLogs.At(j).ScopeLogs()
for k := 0; k < scopeLogs.Len(); k++ {
logRecords := scopeLogs.At(k).LogRecords()
totalRecords += logRecords.Len()
// Verify first record contains the pattern
if logRecords.Len() > 0 {
firstRecord := logRecords.At(0)
body := firstRecord.Body().AsString()
assert.Contains(t, body, "2AUK2025022700000000", "first record should contain SAP audit log pattern")
}
}
}
}
assert.Equal(t, 10, totalRecords, "expected 10 log records total")
}