diff --git a/pbm-functional/pytest/cluster.py b/pbm-functional/pytest/cluster.py index dadbcda3..03e4229d 100644 --- a/pbm-functional/pytest/cluster.py +++ b/pbm-functional/pytest/cluster.py @@ -429,7 +429,8 @@ def make_restore(self, name, **kwargs): assert False, "Cannot start restore, another operation running" time.sleep(1) Cluster.log("Restore started") - result = n.run('timeout 240 pbm restore ' + name + ' --wait') + timeout=kwargs.get('timeout', 240) + result = n.run('timeout ' + str(timeout) + ' pbm restore ' + name + ' --wait') if result.rc == 124: # try to catch possible failures if timeout exceeded for host in self.mongod_hosts: diff --git a/pbm-functional/pytest/test_PBM-1223.py b/pbm-functional/pytest/test_PBM-1223.py index 086f5162..82c9987f 100644 --- a/pbm-functional/pytest/test_PBM-1223.py +++ b/pbm-functional/pytest/test_PBM-1223.py @@ -5,8 +5,9 @@ import time import os import docker -import threading import concurrent.futures +import random +import json from datetime import datetime from cluster import Cluster @@ -45,6 +46,8 @@ def start_cluster(cluster,request): client=pymongo.MongoClient(cluster.connection) client.admin.command("enableSharding", "test") client.admin.command("shardCollection", "test.test", key={"_id": "hashed"}) + client.admin.command("shardCollection", "test.test1", key={"_id": "hashed"}) + client.admin.command("shardCollection", "test.test2", key={"_id": "hashed"}) yield True finally: @@ -80,3 +83,91 @@ def test_disabled(start_cluster,cluster): assert pymongo.MongoClient(cluster.connection)["test"]["test"].count_documents({}) == 8 assert pymongo.MongoClient(cluster.connection)["test"].command("collstats", "test").get("sharded", False) Cluster.log("Finished successfully\n") + +@pytest.mark.timeout(3600,func_only=True) +def test_load(start_cluster,cluster): + # run transactions, returns array of tuples each includes oplog timestamp, oplog increment, resulted documents count + def background_transaction(db,collection): + Cluster.log("Starting background insert to " + collection) + data = random.randbytes(1024 * 1024) + j = 0 + result = [] + while upsert: + client = pymongo.MongoClient(cluster.connection) + with client.start_session() as session: + try: + with session.start_transaction(): + for i in range(20): + client[db][collection].insert_one({str(i): data }, session=session) + time.sleep(0.5) + session.commit_transaction() + j = j + 20 + Cluster.log(collection + ": " + str(session.cluster_time['clusterTime'].time) + " " + str(session.cluster_time['clusterTime'].inc) + " " + str(j)) + result.append((session.cluster_time['clusterTime'].time,session.cluster_time['clusterTime'].inc,j)) + except Exception as e: + Cluster.log(e) + continue + finally: + client.close() + Cluster.log("Stopping background insert to " + collection) + return result + + cluster.check_pbm_status() + upsert=True + background_transaction1 = concurrent.futures.ThreadPoolExecutor().submit(background_transaction, 'test', 'test1') + time.sleep(1) + background_transaction2 = concurrent.futures.ThreadPoolExecutor().submit(background_transaction, 'test', 'test2') + + time.sleep(300) + backup=cluster.make_backup('logical') + + upsert=False + upsert1_result = background_transaction1.result() + upsert2_result = background_transaction2.result() + Cluster.log("test1 documents count: " + str(pymongo.MongoClient(cluster.connection)["test"]["test1"].count_documents({}))) + Cluster.log("test2 documents count: " + str(pymongo.MongoClient(cluster.connection)["test"]["test2"].count_documents({}))) + + # backup_meta=json.loads(cluster.exec_pbm_cli("describe-backup " + backup + " --out=json").stdout) + # since pbm describe-backup doesn't return exact oplog timestamp let's check metadata on the storage + backup_meta=json.loads(testinfra.get_host("docker://rscfg01").check_output('cat /backups/' + backup + '.pbm.json')) + Cluster.log(json.dumps(backup_meta, indent=4)) + last_write_ts = (backup_meta["last_write_ts"]["T"],backup_meta["last_write_ts"]["I"]) + + # let's find the real count of documents inserted till last_write_ts + # we have the array of tuples containing oplog timestamp, oplog increment and resulted documents count like + # [(1709134884, 23, 20), (1709134886, 5, 40), (1709134887, 39, 60), (1709134889, 6, 80), (1709134891, 5, 100), (1709134955, 4, 120)] + # the second argument is last_write_ts like (1709134954, 11) + # the result should be (1709134891, 5, 100) + # first we need to find the array of tuples were max(t[0]) <= last_write_ts[0] (timestamp comparison) + # if max(t[0]) == last_write_ts[0] we need to find the tuple from the resulted array were max(t[1]) <= last_write_ts[1] (increment comparison) + # else we just need to find the tuple with the max(t[1]) + # result should be t[2] from the resulted tuple + def find_inserted(array_tuples,tuple): + print(array_tuples) + print(tuple) + filtered_first = [t for t in array_tuples if t[0] <= tuple[0]] + max_first_element = max(t[0] for t in filtered_first) + filtered_second = [t for t in filtered_first if t[0] == max_first_element] + if max_first_element == tuple[0]: + resulted_tuples = [t for t in filtered_second if t[1] <= tuple[1]] + else: + resulted_tuples = filtered_second + result = max(resulted_tuples, key=lambda t: t[1]) + print(result) + return result[2] + + inserted_test1 = find_inserted(upsert1_result,last_write_ts) + inserted_test2 = find_inserted(upsert2_result,last_write_ts) + Cluster.log("test1 inserted count: " + str(inserted_test1)) + Cluster.log("test2 inserted count: " + str(inserted_test2)) + + cluster.make_restore(backup,check_pbm_status=True,timeout=600) + + count_test1 = pymongo.MongoClient(cluster.connection)["test"]["test1"].count_documents({}) + count_test2 = pymongo.MongoClient(cluster.connection)["test"]["test2"].count_documents({}) + Cluster.log("test1 documents count: " + str(count_test1)) + Cluster.log("test2 documents count: " + str(count_test2)) + + assert inserted_test1 == count_test1 + assert inserted_test2 == count_test2 + Cluster.log("Finished successfully\n")