Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# Performance Analysis: UnmarshalLogs Implementation

## Current Implementation Issues

### 1. Structure Overhead
The current `UnmarshalLogs` implementation creates a **new ResourceLogs and ScopeLogs for every single log line**:

```go
func (ex *ext) handleLogLine(logs plog.Logs, logLine []byte) error {
// ...
rl := logs.ResourceLogs().AppendEmpty() // NEW ResourceLogs for each log
r := rl.Resource()
scopeLogs := rl.ScopeLogs().AppendEmpty() // NEW ScopeLogs for each log
// ...
}
```

**Performance Impact:**
- For N log lines, this creates N ResourceLogs and N ScopeLogs structures
- Each ResourceLogs contains:
- Resource attributes map
- Schema URL
- ScopeLogs slice
- Each ScopeLogs contains:
- InstrumentationScope (name, version, attributes)
- Schema URL
- LogRecords slice

**Memory Overhead:** For 1000 log lines, this creates:
- 1000 ResourceLogs structures
- 1000 ScopeLogs structures
- Significant overhead in slice allocations and memory fragmentation

### 2. plog.Logs Internal Structure

From the generated code analysis:
- `plog.Logs` wraps `internal.ExportLogsServiceRequest`
- It contains a `ResourceLogsSlice` (slice of ResourceLogs)
- Each ResourceLogs contains:
- Resource (attributes map)
- ScopeLogsSlice (slice of ScopeLogs)
- Each ScopeLogs contains:
- InstrumentationScope
- LogRecordSlice (slice of LogRecords)

**Structure Hierarchy:**
```
plog.Logs
└── ResourceLogsSlice[]
└── ResourceLogs
├── Resource (attributes)
└── ScopeLogsSlice[]
└── ScopeLogs
├── InstrumentationScope
└── LogRecordSlice[]
└── LogRecord
```

### 3. Interface Constraints

The `LogsUnmarshalerExtension` interface implements `plog.Unmarshaler`:
```go
type LogsUnmarshalerExtension interface {
extension.Extension
plog.Unmarshaler // UnmarshalLogs(buf []byte) (plog.Logs, error)
}
```

**Key Constraint:** The interface **requires returning a complete `plog.Logs` batch**. Streaming individual logs is not possible within this interface design.

## Performance Optimization Opportunities

### Option 1: Reuse ResourceLogs/ScopeLogs (Recommended)

Instead of creating new structures for each log, reuse a single ResourceLogs and ScopeLogs, appending all log records to the same scope:

```go
func (ex *ext) UnmarshalLogs(buf []byte) (plog.Logs, error) {
logs := plog.NewLogs()

// Create ONE ResourceLogs and ONE ScopeLogs for all logs
rl := logs.ResourceLogs().AppendEmpty()
scopeLogs := rl.ScopeLogs().AppendEmpty()

scanner := bufio.NewScanner(bytes.NewReader(buf))
for scanner.Scan() {
line := scanner.Bytes()
if err := ex.handleLogLine(rl, scopeLogs, line); err != nil {
return plog.Logs{}, err
}
}

return logs, nil
}
```

**Benefits:**
- Reduces structure overhead from O(N) to O(1) for ResourceLogs/ScopeLogs
- Only LogRecords are allocated per log line
- Significantly reduces memory allocations
- Better memory locality (all log records in same slice)

**Limitations:**
- All logs share the same resource attributes (if logs have different resources, they would need separate ResourceLogs)
- All logs share the same scope (if logs have different scopes, they would need separate ScopeLogs)

### Option 2: Grouping by Resource/Scope (Advanced)

For logs with different resources or scopes, group them intelligently:

```go
// Group logs by resource attributes hash
resourceGroups := make(map[string]plog.ResourceLogs)
scopeGroups := make(map[string]plog.ScopeLogs)

// For each log, determine resource/scope key and append to appropriate group
```

**Trade-offs:**
- More complex implementation
- Requires hashing/comparison overhead
- Better semantic grouping
- May not be necessary if all logs share same resource/scope

### Option 3: Streaming Interface (Not Possible)

A streaming interface would require changing the OpenTelemetry Collector API:

```go
// NOT POSSIBLE with current interface
type StreamingLogsUnmarshaler interface {
UnmarshalLogs(buf []byte, callback func(plog.LogRecord) error) error
}
```

This would require:
- Modifying the collector framework
- Changing all consumers
- Breaking backward compatibility

## Recommendation

**Implement Option 1** (Reuse ResourceLogs/ScopeLogs):
1. Most logs from the same source share the same resource/scope
2. Massive reduction in memory overhead
3. Simple implementation
4. No API changes required

If logs have different resources/scopes, we can enhance with Option 2 later.

## Expected Performance Improvements

For a buffer with 1000 log lines:

**Before:**
- 1000 ResourceLogs allocations
- 1000 ScopeLogs allocations
- 1000 LogRecord allocations
- Total: ~3000 structure allocations

**After:**
- 1 ResourceLogs allocation
- 1 ScopeLogs allocation
- 1000 LogRecord allocations
- Total: ~1002 structure allocations

**Memory Reduction:** ~66% reduction in structure overhead
**Allocation Reduction:** ~67% reduction in allocations

Original file line number Diff line number Diff line change
Expand Up @@ -231,27 +231,26 @@ See the struct of the Cloud Audit Log payload in [AuditLog](https://cloud.google
| `dest_gke_details.pod.pod_workload.workload_type` | `gcp.vpc.flow.destination.gke.pod.workload.type` | not yet supported |
| `dest_gke_details.service.service_name` | `gcp.vpc.flow.destination.gke.service.name` | not yet supported |
| `dest_gke_details.service.service_namespace` | `gcp.vpc.flow.destination.gke.service.namespace` | not yet supported |
| `src_google_service.type` | `gcp.vpc.flow.source.google_service.type` | not yet supported |
| `src_google_service.service_name` | `gcp.vpc.flow.source.google_service.name` | not yet supported |
| `src_google_service.connectivity` | `gcp.vpc.flow.source.google_service.connectivity` | not yet supported |
| `src_google_service.type` | `gcp.vpc.flow.source.google_service.type` | supported |
| `src_google_service.service_name` | `gcp.vpc.flow.source.google_service.name` | supported |
| `src_google_service.connectivity` | `gcp.vpc.flow.source.google_service.connectivity` | supported |
| `src_google_service.private_domain` | `gcp.vpc.flow.source.google_service.domain.private` | not yet supported |
| `dest_google_service.type` | `gcp.vpc.flow.destination.google_service.type` | not yet supported |
| `dest_google_service.service_name` | `gcp.vpc.flow.destination.google_service.name` | not yet supported |
| `dest_google_service.connectivity` | `gcp.vpc.flow.destination.google_service.connectivity` | not yet supported |
| `dest_google_service.private_domain` | `gcp.vpc.flow.destination.google_service.domain.private` | not yet supported |
| `dest_google_service.type` | `gcp.vpc.flow.destination.google_service.type` | supported |
| `dest_google_service.service_name` | `gcp.vpc.flow.destination.google_service.name` | supported |
| `dest_google_service.connectivity` | `gcp.vpc.flow.destination.google_service.connectivity` | supported |
| `src_instance.project_id` | `gcp.vpc.flow.source.instance.project.id` | supported |
| `src_instance.region` | `gcp.vpc.flow.source.instance.vm.region` | supported |
| `src_instance.vm_name` | `gcp.vpc.flow.source.instance.vm.name` | supported |
| `src_instance.zone` | `gcp.vpc.flow.source.instance.vm.zone` | supported |
| `src_instance.managed_instance_group.name` | `gcp.vpc.flow.source.instance.managed_instance_group.name` | supported |
| `src_instance.managed_instance_group.region` | `gcp.vpc.flow.source.instance.managed_instance_group.region` | not yet supported |
| `src_instance.managed_instance_group.region` | `gcp.vpc.flow.source.instance.managed_instance_group.region` | supported |
| `src_instance.managed_instance_group.zone` | `gcp.vpc.flow.source.instance.managed_instance_group.zone` | supported |
| `dest_instance.project_id` | `gcp.vpc.flow.destination.instance.project.id` | supported |
| `dest_instance.region` | `gcp.vpc.flow.destination.instance.vm.region` | supported |
| `dest_instance.vm_name` | `gcp.vpc.flow.destination.instance.vm.name` | supported |
| `dest_instance.zone` | `gcp.vpc.flow.destination.instance.vm.zone` | supported |
| `dest_instance.managed_instance_group.name` | `gcp.vpc.flow.destination.instance.managed_instance_group.name` | supported |
| `dest_instance.managed_instance_group.region` | `gcp.vpc.flow.destination.instance.managed_instance_group.region` | not yet supported |
| `dest_instance.managed_instance_group.region` | `gcp.vpc.flow.destination.instance.managed_instance_group.region` | supported |
| `dest_instance.managed_instance_group.zone` | `gcp.vpc.flow.destination.instance.managed_instance_group.zone` | supported |
| `src_location.asn` | `gcp.vpc.flow.source.asn` | supported |
| `src_location.city` | `gcp.vpc.flow.source.geo.city` | supported |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package googlecloudlogentryencodingextension
import (
"bytes"
"os"
"path/filepath"
"testing"

gojson "github.com/goccy/go-json"
Expand Down Expand Up @@ -186,6 +187,16 @@ func TestPayloads(t *testing.T) {
logFilename: "testdata/vpc-flow-log/vpc-flow-log-w-internet-routing-details.json",
expectedFilename: "testdata/vpc-flow-log/vpc-flow-log-w-internet-routing-details_expected.yaml",
},
{
name: "vpc flow log - google services",
logFilename: "testdata/vpc-flow-log/vpc-flow-log-google-service.ndjson",
expectedFilename: "testdata/vpc-flow-log/vpc-flow-log-google-service_expected.yaml",
},
{
name: "vpc flow log - managed instance mig regions",
logFilename: "testdata/vpc-flow-log/vpc-flow-log-managed-instance.ndjson",
expectedFilename: "testdata/vpc-flow-log/vpc-flow-log-managed-instance_expected.yaml",
},
}

extension := newTestExtension(t, Config{})
Expand All @@ -196,9 +207,33 @@ func TestPayloads(t *testing.T) {
data, err := os.ReadFile(tt.logFilename)
require.NoError(t, err)

content := bytes.NewBuffer([]byte{})
err = gojson.Compact(content, data)
require.NoError(t, err)
content := bytes.NewBuffer(nil)
normalizeNDJSON := func(raw []byte) {
lines := bytes.Split(raw, []byte{'\n'})
for _, line := range lines {
line = bytes.TrimSpace(line)
if len(line) == 0 {
continue
}
content.Write(line)
content.WriteByte('\n')
}
}

isNDJSON := filepath.Ext(tt.logFilename) == ".ndjson"
var compactionErr error
if !isNDJSON {
compactionErr = gojson.Compact(content, data)
}

if isNDJSON || compactionErr != nil {
if !isNDJSON {
content.Reset()
}
// NDJSON fixtures contain one JSON object per line, so we strip
// whitespace from each line while preserving record boundaries.
normalizeNDJSON(data)
}

logs, err := extension.UnmarshalLogs(content.Bytes())
require.NoError(t, err)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//go:build generate_vpc_goldens

package googlecloudlogentryencodingextension

import (
"bytes"
"os"
"path/filepath"
"testing"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden"
)

// TestGenerateVPCGoldens refreshes the YAML goldens used by the integration
// tests that exercise VPC flow parsing. It is guarded by the
// `generate_vpc_goldens` build tag so it only runs when explicitly invoked,
// e.g. `go test -tags generate_vpc_goldens ./extension/... -run TestGenerateVPCGoldens`.
// The fixtures are stored as newline-delimited JSON logs; the helper rewrites
// each one to match the encoder output.
func TestGenerateVPCGoldens(t *testing.T) {
t.Parallel()

cfg := createDefaultConfig().(*Config)
ext := newExtension(cfg)

root := "testdata/vpc-flow-log"
testCases := []struct {
name string
jsonFile string
outFile string
}{
{
name: "google_service",
jsonFile: "vpc-flow-log-google-service.ndjson",
outFile: "vpc-flow-log-google-service_expected.yaml",
},
{
name: "managed_instance",
jsonFile: "vpc-flow-log-managed-instance.ndjson",
outFile: "vpc-flow-log-managed-instance_expected.yaml",
},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.name, func(t *testing.T) {
t.Parallel()

raw, err := os.ReadFile(filepath.Join(root, tc.jsonFile))
if err != nil {
t.Fatalf("failed to read fixture: %v", err)
}

lines := bytes.Split(raw, []byte{'\n'})
var buf bytes.Buffer
for _, line := range lines {
line = bytes.TrimSpace(line)
if len(line) == 0 {
continue
}
buf.Write(line)
buf.WriteByte('\n')
}
data := bytes.TrimSpace(buf.Bytes())

logs, err := ext.UnmarshalLogs(data)
if err != nil {
t.Fatalf("failed to unmarshal logs: %v", err)
}

if err := golden.WriteLogsToFile(filepath.Join(root, tc.outFile), logs); err != nil {
t.Fatalf("failed to write golden file: %v", err)
}
})
}
}
Loading