Skip to content

Commit

Permalink
Merge pull request #700 from DanCech/lint
Browse files Browse the repository at this point in the history
Fix lint issues, run lint checks in travis
  • Loading branch information
DanCech authored Nov 25, 2017
2 parents 35811d3 + 60e1cff commit 710b7f9
Show file tree
Hide file tree
Showing 33 changed files with 213 additions and 203 deletions.
2 changes: 0 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ python:
- pypy

install:
- pip install -r requirements.txt
- python setup.py install --prefix=$VIRTUAL_ENV --install-lib=$VIRTUAL_ENV/lib/python$TRAVIS_PYTHON_VERSION/site-packages
- pip install tox-travis

script:
Expand Down
13 changes: 7 additions & 6 deletions lib/carbon/aggregator/buffers.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from carbon import log


class BufferManager:
class _BufferManager:
def __init__(self):
self.buffers = {}

Expand Down Expand Up @@ -59,9 +59,10 @@ def configure_aggregation(self, frequency, func):
self.configured = True

def compute_value(self):
now = int( time.time() )
now = int(time.time())
current_interval = now - (now % self.aggregation_frequency)
age_threshold = current_interval - (settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency)
age_threshold = current_interval - (
settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency)

for buffer in list(self.interval_buffers.values()):
if buffer.active:
Expand Down Expand Up @@ -96,15 +97,15 @@ def __init__(self, interval):
self.active = True

def input(self, datapoint):
self.values.append( datapoint[1] )
self.values.append(datapoint[1])
self.active = True

def mark_inactive(self):
self.active = False


# Shared importable singleton
BufferManager = BufferManager()
BufferManager = _BufferManager()

# Avoid import circularity
from carbon import state
from carbon import state # NOQA
3 changes: 2 additions & 1 deletion lib/carbon/aggregator/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ def process(self, metric, datapoint):

if settings.FORWARD_ALL and metric not in aggregate_metrics:
if settings.LOG_AGGREGATOR_MISSES and len(aggregate_metrics) == 0:
log.msg("Couldn't match metric %s with any aggregation rule. Passing on un-aggregated." % metric)
log.msg(
"Couldn't match metric %s with any aggregation rule. Passing on un-aggregated." % metric)
yield (metric, datapoint)
24 changes: 12 additions & 12 deletions lib/carbon/aggregator/rules.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import time
import re

from os.path import exists, getmtime
Expand Down Expand Up @@ -71,7 +70,7 @@ def parse_definition(self, line):
left_side, right_side = line.split('=', 1)
output_pattern, frequency = left_side.split()
method, input_pattern = right_side.split()
frequency = int( frequency.lstrip('(').rstrip(')') )
frequency = int(frequency.lstrip('(').rstrip(')'))
return AggregationRule(input_pattern, output_pattern, method, frequency)

except ValueError:
Expand Down Expand Up @@ -125,17 +124,17 @@ def build_regex(self):
i = input_part.find('<<')
j = input_part.find('>>')
pre = input_part[:i]
post = input_part[j+2:]
field_name = input_part[i+2:j]
post = input_part[j + 2:]
field_name = input_part[i + 2:j]
regex_part = '%s(?P<%s>.+)%s' % (pre, field_name, post)

else:
i = input_part.find('<')
j = input_part.find('>')
if i > -1 and j > i:
pre = input_part[:i]
post = input_part[j+1:]
field_name = input_part[i+1:j]
post = input_part[j + 1:]
field_name = input_part[i + 1:j]
regex_part = '%s(?P<%s>[^.]+)%s' % (pre, field_name, post)
elif input_part == '*':
regex_part = '[^.]+'
Expand All @@ -153,18 +152,19 @@ def build_template(self):

def avg(values):
if values:
return float( sum(values) ) / len(values)
return float(sum(values)) / len(values)


def count(values):
if values:
return len(values)

AGGREGATION_METHODS = {
'sum' : sum,
'avg' : avg,
'min' : min,
'max' : max,
'count' : count
'sum': sum,
'avg': avg,
'min': min,
'max': max,
'count': count
}

# Importable singleton
Expand Down
25 changes: 13 additions & 12 deletions lib/carbon/amqp_listener.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,6 @@
"""
import sys

# txamqp is currently not ported to py3
if sys.version_info >= (3, 0):
raise ImportError

import os
import socket
from optparse import OptionParser
Expand All @@ -44,9 +40,14 @@
from twisted.internet import reactor
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.application.internet import TCPClient
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec

# txamqp is currently not ported to py3
try:
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec
except:
raise ImportError

try:
import carbon
Expand All @@ -55,10 +56,10 @@
LIB_DIR = os.path.dirname(os.path.dirname(__file__))
sys.path.insert(0, LIB_DIR)

import carbon.protocols #satisfy import order requirements
import carbon.protocols # NOQA satisfy import order requirements
from carbon.protocols import CarbonServerProtocol
from carbon.conf import settings
from carbon import log, events, instrumentation
from carbon import log, events


HOSTNAME = socket.gethostname().split('.')[0]
Expand Down Expand Up @@ -122,13 +123,14 @@ def setup(self):

# bind each configured metric pattern
for bind_pattern in settings.BIND_PATTERNS:
log.listener("binding exchange '%s' to queue '%s' with pattern %s" \
log.listener("binding exchange '%s' to queue '%s' with pattern %s"
% (exchange, my_queue, bind_pattern))
yield chan.queue_bind(exchange=exchange, queue=my_queue,
routing_key=bind_pattern)

yield chan.basic_consume(queue=my_queue, no_ack=True,
consumer_tag=self.consumer_tag)

@inlineCallbacks
def receive_loop(self):
queue = yield self.queue(self.consumer_tag)
Expand All @@ -154,7 +156,7 @@ def processMessage(self, message):
metric, value, timestamp = line.split()
else:
value, timestamp = line.split()
datapoint = ( float(timestamp), float(value) )
datapoint = (float(timestamp), float(value))
if datapoint[1] != datapoint[1]: # filter out NaN values
continue
except ValueError:
Expand Down Expand Up @@ -253,7 +255,6 @@ def main():

(options, args) = parser.parse_args()


startReceiver(options.host, options.port, options.username,
options.password, vhost=options.vhost,
exchange_name=options.exchange, verbose=options.verbose)
Expand Down
4 changes: 2 additions & 2 deletions lib/carbon/amqp_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from optparse import OptionParser

from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor, task
from twisted.internet import reactor
from twisted.internet.protocol import ClientCreator
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
Expand Down Expand Up @@ -54,7 +54,7 @@ def writeMetric(metric_path, value, timestamp, host, port, username, password,
yield channel.exchange_declare(exchange=exchange, type="topic",
durable=True, auto_delete=False)

message = Content( "%f %d" % (value, timestamp) )
message = Content("%f %d" % (value, timestamp))
message["delivery mode"] = 2

channel.basic_publish(exchange=exchange, content=message, routing_key=metric_path)
Expand Down
4 changes: 2 additions & 2 deletions lib/carbon/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class MaxStrategy(DrainStrategy):
that infrequently or irregularly updated metrics may not be written
until shutdown """
def choose_item(self):
metric_name, size = max(self.cache.items(), key=lambda x: len(itemgetter(1)(x)))
metric_name, _ = max(self.cache.items(), key=lambda x: len(itemgetter(1)(x)))
return metric_name


Expand Down Expand Up @@ -245,4 +245,4 @@ def MetricCache():


# Avoid import circularities
from carbon import state
from carbon import state # NOQA
34 changes: 16 additions & 18 deletions lib/carbon/carbon_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 2 additions & 7 deletions lib/carbon/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,12 @@ def sendQueued(self):
instrumentation.prior_stats.get('metricsReceived', 0)))

self.sendDatapointsNow(self.factory.takeSomeFromQueue())
if (self.factory.queueFull.called and
queueSize < SEND_QUEUE_LOW_WATERMARK):
if (self.factory.queueFull.called and queueSize < SEND_QUEUE_LOW_WATERMARK):
if not self.factory.queueHasSpace.called:
self.factory.queueHasSpace.callback(queueSize)
if self.factory.hasQueuedDatapoints():
self.factory.scheduleSend()


def connectionQualityMonitor(self):
"""Checks to see if the connection for this factory appears to
be delivering stats at a speed close to what we're receiving
Expand All @@ -137,7 +135,6 @@ def connectionQualityMonitor(self):
True means that the total received is less than settings.MIN_RESET_STAT_FLOW
False means that quality is bad
"""
destination_sent = float(instrumentation.prior_stats.get(self.sent, 0))
total_received = float(instrumentation.prior_stats.get('metricsReceived', 0))
Expand Down Expand Up @@ -256,7 +253,7 @@ def takeSomeFromQueue(self):
queue.
"""
def yield_max_datapoints():
for count in range(settings.MAX_DATAPOINTS_PER_MESSAGE):
for _ in range(settings.MAX_DATAPOINTS_PER_MESSAGE):
try:
yield self.queue.popleft()
except IndexError:
Expand Down Expand Up @@ -470,8 +467,6 @@ def __init__(self, router):
state.events.resumeReceivingMetrics.addHandler(fake_factory.reinjectDatapoints)

def createFactory(self, destination):
from carbon.conf import settings

factory_name = settings["DESTINATION_PROTOCOL"]
factory_class = CarbonClientFactory.plugins.get(factory_name)

Expand Down
Loading

0 comments on commit 710b7f9

Please sign in to comment.