-
Notifications
You must be signed in to change notification settings - Fork 137
/
Copy pathtest_search.py
339 lines (272 loc) · 13.7 KB
/
test_search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
import json
import os
import time
import unittest
from six import iterkeys
from urllib3 import disable_warnings
import cloudinary
from cloudinary import uploader, SearchFolders, Search
from test.helper_test import SUFFIX, TEST_IMAGE, TEST_TAG, UNIQUE_TAG, TEST_FOLDER, UNIQUE_TEST_FOLDER, \
retry_assertion, cleanup_test_resources_by_tag, URLLIB3_REQUEST, get_json_body, get_uri, patch
from test.test_api import MOCK_RESPONSE, NEXT_CURSOR
from test.test_config import CLOUD_NAME, API_KEY, API_SECRET
TEST_TAG = 'search_{}'.format(TEST_TAG)
UNIQUE_TAG = 'search_{}'.format(UNIQUE_TAG)
TEST_IMAGES_COUNT = 3
MAX_INDEX_RETRIES = 10
public_ids = ["{0}/search_test{1}_{1}".format(UNIQUE_TEST_FOLDER, i, SUFFIX) for i in range(0, TEST_IMAGES_COUNT)]
upload_results = ["++"]
FOLDERS_SEARCH_EXPRESSION = "path:{}*".format(TEST_FOLDER)
disable_warnings()
class SearchTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.ready = False
cloudinary.reset_config()
if not cloudinary.config().api_secret:
return
for public_id in public_ids:
res = uploader.upload(TEST_IMAGE,
public_id=public_id,
tags=[TEST_TAG, UNIQUE_TAG],
context="stage=value")
upload_results.append(res)
attempt = 0
while attempt < MAX_INDEX_RETRIES:
time.sleep(1)
results = Search().expression("tags={0}".format(UNIQUE_TAG)).execute()
if len(results['resources']) == len(public_ids):
cls.ready = True
break
attempt += 1
def setUp(self):
if not self.ready:
self.fail("Failed indexing test resources")
@classmethod
def tearDownClass(cls):
cleanup_test_resources_by_tag([(UNIQUE_TAG,)])
def tearDown(self):
cloudinary.reset_config()
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
def test_should_create_empty_json(self):
query_hash = Search().as_dict()
self.assertEqual(query_hash, {})
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
def test_should_add_expression_as_dict(self):
query = Search().expression('format:jpg').as_dict()
self.assertEqual(query, {"expression": 'format:jpg'})
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
def test_should_add_sort_by_as_dict(self):
query = Search().sort_by('created_at', 'asc').sort_by('updated_at', 'desc').as_dict()
self.assertEqual(query, {"sort_by": [{'created_at': 'asc'}, {'updated_at': 'desc'}]})
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
def test_should_add_max_results_as_dict(self):
query = Search().max_results('10').as_dict()
self.assertEqual(query, {"max_results": '10'})
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
def test_should_add_next_cursor_as_dict(self):
cursor = 'ec471a97ba510904ab57460b3ba3150ec29b6f8563eb1c10f6925ed0c6813f33cfa62ec6cf5ad96be6d6fa3ac3a76ccb'
query = Search().next_cursor(cursor).as_dict()
self.assertEqual(query, {"next_cursor": cursor})
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
def test_should_add_aggregations_arguments_as_array_as_dict(self):
query = Search().aggregate('format').aggregate('size_category').as_dict()
self.assertEqual(query, {"aggregate": ["format", "size_category"]})
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
def test_should_add_with_field_as_dict(self):
query = Search().with_field('context').with_field('tags').as_dict()
self.assertEqual(query, {"with_field": ["context", "tags"]})
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
@unittest.skipIf(not os.environ.get('RUN_SEARCH_TESTS', False),
"For this test to work, 'Advanced search' should be enabled for your cloud. " +
"Use env variable RUN_SEARCH_TESTS=1 if you really want to test it.")
@retry_assertion()
def test_should_return_all_images_tagged(self):
results = Search().expression("tags={0}".format(UNIQUE_TAG)).execute()
self.assertEqual(len(results['resources']), TEST_IMAGES_COUNT)
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
@unittest.skipIf(not os.environ.get('RUN_SEARCH_TESTS', False),
"For this test to work, 'Advanced search' should be enabled for your cloud. " +
"Use env variable RUN_SEARCH_TESTS=1 if you really want to test it.")
@retry_assertion()
def test_should_return_resource(self):
results = Search().expression("public_id={0}".format(public_ids[0])).execute()
self.assertEqual(len(results['resources']), 1)
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
@unittest.skipIf(not os.environ.get('RUN_SEARCH_TESTS', False),
"For this test to work, 'Advanced search' should be enabled for your cloud. " +
"Use env variable RUN_SEARCH_TESTS=1 if you really want to test it.")
@retry_assertion()
def test_should_return_resource_by_asset_id_equals(self):
asset_id = upload_results[1]["asset_id"]
results = Search().expression("asset_id={0}".format(asset_id)).execute()
self.assertEqual(len(results['resources']), 1)
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
@unittest.skipIf(not os.environ.get('RUN_SEARCH_TESTS', False),
"For this test to work, 'Advanced search' should be enabled for your cloud. " +
"Use env variable RUN_SEARCH_TESTS=1 if you really want to test it.")
@retry_assertion()
def test_should_return_resource_by_asset_id_colon(self):
asset_id = upload_results[1]["asset_id"]
results = Search().expression("asset_id:{0}".format(asset_id)).execute()
self.assertEqual(len(results['resources']), 1)
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
@unittest.skipIf(not os.environ.get('RUN_SEARCH_TESTS', False),
"For this test to work, 'Advanced search' should be enabled for your cloud. " +
"Use env variable RUN_SEARCH_TESTS=1 if you really want to test it.")
@retry_assertion()
def test_should_paginate_resources_limited_by_tag_and_ordered_by_ascending_public_id(self):
results = Search().max_results(1).expression("tags={0}".format(UNIQUE_TAG)).sort_by('public_id',
'asc').execute()
results = {'next_cursor': ''}
for i in range(0, TEST_IMAGES_COUNT): # get one resource at a time
results = Search() \
.max_results(1) \
.expression("tags={0}".format(UNIQUE_TAG)) \
.sort_by('public_id', 'asc') \
.next_cursor(results['next_cursor']) \
.execute()
self.assertEqual(len(results['resources']), 1)
self.assertEqual(
results['resources'][0]['public_id'],
public_ids[i],
"{0} found public_id {1} instead of {2} ".format(
i, results['resources'][0]['public_id'], public_ids[i]))
self.assertEqual(results['total_count'], TEST_IMAGES_COUNT)
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
@unittest.skipIf(not os.environ.get('RUN_SEARCH_TESTS', False),
"For this test to work, 'Advanced search' should be enabled for your cloud. " +
"Use env variable RUN_SEARCH_TESTS=1 if you really want to test it.")
@retry_assertion()
def test_should_include_context(self):
results = Search().expression("tags={0}".format(UNIQUE_TAG)).with_field('context').execute()
self.assertEqual(len(results['resources']), TEST_IMAGES_COUNT)
for res in results['resources']:
self.assertEqual([key for key in iterkeys(res['context'])], [u'stage'])
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
@unittest.skipIf(not os.environ.get('RUN_SEARCH_TESTS', False),
"For this test to work, 'Advanced search' should be enabled for your cloud. " +
"Use env variable RUN_SEARCH_TESTS=1 if you really want to test it.")
@retry_assertion()
def test_should_include_context_tags_and_image_metadata(self):
results = Search().expression("tags={0}".format(UNIQUE_TAG)). \
with_field('context').with_field('tags'). \
with_field('image_metadata').execute()
self.assertEqual(len(results['resources']), TEST_IMAGES_COUNT)
for res in results['resources']:
self.assertEqual([key for key in iterkeys(res['context'])], [u'stage'])
self.assertTrue('image_metadata' in res)
self.assertEqual(len(res['tags']), 2)
@patch(URLLIB3_REQUEST)
def test_should_not_duplicate_values(self, mocker):
mocker.return_value = MOCK_RESPONSE
Search() \
.sort_by('created_at', 'asc') \
.sort_by('public_id', 'asc') \
.sort_by('created_at') \
.aggregate('format') \
.aggregate('format') \
.aggregate(['resource_type', 'type']) \
.with_field('context') \
.with_field('context') \
.with_field('tags') \
.fields(('tags', 'context')) \
.fields('metadata') \
.fields('tags') \
.execute()
_, args = mocker.call_args
result = json.loads(args['body'])
self.assertEqual(result, {
'sort_by': [
{'created_at': 'desc'},
{'public_id': 'asc'},
],
'aggregate': ['format', 'resource_type', 'type'],
'with_field': ['context', 'tags'],
'fields': ['tags', 'context', 'metadata'],
})
def test_should_build_search_url(self):
cloudinary.config(cloud_name=CLOUD_NAME, api_key=API_KEY, api_secret=API_SECRET, secure=True)
search = Search() \
.expression("resource_type:image AND tags=kitten AND uploaded_at>1d AND bytes>1m") \
.sort_by("public_id", "desc") \
.max_results(30)
b64query = "eyJleHByZXNzaW9uIjoicmVzb3VyY2VfdHlwZTppbWFnZSBBTkQgdGFncz1raXR0ZW4gQU5EIHVwbG9hZGVkX2F0" \
"PjFkIEFORCBieXRlcz4xbSIsIm1heF9yZXN1bHRzIjozMCwic29ydF9ieSI6W3sicHVibGljX2lkIjoiZGVzYyJ9XX0="
ttl300_sig = "431454b74cefa342e2f03e2d589b2e901babb8db6e6b149abf25bc0dd7ab20b7"
ttl1000_sig = "25b91426a37d4f633a9b34383c63889ff8952e7ffecef29a17d600eeb3db0db7"
# default usage
self.assertEqual("https://res.cloudinary.com/{cloud}/search/{sig}/{ttl}/{query}".format(
cloud=CLOUD_NAME,
sig=ttl300_sig,
ttl=300,
query=b64query
),
search.to_url()
)
# same signature with next cursor
self.assertEqual("https://res.cloudinary.com/{cloud}/search/{sig}/{ttl}/{query}/{cursor}".format(
cloud=CLOUD_NAME,
sig=ttl300_sig,
ttl=300,
query=b64query,
cursor=NEXT_CURSOR
),
search.to_url(next_cursor=NEXT_CURSOR)
)
# with custom ttl and next cursor
self.assertEqual("https://res.cloudinary.com/{cloud}/search/{sig}/{ttl}/{query}/{cursor}".format(
cloud=CLOUD_NAME,
sig=ttl1000_sig,
ttl=1000,
query=b64query,
cursor=NEXT_CURSOR
),
search.to_url(ttl=1000, next_cursor=NEXT_CURSOR)
)
# ttl and cursor are set from the class
self.assertEqual("https://res.cloudinary.com/{cloud}/search/{sig}/{ttl}/{query}/{cursor}".format(
cloud=CLOUD_NAME,
sig=ttl1000_sig,
ttl=1000,
query=b64query,
cursor=NEXT_CURSOR
),
search.ttl(1000).next_cursor(NEXT_CURSOR).to_url()
)
# private cdn
self.assertEqual("https://{cloud}-res.cloudinary.com/search/{sig}/{ttl}/{query}".format(
cloud=CLOUD_NAME,
sig=ttl300_sig,
ttl=300,
query=b64query
),
search.to_url(ttl=300, next_cursor="", private_cdn=True)
)
# private cdn from config
cloudinary.config(private_cdn=True)
self.assertEqual("https://{cloud}-res.cloudinary.com/search/{sig}/{ttl}/{query}".format(
cloud=CLOUD_NAME,
sig=ttl300_sig,
ttl=300,
query=b64query
),
search.to_url(ttl=300, next_cursor="")
)
@patch(URLLIB3_REQUEST)
def test_should_search_folders_endpoint(self, mocker):
mocker.return_value = MOCK_RESPONSE
SearchFolders() \
.expression(FOLDERS_SEARCH_EXPRESSION) \
.execute()
result = get_json_body(mocker)
self.assertTrue(get_uri(mocker).endswith('folders/search'))
self.assertEqual({'expression': FOLDERS_SEARCH_EXPRESSION}, result)
@unittest.skipUnless(cloudinary.config().api_secret, "requires api_key/api_secret")
def test_should_search_folders(self):
results = SearchFolders() \
.max_results(1) \
.execute()
self.assertEqual(1, len(results['folders']))
if __name__ == '__main__':
unittest.main()