Skip to content

Commit

Permalink
opensearch: add working opensearch manager
Browse files Browse the repository at this point in the history
Add OpenSearch cluster manager.
Adjust initial opensearch test to use pytest fixture.
Add conftest with opensearch cluster fixture.
Adjust opensearch dependency installation.
  • Loading branch information
QuerthDP committed Feb 25, 2025
1 parent bef716b commit a6f0df7
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 78 deletions.
55 changes: 27 additions & 28 deletions install-dependencies.sh
Original file line number Diff line number Diff line change
Expand Up @@ -259,37 +259,35 @@ minio_download_jobs() {
rm -f ${cfile}
}

open_search_arch() {
local -A OPEN_SEARCH_ARCH=(
opensearch_arch() {
local -A OPENSEARCH_ARCH=(
["x86_64"]=x64
["aarch64"]=arm64
)
echo ${OPEN_SEARCH_ARCH["$(arch)"]}
echo ${OPENSEARCH_ARCH["$(arch)"]}
}

OPEN_SEARCH_VERSION=2.19.0
declare -A OPEN_SEARCH_CHECKSUM=(
OPENSEARCH_VERSION=2.19.0
declare -A OPENSEARCH_CHECKSUM=(
["x86_64"]=455b900182ef2a7193d443d96aaea55f534ec0f91b1bad6d2eacdbaea601f16e
["aarch64"]= # TODO: Add the checksum for the "aarch64" architecture.
["aarch64"]="" # TODO: Add the checksum for the "aarch64" architecture.
)
OPEN_SEARCH_DIR=/opt/scylladb/dependencies
# OPEN_SEARCH_BINARIES_DIR=/home/$(logname)/opensearch
OPEN_SEARCH_BINARIES_DIR=$OPEN_SEARCH_DIR
OPENSEARCH_DIR=/opt/scylladb/dependencies

open_search_filename() {
echo "opensearch-$OPEN_SEARCH_VERSION-linux-$(open_search_arch).tar.gz"
opensearch_filename() {
echo "opensearch-$OPENSEARCH_VERSION-linux-$(opensearch_arch).tar.gz"
}

open_search_fullpath() {
echo "$OPEN_SEARCH_DIR/$(open_search_filename)"
opensearch_fullpath() {
echo "$OPENSEARCH_DIR/$(opensearch_filename)"
}

open_search_checksum() {
sha256sum "$(open_search_fullpath)" | while read -r sum _; do [[ "$sum" == "${OPEN_SEARCH_CHECKSUM["$(arch)"]}" ]]; done
opensearch_checksum() {
sha256sum "$(opensearch_fullpath)" | while read -r sum _; do [[ "$sum" == "${OPENSEARCH_CHECKSUM["$(arch)"]}" ]]; done
}

open_search_url() {
echo "https://artifacts.opensearch.org/releases/bundle/opensearch/$OPEN_SEARCH_VERSION/$(open_search_filename)"
opensearch_url() {
echo "https://artifacts.opensearch.org/releases/bundle/opensearch/$OPENSEARCH_VERSION/$(opensearch_filename)"
}

print_usage() {
Expand Down Expand Up @@ -401,23 +399,24 @@ elif [ "$ID" = "fedora" ]; then
fi
fi

if [ -f "$(open_search_fullpath)" ] && open_search_checksum; then
echo "$(open_search_filename) already exists, skipping download"
if [ -f "$(opensearch_fullpath)" ] && opensearch_checksum; then
echo "$(opensearch_filename) already exists, skipping download"
else
mkdir -p "$OPEN_SEARCH_DIR"
if curl -fSL -o "$(open_search_fullpath)" "$(open_search_url)";
mkdir -p "$OPENSEARCH_DIR"
if curl -fSL -o "$(opensearch_fullpath)" "$(opensearch_url)";
then
if ! open_search_checksum; then
echo "$(open_search_filename) download failed"
if ! opensearch_checksum; then
echo "$(opensearch_filename) download failed"
exit 1
else
mkdir -p "$OPEN_SEARCH_BINARIES_DIR"
tar -C "$OPEN_SEARCH_BINARIES_DIR" -xvf "$(open_search_fullpath)"
echo "plugins.security.disabled: true" >> "$OPEN_SEARCH_BINARIES_DIR/opensearch-$OPEN_SEARCH_VERSION/config/opensearch.yml"
chown -R "$(logname):$(logname)" "$OPEN_SEARCH_BINARIES_DIR/opensearch-$OPEN_SEARCH_VERSION"
mkdir -p "$OPENSEARCH_DIR"
tar -C "$OPENSEARCH_DIR" -xvf "$(opensearch_fullpath)"
chmod -R a+rx "$OPENSEARCH_DIR/opensearch-$OPENSEARCH_VERSION/jdk"
chmod -R a+rx "$OPENSEARCH_DIR/opensearch-$OPENSEARCH_VERSION/config"
chmod -R a+rw "$OPENSEARCH_DIR/opensearch-$OPENSEARCH_VERSION/logs" # TODO: change it to work without log permissions
fi
else
echo "$(open_search_url) is unreachable, skipping"
echo "$(opensearch_url) is unreachable, skipping"
fi
fi
elif [ "$ID" = "centos" ]; then
Expand Down
4 changes: 1 addition & 3 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1958,9 +1958,7 @@ async def reap(done, pending, signaled):
await proxy_s3_server.start()
TestSuite.artifacts.add_exit_artifact(None, proxy_s3_server.stop)

# TODO: one opensearch instance for every test.py run, is it ok?
# TODO: make it work :)
os_cluster = OpenSearchCluster()
os_cluster = OpenSearchCluster(options.tmpdir, '127.0.0.1', LogPrefixAdapter(logging.getLogger('opensearch'), {'prefix': 'opensearch'}))
await os_cluster.start()
TestSuite.artifacts.add_exit_artifact(None, os_cluster.stop)

Expand Down
26 changes: 26 additions & 0 deletions test/opensearch/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (C) 2024-present ScyllaDB
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
# This file configures pytest for all tests in this directory, and also
# defines common test fixtures for all of them to use.
from opensearchpy import OpenSearch
import pytest
import os

@pytest.fixture(scope="function")
async def opensearch():
host = os.environ.get('OPENSEARCH_ADDRESS')
port = os.environ.get('OPENSEARCH_PORT')

client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_compress = True,
use_ssl = False,
verify_certs = False,
ssl_assert_hostname = False,
ssl_show_warn = False
)

yield client
24 changes: 3 additions & 21 deletions test/opensearch/test_opensearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,10 @@
#
# SPDX-License-Identifier: LicenseRef-ScyllaDB-Source-Available-1.0
#
from opensearchpy import OpenSearch
import pytest

@pytest.mark.asyncio
async def test_opensearch_basic():
host = 'localhost'
port = 9200

# Create the client with SSL/TLS and hostname verification disabled.
client = OpenSearch(
hosts = [{'host': host, 'port': port}],
http_compress = True, # enables gzip compression for request bodies
use_ssl = False,
verify_certs = False,
ssl_assert_hostname = False,
ssl_show_warn = False
)

# Cluster should be in clean stare here so we can create index (without error that index already exists)
async def test_opensearch_basic(opensearch):

index_name = 'python-test-index'
index_body = {
Expand All @@ -32,12 +17,9 @@ async def test_opensearch_basic():
}
}

response = client.indices.create(index_name, body=index_body)
response = opensearch.indices.create(index_name, body=index_body)
print(f"Index creation response: {response}")

response = client.cat.indices(format='json')
response = opensearch.cat.indices(format='json')
for index in response:
print(index['index'])

response = client.indices.delete(index_name)
print(f"Index deletion response: {response}")
185 changes: 159 additions & 26 deletions test/pylib/opensearch_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,180 @@
Provides helpers to setup and manage OpenSearch cluster for testing.
"""
from opensearchpy import OpenSearch
import asyncio
import os
import argparse
import asyncio
from asyncio.subprocess import Process
from typing import Generator, Optional
import logging
import pathlib
import random
import shutil
import time
import tempfile
import socket
import yaml
from io import BufferedWriter

class OpenSearchCluster:

OPEN_SEARCH_DIR = '/opt/scylladb/dependencies/'
ENV_JAVA_HOME = 'OPENSEARCH_JAVA_HOME'
ENV_CONF_PATH = 'OPENSEARCH_PATH_CONF'
ENV_ADDRESS = 'OPENSEARCH_ADDRESS'
ENV_PORT = 'OPENSEARCH_PORT'

async def start(self):
os.environ['OPENSEARCH_JAVA_HOME'] = f'{self.OPEN_SEARCH_DIR}/opensearch-2.19.0/jdk'
log_file: BufferedWriter

def __init__(self, tempdir_base, address, logger):
self.opensearch_dir = pathlib.Path('/opt/scylladb/dependencies/opensearch-2.19.0')
self.os_exe = shutil.which('opensearch', path=self.opensearch_dir / 'bin')
self.address = address
self.port = None
tempdir = tempfile.mkdtemp(dir=tempdir_base, prefix="opensearch-")
self.tempdir = pathlib.Path(tempdir)
self.rootdir = self.tempdir / 'opensearch_data'
self.config_dir = self.tempdir / 'config'
self.config_file = self.config_dir / 'opensearch.yml'
self.logger = logger
self.cmd: Optional[Process] = None
self.log_filename = (self.tempdir / 'opensearch').with_suffix(".log")
self.old_env = dict()

def __repr__(self):
return f"[opensearch] {self.address}:{self.port}"

def check_server(self, port):
s = socket.socket()
try:
s.connect((self.address, port))
return True
except socket.error:
return False
finally:
s.close()

def _get_local_ports(self, num_ports: int) -> Generator[int, None, None]:
with open('/proc/sys/net/ipv4/ip_local_port_range', encoding='ascii') as port_range:
min_port, max_port = map(int, port_range.read().split())
for _ in range(num_ports):
yield random.randint(min_port, max_port)

def create_conf_file(self, address: str, port: int, path: str):
with open(path, 'w', encoding='ascii') as config_file:
config = {
'network.host': address,
'http.port': port,
'plugins.security.disabled': True,
'path.data': str(self.rootdir),
}
yaml.dump(config, config_file)

process = await asyncio.create_subprocess_exec(
f'{self.OPEN_SEARCH_DIR}/opensearch-2.19.0/bin/opensearch',
stdout=asyncio.subprocess.DEVNULL,
stderr=asyncio.subprocess.DEVNULL,
async def _run_cluster(self, port):
self.logger.info(f'Starting OpenSearch cluster at {self.address}:{port}')
cmd = await asyncio.create_subprocess_exec(
self.os_exe,
stdout=self.log_file,
stderr=self.log_file,
start_new_session=True
)
await asyncio.sleep(10) # Give it some time to start
if process.returncode is not None:
print("OpenSearch failed to start")
else:
print("OpenSearch started successfully")
timeout = time.time() + 30
while time.time() < timeout:
if cmd.returncode is not None:
self.logger.info('OpenSearch exited with %s', cmd.returncode)
raise RuntimeError("Failed to start OpenSearch cluster")
if self.check_server(port):
self.logger.info('Opensearch is up and running')
break

async def stop(self):
process = await asyncio.create_subprocess_exec(
'pkill', '-f', 'opensearch-2.19.0'
)
await process.communicate()
if process.returncode != 0:
print("Failed to stop OpenSearch")
await asyncio.sleep(0.1)

return cmd

def _set_environ(self):
self.old_env = dict(os.environ)
os.environ[self.ENV_JAVA_HOME] = f'{self.opensearch_dir}/jdk'
os.environ[self.ENV_CONF_PATH] = f'{self.config_dir}'
os.environ[self.ENV_ADDRESS] = f'{self.address}'
os.environ[self.ENV_PORT] = f'{self.port}'

def _get_environs(self):
return [self.ENV_JAVA_HOME, self.ENV_CONF_PATH, self.ENV_ADDRESS, self.ENV_PORT]

def _unset_environ(self):
for env in self._get_environs():
if value := self.old_env.get(env):
os.environ[env] = value
else:
del os.environ[env]

def print_environ(self):
msgs = []
for key in self._get_environs():
value = os.environ[key]
msgs.append(f'export {key}={value}')
print('\n'.join(msgs))

async def start(self):
if self.os_exe is None:
self.logger.error("OpenSearch not installed")
shutil.rmtree(self.tempdir)
return

self.log_file = self.log_filename.open("wb")
os.mkdir(self.rootdir)
shutil.copytree(self.opensearch_dir / 'config', self.config_dir)

retries = 42 # just retry a fixed number of times
for port in self._get_local_ports(retries):
try:
self.port = port
self.create_conf_file(self.address, port, self.config_file)
self._set_environ()
self.cmd = await self._run_cluster(port)
except RuntimeError:
pass
else:
break
else:
print("OpenSearch stopped successfully")
self.logger.error("Failed to start OpenSearch cluster")
return

async def clear(self):
pass

async def execute(self, query):
pass
async def stop(self):
self.logger.info('Killing OpenSearch cluster')
if not self.cmd:
return

self._unset_environ()
try:
self.cmd.kill()
except ProcessLookupError:
pass
else:
await self.cmd.wait()
finally:
self.logger.info('Killed OpenSearch cluster')
self.cmd = None
shutil.rmtree(self.tempdir)


async def main():
pass
parser = argparse.ArgumentParser(description="Start a OpenSearch cluster")
parser.add_argument('--tempdir')
parser.add_argument('--host', default='127.0.0.1')
args = parser.parse_args()
with tempfile.TemporaryDirectory(suffix='-opensearch', dir=args.tempdir) as tempdir:
if args.tempdir is None:
print(f'{tempdir=}')
server = OpenSearchCluster(tempdir, args.host, logging.getLogger('opensearch'))
await server.start()
server.print_environ()
try:
_ = input('server started. press any key to stop: ')
except KeyboardInterrupt:
pass
finally:
await server.stop()

if __name__ == '__main__':
asyncio.run(main())

0 comments on commit a6f0df7

Please sign in to comment.