Skip to content

Commit

Permalink
Applying PR: Added ability to access mongo oplog via a proxy yougov#751
Browse files Browse the repository at this point in the history
  • Loading branch information
shawlz committed Feb 7, 2018
1 parent 085b5f6 commit 6a0c896
Showing 1 changed file with 40 additions and 12 deletions.
52 changes: 40 additions & 12 deletions mongo_connector/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@


# Monkey patch logging to add Logger.always
ALWAYS = logging.CRITICAL + 10
ALWAYS = logging.CRITICAL 10
logging.addLevelName(ALWAYS, 'ALWAYS')


Expand Down Expand Up @@ -124,6 +124,9 @@ def __init__(self, mongo_address, doc_managers=None, **kwargs):
# Timezone awareness
self.tz_aware = kwargs.get('tz_aware', False)

# If oplog access is via the proxy
self.is_oplog_proxy= kwargs.get('is_oplog_proxy', False)

# SSL keyword arguments to MongoClient.
ssl_certfile = kwargs.pop('ssl_certfile', None)
ssl_ca_certs = kwargs.pop('ssl_ca_certs', None)
Expand Down Expand Up @@ -215,7 +218,8 @@ def from_config(cls, config):
ssl_keyfile=config['ssl.sslKeyfile'],
ssl_ca_certs=config['ssl.sslCACerts'],
ssl_cert_reqs=config['ssl.sslCertificatePolicy'],
tz_aware=config['timezoneAware']
tz_aware=config['timezoneAware'],
is_oplog_proxy=config['isOplogProxy']
)
return connector

Expand All @@ -242,7 +246,7 @@ def write_oplog_progress(self):
return

# write to temp file
backup_file = self.oplog_checkpoint + '.backup'
backup_file = self.oplog_checkpoint '.backup'
os.rename(self.oplog_checkpoint, backup_file)

# for each of the threads write to file
Expand Down Expand Up @@ -314,9 +318,9 @@ def copy_uri_options(hosts, mongodb_uri):
options = mongodb_uri.split('?', 1)[1]
else:
options = None
uri = 'mongodb://' + hosts
uri = 'mongodb://' hosts
if options:
uri += '/?' + options
uri= '/?' options
return uri

def create_authed_client(self, hosts=None, **kwargs):
Expand Down Expand Up @@ -374,12 +378,25 @@ def run(self):
)
return

# Establish a connection to the replica set as a whole
self.main_conn.close()
self.main_conn = self.create_authed_client(
replicaSet=is_master['setName'])

self.update_version_from_client(self.main_conn)
# # Establish a connection to the replica set as a whole
# self.main_conn.close()
# self.main_conn = self.create_authed_client(
# replicaSet=is_master['setName'])

# self.update_version_from_client(self.main_conn)
if not self.is_oplog_proxy:
# Establish a connection to the replica set as a whole
self.main_conn.close()
self.main_conn = self.create_authed_client(
replicaSet=is_master['setName'])
self.update_version_from_client(self.main_conn)
else:
try:
# Check if local.oplog.rs is readable
self.main_conn.local.oplog.rs.find_one()
except pymongo.errors.OperationFailure:
LOG.error('Could not read local.oplog.rs!')
sys.exit(1)

# non sharded configuration
oplog = OplogThread(
Expand Down Expand Up @@ -486,6 +503,17 @@ def add_option(*args, **kwargs):
" would be a valid argument to `-m`. Don't use"
" quotes around the address.")

is_oplog_proxy = add_option(
config_key="isOplogProxy",
default=False,
type=bool)

is_oplog_proxy.add_cli(
"--is_oplog_proxy", dest="is_oplog_proxy", help=
"True if passed uri is a proxy with access to mongo"
"oplog.rs.")


oplog_file = add_option(
config_key="oplogFile",
default="oplog.timestamp",
Expand Down Expand Up @@ -859,7 +887,7 @@ def apply_old_namespace_options(option, cli_values):
dest_mapping = option.value['mapping']

valid_names = set(['include', 'exclude', 'gridfs', 'mapping'])
valid_names |= set('__' + name for name in valid_names)
valid_names |= set('__' name for name in valid_names)
valid_names.add('__comment__')
for key in option.value:
if key not in valid_names:
Expand Down

0 comments on commit 6a0c896

Please sign in to comment.