Skip to content

Commit

Permalink
fix: join multiparts on file beginning (#3784)
Browse files Browse the repository at this point in the history
* fix: join multiparts on file beginning

Signed-off-by: Dominik Rosiek <[email protected]>

* chore: changelog

Signed-off-by: Dominik Rosiek <[email protected]>

* Apply suggestions from code review

* Apply suggestions from code review

---------

Signed-off-by: Dominik Rosiek <[email protected]>
(cherry picked from commit 58f19f8)
  • Loading branch information
sumo-drosiek authored and Dominik Rosiek committed Jun 28, 2024
1 parent 8078d95 commit 9fc019e
Show file tree
Hide file tree
Showing 9 changed files with 776 additions and 1 deletion.
1 change: 1 addition & 0 deletions .changelog/3784.fixed.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
fix: join multiparts on file beginning
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ processors:
is_first_entry: attributes.log matches {{ .Values.sumologic.logs.multiline.first_line_regex | quote }}
source_identifier: resource["cloudwatch.log.stream"]
type: recombine
max_unmatched_batch_size: 1
max_unmatched_batch_size: 0
receivers:
awscloudwatch:
region: {{ .Values.sumologic.logs.collector.otelcloudwatch.region }}
Expand Down
2 changes: 2 additions & 0 deletions deploy/helm/sumologic/conf/logs/collector/otelcol/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ receivers:
output: strip-trailing-newline
source_identifier: attributes["log.file.path"]
type: recombine
max_unmatched_batch_size: 1

## merge-cri-lines stitches back together log lines split by CRI logging drivers.
## Input Body (JSON): { "log": "2001-02-03 04:05:06 very long li", "logtag": "P" }
Expand All @@ -135,6 +136,7 @@ receivers:
overwrite_with: newest
source_identifier: attributes["log.file.path"]
type: recombine
max_unmatched_batch_size: 1

## strip-trailing-newline removes the trailing "\n" from the `log` key. This is required for logs coming from Docker container runtime.
## Input Body (JSON): { "log": "2001-02-03 04:05:06 very long line that was split by the logging driver\n", "stream": "stdout" }
Expand Down
2 changes: 2 additions & 0 deletions tests/helm/testdata/goldenfile/logs_otc/basic.output.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ data:
combine_with: ""
id: merge-docker-lines
is_last_entry: body.log matches "\n$"
max_unmatched_batch_size: 0
output: strip-trailing-newline
source_identifier: attributes["log.file.path"]
type: recombine
- combine_field: body.log
combine_with: ""
id: merge-cri-lines
is_last_entry: body.logtag == "F"
max_unmatched_batch_size: 1
output: extract-metadata-from-filepath
overwrite_with: newest
source_identifier: attributes["log.file.path"]
Expand Down
250 changes: 250 additions & 0 deletions tests/helm/testdata/goldenfile/logs_otc/debug.output.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,250 @@
---
# Source: sumologic/templates/logs/collector/otelcol/configmap.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: RELEASE-NAME-sumologic-otelcol-logs-collector
namespace: sumologic
labels:
app: RELEASE-NAME-sumologic-otelcol-logs-collector
chart: "sumologic-%CURRENT_CHART_VERSION%"
release: "RELEASE-NAME"
heritage: "Helm"
data:
config.yaml: |
exporters:
debug:
verbosity: detailed
otlphttp:
disable_keep_alives: true
endpoint: http://${LOGS_METADATA_SVC}.${NAMESPACE}.svc.cluster.local.:4318
sending_queue:
queue_size: 10
extensions:
file_storage:
compaction:
directory: /var/lib/storage/otc
on_rebound: true
directory: /var/lib/storage/otc
timeout: 10s
health_check: {}
pprof: {}
processors:
batch:
send_batch_max_size: 2000
send_batch_size: 1000
timeout: 1s
logstransform/systemd:
operators:
- from: body._SYSTEMD_UNIT
to: attributes._SYSTEMD_UNIT
type: copy
- from: body.SYSLOG_FACILITY
to: attributes.SYSLOG_FACILITY
type: copy
- from: body._HOSTNAME
to: attributes._HOSTNAME
type: copy
- from: body.PRIORITY
to: attributes.PRIORITY
type: copy
- field: attributes["fluent.tag"]
type: add
value: EXPR("host." + attributes["_SYSTEMD_UNIT"])
- field: body.__CURSOR
type: remove
- field: body.__MONOTONIC_TIMESTAMP
type: remove
receivers:
filelog/containers:
exclude:
- /var/log/pods/sumologic_RELEASE-NAME-sumologic-mock*/*/*.log
- /var/log/pods/sumologic_RELEASE-NAME-sumologic-otelcol-logs*/*/*.log
- /var/log/pods/sumologic_RELEASE-NAME-sumologic-otelcol-logs-collector*/*/*.log
- /var/log/pods/sumologic_RELEASE-NAME-sumologic-otelcol-metrics*/*/*.log
- /var/log/pods/sumologic_RELEASE-NAME-sumologic-metrics*/*/*.log
- /var/log/pods/sumologic_RELEASE-NAME-sumologic-otelcol-instrumentation*/*/*.log
- /var/log/pods/sumologic_RELEASE-NAME-sumologic-traces-gateway*/*/*.log
- /var/log/pods/sumologic_RELEASE-NAME-sumologic-traces-sampler*/*/*.log
- /var/log/pods/sumologic_RELEASE-NAME-sumologic-otelcol-events*/*/*.log
include:
- /var/log/pods/*/*/*.log
include_file_name: false
include_file_path: true
operators:
- id: get-format
routes:
- expr: body matches "^\\{"
output: parser-docker
- expr: body matches "^[^ Z]+ "
output: parser-crio
- expr: body matches "^[^ Z]+Z"
output: parser-containerd
type: router
- id: parser-crio
output: merge-cri-lines
parse_to: body
regex: ^(?P<time>[^ Z]+) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*)( |)(?P<log>.*)$
timestamp:
layout: "2006-01-02T15:04:05.000000000-07:00"
layout_type: gotime
parse_from: body.time
type: regex_parser
- id: parser-containerd
output: merge-cri-lines
parse_to: body
regex: ^(?P<time>[^ ^Z]+Z) (?P<stream>stdout|stderr) (?P<logtag>[^ ]*)( |)(?P<log>.*)$
timestamp:
layout: '%Y-%m-%dT%H:%M:%S.%LZ'
parse_from: body.time
type: regex_parser
- id: parser-docker
output: merge-docker-lines
parse_to: body
timestamp:
layout: '%Y-%m-%dT%H:%M:%S.%LZ'
parse_from: body.time
type: json_parser
- combine_field: body.log
combine_with: ""
id: merge-docker-lines
is_last_entry: body.log matches "\n$"
max_unmatched_batch_size: 0
output: strip-trailing-newline
source_identifier: attributes["log.file.path"]
type: recombine
- combine_field: body.log
combine_with: ""
id: merge-cri-lines
is_last_entry: body.logtag == "F"
max_unmatched_batch_size: 0
output: extract-metadata-from-filepath
overwrite_with: newest
source_identifier: attributes["log.file.path"]
type: recombine
- id: strip-trailing-newline
output: extract-metadata-from-filepath
parse_from: body.log
parse_to: body
regex: |-
^(?P<log>.*)
$
type: regex_parser
- id: extract-metadata-from-filepath
parse_from: attributes["log.file.path"]
regex: ^.*\/(?P<namespace>[^_]+)_(?P<pod_name>[^_]+)_(?P<uid>[a-f0-9\-]+)\/(?P<container_name>[^\._]+)\/(?P<run_id>\d+)\.log$
type: regex_parser
- from: body.stream
id: move-attributes
to: attributes["stream"]
type: move
- from: attributes.container_name
to: attributes["k8s.container.name"]
type: move
- from: attributes.namespace
to: attributes["k8s.namespace.name"]
type: move
- from: attributes.pod_name
to: attributes["k8s.pod.name"]
type: move
- from: body.log
to: body
type: move
- field: attributes.run_id
type: remove
- field: attributes.uid
type: remove
- default: merge-multiline-logs
id: multiline
routes: null
type: router
- combine_field: body
combine_with: |2+
id: merge-multiline-logs
is_first_entry: body matches "^\\[?\\d{4}-\\d{1,2}-\\d{1,2}.\\d{2}:\\d{2}:\\d{2}"
max_unmatched_batch_size: 1
output: clean-up-log-file-path
source_identifier: attributes["log.file.path"]
type: recombine
- field: attributes["log.file.path"]
id: clean-up-log-file-path
type: remove
storage: file_storage
journald:
directory: /var/log/journal
units:
- addon-config.service
- addon-run.service
- cfn-etcd-environment.service
- cfn-signal.service
- clean-ca-certificates.service
- containerd.service
- coreos-metadata.service
- coreos-setup-environment.service
- coreos-tmpfiles.service
- dbus.service
- docker.service
- efs.service
- etcd-member.service
- etcd.service
- etcd2.service
- etcd3.service
- etcdadm-check.service
- etcdadm-reconfigure.service
- etcdadm-save.service
- etcdadm-update-status.service
- flanneld.service
- format-etcd2-volume.service
- kube-node-taint-and-uncordon.service
- kubelet.service
- ldconfig.service
- locksmithd.service
- logrotate.service
- lvm2-monitor.service
- mdmon.service
- nfs-idmapd.service
- nfs-mountd.service
- nfs-server.service
- nfs-utils.service
- node-problem-detector.service
- ntp.service
- oem-cloudinit.service
- rkt-gc.service
- rkt-metadata.service
- rpc-idmapd.service
- rpc-mountd.service
- rpc-statd.service
- rpcbind.service
- set-aws-environment.service
- system-cloudinit.service
- systemd-timesyncd.service
- update-ca-certificates.service
- user-cloudinit.service
- var-lib-etcd2.service
service:
extensions:
- health_check
- file_storage
- pprof
pipelines:
logs/containers:
exporters:
- otlphttp
- debug
processors:
- batch
receivers:
- filelog/containers
logs/systemd:
exporters:
- otlphttp
- debug
processors:
- logstransform/systemd
- batch
receivers:
- journald
telemetry:
logs:
level: info
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ data:
combine_with: ""
id: merge-docker-lines
is_last_entry: body.log matches "\n$"
max_unmatched_batch_size: 0
output: strip-trailing-newline
source_identifier: attributes["log.file.path"]
type: recombine
- combine_field: body.log
combine_with: ""
id: merge-cri-lines
is_last_entry: body.logtag == "F"
max_unmatched_batch_size: 0
output: extract-metadata-from-filepath
overwrite_with: newest
source_identifier: attributes["log.file.path"]
Expand Down
Loading

0 comments on commit 9fc019e

Please sign in to comment.