Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Create k8s asset collector #38

Merged
merged 12 commits into from
Mar 23, 2023
28 changes: 28 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM ubuntu:20.04

RUN mkdir -p /usr/share/inputrunner
COPY inputrunner /usr/share/inputrunner/inputrunner

RUN mkdir -p /usr/share/inputrunner/data /usr/share/inputrunner/logs && \
chown -R root:root /usr/share/inputrunner && \
find /usr/share/inputrunner -type d -exec chmod 0755 {} \; && \
find /usr/share/inputrunner -type f -exec chmod 0644 {} \; && \
chmod 0775 /usr/share/inputrunner/data /usr/share/inputrunner/logs


RUN chmod 0755 /usr/share/inputrunner/inputrunner
RUN for iter in {1..10}; do \
apt-get update -y && \
DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends --yes ca-certificates curl coreutils gawk libcap2-bin xz-utils && \
apt-get clean all && \
exit_code=0 && break || exit_code=$? && echo "apt-get error: retry $iter in 10s" && sleep 10; \
done; \
(exit $exit_code)


RUN groupadd --gid 1000 inputrunner
RUN useradd -M --uid 1000 --gid 1000 --groups 0 --home /usr/share/inputrunner inputrunner
USER inputrunner

WORKDIR /usr/share/inputrunner
CMD [ "/bin/bash", "-c", "./inputrunner", "run" ]
166 changes: 166 additions & 0 deletions deploy/inputrunner-kubernetes-manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
---
apiVersion: v1
kind: ConfigMap
metadata:
name: inputrunnerconfig
namespace: kube-system
labels:
k8s-app: inputrunner
data:
inputrunner.yml: |-
inputrunner.inputs:
- type: assets_k8s
period: 600s
kube_config: ""
asset_types: ["node", "pod"]

output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
ssl.verification_mode: "none"

logging.level: info
logging.to_files: false
logging.to_stderr: true
logging.selectors: ["*"]
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: inputrunner
namespace: kube-system
labels:
k8s-app: inputrunner
spec:
replicas: 1
selector:
matchLabels:
k8s-app: inputrunner
template:
metadata:
labels:
k8s-app: inputrunner
spec:
serviceAccountName: inputrunner
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: inputrunner
image: michaelkatsoulis/inputrunner:latest
env:
# The basic authentication username used to connect to Elasticsearch
# This user needs the privileges required to publish events to Elasticsearch.
- name: ELASTICSEARCH_USERNAME
value: "elastic"
# The basic authentication password used to connect to Elasticsearch
- name: ELASTICSEARCH_PASSWORD
value: "changeme"
# The Elasticsearch host to communicate with
- name: ELASTICSEARCH_HOST
value: "elasticsearch"
# The Elasticsearch port to communicate with
- name: ELASTICSEARCH_PORT
value: "9200"
volumeMounts:
- name: config
mountPath: /usr/share/inputrunner/inputrunner.yml
readOnly: true
subPath: inputrunner.yml
volumes:
- name: config
configMap:
defaultMode: 0640
name: inputrunnerconfig
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: inputrunner
namespace: kube-system
labels:
k8s-app: inputrunner
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: inputrunner
labels:
k8s-app: inputrunner
rules:
- apiGroups: [""]
resources:
- nodes
- namespaces
- events
- pods
- services
- configmaps
# Needed for cloudbeat
ChrsMark marked this conversation as resolved.
Show resolved Hide resolved
- serviceaccounts
- persistentvolumes
- persistentvolumeclaims
verbs: ["get", "list", "watch"]
# Enable this rule only if planing to use kubernetes_secrets provider
#- apiGroups: [""]
# resources:
# - secrets
# verbs: ["get"]
- apiGroups: ["extensions"]
resources:
- replicasets
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources:
- statefulsets
- deployments
- replicasets
- daemonsets
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
resources:
- nodes/stats
verbs:
- get
- apiGroups: [ "batch" ]
resources:
- jobs
- cronjobs
verbs: [ "get", "list", "watch" ]
# Needed for apiserver
- nonResourceURLs:
- "/metrics"
verbs:
- get
# Needed for cloudbeat
- apiGroups: ["rbac.authorization.k8s.io"]
resources:
- clusterrolebindings
- clusterroles
- rolebindings
- roles
verbs: ["get", "list", "watch"]
# Needed for cloudbeat
- apiGroups: ["policy"]
resources:
- podsecuritypolicies
verbs: ["get", "list", "watch"]
- apiGroups: [ "storage.k8s.io" ]
resources:
- storageclasses
verbs: [ "get", "list", "watch" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: inputrunner
subjects:
- kind: ServiceAccount
name: inputrunner
namespace: kube-system
roleRef:
kind: ClusterRole
name: inputrunner
apiGroup: rbac.authorization.k8s.io
---
20 changes: 20 additions & 0 deletions input/assets/internal/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/mapstr"
stateless "github.com/elastic/inputrunner/input/v2/input-stateless"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

type AssetOption func(beat.Event) beat.Event
Expand Down Expand Up @@ -95,3 +96,22 @@ func WithAssetMetadata(value mapstr.M) AssetOption {
return e
}
}

func WithNodeData(name, providerId string, startTime *metav1.Time) AssetOption {
return func(e beat.Event) beat.Event {
e.Fields["kubernetes.node.name"] = name
e.Fields["kubernetes.node.providerId"] = providerId
e.Fields["kubernetes.node.start_time"] = startTime
return e
}
}

func WithPodData(name, uid, namespace string, startTime *metav1.Time) AssetOption {
return func(e beat.Event) beat.Event {
e.Fields["kubernetes.pod.name"] = name
e.Fields["kubernetes.pod.uid"] = uid
e.Fields["kubernetes.pod.start_time"] = startTime
e.Fields["kubernetes.namespace"] = namespace
return e
}
}
30 changes: 28 additions & 2 deletions input/assets/internal/publish_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@
package internal

import (
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/inputrunner/input/testutil"
"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"testing"
"time"
)

var startTime = metav1.Time{Time: time.Date(2021, 8, 15, 14, 30, 45, 100, time.Local)}

func TestPublish(t *testing.T) {
for _, tt := range []struct {
name string
Expand Down Expand Up @@ -114,6 +117,29 @@ func TestPublish(t *testing.T) {
"asset.metadata": mapstr.M{"foo": "bar"},
}},
},
{
name: "with valid node data",
opts: []AssetOption{
WithNodeData("ip-172-31-29-242.us-east-2.compute.internal", "aws:///us-east-2b/i-0699b78f46f0fa248", &startTime),
},
expectedEvent: beat.Event{Fields: mapstr.M{
"kubernetes.node.name": "ip-172-31-29-242.us-east-2.compute.internal",
"kubernetes.node.providerId": "aws:///us-east-2b/i-0699b78f46f0fa248",
"kubernetes.node.start_time": &startTime,
}},
},
{
name: "with valid pod data",
opts: []AssetOption{
WithPodData("nginx", "a375d24b-fa20-4ea6-a0ee-1d38671d2c09", "default", &startTime),
},
expectedEvent: beat.Event{Fields: mapstr.M{
"kubernetes.pod.name": "nginx",
"kubernetes.pod.uid": "a375d24b-fa20-4ea6-a0ee-1d38671d2c09",
"kubernetes.pod.start_time": &startTime,
"kubernetes.namespace": "default",
}},
},
} {
t.Run(tt.name, func(t *testing.T) {
publisher := testutil.NewInMemoryPublisher()
Expand Down
49 changes: 49 additions & 0 deletions input/assets/k8s/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# K8s Assets Input

## What does it do?

The K8s Assets Input collects data about resources running on a K8s cluster.
Information about the following resources is currently collected:

- K8s Nodes


## Asset schema

### K8s Nodes

Field | Example |
------|---------|
"asset.id": "aws:///us-east-2b/i-0699b78f46f0fa248",
"asset.ean": "k8s.node:aws:///us-east-2b/i-0699b78f46f0fa248",
"asset.name": "ip-172-31-29-242.us-east-2.compute.internal",
"asset.type": "k8s.node",
"input": {
"type": "assets_k8s"
}


In order to run set the following configuration in inputrunner.yml

inputrunner.inputs:
- type: assets_k8s
period: 600s
kube_config: /Users/michaliskatsoulis/go/src/github.com/elastic/inputrunner/kube_config

output.elasticsearch:
hosts: ["localhost:9200"]
protocol: "https"
username: "elastic"
password: "changeme"
ssl.verification_mode: "none"


logging.level: info
logging.to_files: false
logging.to_stderr: true
logging.selectors: ["*"]


The kube_config path must contain a kube config file so that if the inputrunner runs as a process anywhere, can access the cluster.
In case it runs as a pod in the same k8s cluster it needs to monitor, then the kube_config is collected from withing the cluster(inClusterconfig)
and the values should be left empty.
Loading