Skip to content

Commit

Permalink
Retry support (#83)
Browse files Browse the repository at this point in the history
* send request w/ Requests.

* Move to urllib3, drop base64 encoding, fix tests.

* Remove rest of urllib stuff.

* json.load str, not bytes.

* Stop encoding payload.

* POST urlencoded data payload as www-form-urlencoded

* Generate , add basic urllib3.Retry config.

* remove import

* Retry per request, not per PoolMgr.

* Timeouts and status code retries.

* Expose retry options in consumer initializer.

* use uuid4

* Fix str test in py2

* test empty track dict

* Retry tweaks.

* Groups doc fix
  • Loading branch information
David Grant authored Sep 12, 2020
1 parent a6ec21e commit 728769c
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 89 deletions.
76 changes: 50 additions & 26 deletions mixpanel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
callers to customize the IO characteristics of their tracking.
"""
from __future__ import absolute_import, unicode_literals
import base64
import datetime
import json
import time
import uuid

import six
from six.moves import urllib
from six.moves import range
import urllib3

__version__ = '4.6.0'
__version__ = '4.7.0'
VERSION = __version__ # TODO: remove when bumping major version.


Expand Down Expand Up @@ -64,6 +65,9 @@ def __init__(self, token, consumer=None, serializer=DatetimeSerializer):
def _now(self):
return time.time()

def _make_insert_id(self):
return uuid.uuid4().hex

def track(self, distinct_id, event_name, properties=None, meta=None):
"""Record an event.
Expand All @@ -81,6 +85,7 @@ def track(self, distinct_id, event_name, properties=None, meta=None):
'token': self._token,
'distinct_id': distinct_id,
'time': int(self._now()),
'$insert_id': self._make_insert_id(),
'mp_lib': 'python',
'$lib_version': __version__,
}
Expand Down Expand Up @@ -116,6 +121,7 @@ def import_data(self, api_key, distinct_id, event_name, timestamp,
'token': self._token,
'distinct_id': distinct_id,
'time': int(timestamp),
'$insert_id': self._make_insert_id(),
'mp_lib': 'python',
'$lib_version': __version__,
}
Expand Down Expand Up @@ -351,7 +357,6 @@ def people_update(self, message, meta=None):
record.update(meta)
self._consumer.send('people', json_dumps(record, cls=self._serializer))


def group_set(self, group_key, group_id, properties, meta=None):
"""Set properties of a group profile.
Expand Down Expand Up @@ -490,26 +495,41 @@ class Consumer(object):
:param str groups_url: override the default groups API endpoint
:param str api_host: the Mixpanel API domain where all requests should be
issued (unless overridden by above URLs).
:param int retry_limit: number of times to retry each retry in case of
connection or HTTP 5xx error; 0 to fail after first attempt.
:param int retry_backoff_factor: In case of retries, controls sleep time. e.g.,
sleep_seconds = backoff_factor * (2 ^ (num_total_retries - 1)).
.. versionadded:: 4.6.0
The *api_host* parameter.
"""

def __init__(self, events_url=None, people_url=None, import_url=None,
request_timeout=None, groups_url=None, api_host="api.mixpanel.com"):
request_timeout=None, groups_url=None, api_host="api.mixpanel.com",
retry_limit=4, retry_backoff_factor=0.25):
# TODO: With next major version, make the above args kwarg-only, and reorder them.
self._endpoints = {
'events': events_url or 'https://{}/track'.format(api_host),
'people': people_url or 'https://{}/engage'.format(api_host),
'groups': groups_url or 'https://{}/groups'.format(api_host),
'imports': import_url or 'https://{}/import'.format(api_host),
}
self._request_timeout = request_timeout
retry_config = urllib3.Retry(
total=retry_limit,
backoff_factor=retry_backoff_factor,
method_whitelist={'POST'},
status_forcelist=set(range(500, 600)),
)
self._http = urllib3.PoolManager(
retries=retry_config,
timeout=urllib3.Timeout(request_timeout),
)

def send(self, endpoint, json_message, api_key=None):
"""Immediately record an event or a profile update.
:param endpoint: the Mixpanel API endpoint appropriate for the message
:type endpoint: "events" | "people" | "imports"
:type endpoint: "events" | "people" | "groups" | "imports"
:param str json_message: a JSON message formatted for the endpoint
:param str api_key: your Mixpanel project's API key
:raises MixpanelException: if the endpoint doesn't exist, the server is
Expand All @@ -522,34 +542,32 @@ def send(self, endpoint, json_message, api_key=None):

def _write_request(self, request_url, json_message, api_key=None):
data = {
'data': base64.b64encode(json_message.encode('utf8')),
'data': json_message,
'verbose': 1,
'ip': 0,
}
if api_key:
data.update({'api_key': api_key})
encoded_data = urllib.parse.urlencode(data).encode('utf8')

try:
request = urllib.request.Request(request_url, encoded_data)

# Note: We don't send timeout=None here, because the timeout in urllib2 defaults to
# an internal socket timeout, not None.
if self._request_timeout is not None:
response = urllib.request.urlopen(request, timeout=self._request_timeout).read()
else:
response = urllib.request.urlopen(request).read()
except urllib.error.URLError as e:
response = self._http.request(
'POST',
request_url,
fields=data,
encode_multipart=False, # URL-encode payload in POST body.
)
except Exception as e:
six.raise_from(MixpanelException(e), e)

try:
response = json.loads(response.decode('utf8'))
response_dict = json.loads(response.data.decode('utf-8'))
except ValueError:
raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response))
raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response.data))

if response['status'] != 1:
raise MixpanelException('Mixpanel error: {0}'.format(response['error']))
if response_dict['status'] != 1:
raise MixpanelException('Mixpanel error: {0}'.format(response_dict['error']))

return True
return True # <- TODO: remove return val with major release.


class BufferedConsumer(object):
Expand All @@ -567,6 +585,10 @@ class BufferedConsumer(object):
:param str groups_url: override the default groups API endpoint
:param str api_host: the Mixpanel API domain where all requests should be
issued (unless overridden by above URLs).
:param int retry_limit: number of times to retry each retry in case of
connection or HTTP 5xx error; 0 to fail after first attempt.
:param int retry_backoff_factor: In case of retries, controls sleep time. e.g.,
sleep_seconds = backoff_factor * (2 ^ (num_total_retries - 1)).
.. versionadded:: 4.6.0
The *api_host* parameter.
Expand All @@ -578,8 +600,10 @@ class BufferedConsumer(object):
remaining unsent events being held by the instance.
"""
def __init__(self, max_size=50, events_url=None, people_url=None, import_url=None,
request_timeout=None, groups_url=None, api_host="api.mixpanel.com"):
self._consumer = Consumer(events_url, people_url, import_url, request_timeout, groups_url, api_host)
request_timeout=None, groups_url=None, api_host="api.mixpanel.com",
retry_limit=4, retry_backoff_factor=0.25):
self._consumer = Consumer(events_url, people_url, import_url, request_timeout,
groups_url, api_host, retry_limit, retry_backoff_factor)
self._buffers = {
'events': [],
'people': [],
Expand All @@ -598,7 +622,7 @@ def send(self, endpoint, json_message, api_key=None):
:meth:`~.send`.
:param endpoint: the Mixpanel API endpoint appropriate for the message
:type endpoint: "events" | "people" | "imports"
:type endpoint: "events" | "people" | "groups" | "imports"
:param str json_message: a JSON message formatted for the endpoint
:param str api_key: your Mixpanel project's API key
:raises MixpanelException: if the endpoint doesn't exist, the server is
Expand Down
5 changes: 4 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ def find_version(*paths):
author='Mixpanel, Inc.',
author_email='[email protected]',
license='Apache',
install_requires=['six >= 1.9.0'],
install_requires=[
'six >= 1.9.0',
'urllib3 >= 1.21.1',
],

classifiers=[
'License :: OSI Approved :: Apache Software License',
Expand Down
Loading

0 comments on commit 728769c

Please sign in to comment.