diff --git a/.chloggen/fix_43693.yaml b/.chloggen/fix_43693.yaml new file mode 100644 index 0000000000000..e0e75600c9cef --- /dev/null +++ b/.chloggen/fix_43693.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "bug_fix" + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: pkg/stanza + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Improve logic to detect copytruncate rotations and reset reader offsets to ensure continuous log collection." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [43693] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 580648fd84701..59de13e50d4c5 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -306,7 +306,7 @@ receiver/ntpreceiver/ @open-telemetry receiver/oracledbreceiver/ @open-telemetry/collector-contrib-approvers @dmitryax @crobert-1 @atoulme receiver/osqueryreceiver/ @open-telemetry/collector-contrib-approvers @nslaughter @smithclay receiver/otelarrowreceiver/ @open-telemetry/collector-contrib-approvers @jmacd @moh-osman3 -receiver/otlpjsonfilereceiver/ @open-telemetry/collector-contrib-approvers @atoulme +receiver/otlpjsonfilereceiver/ @open-telemetry/collector-contrib-approvers @atoulme @paulojmdias receiver/podmanreceiver/ @open-telemetry/collector-contrib-approvers @rogercoll receiver/postgresqlreceiver/ @open-telemetry/collector-contrib-approvers @antonblock @ishleenk17 receiver/pprofreceiver/ @open-telemetry/collector-contrib-approvers @MovieStoreGuy @atoulme diff --git a/pkg/stanza/fileconsumer/config.go b/pkg/stanza/fileconsumer/config.go index 7c7c3e059519a..becf6dd6969c9 100644 --- a/pkg/stanza/fileconsumer/config.go +++ b/pkg/stanza/fileconsumer/config.go @@ -38,6 +38,21 @@ const ( defaultPollInterval = 200 * time.Millisecond ) +// OnTruncate defines the behavior when a file with the same fingerprint +// is detected but with a smaller size (indicating a copytruncate rotation). +const ( + // OnTruncateIgnore keeps the current behavior: do not read any data until + // the file grows past the original offset, then read only the new data. + OnTruncateIgnore = "ignore" + // OnTruncateReadWholeFile reads the whole file from the beginning when truncation is detected. + OnTruncateReadWholeFile = "read_whole_file" + // OnTruncateReadNew stores the new (lower) offset, so that the next time the file grows, + // any new data past the new offset is read. + OnTruncateReadNew = "read_new" +) + +const defaultOnTruncate = OnTruncateIgnore + var allowFileDeletion = featuregate.GlobalRegistry().MustRegister( "filelog.allowFileDeletion", featuregate.StageAlpha, @@ -63,6 +78,7 @@ func NewConfig() *Config { MaxLogSize: reader.DefaultMaxLogSize, Encoding: defaultEncoding, FlushPeriod: reader.DefaultFlushPeriod, + OnTruncate: defaultOnTruncate, Resolver: attrs.Resolver{ IncludeFileName: true, }, @@ -91,6 +107,7 @@ type Config struct { Compression string `mapstructure:"compression,omitempty"` PollsToArchive int `mapstructure:"polls_to_archive,omitempty"` AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"` + OnTruncate string `mapstructure:"on_truncate,omitempty"` } type HeaderConfig struct { @@ -192,6 +209,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts telemetryBuilder: telemetryBuilder, noTracking: o.noTracking, pollsToArchive: c.PollsToArchive, + onTruncate: c.OnTruncate, }, nil } @@ -247,6 +265,13 @@ func (c Config) validate() error { return fmt.Errorf("'include_file_owner_name' or 'include_file_owner_group_name' it's not supported for windows: %w", err) } + switch c.OnTruncate { + case OnTruncateIgnore, OnTruncateReadWholeFile, OnTruncateReadNew: + // Valid values + default: + return fmt.Errorf("'on_truncate' must be one of: %s, %s, %s", OnTruncateIgnore, OnTruncateReadWholeFile, OnTruncateReadNew) + } + return nil } diff --git a/pkg/stanza/fileconsumer/file.go b/pkg/stanza/fileconsumer/file.go index 2da1c723766bb..2f5a18f6debbd 100644 --- a/pkg/stanza/fileconsumer/file.go +++ b/pkg/stanza/fileconsumer/file.go @@ -37,6 +37,7 @@ type Manager struct { maxBatches int maxBatchFiles int pollsToArchive int + onTruncate string telemetryBuilder *metadata.TelemetryBuilder } @@ -252,6 +253,34 @@ func (m *Manager) handleUnmatchedFiles(ctx context.Context) { var err error if md != nil { + // Check if file was copy-truncated since last seen + if info, statErr := file.Stat(); statErr == nil && md.Offset > info.Size() { + // File has been truncated (copytruncate rotation) + switch m.onTruncate { + case OnTruncateReadWholeFile: + m.set.Logger.Debug("File has been rotated(truncated). Resetting offset to 0", + zap.String("path", file.Name()), + zap.Int64("stored_offset", md.Offset), + zap.Int64("current_file_size", info.Size()), + ) + md.Offset = 0 + case OnTruncateReadNew: + m.set.Logger.Debug("File has been rotated(truncated). Storing new offset", + zap.String("path", file.Name()), + zap.Int64("stored_offset", md.Offset), + zap.Int64("current_file_size", info.Size()), + zap.Int64("new_offset", info.Size()), + ) + md.Offset = info.Size() + case OnTruncateIgnore: + // Keep the old offset - no data will be read until file grows past the original offset + m.set.Logger.Debug("File has been rotated(truncated). Keeping original offset", + zap.String("path", file.Name()), + zap.Int64("stored_offset", md.Offset), + zap.Int64("current_file_size", info.Size()), + ) + } + } reader, err = m.readerFactory.NewReaderFromMetadata(file, md) if m.tracker.Name() != tracker.NoStateTracker { m.set.Logger.Info("File found in archive. Started watching file again", zap.String("path", file.Name())) @@ -289,11 +318,68 @@ func (m *Manager) newReader(ctx context.Context, file *os.File, fp *fingerprint. zap.String("rotated_path", file.Name())) } } - return m.readerFactory.NewReaderFromMetadata(file, oldReader.Close()) + // Close old reader and adjust metadata if file was copy-truncated. + md := oldReader.Close() + if info, err := file.Stat(); err == nil && md.Offset > info.Size() { + // File has been truncated (copytruncate rotation) + switch m.onTruncate { + case OnTruncateReadWholeFile: + m.set.Logger.Warn("File has been rotated(truncated). Resetting offset to 0", + zap.String("path", file.Name()), + zap.Int64("stored_offset", md.Offset), + zap.Int64("current_file_size", info.Size()), + ) + md.Offset = 0 + case OnTruncateReadNew: + m.set.Logger.Warn("File has been rotated(truncated). Storing new offset", + zap.String("path", file.Name()), + zap.Int64("stored_offset", md.Offset), + zap.Int64("current_file_size", info.Size()), + zap.Int64("new_offset", info.Size()), + ) + md.Offset = info.Size() + case OnTruncateIgnore: + // Keep the old offset - no data will be read until file grows past the original offset + m.set.Logger.Warn("File has been rotated(truncated). Keeping original offset", + zap.String("path", file.Name()), + zap.Int64("stored_offset", md.Offset), + zap.Int64("current_file_size", info.Size()), + ) + } + } + return m.readerFactory.NewReaderFromMetadata(file, md) } // Check for closed files for match if oldMetadata := m.tracker.GetClosedFile(fp); oldMetadata != nil { + // Check if file was copy-truncated since last seen + if info, statErr := file.Stat(); statErr == nil && oldMetadata.Offset > info.Size() { + // File has been truncated (copytruncate rotation) + switch m.onTruncate { + case OnTruncateReadWholeFile: + m.set.Logger.Debug("File has been rotated(truncated). Resetting offset to 0", + zap.String("path", file.Name()), + zap.Int64("stored_offset", oldMetadata.Offset), + zap.Int64("current_file_size", info.Size()), + ) + oldMetadata.Offset = 0 + case OnTruncateReadNew: + m.set.Logger.Debug("File has been rotated(truncated). Storing new offset", + zap.String("path", file.Name()), + zap.Int64("stored_offset", oldMetadata.Offset), + zap.Int64("current_file_size", info.Size()), + zap.Int64("new_offset", info.Size()), + ) + oldMetadata.Offset = info.Size() + case OnTruncateIgnore: + // Keep the old offset - no data will be read until file grows past the original offset + m.set.Logger.Debug("File has been rotated(truncated). Keeping original offset", + zap.String("path", file.Name()), + zap.Int64("stored_offset", oldMetadata.Offset), + zap.Int64("current_file_size", info.Size()), + ) + } + } r, err := m.readerFactory.NewReaderFromMetadata(file, oldMetadata) if err != nil { return nil, err diff --git a/pkg/stanza/fileconsumer/file_test.go b/pkg/stanza/fileconsumer/file_test.go index f2ef4b312dd9c..184b3c3edc9ee 100644 --- a/pkg/stanza/fileconsumer/file_test.go +++ b/pkg/stanza/fileconsumer/file_test.go @@ -4,6 +4,7 @@ package fileconsumer import ( + "bytes" "compress/gzip" "context" "fmt" @@ -1664,3 +1665,59 @@ func TestArchive(t *testing.T) { sink.ExpectCalls(t, log3, log4) } + +func TestCopyTruncateResetsOffsetOnRestart_IdenticalFirstKB(t *testing.T) { + t.Parallel() + + line := string(bytes.Repeat([]byte("a"), 1024)) // identical 1024B lines + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg.FingerprintSize = 1000 // identical prefix across rotations + + // Manager #1 (manual polling, no background goroutine) + op1, sink1 := testManager(t, cfg) + op1.persister = testutil.NewUnscopedMockPersister() + + // Create file and write 20 lines + log := filetest.OpenTemp(t, tempDir) + for range 20 { + filetest.WriteString(t, log, line+"\n") + } + + // First poll: read the existing 20 lines + op1.poll(t.Context()) + for range 20 { + sink1.ExpectToken(t, []byte(line)) + } + + // Simulate copytruncate + rotated := log.Name() + ".1" + origData, err := os.ReadFile(log.Name()) + require.NoError(t, err) + require.NoError(t, os.WriteFile(rotated, origData, 0o600)) + require.NoError(t, log.Truncate(0)) + _, err = log.Seek(0, 0) + require.NoError(t, err) + for range 10 { + filetest.WriteString(t, log, line+"\n") + } + + // Persist metadata as if we were running; then stop op1. + // (poll() already saves checkpoints when persister is set.) + // Ensure any internal rotations are finalized. + op1.poll(t.Context()) + require.NoError(t, op1.Stop()) + + // Manager #2 (manual polling) resumes from persisted metadata + op2, sink2 := testManager(t, cfg) + op2.persister = op1.persister + + // On poll, offset > current size is detected and reset to 0. + op2.poll(t.Context()) + for range 10 { + sink2.ExpectToken(t, []byte(line)) + } + sink2.ExpectNoCalls(t) +} diff --git a/pkg/stanza/fileconsumer/rotation_test.go b/pkg/stanza/fileconsumer/rotation_test.go index 2ce406cca2c46..e72f76f44f934 100644 --- a/pkg/stanza/fileconsumer/rotation_test.go +++ b/pkg/stanza/fileconsumer/rotation_test.go @@ -4,6 +4,7 @@ package fileconsumer import ( + "bytes" "fmt" "io" "os" @@ -495,3 +496,166 @@ func TestFileMovedWhileOff_BigFiles(t *testing.T) { sink2.ExpectTokens(t, log2, log3) require.NoError(t, operator2.Stop()) } + +// TestOnTruncateReadWholeFile tests that when on_truncate is set to "read_whole_file", +// the whole file is read from the beginning after a copytruncate rotation +func TestOnTruncateReadWholeFile(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg.OnTruncate = OnTruncateReadWholeFile + cfg.PollInterval = 100 * time.Millisecond + + temp := filetest.OpenTemp(t, tempDir) + tempCopy := filetest.OpenTempWithPattern(t, tempDir, "*.copy.log") + + // Write initial logs + log1 := []byte("log line 1") + log2 := []byte("log line 2") + filetest.WriteString(t, temp, string(log1)+"\n") + filetest.WriteString(t, temp, string(log2)+"\n") + + // Start the operator and expect initial logs + operator, sink := testManager(t, cfg) + persister := testutil.NewUnscopedMockPersister() + require.NoError(t, operator.Start(persister)) + defer func() { + require.NoError(t, operator.Stop()) + }() + + sink.ExpectTokens(t, log1, log2) + + // Perform copytruncate: copy file to backup and truncate original + require.NoError(t, temp.Sync()) + _, err := temp.Seek(0, 0) + require.NoError(t, err) + _, err = io.Copy(tempCopy, temp) + require.NoError(t, err) + require.NoError(t, tempCopy.Close()) + require.NoError(t, temp.Truncate(0)) + _, err = temp.Seek(0, 0) + require.NoError(t, err) + + // Write new logs to the truncated file + log3 := []byte("log line 3") + log4 := []byte("log line 4") + filetest.WriteString(t, temp, string(log3)+"\n") + filetest.WriteString(t, temp, string(log4)+"\n") + + // With read_whole_file, we should see log3 and log4 + // (the file is re-read from the beginning after truncation) + sink.ExpectTokens(t, log3, log4) +} + +// TestOnTruncateReadNew tests that when on_truncate is set to "read_new", +// only new data added after the truncation point is read +func TestOnTruncateReadNew(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg.OnTruncate = OnTruncateReadNew + cfg.PollInterval = 100 * time.Millisecond + + temp := filetest.OpenTemp(t, tempDir) + tempCopy := filetest.OpenTempWithPattern(t, tempDir, "*.copy.log") + + // Write initial logs + log1 := []byte("log line 1") + log2 := []byte("log line 2") + filetest.WriteString(t, temp, string(log1)+"\n") + filetest.WriteString(t, temp, string(log2)+"\n") + + // Start the operator and expect initial logs + operator, sink := testManager(t, cfg) + persister := testutil.NewUnscopedMockPersister() + require.NoError(t, operator.Start(persister)) + defer func() { + require.NoError(t, operator.Stop()) + }() + + sink.ExpectTokens(t, log1, log2) + + // Perform copytruncate: copy file to backup and truncate original + require.NoError(t, temp.Sync()) + _, err := temp.Seek(0, 0) + require.NoError(t, err) + _, err = io.Copy(tempCopy, temp) + require.NoError(t, err) + require.NoError(t, tempCopy.Close()) + require.NoError(t, temp.Truncate(0)) + _, err = temp.Seek(0, 0) + require.NoError(t, err) + + // Write new logs to the truncated file (less than the previous offset) + log3 := []byte("log3") + filetest.WriteString(t, temp, string(log3)+"\n") + + // With read_new, we should see log3 because offset is reset to current file size (0) + // So when log3 is written, it will be read + sink.ExpectTokens(t, log3) + + // Write more data to verify continued reading + log4 := []byte("log line 4") + filetest.WriteString(t, temp, string(log4)+"\n") + sink.ExpectTokens(t, log4) +} + +// TestOnTruncateIgnore tests that when on_truncate is set to "ignore" (default), +// the old offset is kept, meaning no data is read until the file grows past it +func TestOnTruncateIgnore(t *testing.T) { + t.Parallel() + + identicalPrefix := string(bytes.Repeat([]byte("x"), 100)) // 100 bytes of 'x' + + tempDir := t.TempDir() + cfg := NewConfig().includeDir(tempDir) + cfg.StartAt = "beginning" + cfg.OnTruncate = OnTruncateIgnore + cfg.FingerprintSize = 100 // Match the prefix size + + // Manager #1 (manual polling) + op1, sink1 := testManager(t, cfg) + op1.persister = testutil.NewUnscopedMockPersister() + + // Create file and write initial logs + temp := filetest.OpenTemp(t, tempDir) + log1 := []byte(identicalPrefix + " - log line 1") + log2 := []byte(identicalPrefix + " - log line 2") + filetest.WriteString(t, temp, string(log1)+"\n") + filetest.WriteString(t, temp, string(log2)+"\n") + + // First poll: read the existing logs + op1.poll(t.Context()) + sink1.ExpectTokens(t, log1, log2) + + // Stop op1 and persist metadata + require.NoError(t, op1.Stop()) + + // Simulate copytruncate - truncate file and write new shorter content + require.NoError(t, temp.Truncate(0)) + _, err := temp.Seek(0, 0) + require.NoError(t, err) + + // Write the SAME prefix to maintain fingerprint, but with new shorter content + log3 := []byte(identicalPrefix + " - new short log") + filetest.WriteString(t, temp, string(log3)+"\n") + + // Manager #2 resumes from persisted metadata + op2, sink2 := testManager(t, cfg) + op2.persister = op1.persister + + // Load metadata from op1's tracker into op2's tracker + metadata := op1.tracker.GetMetadata() + op2.tracker.LoadMetadata(metadata) + + // On poll, with "ignore" mode, the old offset should be kept + // Since the file is smaller than the old offset, nothing should be read + op2.poll(t.Context()) + sink2.ExpectNoCalls(t) + + require.NoError(t, op2.Stop()) +} diff --git a/receiver/filelogreceiver/README.md b/receiver/filelogreceiver/README.md index 245dc931551cf..7598e41e113b3 100644 --- a/receiver/filelogreceiver/README.md +++ b/receiver/filelogreceiver/README.md @@ -67,6 +67,7 @@ Tails and parses logs from files. | `ordering_criteria.sort_by.ascending` | | Sort direction | | `compression` | | Indicate the compression format of input files. If set accordingly, files will be read using a reader that uncompresses the file before scanning its content. Options are ``, `gzip`, or `auto`. `auto` auto-detects file compression type. Currently, gzip files are the only compressed files auto-detected, based on ".gz" filename extension. `auto` option is useful when ingesting a mix of compressed and uncompressed files with the same filelogreceiver. | | `polls_to_archive` | `0` | This settings controls the number of poll cycles to store on disk, rather than being discarded. By default, the receiver will purge the record of readers that have existed for 3 generations. Refer [archiving](#archiving) and [polling](../../pkg/stanza/fileconsumer/design.md#polling) for more details. **Note: This feature is experimental.** | +| `on_truncate` | `ignore` | Behavior when a file with the same fingerprint is detected but with a smaller size (indicating a copytruncate rotation). Options are `ignore`, `read_whole_file`, or `read_new`. See [handling copytruncate rotation](#handling-copytruncate-rotation) for more details. | Note that _by default_, no logs will be read from a file that is not actively being written to because `start_at` defaults to `end`. @@ -126,7 +127,32 @@ All time parameters must have the unit of time specified. e.g.: `200ms`, `1s`, ` ### Log Rotation -File Log Receiver can read files that are being rotated. +File Log Receiver can read files that are being rotated. + +### Handling Copytruncate Rotation + +When log files are rotated using the `copytruncate` strategy (where the file is copied and then truncated in place), the receiver can detect when a file has been truncated by comparing the stored offset with the current file size. The `on_truncate` setting controls how the receiver behaves when truncation is detected: + +- `ignore` (default): The receiver keeps the original offset and will not read any data until the file grows past the original offset. This prevents duplicate log ingestion when a file is rotated. +- `read_whole_file`: The receiver resets the offset to 0 and reads the entire file from the beginning. Use this mode when you want to ensure no data loss, even if it means potentially re-reading some logs. +- `read_new`: The receiver updates the offset to the current file size (the position after truncation). This allows reading new data that is written after the truncation without re-reading existing content. + +**Example configuration:** + +```yaml +receivers: + filelog: + include: [ /var/log/myapp/*.log ] + on_truncate: read_whole_file # Read entire file after copytruncate rotation + operators: + - type: json_parser +``` + +**When to use each mode:** + +- Use `ignore` when you want to avoid duplicate logs and your log rotation strategy ensures that rotated files are properly renamed or moved. +- Use `read_whole_file` when data completeness is critical and you can tolerate duplicate logs, or when you have deduplication logic downstream. +- Use `read_new` when files are expected to be deleted after rotation and you want to read only new data written after the truncation point. ## Example - Tailing a simple json file diff --git a/receiver/otlpjsonfilereceiver/README.md b/receiver/otlpjsonfilereceiver/README.md index e47b8c3627c2a..34de10756412a 100644 --- a/receiver/otlpjsonfilereceiver/README.md +++ b/receiver/otlpjsonfilereceiver/README.md @@ -8,7 +8,7 @@ | Distributions | [contrib] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Areceiver%2Fotlpjsonfile%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Areceiver%2Fotlpjsonfile) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Areceiver%2Fotlpjsonfile%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Areceiver%2Fotlpjsonfile) | | Code coverage | [![codecov](https://codecov.io/github/open-telemetry/opentelemetry-collector-contrib/graph/main/badge.svg?component=receiver_otlpjsonfile)](https://app.codecov.io/gh/open-telemetry/opentelemetry-collector-contrib/tree/main/?components%5B0%5D=receiver_otlpjsonfile&displayType=list) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atoulme](https://www.github.com/atoulme) \| Seeking more code owners! | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@atoulme](https://www.github.com/atoulme), [@paulojmdias](https://www.github.com/paulojmdias) \| Seeking more code owners! | [development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development [alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha @@ -41,4 +41,40 @@ receivers: - "/var/log/*.log" exclude: - "/var/log/example.log" -``` \ No newline at end of file +``` + +## Configuration + +| Field | Default | Description | +|----------------------------|------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `include` | required | A list of file glob patterns that match the file paths to be read. | +| `exclude` | [] | A list of file glob patterns to exclude from reading. This is applied against the paths matched by `include`. | +| `encoding` | `utf-8` | The encoding of the file being read. See the list of [supported encodings below](#supported-encodings) for available options. | +| `poll_interval` | 200ms | The duration between filesystem polls. | +| `start_at` | `end` | At startup, where to start reading logs from the file. Options are `beginning` or `end`. | +| `fingerprint_size` | `1kb` | The number of bytes with which to identify a file. The first bytes in the file are used as the fingerprint. | +| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. | +| `max_log_size` | `1MiB` | The maximum size of a log entry to read. A log entry will be truncated if it is larger than `max_log_size`. | +| `storage` | none | The ID of a storage extension to be used to store file offsets. | +| `on_truncate` | `ignore` | Behavior when a file with the same fingerprint is detected but with a smaller size (indicating a copytruncate rotation). Options are `ignore`, `read_whole_file`, or `read_new`. See [handling copytruncate rotation](#handling-copytruncate-rotation). | +| `replay_file` | `false` | If `true`, the receiver will not track file offsets and will re-read files from the beginning on every poll. | + +For additional configuration options, see the [File Log Receiver documentation](../filelogreceiver/README.md#configuration). + +### Handling Copytruncate Rotation + +When log files are rotated using the `copytruncate` strategy (where the file is copied and then truncated in place), the receiver can detect when a file has been truncated by comparing the stored offset with the current file size. The `on_truncate` setting controls how the receiver behaves when truncation is detected: + +- `ignore` (default): The receiver keeps the original offset and will not read any data until the file grows past the original offset. This prevents duplicate log ingestion when a file is rotated. +- `read_whole_file`: The receiver resets the offset to 0 and reads the entire file from the beginning. Use this mode when you want to ensure no data loss, even if it means potentially re-reading some logs. +- `read_new`: The receiver updates the offset to the current file size (the position after truncation). This allows reading new data that is written after the truncation without re-reading existing content. + +**Example configuration:** + +```yaml +receivers: + otlpjsonfile: + include: + - /var/log/otlp/*.json + on_truncate: read_whole_file # Read entire file after copytruncate rotation +``` diff --git a/receiver/otlpjsonfilereceiver/file_test.go b/receiver/otlpjsonfilereceiver/file_test.go index 4451835095363..c5a2b5196b3e4 100644 --- a/receiver/otlpjsonfilereceiver/file_test.go +++ b/receiver/otlpjsonfilereceiver/file_test.go @@ -209,6 +209,7 @@ func testdataConfigYamlAsMap() *Config { MaxLogSize: 1024 * 1024, MaxConcurrentFiles: 1024, FlushPeriod: 500 * time.Millisecond, + OnTruncate: "ignore", Criteria: matcher.Criteria{ Include: []string{"/var/log/*.log"}, Exclude: []string{"/var/log/example.log"}, diff --git a/receiver/otlpjsonfilereceiver/metadata.yaml b/receiver/otlpjsonfilereceiver/metadata.yaml index b5300ce26b090..373f3c586e324 100644 --- a/receiver/otlpjsonfilereceiver/metadata.yaml +++ b/receiver/otlpjsonfilereceiver/metadata.yaml @@ -7,7 +7,7 @@ status: development: [profiles] distributions: [contrib] codeowners: - active: [atoulme] + active: [atoulme, paulojmdias] seeking_new: true tests: config: diff --git a/receiver/otlpjsonfilereceiver/testdata/config.yaml b/receiver/otlpjsonfilereceiver/testdata/config.yaml index f9c4c75dcc682..018cdd26b3caa 100644 --- a/receiver/otlpjsonfilereceiver/testdata/config.yaml +++ b/receiver/otlpjsonfilereceiver/testdata/config.yaml @@ -13,6 +13,7 @@ otlpjsonfile/all: max_log_size: 10000 max_concurrent_files: 4 encoding: "UTF-8" + on_truncate: "ignore" multiline: line_start_pattern: "<" line_end_pattern: ">"