Skip to content

Commit

Permalink
[AMLII-2170] fix removed/renamed function flush (#868)
Browse files Browse the repository at this point in the history
* fix missing function that was renamed

* fix lint

* change variable name back to original
  • Loading branch information
andrewqian2001datadog authored Nov 15, 2024
1 parent ab20e29 commit 362e187
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 33 deletions.
9 changes: 6 additions & 3 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
DEFAULT_PORT = 8125

# Buffering-related values (in seconds)
DEFAULT_FLUSH_INTERVAL = 0.3
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
MIN_FLUSH_INTERVAL = 0.0001

# Env var to enable/disable sending the container ID field
Expand Down Expand Up @@ -145,7 +145,7 @@ def __init__(
host=DEFAULT_HOST, # type: Text
port=DEFAULT_PORT, # type: int
max_buffer_size=None, # type: None
flush_interval=DEFAULT_FLUSH_INTERVAL, # type: float
flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, # type: float
disable_aggregation=True, # type: bool
disable_buffering=True, # type: bool
namespace=None, # type: Optional[Text]
Expand Down Expand Up @@ -643,7 +643,7 @@ def disable_aggregation(self):
self._stop_flush_thread()
log.debug("Statsd aggregation is disabled")

def enable_aggregation(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
def enable_aggregation(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
with self._config_lock:
if not self._disable_aggregation:
return
Expand Down Expand Up @@ -805,6 +805,9 @@ def _reset_buffer(self):
self._current_buffer_total_size = 0
self._buffer = []

def flush(self):
self.flush_buffered_metrics()

def flush_buffered_metrics(self):
"""
Flush the metrics buffer by sending the data to the server.
Expand Down
60 changes: 30 additions & 30 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
# Datadog libraries
from datadog import initialize, statsd
from datadog import __version__ as version
from datadog.dogstatsd.base import DEFAULT_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH
from datadog.dogstatsd.base import DEFAULT_BUFFERING_FLUSH_INTERVAL, DogStatsd, MIN_SEND_BUFFER_SIZE, UDP_OPTIMAL_PAYLOAD_LENGTH, UDS_OPTIMAL_PAYLOAD_LENGTH
from datadog.dogstatsd.context import TimedContextManagerDecorator
from datadog.util.compat import is_higher_py35, is_p3k
from tests.util.contextmanagers import preserve_environment_variable, EnvVars
Expand All @@ -41,7 +41,7 @@ class FakeSocket(object):

FLUSH_GRACE_PERIOD = 0.2

def __init__(self, flush_interval=DEFAULT_FLUSH_INTERVAL):
def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
self.payloads = deque()

self._flush_interval = flush_interval
Expand Down Expand Up @@ -331,42 +331,42 @@ def test_gauge_with_invalid_ts_should_be_ignored(self):

def test_counter(self):
self.statsd.increment('page.views')
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd.increment('page.views', 11)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:11|c\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement('page.views')
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:-1|c\n', self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement('page.views', 12)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:-12|c\n', self.recv(2))

def test_count(self):
self.statsd.count('page.views', 11)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry('page.views:11|c\n', self.recv(2))

def test_count_with_ts(self):
self.statsd.count_with_timestamp("page.views", 1, timestamp=1066)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c|T1066\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.count_with_timestamp("page.views", 11, timestamp=2121)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:11|c|T2121\n", self.recv(2))

def test_count_with_invalid_ts_should_be_ignored(self):
self.statsd.count_with_timestamp("page.views", 1, timestamp=-1066)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c\n", self.recv(2))

def test_histogram(self):
Expand Down Expand Up @@ -399,7 +399,7 @@ def test_sample_rate(self):
for _ in range(10000):
self.statsd.increment('sampled_counter', sample_rate=0.3)

self.statsd.flush_buffered_metrics()
self.statsd.flush()

total_metrics = 0
payload = self.recv()
Expand Down Expand Up @@ -667,7 +667,7 @@ def test_socket_error(self):
self.statsd.socket = BrokenSocket()
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
mock_log.warning.assert_called_once_with(
Expand All @@ -679,7 +679,7 @@ def test_socket_overflown(self):
self.statsd.socket = OverflownSocket()
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
calls = [call("Socket send would block: %s, dropping the packet", mock.ANY)]
Expand All @@ -689,7 +689,7 @@ def test_socket_message_too_long(self):
self.statsd.socket = BrokenSocket(error_number=errno.EMSGSIZE)
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
calls = [
Expand All @@ -705,7 +705,7 @@ def test_socket_no_buffer_space(self):
self.statsd.socket = BrokenSocket(error_number=errno.ENOBUFS)
with mock.patch("datadog.dogstatsd.base.log") as mock_log:
self.statsd.gauge('no error', 1)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

mock_log.error.assert_not_called()
calls = [call("Socket buffer full: %s, dropping the packet", mock.ANY)]
Expand All @@ -720,7 +720,7 @@ def test_uds_socket_ensures_min_receive_buffer(self, mock_socket_create):

datadog = DogStatsd(socket_path="/fake/uds/socket/path")
datadog.gauge('some value', 1)
datadog.flush_buffered_metrics()
datadog.flush()

# Sanity check
mock_socket_create.assert_called_once_with(socket.AF_UNIX, socket.SOCK_DGRAM)
Expand All @@ -740,7 +740,7 @@ def test_udp_socket_ensures_min_receive_buffer(self, mock_socket_create):

datadog = DogStatsd()
datadog.gauge('some value', 1)
datadog.flush_buffered_metrics()
datadog.flush()

# Sanity check
mock_socket_create.assert_called_once_with(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
Expand Down Expand Up @@ -837,7 +837,7 @@ def func(arg1, arg2, kwarg1=1, kwarg2=1):
return (arg1, arg2, kwarg1, kwarg2)

func(1, 2, kwarg2=3)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

# Ignore telemetry packet
packet = self.recv(2).split("\n")[0]
Expand Down Expand Up @@ -881,7 +881,7 @@ def func(arg1, arg2, kwarg1=1, kwarg2=1):
return (arg1, arg2, kwarg1, kwarg2)

func(1, 2, kwarg2=3)
self.statsd.flush_buffered_metrics()
self.statsd.flush()

packet = self.recv()
name_value, type_ = packet.rstrip('\n').split('|')
Expand Down Expand Up @@ -1068,7 +1068,7 @@ def test_flush(self):

dogstatsd.increment('page.views')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2))

def test_flush_interval(self):
Expand Down Expand Up @@ -1096,7 +1096,7 @@ def test_aggregation_buffering_simultaneously(self):
dogstatsd.increment('test.aggregation_and_buffering')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush_aggregated_metrics()
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
self.assert_equal_telemetry('test.aggregation_and_buffering:10|c\n', fake_socket.recv(2))

def test_aggregation_buffering_simultaneously_with_interval(self):
Expand Down Expand Up @@ -1139,7 +1139,7 @@ def test_flush_disable(self):
dogstatsd.increment('page.views')
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(DEFAULT_FLUSH_INTERVAL)
time.sleep(DEFAULT_BUFFERING_FLUSH_INTERVAL)
self.assertIsNone(fake_socket.recv(no_wait=True))

time.sleep(0.3)
Expand Down Expand Up @@ -1697,7 +1697,7 @@ def test_entity_id_and_container_id(self):
dogstatsd._container_id = "ci-fake-container-id"

dogstatsd.increment("page.views")
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
tags = "dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d"
metric = 'page.views:1|c|#' + tags + '|c:ci-fake-container-id\n'
self.assertEqual(metric, dogstatsd.socket.recv())
Expand All @@ -1712,7 +1712,7 @@ def test_entity_id_and_container_id_and_external_env(self):
dogstatsd._container_id = "ci-fake-container-id"

dogstatsd.increment("page.views")
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()
tags = "dd.internal.entity_id:04652bb7-19b7-11e9-9cc6-42010a9c016d"
metric = 'page.views:1|c|#' + tags + '|c:ci-fake-container-id' + '|e:it-false,cn-container-name,pu-04652bb7-19b7-11e9-9cc6-42010a9c016d' + '\n'
self.assertEqual(metric, dogstatsd.socket.recv())
Expand Down Expand Up @@ -1795,7 +1795,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self):
# Make call with no tags passed; only the globally configured tags will be used.
global_tags_str = ','.join([t for t in global_tags])
dogstatsd.gauge('gt', 123.4)
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()

# Protect against the no tags case.
metric = 'gt:123.4|g|#{}\n'.format(global_tags_str) if global_tags_str else 'gt:123.4|g\n'
Expand All @@ -1813,7 +1813,7 @@ def test_dogstatsd_initialization_with_dd_env_service_version(self):
passed_tags = ['env:prod', 'version:def456', 'custom_tag:toad']
all_tags_str = ','.join([t for t in passed_tags + global_tags])
dogstatsd.gauge('gt', 123.4, tags=passed_tags)
dogstatsd.flush_buffered_metrics()
dogstatsd.flush()

metric = 'gt:123.4|g|#{}\n'.format(all_tags_str)
self.assertEqual(metric, dogstatsd.socket.recv())
Expand Down Expand Up @@ -1919,22 +1919,22 @@ def test_counter_with_container_field(self):
self.statsd._container_id = "ci-fake-container-id"

self.statsd.increment("page.views")
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:1|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.increment("page.views", 11)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:11|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement("page.views")
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:-1|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._reset_telemetry()
self.statsd.decrement("page.views", 12)
self.statsd.flush_buffered_metrics()
self.statsd.flush()
self.assert_equal_telemetry("page.views:-12|c|c:ci-fake-container-id\n", self.recv(2))

self.statsd._container_id = None
Expand Down

0 comments on commit 362e187

Please sign in to comment.