-
Notifications
You must be signed in to change notification settings - Fork 36
/
Copy pathtest_application.py
885 lines (789 loc) · 30.6 KB
/
test_application.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
# Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root.
import json
import unittest
import pytest
from unittest.mock import PropertyMock, patch
from unittest.mock import MagicMock, AsyncMock
from requests.models import HTTPError, Response
from vespa.package import ApplicationPackage, Schema, Document
from vespa.application import Vespa, raise_for_status
from vespa.exceptions import VespaError
from vespa.io import VespaQueryResponse, VespaResponse
import requests_mock
from unittest.mock import Mock
from requests import Request, Session
import gzip
from vespa.application import (
CustomHTTPAdapter,
VespaAsync,
)
import httpx
from typing import List
class TestVespaRequestsUsage(unittest.TestCase):
def test_additional_query_params(self):
app = Vespa(url="http://localhost", port=8080)
with requests_mock.Mocker() as m:
m.get(
"http://localhost:8080/ApplicationStatus",
status_code=200,
)
m.post("http://localhost:8080/search/", status_code=200, text="{}")
r: VespaQueryResponse = app.query(
query="this is a test", hits=10, searchChain="default"
)
self.assertEqual(
r.url,
"http://localhost:8080/search/?query=this+is+a+test&hits=10&searchChain=default",
)
def test_additional_doc_params(self):
app = Vespa(url="http://localhost", port=8080)
with requests_mock.Mocker() as m:
# Mock the ApplicationStatus endpoint
m.get(
"http://localhost:8080/ApplicationStatus",
status_code=200,
)
m.post(
"http://localhost:8080/document/v1/foo/foo/docid/0",
status_code=200,
text="{}",
)
r: VespaResponse = app.feed_data_point(
schema="foo",
data_id="0",
fields={"body": "this is a test"},
route="default",
timeout="10s",
)
self.assertEqual(
r.url,
"http://localhost:8080/document/v1/foo/foo/docid/0?route=default&timeout=10s",
)
with requests_mock.Mocker() as m:
m.put(
"http://localhost:8080/document/v1/foo/foo/docid/0",
status_code=200,
text="{}",
)
r: VespaResponse = app.update_data(
schema="foo",
data_id="0",
fields={"body": "this is a test"},
route="default",
timeout="10s",
)
self.assertEqual(
r.url,
"http://localhost:8080/document/v1/foo/foo/docid/0?create=false&route=default&timeout=10s",
)
with requests_mock.Mocker() as m:
m.delete(
"http://localhost:8080/document/v1/foo/foo/docid/0",
status_code=200,
text="{}",
)
r: VespaResponse = app.delete_data(
schema="foo", data_id="0", route="default", timeout="10s", dryRun=True
)
self.assertEqual(
r.url,
"http://localhost:8080/document/v1/foo/foo/docid/0?route=default&timeout=10s&dryRun=True",
)
def test_delete_all_docs(self):
app = Vespa(url="http://localhost", port=8080)
with requests_mock.Mocker() as m:
m.get(
"http://localhost:8080/ApplicationStatus",
status_code=200,
)
m.delete(
"http://localhost:8080/document/v1/foo/foo/docid/",
status_code=200,
text="{}",
)
app.delete_all_docs(
schema="foo",
namespace="foo",
content_cluster_name="content",
timeout="200s",
)
def test_visit(self):
app = Vespa(url="http://localhost", port=8080)
with requests_mock.Mocker() as m:
m.get(
"http://localhost:8080/ApplicationStatus",
status_code=200,
)
m.get(
"http://localhost:8080/document/v1/foo/foo/docid/",
[
{"json": {"continuation": "AAA"}, "status_code": 200},
{"json": {}, "status_code": 200},
],
)
results = []
for slice in app.visit(
schema="foo",
namespace="foo",
content_cluster_name="content",
timeout="200s",
):
for response in slice:
results.append(response)
assert len(results) == 2
urls = [response.url for response in results]
assert (
"http://localhost:8080/document/v1/foo/foo/docid/"
"?cluster=content"
"&selection=true"
"&wantedDocumentCount=500"
"&slices=1"
"&sliceId=0"
"&timeout=200s"
"&continuation=AAA"
) in urls
assert (
"http://localhost:8080/document/v1/foo/foo/docid/"
"?cluster=content"
"&selection=true"
"&wantedDocumentCount=500"
"&slices=1"
"&sliceId=0"
"&timeout=200s"
) in urls
def test_visit_slice_id(self):
app = Vespa(url="http://localhost", port=8080)
with requests_mock.Mocker() as m:
m.get(
"http://localhost:8080/ApplicationStatus",
status_code=200,
)
m.get(
"http://localhost:8080/document/v1/foo/foo/docid/",
[
{"json": {"continuation": "AAA"}, "status_code": 200},
{"json": {}, "status_code": 200},
],
)
results = []
for slice in app.visit(
schema="foo",
namespace="foo",
content_cluster_name="content",
timeout="200s",
slices=10,
slice_id=2,
):
for response in slice:
results.append(response)
assert len(results) == 2
urls = [response.url for response in results]
assert (
"http://localhost:8080/document/v1/foo/foo/docid/"
"?cluster=content"
"&selection=true"
"&wantedDocumentCount=500"
"&slices=10"
"&sliceId=2"
"&timeout=200s"
"&continuation=AAA"
) in urls
assert (
"http://localhost:8080/document/v1/foo/foo/docid/"
"?cluster=content"
"&selection=true"
"&wantedDocumentCount=500"
"&slices=10"
"&sliceId=2"
"&timeout=200s"
) in urls
class TestVespa(unittest.TestCase):
def test_end_point(self):
self.assertEqual(
Vespa(url="https://cord19.vespa.ai").end_point, "https://cord19.vespa.ai"
)
self.assertEqual(
Vespa(url="http://localhost", port=8080).end_point, "http://localhost:8080"
)
self.assertEqual(
Vespa(url="http://localhost/", port=8080).end_point, "http://localhost:8080"
)
self.assertEqual(
Vespa(url="http://localhost:8080").end_point, "http://localhost:8080"
)
def test_document_v1_format(self):
vespa = Vespa(url="http://localhost", port=8080)
self.assertEqual(
vespa.get_document_v1_path(id=0, schema="foo"),
"/document/v1/foo/foo/docid/0",
)
self.assertEqual(
vespa.get_document_v1_path(id="0", schema="foo"),
"/document/v1/foo/foo/docid/0",
)
self.assertEqual(
vespa.get_document_v1_path(id="0", schema="foo", namespace="bar"),
"/document/v1/bar/foo/docid/0",
)
self.assertEqual(
vespa.get_document_v1_path(
id="0", schema="foo", namespace="bar", group="g0"
),
"/document/v1/bar/foo/group/g0/0",
)
self.assertEqual(
vespa.get_document_v1_path(
id="0", schema="foo", namespace="bar", number="0"
),
"/document/v1/bar/foo/number/0/0",
)
self.assertEqual(
vespa.get_document_v1_path(
id="mydoc#1", schema="foo", namespace="bar", group="ab"
),
"/document/v1/bar/foo/group/ab/mydoc%231",
)
def test_query_token(self):
self.assertEqual(
Vespa(
url="https://cord19.vespa.ai",
vespa_cloud_secret_token="vespa_cloud_str_secret",
).vespa_cloud_secret_token,
"vespa_cloud_str_secret",
)
def test_query_token_from_env(self):
import os
os.environ["VESPA_CLOUD_SECRET_TOKEN"] = "vespa_cloud_str_secret"
self.assertEqual(
Vespa(
url="https://cord19.vespa.ai",
vespa_cloud_secret_token=os.getenv("VESPA_CLOUD_SECRET_TOKEN"),
).vespa_cloud_secret_token,
"vespa_cloud_str_secret",
)
def test_infer_schema(self):
#
# No application package
#
app = Vespa(url="http://localhost", port=8080)
with self.assertRaisesRegex(
ValueError,
"Application Package not available. Not possible to infer schema name.",
):
_ = app._infer_schema_name()
#
# No schema
#
app_package = ApplicationPackage(name="test")
# app = Vespa(url="http://localhost", port=8080, application_package=app_package)
# with self.assertRaisesRegex(
# ValueError,
# "Application has no schema. Not possible to infer schema name.",
# ):
# _ = app._infer_schema_name()
# More than one schema
app_package = ApplicationPackage(
name="test",
schema=[
Schema(name="x", document=Document()),
Schema(name="y", document=Document()),
],
)
app = Vespa(url="http://localhost", port=8080, application_package=app_package)
with self.assertRaisesRegex(
ValueError,
"Application has more than one schema. Not possible to infer schema name.",
):
_ = app._infer_schema_name()
# One schema
app_package = ApplicationPackage(
name="test",
schema=[
Schema(name="x", document=Document()),
],
)
app = Vespa(url="http://localhost", port=8080, application_package=app_package)
schema_name = app._infer_schema_name()
self.assertEqual("x", schema_name)
class TestRaiseForStatus(unittest.TestCase):
def test_successful_response(self):
response = Response()
response.status_code = 200
try:
raise_for_status(response)
except Exception as e:
self.fail(
f"No exceptions were expected to be raised but {type(e).__name__} occurred"
)
def test_successful_response_with_error_content(self):
with patch(
"requests.models.Response.content", new_callable=PropertyMock
) as mock_content:
response_json = {
"root": {
"errors": [
{"code": 1, "summary": "summary", "message": "message"},
],
},
}
mock_content.return_value = json.dumps(response_json).encode("utf-8")
response = Response()
response.status_code = 200
try:
raise_for_status(response)
except Exception as e:
self.fail(
f"No exceptions were expected to be raised but {type(e).__name__} occurred"
)
def test_failure_response_for_400(self):
response = Response()
response.status_code = 400
response.reason = "reason"
response.url = "http://localhost:8080"
with pytest.raises(HTTPError) as e:
raise_for_status(response)
self.assertEqual(
str(e.value), "400 Client Error: reason for url: http://localhost:8080"
)
def test_failure_response_for_500(self):
response = Response()
response.status_code = 500
response.reason = "reason"
response.url = "http://localhost:8080"
with pytest.raises(HTTPError) as e:
raise_for_status(response)
self.assertEqual(
str(e.value), "500 Server Error: reason for url: http://localhost:8080"
)
def test_failure_response_without_error_content(self):
with patch(
"requests.models.Response.content", new_callable=PropertyMock
) as mock_content:
response_json = {
"root": {
"errors": [],
},
}
mock_content.return_value = json.dumps(response_json).encode("utf-8")
response = Response()
response.status_code = 400
response.reason = "reason"
response.url = "http://localhost:8080"
with pytest.raises(HTTPError):
raise_for_status(response)
def test_failure_response_with_error_content(self):
with patch(
"requests.models.Response.content", new_callable=PropertyMock
) as mock_content:
response_json = {
"root": {
"errors": [
{"code": 1, "summary": "summary", "message": "message"},
],
},
}
mock_content.return_value = json.dumps(response_json).encode("utf-8")
response = Response()
response.status_code = 400
response.reason = "reason"
response.url = "http://localhost:8080"
with pytest.raises(VespaError):
raise_for_status(response)
def test_failure_response_with_error_content_504(self):
with patch(
"requests.models.Response.content", new_callable=PropertyMock
) as mock_content:
response_json = {
"root": {
"errors": [
{
"code": 12,
"summary": "Timed out",
"message": "No time left after waiting for 1ms to execute query",
},
],
},
}
mock_content.return_value = json.dumps(response_json).encode("utf-8")
response = Response()
response.status_code = 504
response.reason = "reason"
response.url = "http://localhost:8080"
with pytest.raises(VespaError) as e:
raise_for_status(response)
self.assertEqual(
str(e.value),
"[{'code': 12, 'summary': 'Timed out', 'message': 'No time left after waiting for 1ms to execute query'}]",
)
def test_doc_failure_response_with_error_content(self):
with patch(
"requests.models.Response.content", new_callable=PropertyMock
) as mock_content:
response_json = {
"pathId": "/document/v1/textsearch/textsearch/docid/00",
"message": "No field 'foo' in the structure of type 'textsearch'",
}
mock_content.return_value = json.dumps(response_json).encode("utf-8")
response = Response()
response.status_code = 400
response.reason = "Bad Request"
response.url = (
"http://localhost:8080/document/v1/textsearch/textsearch/docid/00"
)
with pytest.raises(VespaError) as e:
raise_for_status(response)
self.assertEqual(
str(e.value), "No field 'foo' in the structure of type 'textsearch'"
)
class TestVespaCollectData(unittest.TestCase):
def setUp(self) -> None:
self.app = Vespa(url="http://localhost", port=8080)
self.raw_vespa_result_recall = {
"root": {
"id": "toplevel",
"relevance": 1.0,
"fields": {"totalCount": 1083},
"coverage": {
"coverage": 100,
"documents": 62529,
"full": True,
"nodes": 2,
"results": 1,
"resultsFull": 1,
},
"children": [
{
"id": "id:covid-19:doc::40215",
"relevance": 30.368213170494712,
"source": "content",
"fields": {
"vespa_id_field": "abc",
"sddocname": "doc",
"body_text": "this is a body",
"title": "this is a title",
"rankfeatures": {"a": 1, "b": 2},
},
}
],
}
}
self.raw_vespa_result_additional = {
"root": {
"id": "toplevel",
"relevance": 1.0,
"fields": {"totalCount": 1083},
"coverage": {
"coverage": 100,
"documents": 62529,
"full": True,
"nodes": 2,
"results": 1,
"resultsFull": 1,
},
"children": [
{
"id": "id:covid-19:doc::40216",
"relevance": 10,
"source": "content",
"fields": {
"vespa_id_field": "def",
"sddocname": "doc",
"body_text": "this is a body 2",
"title": "this is a title 2",
"rankfeatures": {"a": 3, "b": 4},
},
},
{
"id": "id:covid-19:doc::40217",
"relevance": 8,
"source": "content",
"fields": {
"vespa_id_field": "ghi",
"sddocname": "doc",
"body_text": "this is a body 3",
"title": "this is a title 3",
"rankfeatures": {"a": 5, "b": 6},
},
},
],
}
}
class TestFeedAsyncIterable(unittest.TestCase):
def setUp(self):
self.mock_session = AsyncMock()
self.mock_asyncio_patcher = patch("vespa.application.VespaAsync")
self.mock_asyncio = self.mock_asyncio_patcher.start()
self.mock_asyncio.return_value.__aenter__.return_value = self.mock_session
self.vespa = Vespa(url="http://localhost", port=8080)
def tearDown(self):
self.mock_asyncio_patcher.stop()
def test_feed_async_iterable_happy_path(self):
# Arrange
iter_data = [
{"id": "doc1", "fields": {"title": "Document 1"}},
{"id": "doc2", "fields": {"title": "Document 2"}},
]
callback = MagicMock()
# Act
self.vespa.feed_async_iterable(
iter=iter_data,
schema="test_schema",
namespace="test_namespace",
callback=callback,
max_queue_size=2,
max_workers=2,
max_connections=2,
)
# Assert
self.mock_session.feed_data_point.assert_has_calls(
[
unittest.mock.call(
schema="test_schema",
namespace="test_namespace",
groupname=None,
data_id="doc1",
fields={"title": "Document 1"},
),
unittest.mock.call(
schema="test_schema",
namespace="test_namespace",
groupname=None,
data_id="doc2",
fields={"title": "Document 2"},
),
],
any_order=True,
)
self.assertEqual(callback.call_count, 2)
def test_feed_async_iterable_missing_id(self):
# Arrange
iter_data = [
{"fields": {"title": "Document 1"}},
]
callback = MagicMock()
# Act
self.vespa.feed_async_iterable(
iter=iter_data,
schema="test_schema",
namespace="test_namespace",
callback=callback,
max_queue_size=1,
max_workers=1,
max_connections=1,
)
# Assert
self.mock_session.feed_data_point.assert_not_called()
callback.assert_called_once_with(unittest.mock.ANY, None)
self.assertEqual(callback.call_args[0][0].status_code, 499)
self.assertEqual(
callback.call_args[0][0].json["message"], "Missing id in input dict"
)
def test_feed_async_iterable_missing_fields(self):
# Arrange
iter_data = [
{"id": "doc1"},
]
callback = MagicMock()
# Act
self.vespa.feed_async_iterable(
iter=iter_data,
schema="test_schema",
namespace="test_namespace",
callback=callback,
max_queue_size=1,
max_workers=1,
max_connections=1,
)
# Assert
self.mock_session.feed_data_point.assert_not_called()
callback.assert_called_once_with(unittest.mock.ANY, "doc1")
self.assertEqual(callback.call_args[0][0].status_code, 499)
self.assertEqual(
callback.call_args[0][0].json["message"], "Missing fields in input dict"
)
class TestQueryMany(unittest.TestCase):
def setUp(self):
self.mock_session = AsyncMock()
self.mock_asyncio_patcher = patch("vespa.application.VespaAsync")
self.mock_asyncio = self.mock_asyncio_patcher.start()
self.mock_asyncio.return_value.__aenter__.return_value = self.mock_session
self.vespa = Vespa(url="http://localhost", port=8080)
def tearDown(self):
self.mock_asyncio_patcher.stop()
def test_query_many_happy_path(self):
# Arrange
query_data = [
{"query": "this is a test", "hits": 10, "ranking": "default"},
{"query": "this is another test", "hits": 20, "ranking": "default"},
]
#
_responses: List[VespaQueryResponse] = self.vespa.query_many(
queries=query_data,
num_connections=2,
max_concurrent=100,
)
# Assert that app.query is called for each query
self.mock_session.query.assert_has_calls(
[unittest.mock.call(q) for q in query_data],
any_order=True,
)
def test_query_many_client_kwargs(self):
# Arrange
query_data = [
{"query": "this is a test", "hits": 10, "ranking": "default"},
{"query": "this is another test", "hits": 20, "ranking": "default"},
]
#
_responses: List[VespaQueryResponse] = self.vespa.query_many(
queries=query_data,
num_connections=2,
max_concurrent=100,
client_kwargs={"timeout": 10},
)
# Assert that VespaAsync is initialized once with the client_kwargs
self.mock_asyncio.assert_called_once_with(
app=self.vespa,
connections=2,
total_timeout=None,
timeout=10,
)
def test_query_many_query_kwargs(self):
# Arrange
query_data = [
{"query": "this is a test", "hits": 10, "ranking": "default"},
{"query": "this is another test", "hits": 20, "ranking": "default"},
]
#
_responses: List[VespaQueryResponse] = self.vespa.query_many(
queries=query_data,
num_connections=2,
max_concurrent=100,
query_param="custom",
)
# Assert that app.query is called for each query with the query_kwargs
self.mock_session.query.assert_has_calls(
[unittest.mock.call(q, query_param="custom") for q in query_data],
any_order=True,
)
class TestCustomHTTPAdapterCompression(unittest.TestCase):
def setUp(self):
"""Set up the CustomHTTPAdapter for testing."""
self.adapter = CustomHTTPAdapter(compress="auto")
def test_compression_auto_with_large_body(self):
"""Test auto compression with a large request body."""
request = Request(method="POST", url="http://test.com", data=b"test_data" * 300)
self.adapter.check_size = Mock(return_value=5000) # Simulate large content
prepared_request = request.prepare()
self.adapter._maybe_compress_request(prepared_request)
self.assertIn("Content-Encoding", prepared_request.headers)
self.assertEqual(prepared_request.headers["Content-Encoding"], "gzip")
def test_no_compression_auto_with_small_body(self):
"""Test no compression with a small request body."""
request = Request(method="POST", url="http://test.com", data=b"test_data")
self.adapter.check_size = Mock(return_value=10) # Simulate small content
prepared_request = request.prepare()
self.adapter._maybe_compress_request(prepared_request)
self.assertNotIn("Content-Encoding", prepared_request.headers)
def test_force_compression(self):
"""Test forced compression when compress=True."""
self.adapter = CustomHTTPAdapter(compress=True)
request = Request(method="POST", url="http://test.com", data=b"test_data")
prepared_request = request.prepare()
self.adapter._maybe_compress_request(prepared_request)
self.assertIn("Content-Encoding", prepared_request.headers)
self.assertEqual(prepared_request.headers["Content-Encoding"], "gzip")
def test_disable_compression(self):
"""Test no compression when compress=False."""
self.adapter = CustomHTTPAdapter(compress=False)
request = Request(method="POST", url="http://test.com", data=b"test_data")
prepared_request = request.prepare()
self.adapter._maybe_compress_request(prepared_request)
self.assertNotIn("Content-Encoding", prepared_request.headers)
def test_invalid_compression_value(self):
"""Test invalid compress value raises error."""
with self.assertRaises(ValueError):
CustomHTTPAdapter(compress="invalid_value")
def test_compress_request_body(self):
"""Test if request body is compressed when compress=True."""
adapter = CustomHTTPAdapter(compress=True)
session = Session()
session.mount("http://", adapter)
request = Request(method="POST", url="http://test.com", data=b"test_data")
prepared_request = session.prepare_request(request)
# Mock sending the request
with patch("requests.adapters.HTTPAdapter.send") as mock_send:
adapter.send(prepared_request)
mock_send.assert_called_once()
args, _ = mock_send.call_args
self.assertEqual(args[0].body, gzip.compress(b"test_data"))
def test_retry_on_429_status(self):
"""Test retry logic when response status is 429."""
adapter = CustomHTTPAdapter(num_retries_429=2)
session = Session()
session.mount("http://", adapter)
request = Request(method="POST", url="http://test.com", data=b"test_data")
prepared_request = session.prepare_request(request)
with (
patch.object(adapter, "_wait_with_backoff") as mock_backoff,
patch("requests.adapters.HTTPAdapter.send") as mock_send,
):
mock_response = Mock()
mock_response.status_code = 429
mock_send.side_effect = [mock_response, mock_response, mock_response]
adapter.send(prepared_request)
self.assertEqual(mock_send.call_count, 3)
self.assertEqual(mock_backoff.call_count, mock_send.call_count)
class MockVespa:
def __init__(
self,
base_headers=None,
auth_method=None,
vespa_cloud_secret_token=None,
cert=None,
key=None,
):
self.base_headers = base_headers or {}
self.auth_method = auth_method
self.vespa_cloud_secret_token = vespa_cloud_secret_token
self.cert = cert
self.key = key
# Test class
class TestVespaAsync:
def test_init_default(self):
app = MockVespa()
vespa_async = VespaAsync(app)
assert vespa_async.app == app
assert vespa_async.httpx_client is None
assert vespa_async.connections == 1
assert vespa_async.total_timeout is None
assert vespa_async.timeout == httpx.Timeout(5)
assert vespa_async.kwargs == {}
assert vespa_async.headers == app.base_headers
assert vespa_async.limits == httpx.Limits(max_keepalive_connections=1)
def test_init_total_timeout_warns(self):
app = MockVespa()
with pytest.warns(DeprecationWarning, match="total_timeout is deprecated"):
vespa_async = VespaAsync(app, total_timeout=10)
assert vespa_async.total_timeout == 10
def test_init_timeout_int(self):
app = MockVespa()
vespa_async = VespaAsync(app, timeout=10)
assert vespa_async.timeout == httpx.Timeout(10)
def test_init_timeout_timeout(self):
app = MockVespa()
timeout = httpx.Timeout(connect=5, read=10, write=15, pool=20)
vespa_async = VespaAsync(app, timeout=timeout)
assert vespa_async.timeout == timeout
def test_init_keepalive_expiry_warning(self):
app = MockVespa()
limits = httpx.Limits(keepalive_expiry=31)
with pytest.warns(
UserWarning, match="Keepalive expiry is set to more than 30 seconds"
):
_vespa_async = VespaAsync(app, limits=limits)
def test_init_no_keepalive_expiry_warning(self):
app = MockVespa()
limits = httpx.Limits(keepalive_expiry=1)
_vespa_async = VespaAsync(app, limits=limits)
if __name__ == "__main__":
unittest.main()