diff --git a/pbm-functional/pytest/cluster.py b/pbm-functional/pytest/cluster.py index dadbcda3..03e4229d 100644 --- a/pbm-functional/pytest/cluster.py +++ b/pbm-functional/pytest/cluster.py @@ -429,7 +429,8 @@ def make_restore(self, name, **kwargs): assert False, "Cannot start restore, another operation running" time.sleep(1) Cluster.log("Restore started") - result = n.run('timeout 240 pbm restore ' + name + ' --wait') + timeout=kwargs.get('timeout', 240) + result = n.run('timeout ' + str(timeout) + ' pbm restore ' + name + ' --wait') if result.rc == 124: # try to catch possible failures if timeout exceeded for host in self.mongod_hosts: diff --git a/pbm-functional/pytest/test_PBM-1223.py b/pbm-functional/pytest/test_PBM-1223.py index 086f5162..809a2186 100644 --- a/pbm-functional/pytest/test_PBM-1223.py +++ b/pbm-functional/pytest/test_PBM-1223.py @@ -5,8 +5,9 @@ import time import os import docker -import threading import concurrent.futures +import random +import json from datetime import datetime from cluster import Cluster @@ -45,6 +46,8 @@ def start_cluster(cluster,request): client=pymongo.MongoClient(cluster.connection) client.admin.command("enableSharding", "test") client.admin.command("shardCollection", "test.test", key={"_id": "hashed"}) + client.admin.command("shardCollection", "test.test1", key={"_id": "hashed"}) + client.admin.command("shardCollection", "test.test2", key={"_id": "hashed"}) yield True finally: @@ -80,3 +83,83 @@ def test_disabled(start_cluster,cluster): assert pymongo.MongoClient(cluster.connection)["test"]["test"].count_documents({}) == 8 assert pymongo.MongoClient(cluster.connection)["test"].command("collstats", "test").get("sharded", False) Cluster.log("Finished successfully\n") + +@pytest.mark.timeout(3600,func_only=True) +def test_load(start_cluster,cluster): + # run transactions, returns array of tuples each includes oplog timestamp, oplog increment, resulted documents count + def background_transaction(db,collection): + Cluster.log("Starting background insert to " + collection) + j = 0 + result = [] + while upsert: + data = random.randbytes(1024 * 1024) + client = pymongo.MongoClient(cluster.connection) + with client.start_session() as session: + try: + with session.start_transaction(): + for i in range(20): + client[db][collection].insert_one({str(i): data }, session=session) + timeout = random.uniform(0.4,0.6) + time.sleep(timeout) + session.commit_transaction() + j = j + 20 + Cluster.log(collection + ": " + str(session.cluster_time['clusterTime'].time) + "." + str(session.cluster_time['clusterTime'].inc) + " " + str(j)) + timestamp = float(str(session.cluster_time['clusterTime'].time) + "." + str(session.cluster_time['clusterTime'].inc)) + result.append((timestamp,j)) + except Exception as e: + Cluster.log(e) + continue + finally: + client.close() + Cluster.log("Stopping background insert to " + collection) + return result + + cluster.check_pbm_status() + upsert=True + background_transaction1 = concurrent.futures.ThreadPoolExecutor().submit(background_transaction, 'test', 'test1') + background_transaction2 = concurrent.futures.ThreadPoolExecutor().submit(background_transaction, 'test', 'test2') + + time.sleep(300) + backup=cluster.make_backup('logical') + + upsert=False + upsert1_result = background_transaction1.result() + upsert2_result = background_transaction2.result() + Cluster.log("test1 documents count: " + str(pymongo.MongoClient(cluster.connection)["test"]["test1"].count_documents({}))) + Cluster.log("test2 documents count: " + str(pymongo.MongoClient(cluster.connection)["test"]["test2"].count_documents({}))) + + # backup_meta=json.loads(cluster.exec_pbm_cli("describe-backup " + backup + " --out=json").stdout) + # since pbm describe-backup doesn't return exact oplog timestamp let's check metadata on the storage + backup_meta=json.loads(testinfra.get_host("docker://rscfg01").check_output('cat /backups/' + backup + '.pbm.json')) + Cluster.log(json.dumps(backup_meta, indent=4)) + last_write_ts = float(str(backup_meta["last_write_ts"]["T"]) + "." + str(backup_meta["last_write_ts"]["I"])) + + # let's find the real count of documents inserted till last_write_ts + # we have the array of tuples containing oplog timestamp and resulted documents count like + # [(1709134884.23, 20), (1709134886.5, 40), (1709134887.39, 60), (1709134889.6, 80), (1709134891.5, 100), (1709134955.4, 120)] + # the second argument is last_write_ts like 1709134954.11 + # the result should be (1709134891.5, 100) + # result should be t[1] from the resulted tuple + def find_inserted(array_tuples,timestamp): + print(array_tuples) + print(timestamp) + resulted_tuples = [t for t in array_tuples if t[0] <= timestamp] + result = max(resulted_tuples, key=lambda t: t[0]) + print(result) + return result[1] + + inserted_test1 = find_inserted(upsert1_result,last_write_ts) + inserted_test2 = find_inserted(upsert2_result,last_write_ts) + Cluster.log("test1 inserted count: " + str(inserted_test1)) + Cluster.log("test2 inserted count: " + str(inserted_test2)) + + cluster.make_restore(backup,check_pbm_status=True,timeout=600) + + count_test1 = pymongo.MongoClient(cluster.connection)["test"]["test1"].count_documents({}) + count_test2 = pymongo.MongoClient(cluster.connection)["test"]["test2"].count_documents({}) + Cluster.log("test1 documents count: " + str(count_test1)) + Cluster.log("test2 documents count: " + str(count_test2)) + + assert inserted_test1 == count_test1 + assert inserted_test2 == count_test2 + Cluster.log("Finished successfully\n")