diff --git a/mixpanel/__init__.py b/mixpanel/__init__.py index 9d3c17a..af35f85 100644 --- a/mixpanel/__init__.py +++ b/mixpanel/__init__.py @@ -15,15 +15,16 @@ callers to customize the IO characteristics of their tracking. """ from __future__ import absolute_import, unicode_literals -import base64 import datetime import json import time +import uuid import six -from six.moves import urllib +from six.moves import range +import urllib3 -__version__ = '4.6.0' +__version__ = '4.7.0' VERSION = __version__ # TODO: remove when bumping major version. @@ -64,6 +65,9 @@ def __init__(self, token, consumer=None, serializer=DatetimeSerializer): def _now(self): return time.time() + def _make_insert_id(self): + return uuid.uuid4().hex + def track(self, distinct_id, event_name, properties=None, meta=None): """Record an event. @@ -81,6 +85,7 @@ def track(self, distinct_id, event_name, properties=None, meta=None): 'token': self._token, 'distinct_id': distinct_id, 'time': int(self._now()), + '$insert_id': self._make_insert_id(), 'mp_lib': 'python', '$lib_version': __version__, } @@ -116,6 +121,7 @@ def import_data(self, api_key, distinct_id, event_name, timestamp, 'token': self._token, 'distinct_id': distinct_id, 'time': int(timestamp), + '$insert_id': self._make_insert_id(), 'mp_lib': 'python', '$lib_version': __version__, } @@ -351,7 +357,6 @@ def people_update(self, message, meta=None): record.update(meta) self._consumer.send('people', json_dumps(record, cls=self._serializer)) - def group_set(self, group_key, group_id, properties, meta=None): """Set properties of a group profile. @@ -490,26 +495,41 @@ class Consumer(object): :param str groups_url: override the default groups API endpoint :param str api_host: the Mixpanel API domain where all requests should be issued (unless overridden by above URLs). + :param int retry_limit: number of times to retry each retry in case of + connection or HTTP 5xx error; 0 to fail after first attempt. + :param int retry_backoff_factor: In case of retries, controls sleep time. e.g., + sleep_seconds = backoff_factor * (2 ^ (num_total_retries - 1)). .. versionadded:: 4.6.0 The *api_host* parameter. """ def __init__(self, events_url=None, people_url=None, import_url=None, - request_timeout=None, groups_url=None, api_host="api.mixpanel.com"): + request_timeout=None, groups_url=None, api_host="api.mixpanel.com", + retry_limit=4, retry_backoff_factor=0.25): + # TODO: With next major version, make the above args kwarg-only, and reorder them. self._endpoints = { 'events': events_url or 'https://{}/track'.format(api_host), 'people': people_url or 'https://{}/engage'.format(api_host), 'groups': groups_url or 'https://{}/groups'.format(api_host), 'imports': import_url or 'https://{}/import'.format(api_host), } - self._request_timeout = request_timeout + retry_config = urllib3.Retry( + total=retry_limit, + backoff_factor=retry_backoff_factor, + method_whitelist={'POST'}, + status_forcelist=set(range(500, 600)), + ) + self._http = urllib3.PoolManager( + retries=retry_config, + timeout=urllib3.Timeout(request_timeout), + ) def send(self, endpoint, json_message, api_key=None): """Immediately record an event or a profile update. :param endpoint: the Mixpanel API endpoint appropriate for the message - :type endpoint: "events" | "people" | "imports" + :type endpoint: "events" | "people" | "groups" | "imports" :param str json_message: a JSON message formatted for the endpoint :param str api_key: your Mixpanel project's API key :raises MixpanelException: if the endpoint doesn't exist, the server is @@ -522,34 +542,32 @@ def send(self, endpoint, json_message, api_key=None): def _write_request(self, request_url, json_message, api_key=None): data = { - 'data': base64.b64encode(json_message.encode('utf8')), + 'data': json_message, 'verbose': 1, 'ip': 0, } if api_key: data.update({'api_key': api_key}) - encoded_data = urllib.parse.urlencode(data).encode('utf8') + try: - request = urllib.request.Request(request_url, encoded_data) - - # Note: We don't send timeout=None here, because the timeout in urllib2 defaults to - # an internal socket timeout, not None. - if self._request_timeout is not None: - response = urllib.request.urlopen(request, timeout=self._request_timeout).read() - else: - response = urllib.request.urlopen(request).read() - except urllib.error.URLError as e: + response = self._http.request( + 'POST', + request_url, + fields=data, + encode_multipart=False, # URL-encode payload in POST body. + ) + except Exception as e: six.raise_from(MixpanelException(e), e) try: - response = json.loads(response.decode('utf8')) + response_dict = json.loads(response.data.decode('utf-8')) except ValueError: - raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response)) + raise MixpanelException('Cannot interpret Mixpanel server response: {0}'.format(response.data)) - if response['status'] != 1: - raise MixpanelException('Mixpanel error: {0}'.format(response['error'])) + if response_dict['status'] != 1: + raise MixpanelException('Mixpanel error: {0}'.format(response_dict['error'])) - return True + return True # <- TODO: remove return val with major release. class BufferedConsumer(object): @@ -567,6 +585,10 @@ class BufferedConsumer(object): :param str groups_url: override the default groups API endpoint :param str api_host: the Mixpanel API domain where all requests should be issued (unless overridden by above URLs). + :param int retry_limit: number of times to retry each retry in case of + connection or HTTP 5xx error; 0 to fail after first attempt. + :param int retry_backoff_factor: In case of retries, controls sleep time. e.g., + sleep_seconds = backoff_factor * (2 ^ (num_total_retries - 1)). .. versionadded:: 4.6.0 The *api_host* parameter. @@ -578,8 +600,10 @@ class BufferedConsumer(object): remaining unsent events being held by the instance. """ def __init__(self, max_size=50, events_url=None, people_url=None, import_url=None, - request_timeout=None, groups_url=None, api_host="api.mixpanel.com"): - self._consumer = Consumer(events_url, people_url, import_url, request_timeout, groups_url, api_host) + request_timeout=None, groups_url=None, api_host="api.mixpanel.com", + retry_limit=4, retry_backoff_factor=0.25): + self._consumer = Consumer(events_url, people_url, import_url, request_timeout, + groups_url, api_host, retry_limit, retry_backoff_factor) self._buffers = { 'events': [], 'people': [], @@ -598,7 +622,7 @@ def send(self, endpoint, json_message, api_key=None): :meth:`~.send`. :param endpoint: the Mixpanel API endpoint appropriate for the message - :type endpoint: "events" | "people" | "imports" + :type endpoint: "events" | "people" | "groups" | "imports" :param str json_message: a JSON message formatted for the endpoint :param str api_key: your Mixpanel project's API key :raises MixpanelException: if the endpoint doesn't exist, the server is diff --git a/setup.py b/setup.py index 3984009..c89af8a 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,10 @@ def find_version(*paths): author='Mixpanel, Inc.', author_email='dev@mixpanel.com', license='Apache', - install_requires=['six >= 1.9.0'], + install_requires=[ + 'six >= 1.9.0', + 'urllib3 >= 1.21.1', + ], classifiers=[ 'License :: OSI Approved :: Apache Software License', diff --git a/test_mixpanel.py b/test_mixpanel.py index 125aaa6..142fec4 100644 --- a/test_mixpanel.py +++ b/test_mixpanel.py @@ -9,7 +9,8 @@ from mock import Mock, patch import pytest import six -from six.moves import range, urllib +from six.moves import range +import urllib3 import mixpanel @@ -26,28 +27,17 @@ def send(self, endpoint, event, api_key=None): self.log.append((endpoint, json.loads(event))) -# Convert a query string with base64 data into a dict for safe comparison. -def qs(s): - if isinstance(s, six.binary_type): - s = s.decode('utf8') - blob = urllib.parse.parse_qs(s) - if len(blob['data']) != 1: - pytest.fail('found multi-item data: %s' % blob['data']) - json_bytes = base64.b64decode(blob['data'][0]) - blob['data'] = json.loads(json_bytes.decode('utf8')) - return blob - - class TestMixpanel: TOKEN = '12345' def setup_method(self, method): self.consumer = LogConsumer() - self.mp = mixpanel.Mixpanel('12345', consumer=self.consumer) + self.mp = mixpanel.Mixpanel(self.TOKEN, consumer=self.consumer) self.mp._now = lambda: 1000.1 + self.mp._make_insert_id = lambda: "abcdefg" def test_track(self): - self.mp.track('ID', 'button press', {'size': 'big', 'color': 'blue'}) + self.mp.track('ID', 'button press', {'size': 'big', 'color': 'blue', '$insert_id': 'abc123'}) assert self.consumer.log == [( 'events', { 'event': 'button press', @@ -57,15 +47,39 @@ def test_track(self): 'color': 'blue', 'distinct_id': 'ID', 'time': int(self.mp._now()), + '$insert_id': 'abc123', 'mp_lib': 'python', '$lib_version': mixpanel.__version__, } } )] + def test_track_makes_insert_id(self): + self.mp.track('ID', 'button press', {'size': 'big'}) + props = self.consumer.log[0][1]["properties"] + assert "$insert_id" in props + assert isinstance(props["$insert_id"], six.text_type) + assert len(props["$insert_id"]) > 0 + + def test_track_empty(self): + self.mp.track('person_xyz', 'login', {}) + assert self.consumer.log == [( + 'events', { + 'event': 'login', + 'properties': { + 'token': self.TOKEN, + 'distinct_id': 'person_xyz', + 'time': int(self.mp._now()), + '$insert_id': self.mp._make_insert_id(), + 'mp_lib': 'python', + '$lib_version': mixpanel.__version__, + }, + }, + )] + def test_import_data(self): timestamp = time.time() - self.mp.import_data('MY_API_KEY', 'ID', 'button press', timestamp, {'size': 'big', 'color': 'blue'}) + self.mp.import_data('MY_API_KEY', 'ID', 'button press', timestamp, {'size': 'big', 'color': 'blue', '$insert_id': 'abc123'}) assert self.consumer.log == [( 'imports', { 'event': 'button press', @@ -75,6 +89,7 @@ def test_import_data(self): 'color': 'blue', 'distinct_id': 'ID', 'time': int(timestamp), + '$insert_id': 'abc123', 'mp_lib': 'python', '$lib_version': mixpanel.__version__, }, @@ -83,7 +98,7 @@ def test_import_data(self): )] def test_track_meta(self): - self.mp.track('ID', 'button press', {'size': 'big', 'color': 'blue'}, + self.mp.track('ID', 'button press', {'size': 'big', 'color': 'blue', '$insert_id': 'abc123'}, meta={'ip': 0}) assert self.consumer.log == [( 'events', { @@ -94,6 +109,7 @@ def test_track_meta(self): 'color': 'blue', 'distinct_id': 'ID', 'time': int(self.mp._now()), + '$insert_id': 'abc123', 'mp_lib': 'python', '$lib_version': mixpanel.__version__, }, @@ -266,16 +282,17 @@ def test_people_set_created_date_datetime(self): def test_alias(self): # More complicated since alias() forces a synchronous call. mock_response = Mock() - mock_response.read.return_value = six.b('{"status":1, "error": null}') - with patch('six.moves.urllib.request.urlopen', return_value=mock_response) as urlopen: + mock_response.data = six.b('{"status": 1, "error": null}') + with patch('mixpanel.urllib3.PoolManager.request', return_value=mock_response) as req: self.mp.alias('ALIAS', 'ORIGINAL ID') assert self.consumer.log == [] - assert urlopen.call_count == 1 - ((request,), _) = urlopen.call_args + assert req.call_count == 1 + ((method, url), kwargs) = req.call_args - assert request.get_full_url() == 'https://api.mixpanel.com/track' - assert qs(request.data) == \ - qs('ip=0&data=eyJldmVudCI6IiRjcmVhdGVfYWxpYXMiLCJwcm9wZXJ0aWVzIjp7ImFsaWFzIjoiQUxJQVMiLCJ0b2tlbiI6IjEyMzQ1IiwiZGlzdGluY3RfaWQiOiJPUklHSU5BTCBJRCJ9fQ%3D%3D&verbose=1') + assert method == 'POST' + assert url == 'https://api.mixpanel.com/track' + expected_data = {"event":"$create_alias","properties":{"alias":"ALIAS","token":"12345","distinct_id":"ORIGINAL ID"}} + assert json.loads(kwargs["fields"]["data"]) == expected_data def test_merge(self): self.mp.merge('my_good_api_key', 'd1', 'd2') @@ -389,7 +406,7 @@ def default(self, obj): return obj.to_eng_string() self.mp._serializer = CustomSerializer - self.mp.track('ID', 'button press', {'size': decimal.Decimal(decimal_string)}) + self.mp.track('ID', 'button press', {'size': decimal.Decimal(decimal_string), '$insert_id': 'abc123'}) assert self.consumer.log == [( 'events', { 'event': 'button press', @@ -398,6 +415,7 @@ def default(self, obj): 'size': decimal_string, 'distinct_id': 'ID', 'time': int(self.mp._now()), + '$insert_id': 'abc123', 'mp_lib': 'python', '$lib_version': mixpanel.__version__, } @@ -406,7 +424,6 @@ def default(self, obj): class TestConsumer: - @classmethod def setup_class(cls): cls.consumer = mixpanel.Consumer(request_timeout=30) @@ -417,34 +434,31 @@ def _assertSends(self, expect_url, expect_data, consumer=None): consumer = self.consumer mock_response = Mock() - mock_response.read.return_value = six.b('{"status":1, "error": null}') - with patch('six.moves.urllib.request.urlopen', return_value=mock_response) as urlopen: + mock_response.data = six.b('{"status": 1, "error": null}') + with patch('mixpanel.urllib3.PoolManager.request', return_value=mock_response) as req: yield - assert urlopen.call_count == 1 - - (call_args, kwargs) = urlopen.call_args - (request,) = call_args - timeout = kwargs.get('timeout', None) - - assert request.get_full_url() == expect_url - assert qs(request.data) == qs(expect_data) - assert timeout == consumer._request_timeout + assert req.call_count == 1 + (call_args, kwargs) = req.call_args + (method, url) = call_args + assert method == 'POST' + assert url == expect_url + assert kwargs["fields"] == expect_data def test_send_events(self): - with self._assertSends('https://api.mixpanel.com/track', 'ip=0&data=IkV2ZW50Ig%3D%3D&verbose=1'): - self.consumer.send('events', '"Event"') + with self._assertSends('https://api.mixpanel.com/track', {"ip": 0, "verbose": 1, "data": '{"foo":"bar"}'}): + self.consumer.send('events', '{"foo":"bar"}') def test_send_people(self): - with self._assertSends('https://api.mixpanel.com/engage', 'ip=0&data=IlBlb3BsZSI%3D&verbose=1'): - self.consumer.send('people', '"People"') + with self._assertSends('https://api.mixpanel.com/engage', {"ip": 0, "verbose": 1, "data": '{"foo":"bar"}'}): + self.consumer.send('people', '{"foo":"bar"}') def test_consumer_override_api_host(self): consumer = mixpanel.Consumer(api_host="api-eu.mixpanel.com") - with self._assertSends('https://api-eu.mixpanel.com/track', 'ip=0&data=IkV2ZW50Ig%3D%3D&verbose=1', consumer=consumer): - consumer.send('events', '"Event"') - with self._assertSends('https://api-eu.mixpanel.com/engage', 'ip=0&data=IlBlb3BsZSI%3D&verbose=1', consumer=consumer): - consumer.send('people', '"People"') + with self._assertSends('https://api-eu.mixpanel.com/track', {"ip": 0, "verbose": 1, "data": '{"foo":"bar"}'}, consumer=consumer): + consumer.send('events', '{"foo":"bar"}') + with self._assertSends('https://api-eu.mixpanel.com/engage', {"ip": 0, "verbose": 1, "data": '{"foo":"bar"}'}, consumer=consumer): + consumer.send('people', '{"foo":"bar"}') def test_unknown_endpoint(self): with pytest.raises(mixpanel.MixpanelException): @@ -452,7 +466,6 @@ def test_unknown_endpoint(self): class TestBufferedConsumer: - @classmethod def setup_class(cls): cls.MAX_LENGTH = 10 @@ -488,10 +501,10 @@ def test_unknown_endpoint_raises_on_send(self): def test_useful_reraise_in_flush_endpoint(self): error_mock = Mock() - error_mock.read.return_value = six.b('{"status": 0, "error": "arbitrary error"}') + error_mock.data = six.b('{"status": 0, "error": "arbitrary error"}') broken_json = '{broken JSON' consumer = mixpanel.BufferedConsumer(2) - with patch('six.moves.urllib.request.urlopen', return_value=error_mock): + with patch('mixpanel.urllib3.PoolManager.request', return_value=error_mock): consumer.send('events', broken_json) with pytest.raises(mixpanel.MixpanelException) as excinfo: consumer.flush() @@ -506,7 +519,6 @@ def test_send_remembers_api_key(self): class TestFunctional: - @classmethod def setup_class(cls): cls.TOKEN = '12345' @@ -515,25 +527,23 @@ def setup_class(cls): @contextlib.contextmanager def _assertRequested(self, expect_url, expect_data): - mock_response = Mock() - mock_response.read.return_value = six.b('{"status":1, "error": null}') - with patch('six.moves.urllib.request.urlopen', return_value=mock_response) as urlopen: + res = Mock() + res.data = six.b('{"status": 1, "error": null}') + with patch('mixpanel.urllib3.PoolManager.request', return_value=res) as req: yield - assert urlopen.call_count == 1 - ((request,), _) = urlopen.call_args - assert request.get_full_url() == expect_url - data = urllib.parse.parse_qs(request.data.decode('utf8')) - assert len(data['data']) == 1 - payload_encoded = data['data'][0] - payload_json = base64.b64decode(payload_encoded).decode('utf8') - payload = json.loads(payload_json) + assert req.call_count == 1 + ((method, url,), data) = req.call_args + data = data["fields"]["data"] + assert method == 'POST' + assert url == expect_url + payload = json.loads(data) assert payload == expect_data def test_track_functional(self): - expect_data = {'event': {'color': 'blue', 'size': 'big'}, 'properties': {'mp_lib': 'python', 'token': '12345', 'distinct_id': 'button press', '$lib_version': mixpanel.__version__, 'time': 1000}} + expect_data = {'event': 'button_press', 'properties': {'size': 'big', 'color': 'blue', 'mp_lib': 'python', 'token': '12345', 'distinct_id': 'player1', '$lib_version': mixpanel.__version__, 'time': 1000, '$insert_id': 'xyz1200'}} with self._assertRequested('https://api.mixpanel.com/track', expect_data): - self.mp.track('button press', {'size': 'big', 'color': 'blue'}) + self.mp.track('player1', 'button_press', {'size': 'big', 'color': 'blue', '$insert_id': 'xyz1200'}) def test_people_set_functional(self): expect_data = {'$distinct_id': 'amq', '$set': {'birth month': 'october', 'favorite color': 'purple'}, '$time': 1000, '$token': '12345'}