Skip to content

Commit

Permalink
add multinode support
Browse files Browse the repository at this point in the history
  • Loading branch information
ThejasNU committed Nov 13, 2024
1 parent ebbf4a1 commit 1a19ff8
Showing 1 changed file with 117 additions and 113 deletions.
230 changes: 117 additions & 113 deletions libs/agentc_core/agentc_core/util/ddl.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,59 +16,48 @@
logger = logging.getLogger(__name__)


def get_index_info(json_response: dict = None, index_to_create: str = "") -> tuple[bool | dict | None, None]:
"""Helper function for is_index_present()"""
if json_response["status"] == "ok":
if json_response["indexDefs"] is None:
return False, None
created_indexes = [el for el in json_response["indexDefs"]["indexDefs"]]
if index_to_create not in created_indexes:
return False, None
else:
index_def = json_response["indexDefs"]["indexDefs"][index_to_create]
return index_def, None


def is_index_present(
bucket: str = "", index_to_create: str = "", conn: CouchbaseConnect = ""
bucket: str = "", index_to_create: str = "", conn: CouchbaseConnect = None, fts_nodes_hostname=None
) -> tuple[bool | dict | None, Exception | None]:
"""Checks for existence of index_to_create in the given keyspace"""
find_index_https_url = (
f"https://{conn.host}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index"
)
find_index_http_url = (
f"http://{conn.host}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index"
)
auth = (conn.username, conn.password)

# Make request to FTS
try:
# REST call to get list of indexes, decide HTTP or HTTPS based on certificate path
if conn.certificate is not None:
response = requests.request("GET", find_index_https_url, auth=auth, verify=conn.certificate)
else:
response = requests.request("GET", find_index_http_url, auth=auth)

json_response = json.loads(response.text)

return get_index_info(json_response, index_to_create)
if fts_nodes_hostname is None:
fts_nodes_hostname = []

except Exception as e:
return False, e
auth = (conn.username, conn.password)

# Make request to FTS till you find live node
for fts_node_hostname in fts_nodes_hostname:
find_index_https_url = f"https://{fts_node_hostname}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index"
find_index_http_url = f"http://{fts_node_hostname}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index"
try:
# REST call to get list of indexes, decide HTTP or HTTPS based on certificate path
if conn.certificate is not None:
response = requests.request("GET", find_index_https_url, auth=auth, verify=conn.certificate)
else:
response = requests.request("GET", find_index_http_url, auth=auth)

json_response = json.loads(response.text)

if json_response["status"] == "ok":
if json_response["indexDefs"] is None:
return False, None
created_indexes = [el for el in json_response["indexDefs"]["indexDefs"]]
if index_to_create not in created_indexes:
return False, None
else:
index_def = json_response["indexDefs"]["indexDefs"][index_to_create]
return index_def, None
else:
raise RuntimeError("Couldn't check for the existing vector indexes!")
except Exception:
continue

def get_nodes_num(json_response: dict = None) -> tuple[int, None]:
"""Helper function for get_no_of_fts_nodes()"""
if json_response["name"] == "default":
no_of_fts_nodes = 0
for node in json_response["nodes"]:
if "fts" in node["services"]:
no_of_fts_nodes += 1
return no_of_fts_nodes, None
# if there is exception in all nodes then no nodes are alive
return False, RuntimeError("Couldn't make request to any of the nodes with 'search' service!")


def get_no_of_fts_nodes(conn: CouchbaseConnect = None) -> tuple[int | None, Exception | None]:
"""Find the number of nodes with fts support for index partition creation in create_vector_index()"""
def get_fts_nodes_hostname(conn: CouchbaseConnect = None) -> tuple[list[str] | None, Exception | None]:
"""Find the hostname of nodes with fts support for index partition creation in create_vector_index()"""

node_info_url_http = f"http://{conn.host}:{DEFAULT_HTTP_CLUSTER_ADMIN_PORT_NUMBER}/pools/default"
node_info_url_https = f"https://{conn.host}:{DEFAULT_HTTPS_CLUSTER_ADMIN_PORT_NUMBER}/pools/default"
Expand All @@ -84,20 +73,19 @@ def get_no_of_fts_nodes(conn: CouchbaseConnect = None) -> tuple[int | None, Exce

json_response = json.loads(response.text)
# If api call was successful
return get_nodes_num(json_response)
if json_response["name"] == "default":
fts_nodes = []
for node in json_response["nodes"]:
if "fts" in node["services"]:
fts_nodes.append(node["hostname"].split(":")[0])
return fts_nodes, None
else:
return None, RuntimeError("Couldn't check for the existing fts nodes!")

except Exception as e:
return None, e


def update_vector_index(response: requests.Response) -> tuple[str, None] | Exception:
"""Helper function fot create_vector_index()"""
if json.loads(response.text)["status"] == "ok":
logger.info("Updated vector index!!")
return "Success", None
elif json.loads(response.text)["status"] == "fail":
raise Exception(json.loads(response.text)["error"])


def create_vector_index(
bucket: str = "",
kind: str = "tool",
Expand All @@ -110,11 +98,14 @@ def create_vector_index(
qualified_index_name = f"{bucket}.{DEFAULT_CATALOG_SCOPE}.{non_qualified_index_name}"

# decide on plan params
(no_of_fts_nodes, err) = get_no_of_fts_nodes(conn)
if no_of_fts_nodes == 0:
(fts_nodes_hostname, err) = get_fts_nodes_hostname(conn)
num_fts_nodes = len(fts_nodes_hostname)

if num_fts_nodes == 0:
raise ValueError(
"No node with fts service found, cannot create vector index! Please ensure fts service is included in at least one node."
)

max_partition = (
os.getenv("AGENT_CATALOG_MAX_SOURCE_PARTITION")
if os.getenv("AGENT_CATALOG_MAX_SOURCE_PARTITION") is not None
Expand All @@ -123,17 +114,14 @@ def create_vector_index(
index_partition = (
os.getenv("AGENT_CATALOG_INDEX_PARTITION")
if os.getenv("AGENT_CATALOG_INDEX_PARTITION") is not None
else 2 * no_of_fts_nodes
else 2 * num_fts_nodes
)

(index_present, err) = is_index_present(bucket, qualified_index_name, conn)
(index_present, err) = is_index_present(bucket, qualified_index_name, conn, fts_nodes_hostname)
if err is not None:
return None, err
elif isinstance(index_present, bool) and not index_present:
# Create the index for the first time
create_vector_index_https_url = f"https://{conn.host}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}"
create_vector_index_http_url = f"http://{conn.host}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}"

headers = {
"Content-Type": "application/json",
}
Expand Down Expand Up @@ -194,29 +182,36 @@ def create_vector_index(
}
)

try:
# REST call to create the index
if conn.certificate is not None:
response = requests.request(
"PUT",
create_vector_index_https_url,
headers=headers,
auth=auth,
data=payload,
verify=conn.certificate,
)
else:
response = requests.request(
"PUT", create_vector_index_http_url, headers=headers, auth=auth, data=payload
)

if json.loads(response.text)["status"] == "ok":
logger.info("Created vector index!!")
return qualified_index_name, None
elif json.loads(response.text)["status"] == "fail":
raise Exception(json.loads(response.text)["error"])
except Exception as e:
return None, e
# keeping making requests in a loop till you find the alive fts node
for fts_node_hostname in fts_nodes_hostname:
create_vector_index_https_url = f"https://{fts_node_hostname}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}"
create_vector_index_http_url = f"http://{fts_node_hostname}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}"
try:
# REST call to create the index
if conn.certificate is not None:
response = requests.request(
"PUT",
create_vector_index_https_url,
headers=headers,
auth=auth,
data=payload,
verify=conn.certificate,
)
else:
response = requests.request(
"PUT", create_vector_index_http_url, headers=headers, auth=auth, data=payload
)

if json.loads(response.text)["status"] == "ok":
logger.info("Created vector index!!")
return qualified_index_name, None
elif json.loads(response.text)["status"] == "fail":
raise Exception(json.loads(response.text)["error"])
except Exception:
continue

# if there is exception in all nodes then no nodes are alive
return None, RuntimeError("Couldn't make request to any of the nodes with 'search' service!")

elif isinstance(index_present, dict):
# Check if no. of fts nodes has changes since last update
Expand Down Expand Up @@ -252,41 +247,50 @@ def create_vector_index(
"embedding"
]["fields"] = field_mappings

update_vector_index_https_url = f"https://{conn.host}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}"
update_vector_index_http_url = f"http://{conn.host}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}"
headers = {
"Content-Type": "application/json",
}
auth = (conn.username, conn.password)

payload = json.dumps(index_present)

try:
# REST call to update the index
if conn.certificate is not None:
response = requests.request(
"PUT",
update_vector_index_https_url,
headers=headers,
auth=auth,
data=payload,
verify=conn.certificate,
)
else:
response = requests.request(
"PUT", update_vector_index_http_url, headers=headers, auth=auth, data=payload
)

if json.loads(response.text)["status"] == "ok":
logger.info("Updated vector index!!")
return "Success", None
elif json.loads(response.text)["status"] == "fail":
raise Exception(json.loads(response.text)["error"])

return update_vector_index(response)

except Exception as e:
return None, e
# keeping making requests in a loop till you find the alive fts node
for fts_node_hostname in fts_nodes_hostname:
update_vector_index_https_url = f"https://{fts_node_hostname}:{DEFAULT_HTTPS_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}"
update_vector_index_http_url = f"http://{fts_node_hostname}:{DEFAULT_HTTP_FTS_PORT_NUMBER}/api/bucket/{bucket}/scope/{DEFAULT_CATALOG_SCOPE}/index/{non_qualified_index_name}"
try:
# REST call to update the index
if conn.certificate is not None:
response = requests.request(
"PUT",
update_vector_index_https_url,
headers=headers,
auth=auth,
data=payload,
verify=conn.certificate,
)
else:
response = requests.request(
"PUT", update_vector_index_http_url, headers=headers, auth=auth, data=payload
)

if json.loads(response.text)["status"] == "ok":
logger.info("Updated vector index!!")
return "Success", None
elif json.loads(response.text)["status"] == "fail":
raise Exception(json.loads(response.text)["error"])

if json.loads(response.text)["status"] == "ok":
logger.info("Updated vector index!!")
return "Success", None
elif json.loads(response.text)["status"] == "fail":
raise Exception(json.loads(response.text)["error"])

except Exception:
continue

# if there is exception in all nodes then no nodes are alive
return None, RuntimeError("Couldn't make request to any of the nodes with 'search' service!")

else:
return qualified_index_name, None
Expand Down

0 comments on commit 1a19ff8

Please sign in to comment.