Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(agent): Add basic support for EKS Pods on Fargate #472

Merged
merged 4 commits into from
Feb 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions instana/agent/aws_eks_fargate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# (c) Copyright IBM Corp. 2023

"""
The Instana agent (for AWS EKS Fargate) that manages
monitoring state and reporting that data.
"""
import os
import time
from instana.options import EKSFargateOptions
from instana.collector.aws_eks_fargate import EKSFargateCollector
from instana.collector.helpers.eks.process import get_pod_name
from instana.log import logger
from instana.util import to_json
from instana.agent.base import BaseAgent
from instana.version import VERSION


class EKSFargateAgent(BaseAgent):
""" In-process agent for AWS Fargate """
def __init__(self):
super(EKSFargateAgent, self).__init__()

self.options = EKSFargateOptions()
self.collector = None
self.report_headers = None
self._can_send = False
self.podname = get_pod_name()

# Update log level (if INSTANA_LOG_LEVEL was set)
self.update_log_level()

logger.info("Stan is on the EKS Pod on AWS Fargate scene. Starting Instana instrumentation version: %s", VERSION)

if self._validate_options():
self._can_send = True
self.collector = EKSFargateCollector(self)
self.collector.start()
else:
logger.warning("Required INSTANA_AGENT_KEY and/or INSTANA_ENDPOINT_URL environment variables not set. "
"We will not be able to monitor this Pod.")

def can_send(self):
"""
Are we in a state where we can send data?
@return: Boolean
"""
return self._can_send

def get_from_structure(self):
"""
Retrieves the From data that is reported alongside monitoring data.
@return: dict()
"""

return {'hl': True, 'cp': 'k8s', 'e': self.podname}

def report_data_payload(self, payload):
"""
Used to report metrics and span data to the endpoint URL in self.options.endpoint_url
"""
response = None
try:
if self.report_headers is None:
# Prepare request headers
self.report_headers = dict()
self.report_headers["Content-Type"] = "application/json"
self.report_headers["X-Instana-Host"] = self.podname
self.report_headers["X-Instana-Key"] = self.options.agent_key

response = self.client.post(self.__data_bundle_url(),
data=to_json(payload),
headers=self.report_headers,
timeout=self.options.timeout,
verify=self.options.ssl_verify,
proxies=self.options.endpoint_proxy)

if not 200 <= response.status_code < 300:
logger.info("report_data_payload: Instana responded with status code %s", response.status_code)
except Exception as exc:
logger.debug("report_data_payload: connection error (%s)", type(exc))
return response

def _validate_options(self):
"""
Validate that the options used by this Agent are valid. e.g. can we report data?
"""
return self.options.endpoint_url is not None and self.options.agent_key is not None

def __data_bundle_url(self):
"""
URL for posting metrics to the host agent. Only valid when announced.
"""
return "%s/bundle" % self.options.endpoint_url
53 changes: 53 additions & 0 deletions instana/collector/aws_eks_fargate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# (c) Copyright IBM Corp. 2023

"""
Collector for EKS Pods on AWS Fargate: Manages the periodic collection of metrics & snapshot data
"""

from time import time
from instana.log import logger
from instana.collector.base import BaseCollector
from instana.collector.helpers.eks.process import EKSFargateProcessHelper
from instana.collector.helpers.runtime import RuntimeHelper
from instana.util import DictionaryOfStan


class EKSFargateCollector(BaseCollector):
""" Collector for EKS Pods on AWS Fargate """

def __init__(self, agent):
super(EKSFargateCollector, self).__init__(agent)
logger.debug("Loading Collector for EKS Pods on AWS Fargate ")

self.snapshot_data = DictionaryOfStan()
self.snapshot_data_sent = False
self.podname = agent.podname
self.helpers.append(EKSFargateProcessHelper(self))
self.helpers.append(RuntimeHelper(self))

def should_send_snapshot_data(self):
return int(time()) - self.snapshot_data_last_sent > self.snapshot_data_interval

def prepare_payload(self):
payload = DictionaryOfStan()
payload["spans"] = []
payload["metrics"]["plugins"] = []

try:
if not self.span_queue.empty():
payload["spans"] = self.queued_spans()

with_snapshot = self.should_send_snapshot_data()

plugins = []
for helper in self.helpers:
plugins.extend(helper.collect_metrics(with_snapshot=with_snapshot))

payload["metrics"]["plugins"] = plugins

if with_snapshot:
self.snapshot_data_last_sent = int(time())
except Exception:
logger.debug("prepare_payload error", exc_info=True)

return payload
Empty file.
30 changes: 30 additions & 0 deletions instana/collector/helpers/eks/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# (c) Copyright IBM Corp. 2024

""" Module to handle the collection of containerized process metrics for EKS Pods on AWS Fargate """
import os
from instana.collector.helpers.process import ProcessHelper
from instana.log import logger


def get_pod_name():
podname = os.environ.get('HOSTNAME', '')

if not podname:
logger.warning("Failed to determine podname from EKS hostname.")
return podname


class EKSFargateProcessHelper(ProcessHelper):
""" Helper class to extend the generic process helper class with the corresponding fargate attributes """

def collect_metrics(self, **kwargs):
plugin_data = dict()
try:
plugin_data = super(EKSFargateProcessHelper, self).collect_metrics(**kwargs)
plugin_data["data"]["containerType"] = "docker"

if kwargs.get("with_snapshot"):
plugin_data["data"]["com.instana.plugin.host.name"] = get_pod_name()
except Exception:
logger.debug("EKSFargateProcessHelper.collect_metrics: ", exc_info=True)
return [plugin_data]
4 changes: 4 additions & 0 deletions instana/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ def __init__(self, **kwds):

self.zone = os.environ.get("INSTANA_ZONE", None)

class EKSFargateOptions(AWSFargateOptions):
""" Options class for EKS Pods on AWS Fargate. Holds settings specific to EKS Pods on AWS Fargate. """
def __init__(self, **kwds):
super(EKSFargateOptions, self).__init__()

class GCROptions(ServerlessOptions):
""" Options class for Google Cloud Run. Holds settings specific to Google Cloud Run. """
Expand Down
7 changes: 7 additions & 0 deletions instana/singletons.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
aws_env = os.environ.get("AWS_EXECUTION_ENV", "")
env_is_test = "INSTANA_TEST" in os.environ
env_is_aws_fargate = aws_env == "AWS_ECS_FARGATE"
env_is_aws_eks_fargate = (os.environ.get("INSTANA_TRACER_ENVIRONMENT") == "AWS_EKS_FARGATE")
env_is_aws_lambda = "AWS_Lambda_" in aws_env
k_service = os.environ.get("K_SERVICE")
k_configuration = os.environ.get("K_CONFIGURATION")
Expand Down Expand Up @@ -53,6 +54,12 @@

agent = GCRAgent(service=k_service, configuration=k_configuration, revision=k_revision)
span_recorder = StanRecorder(agent)
elif env_is_aws_eks_fargate:
from .agent.aws_eks_fargate import EKSFargateAgent
from .recorder import StanRecorder

agent = EKSFargateAgent()
span_recorder = StanRecorder(agent)
else:
from .agent.host import HostAgent
from .recorder import StanRecorder
Expand Down
120 changes: 120 additions & 0 deletions tests/platforms/test_eksfargate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
# (c) Copyright IBM Corp. 2024

import os
import logging
import unittest

from instana.tracer import InstanaTracer
from instana.options import EKSFargateOptions
from instana.recorder import StanRecorder
from instana.agent.aws_eks_fargate import EKSFargateAgent
from instana.singletons import get_agent, set_agent, get_tracer, set_tracer


class TestFargate(unittest.TestCase):
def __init__(self, methodName='runTest'):
super(TestFargate, self).__init__(methodName)
self.agent = None
self.span_recorder = None
self.tracer = None

self.original_agent = get_agent()
self.original_tracer = get_tracer()

def setUp(self):
os.environ["INSTANA_TRACER_ENVIRONMENT"] = "AWS_EKS_FARGATE"
os.environ["INSTANA_ENDPOINT_URL"] = "https://localhost/notreal"
os.environ["INSTANA_AGENT_KEY"] = "Fake_Key"

def tearDown(self):
""" Reset all environment variables of consequence """
variable_names = (
"INSTANA_TRACER_ENVIRONMENT",
"AWS_EXECUTION_ENV", "INSTANA_EXTRA_HTTP_HEADERS",
"INSTANA_ENDPOINT_URL", "INSTANA_ENDPOINT_PROXY",
"INSTANA_AGENT_KEY", "INSTANA_LOG_LEVEL",
"INSTANA_SECRETS", "INSTANA_DEBUG", "INSTANA_TAGS"
)

for variable_name in variable_names:
if variable_name in os.environ:
os.environ.pop(variable_name)

set_agent(self.original_agent)
set_tracer(self.original_tracer)

def create_agent_and_setup_tracer(self):
self.agent = EKSFargateAgent()
self.span_recorder = StanRecorder(self.agent)
self.tracer = InstanaTracer(recorder=self.span_recorder)
set_agent(self.agent)
set_tracer(self.tracer)

def test_has_options(self):
self.create_agent_and_setup_tracer()
self.assertTrue(hasattr(self.agent, 'options'))
self.assertTrue(isinstance(self.agent.options, EKSFargateOptions))

def test_missing_variables(self):
with self.assertLogs("instana", level=logging.WARN) as context:
os.environ.pop("INSTANA_ENDPOINT_URL")
agent = EKSFargateAgent()
self.assertFalse(agent.can_send())
self.assertIsNone(agent.collector)
self.assertIn('environment variables not set', context.output[0])

os.environ["INSTANA_ENDPOINT_URL"] = "https://localhost/notreal"
with self.assertLogs("instana", level=logging.WARN) as context:
os.environ.pop("INSTANA_AGENT_KEY")
agent = EKSFargateAgent()
self.assertFalse(agent.can_send())
self.assertIsNone(agent.collector)
self.assertIn('environment variables not set', context.output[0])

def test_default_secrets(self):
self.create_agent_and_setup_tracer()
self.assertIsNone(self.agent.options.secrets)
self.assertTrue(hasattr(self.agent.options, 'secrets_matcher'))
self.assertEqual(self.agent.options.secrets_matcher, 'contains-ignore-case')
self.assertTrue(hasattr(self.agent.options, 'secrets_list'))
self.assertListEqual(self.agent.options.secrets_list, ['key', 'pass', 'secret'])

def test_custom_secrets(self):
os.environ["INSTANA_SECRETS"] = "equals:love,war,games"
self.create_agent_and_setup_tracer()

self.assertTrue(hasattr(self.agent.options, 'secrets_matcher'))
self.assertEqual(self.agent.options.secrets_matcher, 'equals')
self.assertTrue(hasattr(self.agent.options, 'secrets_list'))
self.assertListEqual(self.agent.options.secrets_list, ['love', 'war', 'games'])

def test_default_tags(self):
self.create_agent_and_setup_tracer()
self.assertTrue(hasattr(self.agent.options, 'tags'))
self.assertIsNone(self.agent.options.tags)

def test_has_extra_http_headers(self):
self.create_agent_and_setup_tracer()
self.assertTrue(hasattr(self.agent, 'options'))
self.assertTrue(hasattr(self.agent.options, 'extra_http_headers'))

def test_agent_extra_http_headers(self):
os.environ['INSTANA_EXTRA_HTTP_HEADERS'] = "X-Test-Header;X-Another-Header;X-And-Another-Header"
self.create_agent_and_setup_tracer()
self.assertIsNotNone(self.agent.options.extra_http_headers)
should_headers = ['x-test-header', 'x-another-header', 'x-and-another-header']
self.assertListEqual(should_headers, self.agent.options.extra_http_headers)

def test_agent_default_log_level(self):
self.create_agent_and_setup_tracer()
self.assertEqual(self.agent.options.log_level, logging.WARNING)

def test_agent_custom_log_level(self):
os.environ['INSTANA_LOG_LEVEL'] = "eRror"
self.create_agent_and_setup_tracer()
self.assertEqual(self.agent.options.log_level, logging.ERROR)

def test_custom_proxy(self):
os.environ["INSTANA_ENDPOINT_PROXY"] = "http://myproxy.123"
self.create_agent_and_setup_tracer()
self.assertDictEqual(self.agent.options.endpoint_proxy, {'https': "http://myproxy.123"})
Loading
Loading