diff --git a/libs/agentc_core/agentc_core/util/ddl.py b/libs/agentc_core/agentc_core/util/ddl.py index f67c761b..2871a299 100644 --- a/libs/agentc_core/agentc_core/util/ddl.py +++ b/libs/agentc_core/agentc_core/util/ddl.py @@ -16,59 +16,48 @@ logger = logging.getLogger(__name__) -def get_index_info(json_response: dict = None, index_to_create: str = "") -> tuple[bool | dict | None, None]: - """Helper function for is_index_present()""" - if json_response["status"] == "ok": - if json_response["indexDefs"] is None: - return False, None - created_indexes = [el for el in json_response["indexDefs"]["indexDefs"]] - if index_to_create not in created_indexes: - return False, None - else: - index_def = json_response["indexDefs"]["indexDefs"][index_to_create] - return index_def, None - - def is_index_present( - bucket: str = "", index_to_create: str = "", conn: CouchbaseConnect = "" + bucket: str = "", index_to_create: str = "", conn: CouchbaseConnect = None, fts_nodes_hostname=None ) -> tuple[bool | dict | None, Exception | None]: """Checks for existence of index_to_create in the given keyspace""" - find_index_https_url = ( - f"https://{conn.host}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index" - ) - find_index_http_url = ( - f"http://{conn.host}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index" - ) - auth = (conn.username, conn.password) - - # Make request to FTS - try: - # REST call to get list of indexes, decide HTTP or HTTPS based on certificate path - if conn.certificate is not None: - response = requests.request("GET", find_index_https_url, auth=auth, verify=conn.certificate) - else: - response = requests.request("GET", find_index_http_url, auth=auth) - - json_response = json.loads(response.text) - - return get_index_info(json_response, index_to_create) + if fts_nodes_hostname is None: + fts_nodes_hostname = [] - except Exception as e: - return False, e + auth = (conn.username, conn.password) + # Make request to FTS till you find live node + for fts_node_hostname in fts_nodes_hostname: + find_index_https_url = f"https://{fts_node_hostname}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index" + find_index_http_url = f"http://{fts_node_hostname}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index" + try: + # REST call to get list of indexes, decide HTTP or HTTPS based on certificate path + if conn.certificate is not None: + response = requests.request("GET", find_index_https_url, auth=auth, verify=conn.certificate) + else: + response = requests.request("GET", find_index_http_url, auth=auth) + + json_response = json.loads(response.text) + + if json_response["status"] == "ok": + if json_response["indexDefs"] is None: + return False, None + created_indexes = [el for el in json_response["indexDefs"]["indexDefs"]] + if index_to_create not in created_indexes: + return False, None + else: + index_def = json_response["indexDefs"]["indexDefs"][index_to_create] + return index_def, None + else: + raise RuntimeError("Couldn't check for the existing vector indexes!") + except Exception: + continue -def get_nodes_num(json_response: dict = None) -> tuple[int, None]: - """Helper function for get_no_of_fts_nodes()""" - if json_response["name"] == "default": - no_of_fts_nodes = 0 - for node in json_response["nodes"]: - if "fts" in node["services"]: - no_of_fts_nodes += 1 - return no_of_fts_nodes, None + # if there is exception in all nodes then no nodes are alive + return False, RuntimeError("Couldn't make request to any of the nodes with 'search' service!") -def get_no_of_fts_nodes(conn: CouchbaseConnect = None) -> tuple[int | None, Exception | None]: - """Find the number of nodes with fts support for index partition creation in create_vector_index()""" +def get_fts_nodes_hostname(conn: CouchbaseConnect = None) -> tuple[list[str] | None, Exception | None]: + """Find the hostname of nodes with fts support for index partition creation in create_vector_index()""" node_info_url_http = f"http://{conn.host}:{DEFAULT_HTTP_CLUSTER_ADMIN_PORT_NUMBER}/pools/default" node_info_url_https = f"https://{conn.host}:{DEFAULT_HTTPS_CLUSTER_ADMIN_PORT_NUMBER}/pools/default" @@ -84,20 +73,19 @@ def get_no_of_fts_nodes(conn: CouchbaseConnect = None) -> tuple[int | None, Exce json_response = json.loads(response.text) # If api call was successful - return get_nodes_num(json_response) + if json_response["name"] == "default": + fts_nodes = [] + for node in json_response["nodes"]: + if "fts" in node["services"]: + fts_nodes.append(node["hostname"].split(":")[0]) + return fts_nodes, None + else: + return None, RuntimeError("Couldn't check for the existing fts nodes!") + except Exception as e: return None, e -def update_vector_index(response: requests.Response) -> tuple[str, None] | Exception: - """Helper function fot create_vector_index()""" - if json.loads(response.text)["status"] == "ok": - logger.info("Updated vector index!!") - return "Success", None - elif json.loads(response.text)["status"] == "fail": - raise Exception(json.loads(response.text)["error"]) - - def create_vector_index( bucket: str = "", kind: str = "tool", @@ -110,11 +98,14 @@ def create_vector_index( qualified_index_name = f"{bucket}.{DEFAULT_CATALOG_SCOPE}.{non_qualified_index_name}" # decide on plan params - (no_of_fts_nodes, err) = get_no_of_fts_nodes(conn) - if no_of_fts_nodes == 0: + (fts_nodes_hostname, err) = get_fts_nodes_hostname(conn) + num_fts_nodes = len(fts_nodes_hostname) + + if num_fts_nodes == 0: raise ValueError( "No node with fts service found, cannot create vector index! Please ensure fts service is included in at least one node." ) + max_partition = ( os.getenv("AGENT_CATALOG_MAX_SOURCE_PARTITION") if os.getenv("AGENT_CATALOG_MAX_SOURCE_PARTITION") is not None @@ -123,17 +114,14 @@ def create_vector_index( index_partition = ( os.getenv("AGENT_CATALOG_INDEX_PARTITION") if os.getenv("AGENT_CATALOG_INDEX_PARTITION") is not None - else 2 * no_of_fts_nodes + else 2 * num_fts_nodes ) - (index_present, err) = is_index_present(bucket, qualified_index_name, conn) + (index_present, err) = is_index_present(bucket, qualified_index_name, conn, fts_nodes_hostname) if err is not None: return None, err elif isinstance(index_present, bool) and not index_present: # Create the index for the first time - create_vector_index_https_url = f"https://{conn.host}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}" - create_vector_index_http_url = f"http://{conn.host}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}" - headers = { "Content-Type": "application/json", } @@ -194,29 +182,36 @@ def create_vector_index( } ) - try: - # REST call to create the index - if conn.certificate is not None: - response = requests.request( - "PUT", - create_vector_index_https_url, - headers=headers, - auth=auth, - data=payload, - verify=conn.certificate, - ) - else: - response = requests.request( - "PUT", create_vector_index_http_url, headers=headers, auth=auth, data=payload - ) - - if json.loads(response.text)["status"] == "ok": - logger.info("Created vector index!!") - return qualified_index_name, None - elif json.loads(response.text)["status"] == "fail": - raise Exception(json.loads(response.text)["error"]) - except Exception as e: - return None, e + # keeping making requests in a loop till you find the alive fts node + for fts_node_hostname in fts_nodes_hostname: + create_vector_index_https_url = f"https://{fts_node_hostname}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}" + create_vector_index_http_url = f"http://{fts_node_hostname}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}" + try: + # REST call to create the index + if conn.certificate is not None: + response = requests.request( + "PUT", + create_vector_index_https_url, + headers=headers, + auth=auth, + data=payload, + verify=conn.certificate, + ) + else: + response = requests.request( + "PUT", create_vector_index_http_url, headers=headers, auth=auth, data=payload + ) + + if json.loads(response.text)["status"] == "ok": + logger.info("Created vector index!!") + return qualified_index_name, None + elif json.loads(response.text)["status"] == "fail": + raise Exception(json.loads(response.text)["error"]) + except Exception: + continue + + # if there is exception in all nodes then no nodes are alive + return None, RuntimeError("Couldn't make request to any of the nodes with 'search' service!") elif isinstance(index_present, dict): # Check if no. of fts nodes has changes since last update @@ -252,8 +247,6 @@ def create_vector_index( "embedding" ]["fields"] = field_mappings - update_vector_index_https_url = f"https://{conn.host}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}" - update_vector_index_http_url = f"http://{conn.host}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}" headers = { "Content-Type": "application/json", } @@ -261,32 +254,43 @@ def create_vector_index( payload = json.dumps(index_present) - try: - # REST call to update the index - if conn.certificate is not None: - response = requests.request( - "PUT", - update_vector_index_https_url, - headers=headers, - auth=auth, - data=payload, - verify=conn.certificate, - ) - else: - response = requests.request( - "PUT", update_vector_index_http_url, headers=headers, auth=auth, data=payload - ) - - if json.loads(response.text)["status"] == "ok": - logger.info("Updated vector index!!") - return "Success", None - elif json.loads(response.text)["status"] == "fail": - raise Exception(json.loads(response.text)["error"]) - - return update_vector_index(response) - - except Exception as e: - return None, e + # keeping making requests in a loop till you find the alive fts node + for fts_node_hostname in fts_nodes_hostname: + update_vector_index_https_url = f"https://{fts_node_hostname}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}" + update_vector_index_http_url = f"http://{fts_node_hostname}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}" + try: + # REST call to update the index + if conn.certificate is not None: + response = requests.request( + "PUT", + update_vector_index_https_url, + headers=headers, + auth=auth, + data=payload, + verify=conn.certificate, + ) + else: + response = requests.request( + "PUT", update_vector_index_http_url, headers=headers, auth=auth, data=payload + ) + + if json.loads(response.text)["status"] == "ok": + logger.info("Updated vector index!!") + return "Success", None + elif json.loads(response.text)["status"] == "fail": + raise Exception(json.loads(response.text)["error"]) + + if json.loads(response.text)["status"] == "ok": + logger.info("Updated vector index!!") + return "Success", None + elif json.loads(response.text)["status"] == "fail": + raise Exception(json.loads(response.text)["error"]) + + except Exception: + continue + + # if there is exception in all nodes then no nodes are alive + return None, RuntimeError("Couldn't make request to any of the nodes with 'search' service!") else: return qualified_index_name, None