From c5f26cb9684d896658e5fef5c5d28c30c497842a Mon Sep 17 00:00:00 2001 From: aherlihy Date: Tue, 16 Feb 2016 16:49:56 +0100 Subject: [PATCH] Remove elastic_doc_manager (now packaged separately) --- .travis.yml | 5 - README.rst | 8 + mongo_connector/connector.py | 6 +- .../doc_managers/elastic_doc_manager.py | 305 ------------------ .../test_utils.py | 79 ++++- setup.py | 6 +- tests/__init__.py | 19 +- tests/test_command_replication.py | 3 +- tests/test_config.py | 4 +- tests/test_connector_sharded.py | 23 +- tests/test_elastic.py | 267 --------------- tests/test_elastic_doc_manager.py | 235 -------------- tests/test_formatters.py | 14 + tests/test_gridfs_file.py | 34 +- tests/test_mongo.py | 10 +- tests/test_mongo_connector.py | 4 +- tests/test_mongo_doc_manager.py | 4 +- tests/test_oplog_manager.py | 3 +- tests/test_oplog_manager_sharded.py | 8 +- tests/test_rollbacks.py | 27 +- tests/test_solr.py | 15 +- tests/test_solr_doc_manager.py | 6 +- tests/test_synchronizer.py | 6 +- tests/util.py | 47 --- 24 files changed, 184 insertions(+), 954 deletions(-) delete mode 100755 mongo_connector/doc_managers/elastic_doc_manager.py rename tests/setup_cluster.py => mongo_connector/test_utils.py (73%) delete mode 100644 tests/test_elastic.py delete mode 100644 tests/test_elastic_doc_manager.py delete mode 100644 tests/util.py diff --git a/.travis.yml b/.travis.yml index 4fbecdcf..6a90855d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,19 +6,14 @@ python: - 3.4 - 3.5 -services: - - elasticsearch - install: - pip install pymongo==2.9 - pip install mongo-orchestration==0.3.1 - curl -O http://archive.apache.org/dist/lucene/solr/4.9.1/solr-4.9.1.tgz - tar -zxf solr-4.9.1.tgz - cp mongo_connector/doc_managers/schema.xml solr-4.9.1/example/solr/collection1/conf/ - - sudo /usr/share/elasticsearch/bin/plugin install elasticsearch/elasticsearch-mapper-attachments/2.4.3 before_script: - - sudo service elasticsearch restart - cd solr-4.9.1/example/ && java -Djetty.port=8983 -Dsolr.solr.home=solr -jar start.jar > /dev/null 2>&1 & - mongo-orchestration start diff --git a/README.rst b/README.rst index 47070f5b..3c991166 100644 --- a/README.rst +++ b/README.rst @@ -72,6 +72,14 @@ If you want to jump-start into using mongo-connector with a another particular s - `Usage with Elasticsearch `__ - `Usage with MongoDB `__ +Doc Managers +~~~~~~~~~~~~~~~~~~~~~ + +Elastic 1.x doc manager: https://github.com/mongodb-labs/elastic_doc_manager +Elastic 2.x doc manager: https://github.com/mongodb-labs/elastic2_doc_manager + +The Solr doc manager and the MongoDB doc manager come packaged with the mongo-connector project. + Troubleshooting/Questions ------------------------- diff --git a/mongo_connector/connector.py b/mongo_connector/connector.py index 4cdc22f7..66c09f03 100644 --- a/mongo_connector/connector.py +++ b/mongo_connector/connector.py @@ -807,7 +807,11 @@ def import_dm_by_name(name): return module except ImportError: raise errors.InvalidConfiguration( - "Could not import %s." % full_name) + "Could not import %s. It could be that this doc manager ha" + "s been moved out of this project and is maintained elsewh" + "ere. Make sure that you have the doc manager installed al" + "ongside mongo-connector. Check the README for a list of a" + "vailable doc managers." % full_name) sys.exit(1) except (AttributeError, TypeError): raise errors.InvalidConfiguration( diff --git a/mongo_connector/doc_managers/elastic_doc_manager.py b/mongo_connector/doc_managers/elastic_doc_manager.py deleted file mode 100755 index a1bce0a7..00000000 --- a/mongo_connector/doc_managers/elastic_doc_manager.py +++ /dev/null @@ -1,305 +0,0 @@ -# Copyright 2013-2014 MongoDB, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Elasticsearch implementation of the DocManager interface. - -Receives documents from an OplogThread and takes the appropriate actions on -Elasticsearch. -""" -import base64 -import logging - -from threading import Timer - -import bson.json_util - -from elasticsearch import Elasticsearch, exceptions as es_exceptions -from elasticsearch.helpers import scan, streaming_bulk - -from mongo_connector import errors -from mongo_connector.compat import u -from mongo_connector.constants import (DEFAULT_COMMIT_INTERVAL, - DEFAULT_MAX_BULK) -from mongo_connector.util import exception_wrapper, retry_until_ok -from mongo_connector.doc_managers.doc_manager_base import DocManagerBase -from mongo_connector.doc_managers.formatters import DefaultDocumentFormatter - -wrap_exceptions = exception_wrapper({ - es_exceptions.ConnectionError: errors.ConnectionFailed, - es_exceptions.TransportError: errors.OperationFailed, - es_exceptions.NotFoundError: errors.OperationFailed, - es_exceptions.RequestError: errors.OperationFailed}) - -LOG = logging.getLogger(__name__) - - -class DocManager(DocManagerBase): - """Elasticsearch implementation of the DocManager interface. - - Receives documents from an OplogThread and takes the appropriate actions on - Elasticsearch. - """ - - def __init__(self, url, auto_commit_interval=DEFAULT_COMMIT_INTERVAL, - unique_key='_id', chunk_size=DEFAULT_MAX_BULK, - meta_index_name="mongodb_meta", meta_type="mongodb_meta", - attachment_field="content", **kwargs): - self.elastic = Elasticsearch( - hosts=[url], **kwargs.get('clientOptions', {})) - self.auto_commit_interval = auto_commit_interval - self.meta_index_name = meta_index_name - self.meta_type = meta_type - self.unique_key = unique_key - self.chunk_size = chunk_size - if self.auto_commit_interval not in [None, 0]: - self.run_auto_commit() - self._formatter = DefaultDocumentFormatter() - - self.has_attachment_mapping = False - self.attachment_field = attachment_field - - def _index_and_mapping(self, namespace): - """Helper method for getting the index and type from a namespace.""" - index, doc_type = namespace.split('.', 1) - return index.lower(), doc_type - - def stop(self): - """Stop the auto-commit thread.""" - self.auto_commit_interval = None - - def apply_update(self, doc, update_spec): - if "$set" not in update_spec and "$unset" not in update_spec: - # Don't try to add ns and _ts fields back in from doc - return update_spec - return super(DocManager, self).apply_update(doc, update_spec) - - @wrap_exceptions - def handle_command(self, doc, namespace, timestamp): - db = namespace.split('.', 1)[0] - if doc.get('dropDatabase'): - dbs = self.command_helper.map_db(db) - for _db in dbs: - self.elastic.indices.delete(index=_db.lower()) - - if doc.get('renameCollection'): - raise errors.OperationFailed( - "elastic_doc_manager does not support renaming a mapping.") - - if doc.get('create'): - db, coll = self.command_helper.map_collection(db, doc['create']) - if db and coll: - self.elastic.indices.put_mapping( - index=db.lower(), doc_type=coll, - body={ - "_source": {"enabled": True} - }) - - if doc.get('drop'): - db, coll = self.command_helper.map_collection(db, doc['drop']) - if db and coll: - self.elastic.indices.delete_mapping(index=db.lower(), - doc_type=coll) - - @wrap_exceptions - def update(self, document_id, update_spec, namespace, timestamp): - """Apply updates given in update_spec to the document whose id - matches that of doc. - """ - self.commit() - index, doc_type = self._index_and_mapping(namespace) - document = self.elastic.get(index=index, doc_type=doc_type, - id=u(document_id)) - updated = self.apply_update(document['_source'], update_spec) - # _id is immutable in MongoDB, so won't have changed in update - updated['_id'] = document['_id'] - self.upsert(updated, namespace, timestamp) - # upsert() strips metadata, so only _id + fields in _source still here - return updated - - @wrap_exceptions - def upsert(self, doc, namespace, timestamp): - """Insert a document into Elasticsearch.""" - index, doc_type = self._index_and_mapping(namespace) - # No need to duplicate '_id' in source document - doc_id = u(doc.pop("_id")) - metadata = { - "ns": namespace, - "_ts": timestamp - } - # Index the source document, using lowercase namespace as index name. - self.elastic.index(index=index, doc_type=doc_type, - body=self._formatter.format_document(doc), id=doc_id, - refresh=(self.auto_commit_interval == 0)) - # Index document metadata with original namespace (mixed upper/lower). - self.elastic.index(index=self.meta_index_name, doc_type=self.meta_type, - body=bson.json_util.dumps(metadata), id=doc_id, - refresh=(self.auto_commit_interval == 0)) - # Leave _id, since it's part of the original document - doc['_id'] = doc_id - - @wrap_exceptions - def bulk_upsert(self, docs, namespace, timestamp): - """Insert multiple documents into Elasticsearch.""" - def docs_to_upsert(): - doc = None - for doc in docs: - # Remove metadata and redundant _id - index, doc_type = self._index_and_mapping(namespace) - doc_id = u(doc.pop("_id")) - document_action = { - "_index": index, - "_type": doc_type, - "_id": doc_id, - "_source": self._formatter.format_document(doc) - } - document_meta = { - "_index": self.meta_index_name, - "_type": self.meta_type, - "_id": doc_id, - "_source": { - "ns": index, - "_ts": timestamp - } - } - yield document_action - yield document_meta - if doc is None: - raise errors.EmptyDocsError( - "Cannot upsert an empty sequence of " - "documents into Elastic Search") - try: - kw = {} - if self.chunk_size > 0: - kw['chunk_size'] = self.chunk_size - - responses = streaming_bulk(client=self.elastic, - actions=docs_to_upsert(), - **kw) - - for ok, resp in responses: - if not ok: - LOG.error( - "Could not bulk-upsert document " - "into ElasticSearch: %r" % resp) - if self.auto_commit_interval == 0: - self.commit() - except errors.EmptyDocsError: - # This can happen when mongo-connector starts up, there is no - # config file, but nothing to dump - pass - - @wrap_exceptions - def insert_file(self, f, namespace, timestamp): - doc = f.get_metadata() - doc_id = str(doc.pop('_id')) - index, doc_type = self._index_and_mapping(namespace) - - # make sure that elasticsearch treats it like a file - if not self.has_attachment_mapping: - body = { - "properties": { - self.attachment_field: {"type": "attachment"} - } - } - self.elastic.indices.put_mapping(index=index, - doc_type=doc_type, - body=body) - self.has_attachment_mapping = True - - metadata = { - 'ns': namespace, - '_ts': timestamp, - } - - doc = self._formatter.format_document(doc) - doc[self.attachment_field] = base64.b64encode(f.read()).decode() - - self.elastic.index(index=index, doc_type=doc_type, - body=doc, id=doc_id, - refresh=(self.auto_commit_interval == 0)) - self.elastic.index(index=self.meta_index_name, doc_type=self.meta_type, - body=bson.json_util.dumps(metadata), id=doc_id, - refresh=(self.auto_commit_interval == 0)) - - @wrap_exceptions - def remove(self, document_id, namespace, timestamp): - """Remove a document from Elasticsearch.""" - index, doc_type = self._index_and_mapping(namespace) - self.elastic.delete(index=index, doc_type=doc_type, - id=u(document_id), - refresh=(self.auto_commit_interval == 0)) - self.elastic.delete(index=self.meta_index_name, doc_type=self.meta_type, - id=u(document_id), - refresh=(self.auto_commit_interval == 0)) - - @wrap_exceptions - def _stream_search(self, *args, **kwargs): - """Helper method for iterating over ES search results.""" - for hit in scan(self.elastic, query=kwargs.pop('body', None), - scroll='10m', **kwargs): - hit['_source']['_id'] = hit['_id'] - yield hit['_source'] - - def search(self, start_ts, end_ts): - """Query Elasticsearch for documents in a time range. - - This method is used to find documents that may be in conflict during - a rollback event in MongoDB. - """ - return self._stream_search( - index=self.meta_index_name, - body={ - "query": { - "filtered": { - "filter": { - "range": { - "_ts": {"gte": start_ts, "lte": end_ts} - } - } - } - } - }) - - def commit(self): - """Refresh all Elasticsearch indexes.""" - retry_until_ok(self.elastic.indices.refresh, index="") - - def run_auto_commit(self): - """Periodically commit to the Elastic server.""" - self.elastic.indices.refresh() - if self.auto_commit_interval not in [None, 0]: - Timer(self.auto_commit_interval, self.run_auto_commit).start() - - @wrap_exceptions - def get_last_doc(self): - """Get the most recently modified document from Elasticsearch. - - This method is used to help define a time window within which documents - may be in conflict after a MongoDB rollback. - """ - try: - result = self.elastic.search( - index=self.meta_index_name, - body={ - "query": {"match_all": {}}, - "sort": [{"_ts": "desc"}], - }, - size=1 - )["hits"]["hits"] - for r in result: - r['_source']['_id'] = r['_id'] - return r['_source'] - except es_exceptions.RequestError: - # no documents so ES returns 400 because of undefined _ts mapping - return None diff --git a/tests/setup_cluster.py b/mongo_connector/test_utils.py similarity index 73% rename from tests/setup_cluster.py rename to mongo_connector/test_utils.py index ca56622f..b9153ee4 100644 --- a/tests/setup_cluster.py +++ b/mongo_connector/test_utils.py @@ -14,12 +14,31 @@ import atexit import itertools +import time import os +import sys import pymongo import requests -from tests import db_user, db_password + +if sys.version_info[0] == 3: + unicode = str + +# Configurable hosts and ports used in the tests +solr_url = unicode(os.environ.get('SOLR_URL', 'http://localhost:8983/solr')) +db_user = unicode(os.environ.get("DB_USER", "")) +db_password = unicode(os.environ.get("DB_PASSWORD", "")) +# Extra keyword options to provide to Connector. +connector_opts = {} +if db_user: + connector_opts = {'auth_username': db_user, 'auth_key': db_password} + +# Document count for stress tests +STRESS_COUNT = 100 + +# Test namespace, timestamp arguments +TESTARGS = ('test.test', 1) _mo_address = os.environ.get("MO_ADDRESS", "localhost:8889") _mongo_start_port = int(os.environ.get("MONGO_PORT", 27017)) @@ -180,3 +199,61 @@ def start(self): self.shards = [ReplicaSet()._init_from_response(resp) for resp in (shard1, shard2)] return self + + +class MockGridFSFile: + def __init__(self, doc, data): + self._id = doc['_id'] + self.filename = doc['filename'] + self.upload_date = doc['upload_date'] + self.md5 = doc['md5'] + self.data = data + self.length = len(self.data) + self.pos = 0 + + def get_metadata(self): + return { + '_id': self._id, + 'filename': self.filename, + 'upload_date': self.upload_date, + 'md5': self.md5 + } + + def __len__(self): + return self.length + + def read(self, n=-1): + if n < 0 or self.pos + n > self.length: + n = self.length - self.pos + s = self.data[self.pos:self.pos+n] + self.pos += n + return s + + +def wait_for(condition, max_tries=60): + """Wait for a condition to be true up to a maximum number of tries + """ + cond = False + while not cond and max_tries > 1: + try: + cond = condition() + except Exception: + pass + time.sleep(1) + max_tries -= 1 + return condition() + + +def assert_soon(condition, message=None, max_tries=60): + """Assert that a condition eventually evaluates to True after at most + max_tries number of attempts + + """ + if not wait_for(condition, max_tries=max_tries): + raise AssertionError(message or "") + +def close_client(client): + if hasattr(type(client), '_process_kill_cursors_queue'): + client._process_kill_cursors_queue() + time.sleep(1) # Wait for queue to clear. + client.close() diff --git a/setup.py b/setup.py index 66424494..556244fb 100644 --- a/setup.py +++ b/setup.py @@ -42,7 +42,8 @@ from setuptools import setup extra_opts = {"test_suite": "tests", - "tests_require": ["mongo-orchestration>= 0.2, < 0.4", "requests>=2.5.1"]} + "tests_require": ["mongo-orchestration>= 0.2, < 0.4", + "requests>=2.5.1"]} if sys.version_info[:2] == (2, 6): # Need unittest2 to run unittests in Python 2.6 @@ -123,8 +124,7 @@ def run(self): platforms=["any"], classifiers=filter(None, classifiers.split("\n")), install_requires=['pymongo >= 2.9', - 'pysolr >= 3.1.0', - 'elasticsearch >= 1.2, < 2.0.0'], + 'pysolr >= 3.1.0'], packages=["mongo_connector", "mongo_connector.doc_managers"], package_data={ 'mongo_connector.doc_managers': ['schema.xml'] diff --git a/tests/__init__.py b/tests/__init__.py index f9fb5aeb..893a3e14 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -11,8 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import logging -import os import sys logging.basicConfig(stream=sys.stdout) @@ -27,20 +27,3 @@ import unittest from unittest.case import SkipTest -# Configurable hosts and ports used in the tests -elastic_host = unicode(os.environ.get("ES_HOST", 'localhost')) -elastic_port = unicode(os.environ.get("ES_PORT", 9200)) -elastic_pair = '%s:%s' % (elastic_host, elastic_port) -solr_url = unicode(os.environ.get('SOLR_URL', 'http://localhost:8983/solr')) -db_user = unicode(os.environ.get("DB_USER", "")) -db_password = unicode(os.environ.get("DB_PASSWORD", "")) -# Extra keyword options to provide to Connector. -connector_opts = {} -if db_user: - connector_opts = {'auth_username': db_user, 'auth_key': db_password} - -# Document count for stress tests -STRESS_COUNT = 100 - -# Test namespace, timestamp arguments -TESTARGS = ('test.test', 1) diff --git a/tests/test_command_replication.py b/tests/test_command_replication.py index ffb25445..5be228cc 100644 --- a/tests/test_command_replication.py +++ b/tests/test_command_replication.py @@ -26,9 +26,8 @@ from mongo_connector.doc_managers.doc_manager_base import DocManagerBase from mongo_connector.locking_dict import LockingDict from mongo_connector.oplog_manager import OplogThread +from mongo_connector.test_utils import ReplicaSet, assert_soon, close_client from tests import unittest -from tests.setup_cluster import ReplicaSet -from tests.util import assert_soon, close_client class CommandLoggerDocManager(DocManagerBase): diff --git a/tests/test_config.py b/tests/test_config.py index 0edd0359..6444a5bb 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -23,8 +23,8 @@ from mongo_connector import config, errors, connector from mongo_connector.connector import get_config_options, setup_logging from mongo_connector.doc_managers import doc_manager_simulator -from tests import unittest, solr_url -from tests.setup_cluster import ReplicaSet +from mongo_connector.test_utils import ReplicaSet, solr_url +from tests import unittest from_here = lambda *paths: os.path.join( os.path.abspath(os.path.dirname(__file__)), *paths) diff --git a/tests/test_connector_sharded.py b/tests/test_connector_sharded.py index 431a8837..d6ca76bf 100644 --- a/tests/test_connector_sharded.py +++ b/tests/test_connector_sharded.py @@ -1,11 +1,26 @@ +# Copyright 2013-2016 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import os from mongo_connector.connector import Connector from mongo_connector.doc_managers.doc_manager_simulator import DocManager - -from tests import unittest, SkipTest, db_user, db_password -from tests.setup_cluster import ShardedCluster -from tests.util import assert_soon +from mongo_connector.test_utils import (ShardedCluster, + db_user, + db_password, + assert_soon) +from tests import unittest, SkipTest class TestConnectorSharded(unittest.TestCase): diff --git a/tests/test_elastic.py b/tests/test_elastic.py deleted file mode 100644 index 01d59834..00000000 --- a/tests/test_elastic.py +++ /dev/null @@ -1,267 +0,0 @@ -# Copyright 2013-2014 MongoDB, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Integration tests for mongo-connector + Elasticsearch.""" -import base64 -import os -import sys -import time - -from bson import SON -from elasticsearch import Elasticsearch -from gridfs import GridFS - -sys.path[0:0] = [""] - -from tests import elastic_pair -from tests.setup_cluster import ReplicaSet -from mongo_connector.doc_managers.elastic_doc_manager import DocManager -from mongo_connector.connector import Connector -from mongo_connector.util import retry_until_ok -from tests.util import assert_soon, close_client -from tests import unittest - - -class ElasticsearchTestCase(unittest.TestCase): - """Base class for all ES TestCases.""" - - @classmethod - def setUpClass(cls): - cls.elastic_conn = Elasticsearch(hosts=[elastic_pair]) - cls.elastic_doc = DocManager(elastic_pair, - auto_commit_interval=0) - - def setUp(self): - # Create target index in elasticsearch - self.elastic_conn.indices.create(index='test', ignore=400) - self.elastic_conn.cluster.health(wait_for_status='yellow', - index='test') - - def tearDown(self): - self.elastic_conn.indices.delete(index='test', ignore=404) - - def _search(self, query=None): - query = query or {"match_all": {}} - return self.elastic_doc._stream_search( - index="test", doc_type='test', - body={"query": query} - ) - - def _count(self): - return self.elastic_conn.count(index='test')['count'] - - def _remove(self): - self.elastic_conn.indices.delete_mapping( - index="test", doc_type='test' - ) - self.elastic_conn.indices.refresh(index="test") - - def _mappings(self, index='_all'): - mappings = self.elastic_conn.indices.get_mapping(index=index) - if index in mappings: - return list(mappings[index]['mappings'].keys()) - return [] - - def _indices(self): - return list(self.elastic_conn.indices.stats()['indices'].keys()) - - -class TestElastic(ElasticsearchTestCase): - """Integration tests for mongo-connector + Elasticsearch.""" - - @classmethod - def setUpClass(cls): - """Start the cluster.""" - super(TestElastic, cls).setUpClass() - cls.repl_set = ReplicaSet().start() - cls.conn = cls.repl_set.client() - - @classmethod - def tearDownClass(cls): - """Kill the cluster.""" - close_client(cls.conn) - cls.repl_set.stop() - - def tearDown(self): - """Stop the Connector thread.""" - super(TestElastic, self).tearDown() - self.connector.join() - - def setUp(self): - """Start a new Connector for each test.""" - super(TestElastic, self).setUp() - try: - os.unlink("oplog.timestamp") - except OSError: - pass - docman = DocManager(elastic_pair) - self.connector = Connector( - mongo_address=self.repl_set.uri, - ns_set=['test.test'], - doc_managers=(docman,), - gridfs_set=['test.test'] - ) - - self.conn.test.test.drop() - self.conn.test.test.files.drop() - self.conn.test.test.chunks.drop() - - self.connector.start() - assert_soon(lambda: len(self.connector.shard_set) > 0) - assert_soon(lambda: self._count() == 0) - - def test_insert(self): - """Test insert operations.""" - self.conn['test']['test'].insert_one({'name': 'paulie'}) - assert_soon(lambda: self._count() > 0) - result_set_1 = list(self._search()) - self.assertEqual(len(result_set_1), 1) - result_set_2 = self.conn['test']['test'].find_one() - for item in result_set_1: - self.assertEqual(item['_id'], str(result_set_2['_id'])) - self.assertEqual(item['name'], result_set_2['name']) - - def test_remove(self): - """Tests remove operations.""" - self.conn['test']['test'].insert_one({'name': 'paulie'}) - assert_soon(lambda: self._count() == 1) - self.conn['test']['test'].delete_one({'name': 'paulie'}) - assert_soon(lambda: self._count() != 1) - self.assertEqual(self._count(), 0) - - def test_insert_file(self): - """Tests inserting a gridfs file - """ - fs = GridFS(self.conn['test'], 'test') - test_data = b"test_insert_file test file" - id = fs.put(test_data, filename="test.txt", encoding='utf8') - assert_soon(lambda: self._count() > 0) - - query = {"match": {"_all": "test_insert_file"}} - res = list(self._search(query)) - self.assertEqual(len(res), 1) - doc = res[0] - self.assertEqual(doc['filename'], 'test.txt') - self.assertEqual(doc['_id'], str(id)) - self.assertEqual(base64.b64decode(doc['content']), test_data) - - def test_remove_file(self): - fs = GridFS(self.conn['test'], 'test') - id = fs.put("test file", filename="test.txt", encoding='utf8') - assert_soon(lambda: self._count() == 1) - fs.delete(id) - assert_soon(lambda: self._count() == 0) - - def test_update(self): - """Test update operations.""" - # Insert - self.conn.test.test.insert_one({"a": 0}) - assert_soon(lambda: sum(1 for _ in self._search()) == 1) - - def check_update(update_spec): - updated = self.conn.test.command( - SON([('findAndModify', 'test'), - ('query', {"a": 0}), - ('update', update_spec), - ('new', True)]))['value'] - # Stringify _id to match what will be retrieved from ES - updated['_id'] = str(updated['_id']) - assert_soon(lambda: next(self._search()) == updated) - - # Update by adding a field. Note that ES can't mix types within an array - check_update({"$set": {"b": [{"c": 10}, {"d": 11}]}}) - - # Update by setting an attribute of a sub-document beyond end of array. - check_update({"$set": {"b.10.c": 42}}) - - # Update by changing a value within a sub-document (contains array) - check_update({"$inc": {"b.0.c": 1}}) - - # Update by changing the value within an array - check_update({"$inc": {"b.1.f": 12}}) - - # Update by adding new bucket to list - check_update({"$push": {"b": {"e": 12}}}) - - # Update by changing an entire sub-document - check_update({"$set": {"b.0": {"e": 4}}}) - - # Update by adding a sub-document - check_update({"$set": {"b": {"0": {"c": 100}}}}) - - # Update whole document - check_update({"a": 0, "b": {"1": {"d": 10000}}}) - - def test_rollback(self): - """Test behavior during a MongoDB rollback. - - We force a rollback by adding a doc, killing the primary, - adding another doc, killing the new primary, and then - restarting both. - """ - primary_conn = self.repl_set.primary.client() - - self.conn['test']['test'].insert_one({'name': 'paul'}) - condition1 = lambda: self.conn['test']['test'].find( - {'name': 'paul'}).count() == 1 - condition2 = lambda: self._count() == 1 - assert_soon(condition1) - assert_soon(condition2) - - self.repl_set.primary.stop(destroy=False) - - new_primary_conn = self.repl_set.secondary.client() - - admin = new_primary_conn['admin'] - assert_soon(lambda: admin.command("isMaster")['ismaster']) - time.sleep(5) - retry_until_ok(self.conn.test.test.insert_one, - {'name': 'pauline'}) - assert_soon(lambda: self._count() == 2) - result_set_1 = list(self._search()) - result_set_2 = self.conn['test']['test'].find_one({'name': 'pauline'}) - self.assertEqual(len(result_set_1), 2) - #make sure pauline is there - for item in result_set_1: - if item['name'] == 'pauline': - self.assertEqual(item['_id'], str(result_set_2['_id'])) - self.repl_set.secondary.stop(destroy=False) - - self.repl_set.primary.start() - while primary_conn['admin'].command("isMaster")['ismaster'] is False: - time.sleep(1) - - self.repl_set.secondary.start() - - time.sleep(2) - result_set_1 = list(self._search()) - self.assertEqual(len(result_set_1), 1) - - for item in result_set_1: - self.assertEqual(item['name'], 'paul') - find_cursor = retry_until_ok(self.conn['test']['test'].find) - self.assertEqual(retry_until_ok(find_cursor.count), 1) - - def test_bad_int_value(self): - self.conn.test.test.insert_one({ - 'inf': float('inf'), 'nan': float('nan'), - 'still_exists': True}) - assert_soon(lambda: self._count() > 0) - for doc in self._search(): - self.assertNotIn('inf', doc) - self.assertNotIn('nan', doc) - self.assertTrue(doc['still_exists']) - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_elastic_doc_manager.py b/tests/test_elastic_doc_manager.py deleted file mode 100644 index ab935cde..00000000 --- a/tests/test_elastic_doc_manager.py +++ /dev/null @@ -1,235 +0,0 @@ -# Copyright 2013-2014 MongoDB, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Unit tests for the Elastic DocManager.""" -import base64 -import sys -import time - -sys.path[0:0] = [""] - -from tests import elastic_pair, unittest, TESTARGS -from tests.test_elastic import ElasticsearchTestCase -from tests.test_gridfs_file import MockGridFSFile - -from mongo_connector.command_helper import CommandHelper -from mongo_connector.doc_managers.elastic_doc_manager import DocManager - - -class TestElasticDocManager(ElasticsearchTestCase): - """Unit tests for the Elastic DocManager.""" - - def test_update(self): - """Test the update method.""" - doc_id = 1 - doc = {"_id": doc_id, "a": 1, "b": 2} - self.elastic_doc.upsert(doc, *TESTARGS) - # $set only - update_spec = {"$set": {"a": 1, "b": 2}} - doc = self.elastic_doc.update(doc_id, update_spec, *TESTARGS) - self.assertEqual(doc, {"_id": '1', "a": 1, "b": 2}) - # $unset only - update_spec = {"$unset": {"a": True}} - doc = self.elastic_doc.update(doc_id, update_spec, *TESTARGS) - self.assertEqual(doc, {"_id": '1', "b": 2}) - # mixed $set/$unset - update_spec = {"$unset": {"b": True}, "$set": {"c": 3}} - doc = self.elastic_doc.update(doc_id, update_spec, *TESTARGS) - self.assertEqual(doc, {"_id": '1', "c": 3}) - - def test_upsert(self): - """Test the upsert method.""" - docc = {'_id': '1', 'name': 'John'} - self.elastic_doc.upsert(docc, *TESTARGS) - res = self.elastic_conn.search( - index="test", doc_type='test', - body={"query": {"match_all": {}}} - )["hits"]["hits"] - for doc in res: - self.assertEqual(doc['_id'], '1') - self.assertEqual(doc['_source']['name'], 'John') - - def test_bulk_upsert(self): - """Test the bulk_upsert method.""" - self.elastic_doc.bulk_upsert([], *TESTARGS) - - docs = ({"_id": i} for i in range(1000)) - self.elastic_doc.bulk_upsert(docs, *TESTARGS) - self.elastic_doc.commit() - returned_ids = sorted(int(doc["_id"]) for doc in self._search()) - self.assertEqual(self._count(), 1000) - self.assertEqual(len(returned_ids), 1000) - for i, r in enumerate(returned_ids): - self.assertEqual(r, i) - - docs = ({"_id": i, "weight": 2*i} for i in range(1000)) - self.elastic_doc.bulk_upsert(docs, *TESTARGS) - - returned_ids = sorted( - int(doc["weight"]) for doc in self._search()) - self.assertEqual(len(returned_ids), 1000) - for i, r in enumerate(returned_ids): - self.assertEqual(r, 2*i) - - def test_remove(self): - """Test the remove method.""" - docc = {'_id': '1', 'name': 'John'} - self.elastic_doc.upsert(docc, *TESTARGS) - res = self.elastic_conn.search( - index="test", doc_type='test', - body={"query": {"match_all": {}}} - )["hits"]["hits"] - res = [x["_source"] for x in res] - self.assertEqual(len(res), 1) - - self.elastic_doc.remove(docc['_id'], *TESTARGS) - res = self.elastic_conn.search( - index="test", doc_type='test', - body={"query": {"match_all": {}}} - )["hits"]["hits"] - res = [x["_source"] for x in res] - self.assertEqual(len(res), 0) - - def test_insert_file(self): - """Ensure we can properly insert a file into ElasticSearch - """ - test_data = ' '.join(str(x) for x in range(100000)).encode('utf8') - docc = { - '_id': 'test_id', - 'filename': 'test_filename', - 'upload_date': 5, - 'md5': 'test_md5' - } - self.elastic_doc.insert_file( - MockGridFSFile(docc, test_data), *TESTARGS) - res = self._search() - for doc in res: - self.assertEqual(doc['_id'], docc['_id']) - self.assertEqual(doc['filename'], docc['filename']) - self.assertEqual(base64.b64decode(doc['content']), - test_data.strip()) - - def test_remove_file(self): - test_data = b'hello world' - docc = { - '_id': 'test_id', - '_ts': 10, - 'ns': 'test.test', - 'filename': 'test_filename', - 'upload_date': 5, - 'md5': 'test_md5' - } - - self.elastic_doc.insert_file( - MockGridFSFile(docc, test_data), *TESTARGS) - res = list(self._search()) - self.assertEqual(len(res), 1) - - self.elastic_doc.remove('test_id', *TESTARGS) - res = list(self._search()) - self.assertEqual(len(res), 0) - - def test_search(self): - """Test the search method. - - Make sure we can retrieve documents last modified within a time range. - """ - docc = {'_id': '1', 'name': 'John'} - self.elastic_doc.upsert(docc, 'test.test', 5767301236327972865) - docc2 = {'_id': '2', 'name': 'John Paul'} - self.elastic_doc.upsert(docc2, 'test.test', 5767301236327972866) - docc3 = {'_id': '3', 'name': 'Paul'} - self.elastic_doc.upsert(docc3, 'test.test', 5767301236327972870) - search = list(self.elastic_doc.search(5767301236327972865, - 5767301236327972866)) - self.assertEqual(len(search), 2) - result_ids = [result.get("_id") for result in search] - self.assertIn('1', result_ids) - self.assertIn('2', result_ids) - - def test_elastic_commit(self): - """Test the auto_commit_interval attribute.""" - docc = {'_id': '3', 'name': 'Waldo'} - docman = DocManager(elastic_pair) - # test cases: - # -1 = no autocommit - # 0 = commit immediately - # x > 0 = commit within x seconds - for autocommit_interval in [None, 0, 1, 2]: - docman.auto_commit_interval = autocommit_interval - docman.upsert(docc, *TESTARGS) - if autocommit_interval is None: - docman.commit() - else: - # Allow just a little extra time - time.sleep(autocommit_interval + 1) - results = list(self._search()) - self.assertEqual(len(results), 1, - "should commit document with " - "auto_commit_interval = %s" % str( - autocommit_interval)) - self.assertEqual(results[0]["name"], "Waldo") - self._remove() - docman.stop() - - def test_get_last_doc(self): - """Test the get_last_doc method. - - Make sure we can retrieve the document most recently modified from ES. - """ - base = self.elastic_doc.get_last_doc() - ts = base.get("_ts", 0) if base else 0 - docc = {'_id': '4', 'name': 'Hare'} - self.elastic_doc.upsert(docc, 'test.test', ts + 3) - docc = {'_id': '5', 'name': 'Tortoise'} - self.elastic_doc.upsert(docc, 'test.test', ts + 2) - docc = {'_id': '6', 'name': 'Mr T.'} - self.elastic_doc.upsert(docc, 'test.test', ts + 1) - - self.assertEqual( - self.elastic_doc.elastic.count(index="test")['count'], 3) - doc = self.elastic_doc.get_last_doc() - self.assertEqual(doc['_id'], '4') - - docc = {'_id': '6', 'name': 'HareTwin'} - self.elastic_doc.upsert(docc, 'test.test', ts + 4) - doc = self.elastic_doc.get_last_doc() - self.assertEqual(doc['_id'], '6') - self.assertEqual( - self.elastic_doc.elastic.count(index="test")['count'], 3) - - def test_commands(self): - cmd_args = ('test.$cmd', 1) - self.elastic_doc.command_helper = CommandHelper() - - self.elastic_doc.handle_command({'create': 'test2'}, *cmd_args) - time.sleep(1) - self.assertIn('test2', self._mappings('test')) - - self.elastic_doc.handle_command({'drop': 'test2'}, *cmd_args) - time.sleep(1) - self.assertNotIn('test2', self._mappings('test')) - - self.elastic_doc.handle_command({'create': 'test2'}, *cmd_args) - self.elastic_doc.handle_command({'create': 'test3'}, *cmd_args) - time.sleep(1) - self.elastic_doc.handle_command({'dropDatabase': 1}, *cmd_args) - time.sleep(1) - self.assertNotIn('test', self._indices()) - self.assertNotIn('test2', self._mappings()) - self.assertNotIn('test3', self._mappings()) - - -if __name__ == '__main__': - unittest.main() diff --git a/tests/test_formatters.py b/tests/test_formatters.py index df45c214..5cf6ba0f 100644 --- a/tests/test_formatters.py +++ b/tests/test_formatters.py @@ -1,3 +1,17 @@ +# Copyright 2013-2016 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + import datetime import re import sys diff --git a/tests/test_gridfs_file.py b/tests/test_gridfs_file.py index 4b5f4111..b486b65c 100644 --- a/tests/test_gridfs_file.py +++ b/tests/test_gridfs_file.py @@ -18,40 +18,10 @@ sys.path[0:0] = [""] -from mongo_connector.gridfs_file import GridFSFile from mongo_connector import errors +from mongo_connector.gridfs_file import GridFSFile +from mongo_connector.test_utils import ReplicaSet, close_client from tests import unittest -from tests.setup_cluster import ReplicaSet -from tests.util import close_client - - -class MockGridFSFile: - def __init__(self, doc, data): - self._id = doc['_id'] - self.filename = doc['filename'] - self.upload_date = doc['upload_date'] - self.md5 = doc['md5'] - self.data = data - self.length = len(self.data) - self.pos = 0 - - def get_metadata(self): - return { - '_id': self._id, - 'filename': self.filename, - 'upload_date': self.upload_date, - 'md5': self.md5 - } - - def __len__(self): - return self.length - - def read(self, n=-1): - if n < 0 or self.pos + n > self.length: - n = self.length - self.pos - s = self.data[self.pos:self.pos+n] - self.pos += n - return s class TestGridFSFile(unittest.TestCase): diff --git a/tests/test_mongo.py b/tests/test_mongo.py index 0960c783..541a9abb 100644 --- a/tests/test_mongo.py +++ b/tests/test_mongo.py @@ -15,6 +15,7 @@ """Test mongo using the synchronizer, i.e. as it would be used by an user """ + import os import sys import time @@ -24,12 +25,15 @@ sys.path[0:0] = [""] -from tests.setup_cluster import ReplicaSet, Server from mongo_connector.doc_managers.mongo_doc_manager import DocManager from mongo_connector.connector import Connector from mongo_connector.util import retry_until_ok -from tests import unittest, connector_opts -from tests.util import assert_soon, close_client +from mongo_connector.test_utils import (ReplicaSet, + Server, + connector_opts, + assert_soon, + close_client) +from tests import unittest class MongoTestCase(unittest.TestCase): diff --git a/tests/test_mongo_connector.py b/tests/test_mongo_connector.py index 1e4226bf..a1961a00 100755 --- a/tests/test_mongo_connector.py +++ b/tests/test_mongo_connector.py @@ -25,9 +25,9 @@ sys.path[0:0] = [""] from mongo_connector.connector import Connector -from tests import unittest, connector_opts -from tests.setup_cluster import ReplicaSet +from mongo_connector.test_utils import ReplicaSet, connector_opts from mongo_connector.util import long_to_bson_ts +from tests import unittest class TestMongoConnector(unittest.TestCase): diff --git a/tests/test_mongo_doc_manager.py b/tests/test_mongo_doc_manager.py index 796c057e..7aa91ede 100644 --- a/tests/test_mongo_doc_manager.py +++ b/tests/test_mongo_doc_manager.py @@ -21,8 +21,8 @@ from mongo_connector.command_helper import CommandHelper from mongo_connector.doc_managers.mongo_doc_manager import DocManager -from tests import unittest, TESTARGS -from tests.test_gridfs_file import MockGridFSFile +from mongo_connector.test_utils import TESTARGS, MockGridFSFile +from tests import unittest from tests.test_mongo import MongoTestCase diff --git a/tests/test_oplog_manager.py b/tests/test_oplog_manager.py index b5519e1b..4f848a3c 100755 --- a/tests/test_oplog_manager.py +++ b/tests/test_oplog_manager.py @@ -28,9 +28,8 @@ from mongo_connector.doc_managers.doc_manager_simulator import DocManager from mongo_connector.locking_dict import LockingDict from mongo_connector.oplog_manager import OplogThread +from mongo_connector.test_utils import ReplicaSet, assert_soon, close_client from tests import unittest -from tests.setup_cluster import ReplicaSet -from tests.util import assert_soon, close_client class TestOplogManager(unittest.TestCase): diff --git a/tests/test_oplog_manager_sharded.py b/tests/test_oplog_manager_sharded.py index 51bd9ac9..78b3264e 100644 --- a/tests/test_oplog_manager_sharded.py +++ b/tests/test_oplog_manager_sharded.py @@ -20,7 +20,6 @@ import bson import pymongo - from pymongo.read_preferences import ReadPreference from pymongo.write_concern import WriteConcern @@ -29,10 +28,11 @@ from mongo_connector.doc_managers.doc_manager_simulator import DocManager from mongo_connector.locking_dict import LockingDict from mongo_connector.oplog_manager import OplogThread +from mongo_connector.test_utils import (ShardedCluster, + assert_soon, + close_client) from mongo_connector.util import retry_until_ok -from tests import SkipTest, unittest -from tests.setup_cluster import ShardedCluster -from tests.util import assert_soon, close_client +from tests import unittest, SkipTest class TestOplogManagerSharded(unittest.TestCase): diff --git a/tests/test_rollbacks.py b/tests/test_rollbacks.py index e2681e46..f25a7ee8 100644 --- a/tests/test_rollbacks.py +++ b/tests/test_rollbacks.py @@ -1,3 +1,17 @@ +# Copyright 2013-2016 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + """Test Mongo Connector's behavior when its source MongoDB system is experiencing a rollback. @@ -12,14 +26,15 @@ sys.path[0:0] = [""] -from mongo_connector.util import retry_until_ok -from mongo_connector.locking_dict import LockingDict from mongo_connector.doc_managers.doc_manager_simulator import DocManager +from mongo_connector.locking_dict import LockingDict from mongo_connector.oplog_manager import OplogThread - -from tests import unittest, STRESS_COUNT -from tests.util import assert_soon, close_client -from tests.setup_cluster import ReplicaSet +from mongo_connector.test_utils import (ReplicaSet, + STRESS_COUNT, + assert_soon, + close_client) +from mongo_connector.util import retry_until_ok +from tests import unittest class TestRollbacks(unittest.TestCase): diff --git a/tests/test_solr.py b/tests/test_solr.py index a5b94014..b0e7bcca 100644 --- a/tests/test_solr.py +++ b/tests/test_solr.py @@ -14,25 +14,26 @@ # limitations under the License. """Test Solr search using the synchronizer, i.e. as it would be used by an user - """ +""" + import logging import os import sys import time +from pysolr import Solr, SolrError + from bson import SON from gridfs import GridFS -from pysolr import Solr, SolrError sys.path[0:0] = [""] -from tests import solr_url, unittest -from tests.setup_cluster import ReplicaSet -from tests.util import assert_soon from mongo_connector.compat import u from mongo_connector.connector import Connector from mongo_connector.doc_managers.solr_doc_manager import DocManager +from mongo_connector.test_utils import ReplicaSet, solr_url, assert_soon from mongo_connector.util import retry_until_ok +from tests import unittest class SolrTestCase(unittest.TestCase): @@ -159,7 +160,7 @@ def check_update(update_spec): ('new', True)]))['value'] # Stringify _id to match what will be retrieved from Solr - updated['_id'] = u(updated['_id']) + updated[u('_id')] = u(updated['_id']) # Flatten the MongoDB document to match Solr updated = docman._clean_doc(updated, 'dummy.namespace', 0) # Allow some time for update to propagate @@ -175,6 +176,8 @@ def check_update(update_spec): # Remove field added by Solr replicated.pop("_version_") + print("REPLICATED", replicated) + print("UPDATED", updated) self.assertEqual(replicated, updated) # Update by adding a field. diff --git a/tests/test_solr_doc_manager.py b/tests/test_solr_doc_manager.py index e8c6af00..bbab63f0 100755 --- a/tests/test_solr_doc_manager.py +++ b/tests/test_solr_doc_manager.py @@ -20,12 +20,10 @@ from mongo_connector.command_helper import CommandHelper from mongo_connector.doc_managers.solr_doc_manager import DocManager - -from tests import unittest, TESTARGS, solr_url -from tests.test_gridfs_file import MockGridFSFile +from mongo_connector.test_utils import MockGridFSFile, TESTARGS, solr_url +from tests import unittest from tests.test_solr import SolrTestCase - class TestSolrDocManager(SolrTestCase): """Test class for SolrDocManager """ diff --git a/tests/test_synchronizer.py b/tests/test_synchronizer.py index d894bba3..3b215515 100644 --- a/tests/test_synchronizer.py +++ b/tests/test_synchronizer.py @@ -14,6 +14,7 @@ """Test synchronizer using DocManagerSimulator """ + import os import sys import time @@ -21,9 +22,8 @@ sys.path[0:0] = [""] from mongo_connector.connector import Connector -from tests import unittest, connector_opts -from tests.setup_cluster import ReplicaSet -from tests.util import assert_soon +from mongo_connector.test_utils import ReplicaSet, connector_opts, assert_soon +from tests import unittest class TestSynchronizer(unittest.TestCase): diff --git a/tests/util.py b/tests/util.py deleted file mode 100644 index 628d8792..00000000 --- a/tests/util.py +++ /dev/null @@ -1,47 +0,0 @@ -# Copyright 2013-2014 MongoDB, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Utilities for mongo-connector tests. There are no actual tests in here. -""" - -import time - - -def wait_for(condition, max_tries=60): - """Wait for a condition to be true up to a maximum number of tries - """ - cond = False - while not cond and max_tries > 1: - try: - cond = condition() - except Exception: - pass - time.sleep(1) - max_tries -= 1 - return condition() - - -def assert_soon(condition, message=None, max_tries=60): - """Assert that a condition eventually evaluates to True after at most - max_tries number of attempts - - """ - if not wait_for(condition, max_tries=max_tries): - raise AssertionError(message or "") - -def close_client(client): - if hasattr(type(client), '_process_kill_cursors_queue'): - client._process_kill_cursors_queue() - time.sleep(1) # Wait for queue to clear. - client.close()