From 3bb7f33936a1ec5345e206d3670b6a17abdd038b Mon Sep 17 00:00:00 2001 From: Maciej Urbanski Date: Wed, 8 May 2024 20:13:11 +0200 Subject: [PATCH] skip remaining bucket notifications rules integration tests --- test/integration/test_raw_api.py | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/test/integration/test_raw_api.py b/test/integration/test_raw_api.py index cbf5e873..07d36d8b 100644 --- a/test/integration/test_raw_api.py +++ b/test/integration/test_raw_api.py @@ -583,6 +583,25 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): raw_api.delete_file_version(api_url, account_auth_token, file_id, file_name, True) print('b2_get_bucket_notification_rules & b2_set_bucket_notification_rules') + try: + _subtest_bucket_notification_rules( + raw_api, auth_dict, api_url, account_auth_token, bucket_id + ) + except pytest.skip.Exception as e: + print(e) + + # Clean up this test. + _clean_and_delete_bucket(raw_api, api_url, account_auth_token, account_id, bucket_id) + + if should_cleanup_old_buckets: + # Clean up from old tests. Empty and delete any buckets more than an hour old. + _cleanup_old_buckets(raw_api, auth_dict, bucket_list_dict) + + +def _subtest_bucket_notification_rules(raw_api, auth_dict, api_url, account_auth_token, bucket_id): + if 'writeBucketNotifications' not in auth_dict['allowed']['capabilities']: + pytest.skip('Test account does not have writeBucketNotifications capability') + notification_rule = { 'eventTypes': ['b2:ObjectCreated:Copy'], 'isEnabled': False, @@ -620,13 +639,6 @@ def raw_api_test_helper(raw_api, should_cleanup_old_buckets): assert raw_api.set_bucket_notification_rules(api_url, account_auth_token, bucket_id, []) == [] assert raw_api.get_bucket_notification_rules(api_url, account_auth_token, bucket_id) == [] - # Clean up this test. - _clean_and_delete_bucket(raw_api, api_url, account_auth_token, account_id, bucket_id) - - if should_cleanup_old_buckets: - # Clean up from old tests. Empty and delete any buckets more than an hour old. - _cleanup_old_buckets(raw_api, auth_dict, bucket_list_dict) - def cleanup_old_buckets(): raw_api = B2RawHTTPApi(B2Http())