From 6a0c8966ff0e39ae55e729ff2729fa4dba7d8490 Mon Sep 17 00:00:00 2001 From: Sola Ajayi Date: Wed, 7 Feb 2018 01:21:39 +0100 Subject: [PATCH] Applying PR: Added ability to access mongo oplog via a proxy #751 --- mongo_connector/connector.py | 52 +++++++++++++++++++++++++++--------- 1 file changed, 40 insertions(+), 12 deletions(-) diff --git a/mongo_connector/connector.py b/mongo_connector/connector.py index 7465b6d4..3a067250 100644 --- a/mongo_connector/connector.py +++ b/mongo_connector/connector.py @@ -43,7 +43,7 @@ # Monkey patch logging to add Logger.always -ALWAYS = logging.CRITICAL + 10 +ALWAYS = logging.CRITICAL 10 logging.addLevelName(ALWAYS, 'ALWAYS') @@ -124,6 +124,9 @@ def __init__(self, mongo_address, doc_managers=None, **kwargs): # Timezone awareness self.tz_aware = kwargs.get('tz_aware', False) + # If oplog access is via the proxy + self.is_oplog_proxy= kwargs.get('is_oplog_proxy', False) + # SSL keyword arguments to MongoClient. ssl_certfile = kwargs.pop('ssl_certfile', None) ssl_ca_certs = kwargs.pop('ssl_ca_certs', None) @@ -215,7 +218,8 @@ def from_config(cls, config): ssl_keyfile=config['ssl.sslKeyfile'], ssl_ca_certs=config['ssl.sslCACerts'], ssl_cert_reqs=config['ssl.sslCertificatePolicy'], - tz_aware=config['timezoneAware'] + tz_aware=config['timezoneAware'], + is_oplog_proxy=config['isOplogProxy'] ) return connector @@ -242,7 +246,7 @@ def write_oplog_progress(self): return # write to temp file - backup_file = self.oplog_checkpoint + '.backup' + backup_file = self.oplog_checkpoint '.backup' os.rename(self.oplog_checkpoint, backup_file) # for each of the threads write to file @@ -314,9 +318,9 @@ def copy_uri_options(hosts, mongodb_uri): options = mongodb_uri.split('?', 1)[1] else: options = None - uri = 'mongodb://' + hosts + uri = 'mongodb://' hosts if options: - uri += '/?' + options + uri= '/?' options return uri def create_authed_client(self, hosts=None, **kwargs): @@ -374,12 +378,25 @@ def run(self): ) return - # Establish a connection to the replica set as a whole - self.main_conn.close() - self.main_conn = self.create_authed_client( - replicaSet=is_master['setName']) - - self.update_version_from_client(self.main_conn) + # # Establish a connection to the replica set as a whole + # self.main_conn.close() + # self.main_conn = self.create_authed_client( + # replicaSet=is_master['setName']) + + # self.update_version_from_client(self.main_conn) + if not self.is_oplog_proxy: + # Establish a connection to the replica set as a whole + self.main_conn.close() + self.main_conn = self.create_authed_client( + replicaSet=is_master['setName']) + self.update_version_from_client(self.main_conn) + else: + try: + # Check if local.oplog.rs is readable + self.main_conn.local.oplog.rs.find_one() + except pymongo.errors.OperationFailure: + LOG.error('Could not read local.oplog.rs!') + sys.exit(1) # non sharded configuration oplog = OplogThread( @@ -486,6 +503,17 @@ def add_option(*args, **kwargs): " would be a valid argument to `-m`. Don't use" " quotes around the address.") + is_oplog_proxy = add_option( + config_key="isOplogProxy", + default=False, + type=bool) + + is_oplog_proxy.add_cli( + "--is_oplog_proxy", dest="is_oplog_proxy", help= + "True if passed uri is a proxy with access to mongo" + "oplog.rs.") + + oplog_file = add_option( config_key="oplogFile", default="oplog.timestamp", @@ -859,7 +887,7 @@ def apply_old_namespace_options(option, cli_values): dest_mapping = option.value['mapping'] valid_names = set(['include', 'exclude', 'gridfs', 'mapping']) - valid_names |= set('__' + name for name in valid_names) + valid_names |= set('__' name for name in valid_names) valid_names.add('__comment__') for key in option.value: if key not in valid_names: