Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

Commit

Permalink
Add Dockerfile and inputrunner manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelKatsoulis committed Mar 8, 2023
1 parent 8453c0d commit 173686a
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 6 deletions.
28 changes: 28 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM ubuntu:20.04

RUN mkdir -p /usr/share/inputrunner
COPY inputrunner /usr/share/inputrunner/inputrunner

RUN mkdir -p /usr/share/inputrunner/data /usr/share/inputrunner/logs && \
chown -R root:root /usr/share/inputrunner && \
find /usr/share/inputrunner -type d -exec chmod 0755 {} \; && \
find /usr/share/inputrunner -type f -exec chmod 0644 {} \; && \
chmod 0775 /usr/share/inputrunner/data /usr/share/inputrunner/logs


RUN chmod 0755 /usr/share/inputrunner/inputrunner
RUN for iter in {1..10}; do \
apt-get update -y && \
DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends --yes ca-certificates curl coreutils gawk libcap2-bin xz-utils && \
apt-get clean all && \
exit_code=0 && break || exit_code=$? && echo "apt-get error: retry $iter in 10s" && sleep 10; \
done; \
(exit $exit_code)


RUN groupadd --gid 1000 inputrunner
RUN useradd -M --uid 1000 --gid 1000 --groups 0 --home /usr/share/inputrunner inputrunner
USER inputrunner

WORKDIR /usr/share/inputrunner
CMD [ "/bin/bash", "-c", "./inputrunner", "run" ]
168 changes: 168 additions & 0 deletions deploy/inputrunner-kubernetes-manifest.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
---
apiVersion: v1
kind: ConfigMap
metadata:
name: inputrunnerconfig
namespace: kube-system
labels:
k8s-app: inputrunner
data:
inputrunner.yml: |-
inputrunner.inputs:
- type: assets_k8s
period: 600s
kube_config: ""
asset_types: ["node", "pod"]
output.elasticsearch:
hosts: ['${ELASTICSEARCH_HOST:elasticsearch}:${ELASTICSEARCH_PORT:9200}']
username: ${ELASTICSEARCH_USERNAME}
password: ${ELASTICSEARCH_PASSWORD}
ssl.verification_mode: "none"
logging.level: info
logging.to_files: false
logging.to_stderr: true
logging.selectors: ["*"]
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: inputrunner
namespace: kube-system
labels:
k8s-app: inputrunner
spec:
replicas: 1
selector:
matchLabels:
k8s-app: inputrunner
template:
metadata:
labels:
k8s-app: inputrunner
spec:
serviceAccountName: inputrunner
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: inputrunner
image: inputrunner:latest
imagePullPolicy: Never
# command: ['sleep 3600']
env:
# The basic authentication username used to connect to Elasticsearch
# This user needs the privileges required to publish events to Elasticsearch.
- name: ELASTICSEARCH_USERNAME
value: "elastic"
# The basic authentication password used to connect to Elasticsearch
- name: ELASTICSEARCH_PASSWORD
value: "changeme"
# The Elasticsearch host to communicate with
- name: ELASTICSEARCH_HOST
value: "https://elasticsearch"
# The Elasticsearch port to communicate with
- name: ELASTICSEARCH_PORT
value: "9200"
volumeMounts:
- name: config
mountPath: /usr/share/inputrunner/inputrunner.yml
readOnly: true
subPath: inputrunner.yml
volumes:
- name: config
configMap:
defaultMode: 0640
name: inputrunnerconfig
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: inputrunner
namespace: kube-system
labels:
k8s-app: inputrunner
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: inputrunner
labels:
k8s-app: inputrunner
rules:
- apiGroups: [""]
resources:
- nodes
- namespaces
- events
- pods
- services
- configmaps
# Needed for cloudbeat
- serviceaccounts
- persistentvolumes
- persistentvolumeclaims
verbs: ["get", "list", "watch"]
# Enable this rule only if planing to use kubernetes_secrets provider
#- apiGroups: [""]
# resources:
# - secrets
# verbs: ["get"]
- apiGroups: ["extensions"]
resources:
- replicasets
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources:
- statefulsets
- deployments
- replicasets
- daemonsets
verbs: ["get", "list", "watch"]
- apiGroups:
- ""
resources:
- nodes/stats
verbs:
- get
- apiGroups: [ "batch" ]
resources:
- jobs
- cronjobs
verbs: [ "get", "list", "watch" ]
# Needed for apiserver
- nonResourceURLs:
- "/metrics"
verbs:
- get
# Needed for cloudbeat
- apiGroups: ["rbac.authorization.k8s.io"]
resources:
- clusterrolebindings
- clusterroles
- rolebindings
- roles
verbs: ["get", "list", "watch"]
# Needed for cloudbeat
- apiGroups: ["policy"]
resources:
- podsecuritypolicies
verbs: ["get", "list", "watch"]
- apiGroups: [ "storage.k8s.io" ]
resources:
- storageclasses
verbs: [ "get", "list", "watch" ]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: inputrunner
subjects:
- kind: ServiceAccount
name: inputrunner
namespace: kube-system
roleRef:
kind: ClusterRole
name: inputrunner
apiGroup: rbac.authorization.k8s.io
---
20 changes: 14 additions & 6 deletions input/assets_k8s/assets_k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,9 @@ func (s *assetsK8s) Run(inputCtx input.Context, publisher stateless.Publisher) e
// getKubernetesClient returns a kubernetes client. If inCluster is true, it returns an
// in cluster configuration based on the secrets mounted in the Pod. If kubeConfig is passed,
// it parses the config file to get the config required to build a client.
func getKubernetesClient(kubeconfigPath string) (kubernetes.Interface, error) {

cfg, err := BuildConfig(kubeconfigPath)
func getKubernetesClient(kubeconfigPath string, log *logp.Logger) (kubernetes.Interface, error) {
log.Infof("Provided kube config path is %s", kubeconfigPath)
cfg, err := BuildConfig(kubeconfigPath, log)
if err != nil {
return nil, fmt.Errorf("unable to build kubernetes config: %w", err)
}
Expand All @@ -135,15 +135,17 @@ func getKubernetesClient(kubeconfigPath string) (kubernetes.Interface, error) {

func collectK8sAssets(ctx context.Context, kubeconfigPath string, log *logp.Logger, cfg config, publisher stateless.Publisher) {

client, err := getKubernetesClient(kubeconfigPath)
client, err := getKubernetesClient(kubeconfigPath, log)
if err != nil {
log.Errorf("unable to build kubernetes clientset: %w", err)
}

log.Infof("Enabled asset types are %+v", cfg.AssetTypes)
if IsTypeEnabled(cfg.AssetTypes, "node") {
log.Info("Node type enabled. Starting collecting")
go collectK8sNodes(ctx, log, client, publisher)
}
if IsTypeEnabled(cfg.AssetTypes, "pod") {
log.Info("Pod type enabled. Starting collecting")
go collectK8sPods(ctx, log, client, publisher)
}
}
Expand All @@ -154,6 +156,7 @@ func collectK8sNodes(ctx context.Context, log *logp.Logger, client kubernetes.In
// collect the nodes using the client
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Errorf("Cannot list k8s nodes: %+v", err)
return err
}

Expand All @@ -170,6 +173,7 @@ func collectK8sNodes(ctx context.Context, log *logp.Logger, client kubernetes.In
"kubernetes.node.providerId": assetProviderId,
"kubernetes.node.start_time": assetStartTime,
}
log.Info("Publishing nodes assets\n")
publishK8sAsset(node.Name, "k8s.node", assetId, assetParents, assetChildren, publisher, assetSpecificMap)
}
return nil
Expand All @@ -178,6 +182,7 @@ func collectK8sNodes(ctx context.Context, log *logp.Logger, client kubernetes.In
func collectK8sPods(ctx context.Context, log *logp.Logger, client kubernetes.Interface, publisher stateless.Publisher) error {
pods, err := client.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{})
if err != nil {
log.Errorf("Cannot list k8s pods: %+v", err)
return err
}

Expand All @@ -199,6 +204,7 @@ func collectK8sPods(ctx context.Context, log *logp.Logger, client kubernetes.Int
"kubernetes.pod.start_time": assetStartTime,
"kubernetes.namespace": namespace,
}
log.Info("Publishing pod assets\n")
publishK8sAsset(assetName, "k8s.pod", assetId, assetParents, assetChildren, publisher, assetSpecificMap)
}

Expand Down Expand Up @@ -236,12 +242,14 @@ func publishK8sAsset(assetName, assetType, assetId string, assetParents, assetCh
// If inClusterConfig fails, we fallback to the default config.
// This is a copy of `clientcmd.BuildConfigFromFlags` of `client-go` but without the annoying
// klog messages that are not possible to be disabled.
func BuildConfig(kubeconfigPath string) (*restclient.Config, error) {
func BuildConfig(kubeconfigPath string, log *logp.Logger) (*restclient.Config, error) {
if kubeconfigPath == "" {
kubeconfig, err := restclient.InClusterConfig()
if err == nil {
log.Info("Using incluster config")
return kubeconfig, nil
}
log.Infof("There was an error getting incluster config: %+v", err)
}
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
&clientcmd.ClientConfigLoadingRules{ExplicitPath: kubeconfigPath},
Expand Down

0 comments on commit 173686a

Please sign in to comment.