Skip to content

Commit 9ac5745

Browse files
authored
Merge pull request #978 from vespa-engine/thomasht86/wip-version-arg-to-deploy
(feat) support `version`-parameter in `VespaCloud().deploy`
2 parents b9def47 + 1af6509 commit 9ac5745

File tree

2 files changed

+125
-53
lines changed

2 files changed

+125
-53
lines changed

docs/sphinx/source/examples/mother-of-all-embedding-models-cloud.ipynb

+1-1
Original file line numberDiff line numberDiff line change
@@ -666,4 +666,4 @@
666666
},
667667
"nbformat": 4,
668668
"nbformat_minor": 5
669-
}
669+
}

vespa/deployment.py

+124-52
Original file line numberDiff line numberDiff line change
@@ -609,14 +609,16 @@ def deploy(
609609
self,
610610
instance: Optional[str] = "default",
611611
disk_folder: Optional[str] = None,
612-
max_wait: int = 300,
612+
version: Optional[str] = None,
613+
max_wait: int = 1800,
613614
) -> Vespa:
614615
"""
615616
Deploy the given application package as the given instance in the Vespa Cloud dev environment.
616617
617618
:param instance: Name of this instance of the application, in the Vespa Cloud.
618619
:param disk_folder: Disk folder to save the required Vespa config files. Default to application name
619620
folder within user's current working directory.
621+
:param version: Vespa version to use for deployment. Default is None, which means the latest version. Should only be set on instructions from Vespa team. Must be a valid Vespa version, e.g. "8.435.13".
620622
:param max_wait: Seconds to wait for the deployment.
621623
622624
:return: a Vespa connection instance. Returns a connection to the mtls endpoint. To connect to the token endpoint, use :func:`VespaCloud.get_application(endpoint_type="token")`.
@@ -627,8 +629,14 @@ def deploy(
627629

628630
region = self.get_dev_region()
629631
job = "dev-" + region
630-
run = self._start_deployment(instance, job, disk_folder, None)
631-
self._follow_deployment(instance, job, run)
632+
run = self._start_deployment(
633+
instance=instance,
634+
job=job,
635+
disk_folder=disk_folder,
636+
application_zip_bytes=None,
637+
version=version,
638+
)
639+
self._follow_deployment(instance, job, run, max_wait)
632640
app: Vespa = self.get_application(
633641
instance=instance, environment="dev", endpoint_type="mtls"
634642
)
@@ -854,7 +862,11 @@ def wait_for_prod_deployment(
854862
raise TimeoutError(f"Deployment did not finish within {max_wait} seconds. ")
855863

856864
def deploy_from_disk(
857-
self, instance: str, application_root: Path, max_wait: int = 300
865+
self,
866+
instance: str,
867+
application_root: Path,
868+
max_wait: int = 300,
869+
version: Optional[str] = None,
858870
) -> Vespa:
859871
"""
860872
Deploy to dev from a directory tree.
@@ -864,6 +876,7 @@ def deploy_from_disk(
864876
:param instance: Name of the instance where the application is to be run
865877
:param application_root: Application package directory root
866878
:param max_wait: Seconds to wait for the deployment.
879+
:param version: Vespa version to use for deployment. Default is None, which means the latest version. Must be a valid Vespa version, e.g. "8.435.13".
867880
:return: a Vespa connection instance. Returns a connection to the mtls endpoint. To connect to the token endpoint, use :func:`VespaCloud.get_application(endpoint_type="token")`.
868881
"""
869882
data = BytesIO(self.read_app_package_from_disk(application_root))
@@ -873,11 +886,13 @@ def deploy_from_disk(
873886
region = self.get_dev_region()
874887
job = "dev-" + region
875888
run = self._start_deployment(
876-
instance, job, disk_folder, application_zip_bytes=data
889+
instance=instance,
890+
job=job,
891+
disk_folder=disk_folder,
892+
application_zip_bytes=data,
893+
version=version,
877894
)
878895
self._follow_deployment(instance, job, run)
879-
run = self._start_deployment(instance, job, disk_folder, None)
880-
self._follow_deployment(instance, job, run)
881896
app: Vespa = self.get_application(
882897
instance=instance, environment="dev", endpoint_type="mtls"
883898
)
@@ -1208,37 +1223,63 @@ def _try_get_access_token(self) -> str:
12081223

12091224
return auth["providers"]["auth0"]["systems"]["public"]["access_token"]
12101225

1211-
def _request_with_access_token(
1226+
def _handle_response(
12121227
self,
1213-
method: str,
1214-
path: str,
1215-
body: BytesIO = BytesIO(),
1216-
headers={},
1217-
return_raw_response=False,
1228+
response: httpx.Response,
1229+
return_raw_response: bool = False,
1230+
path: str = "",
12181231
) -> Union[dict, httpx.Response]:
1219-
if not self.control_plane_access_token:
1220-
raise ValueError("Access token not set.")
1221-
body.seek(0)
1222-
headers = {
1223-
"Authorization": "Bearer " + self.control_plane_access_token,
1224-
**headers,
1225-
}
1226-
response = self.get_connection_response_with_retry(method, path, body, headers)
1232+
"""Common response handling logic"""
12271233
if return_raw_response:
12281234
return response
1235+
12291236
try:
12301237
parsed = json.load(response)
12311238
except json.JSONDecodeError:
12321239
parsed = response.read()
1240+
12331241
if response.status_code != 200:
12341242
print(parsed)
12351243
raise HTTPError(
12361244
f"HTTP {response.status_code} error: {response.reason_phrase} for {path}"
12371245
)
12381246
return parsed
12391247

1248+
def _get_auth_headers(self, additional_headers: dict = {}) -> dict:
1249+
"""Create authorization headers"""
1250+
if not self.control_plane_access_token:
1251+
raise ValueError("Access token not set.")
1252+
1253+
return {
1254+
"Authorization": f"Bearer {self.control_plane_access_token}",
1255+
**additional_headers,
1256+
}
1257+
1258+
def _request_with_access_token(
1259+
self,
1260+
method: str,
1261+
path: str,
1262+
body: Union[BytesIO, MultipartEncoder] = BytesIO(),
1263+
headers: dict = {},
1264+
return_raw_response: bool = False,
1265+
) -> Union[dict, httpx.Response]:
1266+
"""Make authenticated request with access token"""
1267+
if hasattr(body, "seek"):
1268+
body.seek(0)
1269+
1270+
auth_headers = self._get_auth_headers(headers)
1271+
response = self.get_connection_response_with_retry(
1272+
method, path, body, auth_headers
1273+
)
1274+
1275+
return self._handle_response(response, return_raw_response, path)
1276+
12401277
def _request(
1241-
self, method: str, path: str, body: BytesIO = BytesIO(), headers={}
1278+
self,
1279+
method: str,
1280+
path: str,
1281+
body: Union[BytesIO, MultipartEncoder] = BytesIO(),
1282+
headers: dict = {},
12421283
) -> Union[dict, httpx.Response]:
12431284
if self.control_plane_auth_method == "access_token":
12441285
return self._request_with_access_token(method, path, body, headers)
@@ -1253,47 +1294,54 @@ def _request_with_api_key(
12531294
self,
12541295
method: str,
12551296
path: str,
1256-
body: BytesIO = BytesIO(),
1257-
headers={},
1258-
return_raw_response=False,
1297+
body: Union[BytesIO, MultipartEncoder] = BytesIO(),
1298+
headers: dict = {},
1299+
return_raw_response: bool = False,
12591300
) -> Union[dict, httpx.Response]:
12601301
digest = hashes.Hash(hashes.SHA256(), default_backend())
1261-
body.seek(0)
1262-
digest.update(body.read())
1302+
1303+
# Handle different body types
1304+
if isinstance(body, MultipartEncoder):
1305+
# Use the encoded data for hash computation
1306+
digest = hashes.Hash(hashes.SHA256(), default_backend())
1307+
digest.update(body.to_string()) # This moves the buffer position to the end
1308+
body._buffer.seek(0) # Needs to be reset. Otherwise, no data will be sent
1309+
# Update the headers to include the Content-Type
1310+
headers.update({"Content-Type": body.content_type})
1311+
# Read the content of multipart_data into a bytes object
1312+
multipart_data_bytes: bytes = body.to_string()
1313+
headers.update({"Content-Length": str(len(multipart_data_bytes))})
1314+
# Convert multipart_data_bytes to type BytesIO
1315+
body_data: BytesIO = BytesIO(multipart_data_bytes)
1316+
else:
1317+
if hasattr(body, "seek"):
1318+
body.seek(0)
1319+
digest.update(body.read())
1320+
body_data = body
1321+
# Create signature
12631322
content_hash = standard_b64encode(digest.finalize()).decode("UTF-8")
1264-
timestamp = (
1265-
datetime.utcnow().isoformat() + "Z"
1266-
) # Java's Instant.parse requires the neutral time zone appended
1323+
timestamp = datetime.utcnow().isoformat() + "Z"
12671324
url = self.base_url + path
12681325

12691326
canonical_message = method + "\n" + url + "\n" + timestamp + "\n" + content_hash
12701327
signature = self.api_key.sign(
12711328
canonical_message.encode("UTF-8"), ec.ECDSA(hashes.SHA256())
12721329
)
1330+
signature_b64 = standard_b64encode(signature).decode("UTF-8")
12731331

12741332
headers = {
12751333
"X-Timestamp": timestamp,
12761334
"X-Content-Hash": content_hash,
1277-
"X-Key-Id": self.tenant + ":" + self.application + ":" + "default",
1335+
"X-Key-Id": f"{self.tenant}:{self.application}:default",
12781336
"X-Key": self.api_public_key_bytes,
1279-
"X-Authorization": standard_b64encode(signature),
1337+
"X-Authorization": signature_b64,
12801338
**headers,
12811339
}
12821340

1283-
body.seek(0)
1284-
response = self.get_connection_response_with_retry(method, path, body, headers)
1285-
if return_raw_response:
1286-
return response
1287-
try:
1288-
parsed = json.load(response)
1289-
except json.JSONDecodeError:
1290-
parsed = response.read()
1291-
if response.status_code != 200:
1292-
print(parsed)
1293-
raise HTTPError(
1294-
f"HTTP {response.status_code} error: {response.reason_phrase} for {url}"
1295-
)
1296-
return parsed
1341+
response = self.get_connection_response_with_retry(
1342+
method, path, body_data, headers
1343+
)
1344+
return self._handle_response(response, return_raw_response, path)
12971345

12981346
def get_all_endpoints(
12991347
self,
@@ -1538,6 +1586,7 @@ def _start_deployment(
15381586
job: str,
15391587
disk_folder: str,
15401588
application_zip_bytes: Optional[BytesIO] = None,
1589+
version: Optional[str] = None,
15411590
) -> int:
15421591
deploy_path = (
15431592
"/application/v4/tenant/{}/application/{}/instance/{}/deploy/{}".format(
@@ -1551,11 +1600,30 @@ def _start_deployment(
15511600
if not application_zip_bytes:
15521601
application_zip_bytes = self._to_application_zip(disk_folder=disk_folder)
15531602

1603+
if version is not None:
1604+
# Create multipart form data
1605+
form_data = {
1606+
"applicationZip": (
1607+
"application.zip",
1608+
application_zip_bytes,
1609+
"application/zip",
1610+
),
1611+
"deployOptions": (
1612+
"",
1613+
json.dumps({"vespaVersion": version}),
1614+
"application/json",
1615+
),
1616+
}
1617+
multipart = MultipartEncoder(fields=form_data)
1618+
headers = {"Content-Type": multipart.content_type}
1619+
payload = multipart
1620+
else:
1621+
# Use existing direct zip upload
1622+
headers = {"Content-Type": "application/zip"}
1623+
payload = application_zip_bytes
1624+
15541625
response = self._request(
1555-
"POST",
1556-
deploy_path,
1557-
application_zip_bytes,
1558-
{"Content-Type": "application/zip"},
1626+
method="POST", path=deploy_path, body=payload, headers=headers
15591627
)
15601628
message = response.get("message", "No message provided")
15611629
print(message, file=self.output)
@@ -1616,9 +1684,12 @@ def _to_application_zip(self, disk_folder: str) -> BytesIO:
16161684

16171685
return buffer
16181686

1619-
def _follow_deployment(self, instance: str, job: str, run: int) -> None:
1687+
def _follow_deployment(
1688+
self, instance: str, job: str, run: int, max_wait: int = 1800
1689+
) -> None:
16201690
last = -1
1621-
while True:
1691+
start = time.time()
1692+
while time.time() - start < max_wait:
16221693
try:
16231694
status, last = self._get_deployment_status(instance, job, run, last)
16241695
except RuntimeError:
@@ -1630,6 +1701,7 @@ def _follow_deployment(self, instance: str, job: str, run: int) -> None:
16301701
return
16311702
else:
16321703
raise RuntimeError("Unexpected status: {}".format(status))
1704+
raise TimeoutError(f"Deployment did not finish within {max_wait} seconds.")
16331705

16341706
def _get_deployment_status(
16351707
self, instance: str, job: str, run: int, last: int

0 commit comments

Comments
 (0)