11
11
import pytest
12
12
from aliyunsdkcore import client
13
13
from aliyunsdksts .request .v20150401 import AssumeRoleRequest
14
+ from oss2 .credentials import EcsRamRoleCredentialsProvider
14
15
15
16
from ossfs import OSSFileSystem
16
17
@@ -25,7 +26,7 @@ def fetch_sts_token(access_key_id, access_key_secret, role_arn):
25
26
26
27
req .set_accept_format ("json" )
27
28
req .set_RoleArn (role_arn )
28
- req .set_RoleSessionName ("oss-python-sdk-example " )
29
+ req .set_RoleSessionName ("ossfs-login-test " )
29
30
30
31
body = clt .do_action_with_exception (req )
31
32
@@ -73,3 +74,29 @@ def test_env_endpoint(endpoint, test_bucket_name, monkeypatch):
73
74
def test_anonymous_login (file_in_anonymous , endpoint ):
74
75
ossfs = OSSFileSystem (endpoint = endpoint )
75
76
ossfs .cat (f"{ file_in_anonymous } " )
77
+
78
+
79
+ def test_auth_login (endpoint , test_bucket_name ):
80
+ key , secret , token = fetch_sts_token (STSAccessKeyId , STSAccessKeySecret , STSArn )
81
+ auth = oss2 .StsAuth (key , secret , token )
82
+ ossfs = OSSFileSystem (
83
+ auth = auth ,
84
+ endpoint = endpoint ,
85
+ )
86
+ ossfs .ls (test_bucket_name )
87
+
88
+
89
+ def test_ecs_ram_role_credential (endpoint , test_bucket_name , test_directory ):
90
+ key , secret , token = fetch_sts_token (STSAccessKeyId , STSAccessKeySecret , STSArn )
91
+ auth = oss2 .StsAuth (key , secret , token )
92
+ bucket = oss2 .Bucket (auth , endpoint , test_bucket_name )
93
+ path = f"{ test_directory } /test_ecs_ram_role_credential/file"
94
+ bucket .put_object (path , "test ecs ram role credential" )
95
+ auth_server_host = bucket .sign_url ("GET" , path , 10 )
96
+ credentials_provider = EcsRamRoleCredentialsProvider (auth_server_host , 3 , 10 )
97
+ provider_auth = oss2 .ProviderAuth (credentials_provider )
98
+ ossfs = OSSFileSystem (
99
+ auth = provider_auth ,
100
+ endpoint = endpoint ,
101
+ )
102
+ ossfs .ls (test_bucket_name )
0 commit comments