Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add custom intervals (step_size) and support multiple data types #5

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 23 additions & 11 deletions collectd.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

logger = logging.getLogger("collectd")

SEND_INTERVAL = 10 # seconds
MAX_PACKET_SIZE = 1024 # bytes

PLUGIN_TYPE = "gauge"
Expand All @@ -37,6 +36,14 @@
VALUE_GAUGE = 1
VALUE_DERIVE = 2
VALUE_ABSOLUTE = 3

VALUE_LOOKUP = {
'counter': VALUE_COUNTER,
'gauge': VALUE_GAUGE,
'derive': VALUE_DERIVE,
'absolute': VALUE_ABSOLUTE,
}

VALUE_CODES = {
VALUE_COUNTER: "!Q",
VALUE_GAUGE: "<d",
Expand All @@ -52,10 +59,14 @@ def pack_string(type_code, string):
return struct.pack("!HH", type_code, 5 + len(string)) + string + "\0"

def pack_value(name, value):
value_type = VALUE_LOOKUP[PLUGIN_TYPE]
value_fmt = VALUE_CODES[value_type]

return "".join([
pack(TYPE_TYPE_INSTANCE, name),
struct.pack("!HHH", TYPE_VALUES, 15, 1),
struct.pack("<Bd", VALUE_GAUGE, value)
struct.pack("<B", value_type),
struct.pack(value_fmt, value)
])

def pack(id, value):
Expand All @@ -68,19 +79,19 @@ def pack(id, value):
else:
raise AssertionError("invalid type code " + str(id))

def message_start(when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any"):
def message_start(when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any", send_interval=10):
return "".join([
pack(TYPE_HOST, host),
pack(TYPE_TIME, when or time.time()),
pack(TYPE_PLUGIN, plugin_name),
pack(TYPE_PLUGIN_INSTANCE, plugin_inst),
pack(TYPE_TYPE, PLUGIN_TYPE),
pack(TYPE_INTERVAL, SEND_INTERVAL)
pack(TYPE_INTERVAL, send_interval)
])

def messages(counts, when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any"):
def messages(counts, when=None, host=socket.gethostname(), plugin_inst="", plugin_name="any", send_interval=10):
packets = []
start = message_start(when, host, plugin_inst, plugin_name)
start = message_start(when, host, plugin_inst, plugin_name, send_interval)
parts = [pack(name, count) for name,count in counts.items()]
parts = [p for p in parts if len(start) + len(p) <= MAX_PACKET_SIZE]
if parts:
Expand Down Expand Up @@ -158,7 +169,7 @@ class Connection(object):
@synchronized
def __new__(cls, hostname = socket.gethostname(),
collectd_host = "localhost", collectd_port = 25826,
plugin_inst = "", plugin_name = "any"):
plugin_inst = "", plugin_name = "any", send_interval=10):
id = (hostname, collectd_host, collectd_port, plugin_inst, plugin_name)
if id in cls.instances:
return cls.instances[id]
Expand All @@ -169,14 +180,15 @@ def __new__(cls, hostname = socket.gethostname(),

def __init__(self, hostname = socket.gethostname(),
collectd_host = "localhost", collectd_port = 25826,
plugin_inst = "", plugin_name = "any"):
plugin_inst = "", plugin_name = "any", send_interval=10):
if "_counters" not in self.__dict__:
self._lock = RLock()
self._counters = {}
self._plugin_inst = plugin_inst
self._plugin_name = plugin_name
self._hostname = hostname
self._collectd_addr = (collectd_host, collectd_port)
self._send_interval = send_interval

@synchronized
def __getattr__(self, name):
Expand Down Expand Up @@ -208,7 +220,7 @@ def take_snapshots():
def send_stats(raise_on_empty = False):
try:
when, stats, conn = snaps.get(timeout = 0.1)
for message in messages(stats, when, conn._hostname, conn._plugin_inst, conn._plugin_name):
for message in messages(stats, when, conn._hostname, conn._plugin_inst, conn._plugin_name, conn._send_interval):
sock.sendto(message, conn._collectd_addr)
except Empty:
if raise_on_empty:
Expand All @@ -232,7 +244,7 @@ def wrapped():
t.start()

single_start = Semaphore()
def start_threads():
def start_threads(send_interval=10):
assert single_start.acquire(blocking = False)
daemonize(take_snapshots, sleep_for = SEND_INTERVAL)
daemonize(take_snapshots, sleep_for = send_interval)
daemonize(send_stats)