Skip to content

Commit

Permalink
Merge branch 'main' into renovate/github.com-vmware-govmomi-0.x
Browse files Browse the repository at this point in the history
  • Loading branch information
codeboten authored Aug 22, 2024
2 parents 4f67318 + af9dcb3 commit 3138a78
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 1 deletion.
27 changes: 27 additions & 0 deletions .chloggen/filelog-receiver-fs-lock.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: filelogreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: If acquire_fs_lock is true, attempt to acquire a shared lock before reading a file.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [34801]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Unix only. If a lock cannot be acquired then the file will be ignored until the next poll cycle.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
4 changes: 3 additions & 1 deletion .github/workflows/scripts/add-labels.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,10 @@ LABELS=$(echo "${COMMENT}" | sed -E 's%^/label%%')

for LABEL_REQ in ${LABELS}; do
LABEL=$(echo "${LABEL_REQ}" | sed -E s/^[+-]?//)
SHOULD_ADD=true
# Trim newlines from label that would cause matching to fail
LABEL=$(echo "${LABEL}" | tr -d '\n')

SHOULD_ADD=true
if [[ "${LABEL_REQ:0:1}" = "-" ]]; then
SHOULD_ADD=false
fi
Expand Down
1 change: 1 addition & 0 deletions pkg/stanza/docs/operators/file_input.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ The `file_input` operator reads logs from files. It will place the lines read in
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently (minimum = 2). If the number of files matched in the `include` pattern exceeds half of this number, then files will be processed in batches. |
| `max_batches` | 0 | Only applicable when files must be batched in order to respect `max_concurrent_files`. This value limits the number of batches that will be processed during a single poll interval. A value of 0 indicates no limit. |
| `delete_after_read` | `false` | If `true`, each log file will be read and then immediately deleted. Requires that the `filelog.allowFileDeletion` feature gate is enabled. |
| `acquire_fs_lock` | `false` | Whether to attempt to acquire a filesystem lock before reading a file (Unix only). |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `header` | nil | Specifies options for parsing header metadata. Requires that the `filelog.allowHeaderMetadataParsing` feature gate is enabled. See below for details. |
Expand Down
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type Config struct {
DeleteAfterRead bool `mapstructure:"delete_after_read,omitempty"`
IncludeFileRecordNumber bool `mapstructure:"include_file_record_number,omitempty"`
Compression string `mapstructure:"compression,omitempty"`
AcquireFSLock bool `mapstructure:"acquire_fs_lock,omitempty"`
}

type HeaderConfig struct {
Expand Down Expand Up @@ -170,6 +171,7 @@ func (c Config) Build(set component.TelemetrySettings, emit emit.Callback, opts
DeleteAtEOF: c.DeleteAfterRead,
IncludeFileRecordNumber: c.IncludeFileRecordNumber,
Compression: c.Compression,
AcquireFSLock: c.AcquireFSLock,
}

var t tracker.Tracker
Expand Down
1 change: 1 addition & 0 deletions pkg/stanza/fileconsumer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func TestNewConfig(t *testing.T) {
assert.False(t, cfg.IncludeFileOwnerName)
assert.False(t, cfg.IncludeFileOwnerGroupName)
assert.False(t, cfg.IncludeFileRecordNumber)
assert.False(t, cfg.AcquireFSLock)
}

func TestUnmarshal(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type Factory struct {
DeleteAtEOF bool
IncludeFileRecordNumber bool
Compression string
AcquireFSLock bool
}

func (f *Factory) NewFingerprint(file *os.File) (*fingerprint.Fingerprint, error) {
Expand Down Expand Up @@ -77,6 +78,7 @@ func (f *Factory) NewReaderFromMetadata(file *os.File, m *Metadata) (r *Reader,
deleteAtEOF: f.DeleteAtEOF,
includeFileRecordNum: f.IncludeFileRecordNumber,
compression: f.Compression,
acquireFSLock: f.AcquireFSLock,
}
r.set.Logger = r.set.Logger.With(zap.String("path", r.fileName))

Expand Down
8 changes: 8 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,18 @@ type Reader struct {
needsUpdateFingerprint bool
includeFileRecordNum bool
compression string
acquireFSLock bool
}

// ReadToEnd will read until the end of the file
func (r *Reader) ReadToEnd(ctx context.Context) {
if r.acquireFSLock {
if !r.tryLockFile() {
return
}
defer r.unlockFile()
}

switch r.compression {
case "gzip":
// We need to create a gzip reader each time ReadToEnd is called because the underlying
Expand Down
13 changes: 13 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build !unix

package reader // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"

func (r *Reader) tryLockFile() bool {
return true
}

func (r *Reader) unlockFile() {
}
30 changes: 30 additions & 0 deletions pkg/stanza/fileconsumer/internal/reader/reader_unix.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:build unix

package reader // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/fileconsumer/internal/reader"

import (
"errors"

"go.uber.org/zap"
"golang.org/x/sys/unix"
)

func (r *Reader) tryLockFile() bool {
if err := unix.Flock(int(r.file.Fd()), unix.LOCK_SH|unix.LOCK_NB); err != nil {
if !errors.Is(err, unix.EWOULDBLOCK) {
r.set.Logger.Error("Failed to lock", zap.Error(err))
}
return false
}

return true
}

func (r *Reader) unlockFile() {
if err := unix.Flock(int(r.file.Fd()), unix.LOCK_UN); err != nil {
r.set.Logger.Error("Failed to unlock", zap.Error(err))
}
}
1 change: 1 addition & 0 deletions receiver/filelogreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Tails and parses logs from files.
| `max_concurrent_files` | 1024 | The maximum number of log files from which logs will be read concurrently. If the number of files matched in the `include` pattern exceeds this number, then files will be processed in batches. |
| `max_batches` | 0 | Only applicable when files must be batched in order to respect `max_concurrent_files`. This value limits the number of batches that will be processed during a single poll interval. A value of 0 indicates no limit. |
| `delete_after_read` | `false` | If `true`, each log file will be read and then immediately deleted. Requires that the `filelog.allowFileDeletion` feature gate is enabled. Must be `false` when `start_at` is set to `end`. |
| `acquire_fs_lock` | `false` | Whether to attempt to acquire a filesystem lock before reading a file (Unix only). |
| `attributes` | {} | A map of `key: value` pairs to add to the entry's attributes. |
| `resource` | {} | A map of `key: value` pairs to add to the entry's resource. |
| `operators` | [] | An array of [operators](../../pkg/stanza/docs/operators/README.md#what-operators-are-available). See below for more details. |
Expand Down

0 comments on commit 3138a78

Please sign in to comment.