From 79c81d763eae44f7ea8224ab665dfe4b7e3b0671 Mon Sep 17 00:00:00 2001 From: yanghua Date: Wed, 25 Sep 2024 10:34:02 +0800 Subject: [PATCH] Support tag the access source --- poetry.lock | 2 +- pyproject.toml | 1 + tosfs/tag.py | 120 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+), 1 deletion(-) create mode 100644 tosfs/tag.py diff --git a/poetry.lock b/poetry.lock index b779fe6..ceab263 100644 --- a/poetry.lock +++ b/poetry.lock @@ -733,4 +733,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "1bb1712f54089469cbb3c278bad0114a8104a28f988d82ff8d87bf88aa5d0fa5" +content-hash = "1bb1712f54089469cbb3c278bad0114a8104a28f988d82ff8d87bf88aa5d0fa5" \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 3d59f1f..299d2d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ pytest-cov = "==5.0.0" coverage = "==7.5.0" ruff = "==0.6.0" types-requests = "==2.32.0.20240907" +volcengine= "==1.0.154" [tool.pydocstyle] convention = "numpy" diff --git a/tosfs/tag.py b/tosfs/tag.py new file mode 100644 index 0000000..020c73d --- /dev/null +++ b/tosfs/tag.py @@ -0,0 +1,120 @@ +# ByteDance Volcengine EMR, Copyright 2024. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +import threading + +from volcengine.ApiInfo import ApiInfo +from volcengine.base.Service import Service +from volcengine.Credentials import Credentials +from volcengine.ServiceInfo import ServiceInfo + +PUT_TAG_ACTION_NAME = "PutBucketDoubleMeterTagging" +GET_TAG_ACTION_NAME = "GetBucketTagging" +DEL_TAG_ACTION_NAME = "DeleteBucketTagging" +EMR_OPEN_API_VERSION = "2022-12-29" +OPEN_API_HOST = "open.volcengineapi.com" +ACCEPT_HEADER_KEY = "accept" +ACCEPT_HEADER_JSON_VALUE = "application/json" + + +service_info_map = { + "cn-beijing": ServiceInfo(OPEN_API_HOST, {ACCEPT_HEADER_KEY: ACCEPT_HEADER_JSON_VALUE, }, + Credentials("", "", "emr", "cn-beijing"), 60 * 5, 60 * 5, "http"), + "cn-guangzhou": ServiceInfo(OPEN_API_HOST, {ACCEPT_HEADER_KEY: ACCEPT_HEADER_JSON_VALUE, }, + Credentials("", "", "emr", "cn-guangzhou"), 60 * 5, 60 * 5, "http"), + "cn-shanghai": ServiceInfo(OPEN_API_HOST, {ACCEPT_HEADER_KEY: ACCEPT_HEADER_JSON_VALUE, }, + Credentials("", "", "emr", "cn-shanghai"), 60 * 5, 60 * 5, "http"), + "ap-southeast-1": ServiceInfo(OPEN_API_HOST, {ACCEPT_HEADER_KEY: ACCEPT_HEADER_JSON_VALUE, }, + Credentials("", "", "emr", "ap-southeast-1"), 60 * 5, 60 * 5, "http"), + "cn-beijing-qa": ServiceInfo(OPEN_API_HOST, {ACCEPT_HEADER_KEY: ACCEPT_HEADER_JSON_VALUE, }, + Credentials("", "", "emr_qa", "cn-beijing"), 60 * 5, 60 * 5, "http"), +} + +api_info = { + PUT_TAG_ACTION_NAME: ApiInfo("POST", "/", { + "Action": PUT_TAG_ACTION_NAME, "Version": EMR_OPEN_API_VERSION}, {}, {}), + GET_TAG_ACTION_NAME: ApiInfo("GET", "/", { + "Action": GET_TAG_ACTION_NAME, "Version": EMR_OPEN_API_VERSION}, {}, {}), + DEL_TAG_ACTION_NAME: ApiInfo("POST", "/", { + "Action": DEL_TAG_ACTION_NAME, "Version": EMR_OPEN_API_VERSION}, {}, {}), +} + +class BucketTagAction(Service): + _instance_lock = threading.Lock() + + def __new__(cls, *args, **kwargs): + if not hasattr(BucketTagAction, "_instance"): + with BucketTagAction._instance_lock: + if not hasattr(BucketTagAction, "_instance"): + BucketTagAction._instance = object.__new__(cls) + return BucketTagAction._instance + + def __init__(self, access_key = None, secret_key = None, region = "cn-beijing"): + if region is None: + region = "cn-beijing" + super().__init__(self.get_service_info(region), self.get_api_info()) + if access_key is not None and secret_key is not None: + self.set_ak(access_key) + self.set_sk(secret_key) + + @staticmethod + def get_api_info(): + return api_info + + @staticmethod + def get_service_info(region): + service_info = service_info_map.get(region) + if service_info: + return service_info + elif "VOLC_REGION" in os.environ: + return ServiceInfo(OPEN_API_HOST, {ACCEPT_HEADER_KEY: ACCEPT_HEADER_JSON_VALUE, }, + Credentials("", "", "emr", region), 60 * 5, 60 * 5, "http") + else: + raise Exception("do not support region %s" % region) + + def put_bucket_tag(self, bucket): + params = {"Bucket": bucket,} + + try: + res = self.json(PUT_TAG_ACTION_NAME, params, json.dumps("")) + res_json = json.loads(res) + logging.debug("Put tag for bucket %s is success. The result of put_Bucket_tag is %s.", bucket, res_json) + return (bucket, True) + except Exception as e: + logging.error("Put tag for bucket %s is failed: %s", bucket, e) + return (bucket, False) + + def get_bucket_tag(self, bucket): + params = {"Bucket": bucket,} + try: + res = self.get(GET_TAG_ACTION_NAME, params) + res_json = json.loads(res) + logging.debug("The result of get_Bucket_tag is %s", res_json) + return True + except Exception as e: + logging.error("Get tag for %s is failed: %s", bucket, e) + return False + + def del_bucket_tag(self, bucket): + params = {"Bucket": bucket,} + try: + res = self.json(DEL_TAG_ACTION_NAME, params, json.dumps("")) + res_json = json.loads(res) + logging.debug("The result of del_Bucket_tag is %s", res_json) + except Exception as e: + logging.error("Delete tag for %s is failed: %s", bucket, e) +