Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UDS SOCK_STREAM support to the DogStatsD client #869

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 68 additions & 7 deletions datadog/dogstatsd/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import socket
import errno
import struct
import threading
import time
from threading import Lock, RLock
Expand Down Expand Up @@ -49,6 +50,11 @@
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 8125

# Socket prefixes
UNIX_ADDRESS_SCHEME = "unix://"
UNIX_ADDRESS_DATAGRAM_SCHEME = "unixgram://"
UNIX_ADDRESS_STREAM_SCHEME = "unixstream://"

# Buffering-related values (in seconds)
DEFAULT_BUFFERING_FLUSH_INTERVAL = 0.3
MIN_FLUSH_INTERVAL = 0.0001
Expand Down Expand Up @@ -489,6 +495,30 @@ def socket_path(self, path):
self._transport = "uds"
self._max_payload_size = self._max_buffer_len or UDS_OPTIMAL_PAYLOAD_LENGTH

@property
def socket(self):
return self._socket

@socket.setter
def socket(self, new_socket):
self._socket = new_socket
if new_socket:
self._socket_kind = new_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
else:
self._socket_kind = None

@property
def telemetry_socket(self):
return self._telemetry_socket

@telemetry_socket.setter
def telemetry_socket(self, t_socket):
self._telemetry_socket = t_socket
if t_socket:
self._telemetry_socket_kind = t_socket.getsockopt(socket.SOL_SOCKET, socket.SO_TYPE)
else:
self._telemetry_socket_kind = None

def enable_background_sender(self, sender_queue_size=0, sender_queue_timeout=0):
"""
Use a background thread to communicate with the dogstatsd server.
Expand Down Expand Up @@ -731,11 +761,37 @@ def _ensure_min_send_buffer_size(cls, sock, min_size=MIN_SEND_BUFFER_SIZE):

@classmethod
def _get_uds_socket(cls, socket_path, timeout):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
sock.settimeout(timeout)
cls._ensure_min_send_buffer_size(sock)
sock.connect(socket_path)
return sock
valid_socket_kinds = [socket.SOCK_DGRAM, socket.SOCK_STREAM]
if socket_path.startswith(UNIX_ADDRESS_DATAGRAM_SCHEME):
valid_socket_kinds = [socket.SOCK_DGRAM]
socket_path = socket_path[len(UNIX_ADDRESS_DATAGRAM_SCHEME):]
elif socket_path.startswith(UNIX_ADDRESS_STREAM_SCHEME):
valid_socket_kinds = [socket.SOCK_STREAM]
socket_path = socket_path[len(UNIX_ADDRESS_STREAM_SCHEME):]
elif socket_path.startswith(UNIX_ADDRESS_SCHEME):
socket_path = socket_path[len(UNIX_ADDRESS_SCHEME):]

last_error = ValueError("Invalid socket path")
for socket_kind in valid_socket_kinds:
# py2 stores socket kinds differently than py3, determine the name independently from version
sk_name = {socket.SOCK_STREAM: "stream", socket.SOCK_DGRAM: "datagram"}[socket_kind]

try:
sock = socket.socket(socket.AF_UNIX, socket_kind)
sock.settimeout(timeout)
cls._ensure_min_send_buffer_size(sock)
sock.connect(socket_path)
log.debug("Connected to socket %s with kind %s", socket_path, sk_name)
return sock
except Exception as e:
if sock is not None:
sock.close()
log.debug("Failed to connect to %s with kind %s: %s", socket_path, sk_name, e)
if e.errno == errno.EPROTOTYPE:
last_error = e
continue
raise e
raise last_error

@classmethod
def _get_udp_socket(cls, host, port, timeout):
Expand Down Expand Up @@ -1219,11 +1275,16 @@ def _xmit_packet(self, packet, is_telemetry):
try:
if is_telemetry and self._dedicated_telemetry_destination():
mysocket = self.telemetry_socket or self.get_socket(telemetry=True)
socket_kind = self._telemetry_socket_kind
else:
# If set, use socket directly
mysocket = self.socket or self.get_socket()
socket_kind = self._socket_kind

mysocket.send(packet.encode(self.encoding))
encoded_packet = packet.encode(self.encoding)
if socket_kind == socket.SOCK_STREAM:
mysocket.send(struct.pack('<I', len(encoded_packet)))
mysocket.send(encoded_packet)

if not is_telemetry and self._telemetry:
self.packets_sent += 1
Expand Down Expand Up @@ -1256,7 +1317,7 @@ def _xmit_packet(self, packet, is_telemetry):
)
self.close_socket()
except Exception as exc:
print("Unexpected error: %s", exc)
print("Unexpected error: ", exc)
log.error("Unexpected error: %s", str(exc))

if not is_telemetry and self._telemetry:
Expand Down
51 changes: 47 additions & 4 deletions tests/integration/dogstatsd/test_statsd_sender.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
from contextlib import closing
import itertools
import os
import shutil
import socket
import tempfile
from threading import Thread
import uuid

import pytest

from datadog.dogstatsd.base import DogStatsd

@pytest.mark.parametrize(
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop",
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False])),
"disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind",
list(itertools.product([True, False], [True, False], [True, False], [0, 1], [True, False], [socket.SOCK_DGRAM, socket.SOCK_STREAM])),
)
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop):
def test_sender_mode(disable_background_sender, disable_buffering, wait_for_pending, socket_timeout, stop, socket_kind):
# Test basic sender operation with an assortment of options
foo, bar = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM, 0)
foo, bar = socket.socketpair(socket.AF_UNIX, socket_kind, 0)
statsd = DogStatsd(
telemetry_min_flush_interval=0,
disable_background_sender=disable_background_sender,
Expand Down Expand Up @@ -101,3 +106,41 @@ def test_buffering_with_context():
bar.settimeout(5)
msg = bar.recv(8192)
assert msg == b"first:1|c\n"

@pytest.fixture()
def socket_dir():
tempdir = tempfile.mkdtemp()
yield tempdir
shutil.rmtree(tempdir)

@pytest.mark.parametrize(
"socket_prefix, socket_kind, success",
[
("", socket.SOCK_DGRAM, True),
("", socket.SOCK_STREAM, True),
("unix://", socket.SOCK_DGRAM, True),
("unix://", socket.SOCK_STREAM, True),
("unixstream://", socket.SOCK_DGRAM, False),
("unixstream://", socket.SOCK_STREAM, True),
("unixgram://", socket.SOCK_DGRAM, True),
("unixgram://", socket.SOCK_STREAM, False)
]
)
def test_socket_connection(socket_dir, socket_prefix, socket_kind, success):
socket_path = os.path.join(socket_dir, str(uuid.uuid1()) + ".sock")
listener_socket = socket.socket(socket.AF_UNIX, socket_kind)
listener_socket.bind(socket_path)

if socket_kind == socket.SOCK_STREAM:
listener_socket.listen(1)

with closing(listener_socket):
statsd = DogStatsd(
socket_path = socket_prefix + socket_path
)

if success:
assert statsd.get_socket() is not None
else:
with pytest.raises(socket.error):
statsd.get_socket()
70 changes: 57 additions & 13 deletions tests/unit/dogstatsd/test_statsd.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# Standard libraries
from collections import deque
from contextlib import closing
import struct
from threading import Thread
import errno
import os
Expand Down Expand Up @@ -41,11 +42,12 @@ class FakeSocket(object):

FLUSH_GRACE_PERIOD = 0.2

def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL):
def __init__(self, flush_interval=DEFAULT_BUFFERING_FLUSH_INTERVAL, socket_kind=socket.SOCK_DGRAM):
self.payloads = deque()

self._flush_interval = flush_interval
self._flush_wait = False
self._socket_kind = socket_kind
self.timeout = () # unit tuple = settimeout was not called

def send(self, payload):
Expand All @@ -64,17 +66,29 @@ def recv(self, count=1, reset_wait=False, no_wait=False):
time.sleep(self._flush_interval+self.FLUSH_GRACE_PERIOD)
self._flush_wait = True

if count > len(self.payloads):
payload_len = len(self.payloads)
if self._socket_kind == socket.SOCK_STREAM:
if payload_len % 2 != 0 or count > (payload_len / 2):
return None
elif count > len(self.payloads):
return None

out = []
for _ in range(count):
out.append(self.payloads.popleft().decode('utf-8'))
if self._socket_kind == socket.SOCK_DGRAM:
out.append(self.payloads.popleft().decode('utf-8'))
else:
length = struct.unpack('<I', self.payloads.popleft())[0]
pl = self.payloads.popleft()[:length].decode('utf-8')
out.append(pl)
return '\n'.join(out)

def close(self):
pass

def getsockopt(self, *args):
return self._socket_kind

def __repr__(self):
return str(self.payloads)

Expand Down Expand Up @@ -1061,19 +1075,31 @@ def test_batching(self):
telemetry=telemetry_metrics(metrics=2, bytes_sent=len(expected))
)

def test_flush(self):
def test_flush_dgram(self):
self._test_flush(socket.SOCK_DGRAM)

def test_flush_stream(self):
self._test_flush(socket.SOCK_STREAM)

def _test_flush(self, socket_kind):
dogstatsd = DogStatsd(disable_buffering=False, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
fake_socket = FakeSocket(socket_kind=socket_kind)
dogstatsd.socket = fake_socket

dogstatsd.increment('page.views')
self.assertIsNone(fake_socket.recv(no_wait=True))
dogstatsd.flush()
self.assert_equal_telemetry('page.views:1|c\n', fake_socket.recv(2))

def test_flush_interval(self):
def test_flush_interval_dgram(self):
self._test_flush_interval(socket.SOCK_DGRAM)

def test_flush_interval_stream(self):
self._test_flush_interval(socket.SOCK_STREAM)

def _test_flush_interval(self, socket_kind):
dogstatsd = DogStatsd(disable_buffering=False, flush_interval=1, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
fake_socket = FakeSocket(socket_kind=socket_kind)
dogstatsd.socket = fake_socket

dogstatsd.increment('page.views')
Expand All @@ -1088,9 +1114,15 @@ def test_flush_interval(self):
fake_socket.recv(2, no_wait=True)
)

def test_aggregation_buffering_simultaneously(self):
def test_aggregation_buffering_simultaneously_dgram(self):
self._test_aggregation_buffering_simultaneously(socket.SOCK_DGRAM)

def test_aggregation_buffering_simultaneously_stream(self):
self._test_aggregation_buffering_simultaneously(socket.SOCK_STREAM)

def _test_aggregation_buffering_simultaneously(self, socket_kind):
dogstatsd = DogStatsd(disable_buffering=False, disable_aggregation=False, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
fake_socket = FakeSocket(socket_kind=socket_kind)
dogstatsd.socket = fake_socket
for _ in range(10):
dogstatsd.increment('test.aggregation_and_buffering')
Expand All @@ -1099,9 +1131,15 @@ def test_aggregation_buffering_simultaneously(self):
dogstatsd.flush()
self.assert_equal_telemetry('test.aggregation_and_buffering:10|c\n', fake_socket.recv(2))

def test_aggregation_buffering_simultaneously_with_interval(self):
def test_aggregation_buffering_simultaneously_with_interval_dgram(self):
self._test_aggregation_buffering_simultaneously_with_interval(socket.SOCK_DGRAM)

def test_aggregation_buffering_simultaneously_with_interval_stream(self):
self._test_aggregation_buffering_simultaneously_with_interval(socket.SOCK_STREAM)

def _test_aggregation_buffering_simultaneously_with_interval(self, socket_kind):
dogstatsd = DogStatsd(disable_buffering=False, disable_aggregation=False, flush_interval=1, telemetry_min_flush_interval=0)
fake_socket = FakeSocket()
fake_socket = FakeSocket(socket_kind=socket_kind)
dogstatsd.socket = fake_socket
for _ in range(10):
dogstatsd.increment('test.aggregation_and_buffering_with_interval')
Expand Down Expand Up @@ -1185,12 +1223,18 @@ def test_batching_sequential(self):
)
)

def test_batching_runtime_changes(self):
def test_batching_runtime_changes_dgram(self):
self._test_batching_runtime_changes(socket.SOCK_DGRAM)

def test_batching_runtime_changes_stream(self):
self._test_batching_runtime_changes(socket.SOCK_STREAM)

def _test_batching_runtime_changes(self, socket_kind):
dogstatsd = DogStatsd(
disable_buffering=True,
telemetry_min_flush_interval=0
)
dogstatsd.socket = FakeSocket()
dogstatsd.socket = FakeSocket(socket_kind=socket_kind)

# Send some unbuffered metrics and verify we got it immediately
last_telemetry_size = self.send_and_assert(
Expand Down
Loading