From f4ec8c1824d97fca03e559cd46ecad8dd3e05182 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 6 Nov 2023 13:11:57 +0900 Subject: [PATCH 01/16] add index save implementation Signed-off-by: hlts2 --- .github/workflows/dockers-index-save.yml | 78 ++++++++ Makefile | 1 + Makefile.d/build.mk | 29 +++ Makefile.d/docker.mk | 14 ++ cmd/index/job/save/main.go | 59 ++++++ cmd/index/job/save/sample.yaml | 231 +++++++++++++++++++++++ dockers/index/job/save/Dockerfile | 93 +++++++++ internal/config/index_save.go | 54 ++++++ internal/config/index_save_test.go | 132 +++++++++++++ pkg/index/job/save/config/config.go | 71 +++++++ pkg/index/job/save/service/indexer.go | 219 +++++++++++++++++++++ pkg/index/job/save/service/mock_test.go | 57 ++++++ pkg/index/job/save/service/options.go | 58 ++++++ pkg/index/job/save/usecase/save.go | 213 +++++++++++++++++++++ 14 files changed, 1309 insertions(+) create mode 100644 .github/workflows/dockers-index-save.yml create mode 100644 cmd/index/job/save/main.go create mode 100644 cmd/index/job/save/sample.yaml create mode 100644 dockers/index/job/save/Dockerfile create mode 100644 internal/config/index_save.go create mode 100644 internal/config/index_save_test.go create mode 100644 pkg/index/job/save/config/config.go create mode 100644 pkg/index/job/save/service/indexer.go create mode 100644 pkg/index/job/save/service/mock_test.go create mode 100644 pkg/index/job/save/service/options.go create mode 100644 pkg/index/job/save/usecase/save.go diff --git a/.github/workflows/dockers-index-save.yml b/.github/workflows/dockers-index-save.yml new file mode 100644 index 0000000000..a6570e4bdf --- /dev/null +++ b/.github/workflows/dockers-index-save.yml @@ -0,0 +1,78 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +name: "Build docker image: index-save" +on: + push: + branches: + - main + tags: + - "*.*.*" + - "v*.*.*" + - "*.*.*-*" + - "v*.*.*-*" + paths: + - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/dockers-index-save.yml" + - "go.mod" + - "go.sum" + - "internal/**" + - "!internal/**/*_test.go" + - "!internal/db/**" + - "!internal/k8s/**" + - "apis/grpc/**" + - "pkg/index/job/save/**" + - "cmd/index/job/save/**" + - "dockers/index/job/save/Dockerfile" + - "versions/GO_VERSION" + pull_request: + paths: + - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/_docker-image.yaml" + - ".github/workflows/dockers-index-save.yml" + - "go.mod" + - "go.sum" + - "internal/**" + - "!internal/**/*_test.go" + - "!internal/db/**" + - "!internal/k8s/**" + - "apis/grpc/**" + - "pkg/index/job/save/**" + - "cmd/index/job/save/**" + - "dockers/index/job/save/Dockerfile" + - "versions/GO_VERSION" + pull_request_target: + paths: + - ".github/actions/docker-build/actions.yaml" + - ".github/workflows/_docker-image.yaml" + - ".github/workflows/dockers-index-save.yml" + - "go.mod" + - "go.sum" + - "internal/**" + - "!internal/**/*_test.go" + - "!internal/db/**" + - "!internal/k8s/**" + - "apis/grpc/**" + - "pkg/index/job/save/**" + - "cmd/index/job/save/**" + - "dockers/index/job/save/Dockerfile" + - "versions/GO_VERSION" + +jobs: + build: + uses: ./.github/workflows/_docker-image.yaml + with: + target: index-save + secrets: inherit diff --git a/Makefile b/Makefile index 6770023c93..4a82f747b6 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,7 @@ LB_GATEWAY_IMAGE = $(NAME)-lb-gateway LOADTEST_IMAGE = $(NAME)-loadtest INDEX_CORRECTION_IMAGE = $(NAME)-index-correction INDEX_CREATION_IMAGE = $(NAME)-index-creation +INDEX_SAVE_IMAGE = $(NAME)-index-save MANAGER_INDEX_IMAGE = $(NAME)-manager-index MAINTAINER = "$(ORG).org $(NAME) team <$(NAME)@$(ORG).org>" diff --git a/Makefile.d/build.mk b/Makefile.d/build.mk index aa6a58a999..8bc4fbb444 100644 --- a/Makefile.d/build.mk +++ b/Makefile.d/build.mk @@ -264,6 +264,35 @@ cmd/index/job/creation/index-creation: \ $(dir $@)main.go $@ -version +cmd/index/job/save/index-save: \ + $(GO_SOURCES_INTERNAL) \ + $(PBGOS) \ + $(shell find $(ROOTDIR)/cmd/index/job/save -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') \ + $(shell find $(ROOTDIR)/pkg/index/job/save -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') + $(eval CGO_ENABLED = 0) + CGO_ENABLED=$(CGO_ENABLED) \ + GO111MODULE=on \ + GOPRIVATE=$(GOPRIVATE) \ + go build \ + --ldflags "-w -extldflags=-static \ + -X '$(GOPKG)/internal/info.Version=$(VERSION)' \ + -X '$(GOPKG)/internal/info.GitCommit=$(GIT_COMMIT)' \ + -X '$(GOPKG)/internal/info.BuildTime=$(DATETIME)' \ + -X '$(GOPKG)/internal/info.GoVersion=$(GO_VERSION)' \ + -X '$(GOPKG)/internal/info.GoOS=$(GOOS)' \ + -X '$(GOPKG)/internal/info.GoArch=$(GOARCH)' \ + -X '$(GOPKG)/internal/info.CGOEnabled=$(CGO_ENABLED)' \ + -X '$(GOPKG)/internal/info.BuildCPUInfoFlags=$(CPU_INFO_FLAGS)' \ + -buildid=" \ + -mod=readonly \ + -modcacherw \ + -a \ + -tags "osusergo netgo static_build" \ + -trimpath \ + -o $@ \ + $(dir $@)main.go + $@ -version + .PHONY: binary/build/zip ## build all binaries and zip them binary/build/zip: \ diff --git a/Makefile.d/docker.mk b/Makefile.d/docker.mk index 0d0876c121..7c1bb74e7c 100644 --- a/Makefile.d/docker.mk +++ b/Makefile.d/docker.mk @@ -216,3 +216,17 @@ docker/build/index-creation: -t $(ORG)/$(INDEX_CREATION_IMAGE):$(TAG) . \ --build-arg MAINTAINER=$(MAINTAINER) \ --build-arg GO_VERSION=$(GO_VERSION) + +.PHONY: docker/name/index-save +docker/name/index-save: + @echo "$(ORG)/$(INDEX_SAVE_IMAGE)" + +.PHONY: docker/build/index-save +## build index-save image +docker/build/index-save: + $(DOCKER) build \ + $(DOCKER_OPTS) \ + -f dockers/index/job/save/Dockerfile \ + -t $(ORG)/$(INDEX_SAVE_IMAGE):$(TAG) . \ + --build-arg MAINTAINER=$(MAINTAINER) \ + --build-arg GO_VERSION=$(GO_VERSION) diff --git a/cmd/index/job/save/main.go b/cmd/index/job/save/main.go new file mode 100644 index 0000000000..2ad5f73221 --- /dev/null +++ b/cmd/index/job/save/main.go @@ -0,0 +1,59 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package main + +import ( + "context" + + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/info" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/runner" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/pkg/index/job/save/config" + "github.com/vdaas/vald/pkg/index/job/save/usecase" +) + +const ( + maxVersion = "v0.0.10" + minVersion = "v0.0.0" + name = "index save job" +) + +func main() { + if err := safety.RecoverFunc(func() error { + return runner.Do( + context.Background(), + runner.WithName(name), + runner.WithVersion(info.Version, maxVersion, minVersion), + runner.WithConfigLoader(func(path string) (interface{}, *config.GlobalConfig, error) { + cfg, err := config.NewConfig(path) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to load "+name+"'s configuration") + } + return cfg, &cfg.GlobalConfig, nil + }), + runner.WithDaemonInitializer(func(cfg interface{}) (runner.Runner, error) { + c, ok := cfg.(*config.Data) + if !ok { + return nil, errors.ErrInvalidConfig + } + return usecase.New(c) + }), + ) + })(); err != nil { + log.Fatal(err, info.Get()) + return + } +} diff --git a/cmd/index/job/save/sample.yaml b/cmd/index/job/save/sample.yaml new file mode 100644 index 0000000000..c83105c86b --- /dev/null +++ b/cmd/index/job/save/sample.yaml @@ -0,0 +1,231 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +--- +version: v0.0.0 +time_zone: JST +logging: + format: raw + level: info + logger: glg +server_config: + servers: + - name: grpc + host: 0.0.0.0 + port: 8081 + grpc: + bidirectional_stream_concurrency: 20 + connection_timeout: "" + header_table_size: 0 + initial_conn_window_size: 0 + initial_window_size: 0 + interceptors: [] + keepalive: + max_conn_age: "" + max_conn_age_grace: "" + max_conn_idle: "" + time: "" + timeout: "" + max_header_list_size: 0 + max_receive_message_size: 0 + max_send_message_size: 0 + read_buffer_size: 0 + write_buffer_size: 0 + mode: GRPC + probe_wait_time: 3s + restart: true + health_check_servers: + - name: readiness + host: 0.0.0.0 + port: 3001 + http: + handler_timeout: "" + idle_timeout: "" + read_header_timeout: "" + read_timeout: "" + shutdown_duration: 0s + write_timeout: "" + mode: "" + probe_wait_time: 3s + metrics_servers: + startup_strategy: + - grpc + - readiness + full_shutdown_duration: 600s + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key +creator: + agent_port: 8081 + agent_name: "vald-agent-ngt" + agent_dns: vald-agent-ngt.default.svc.cluster.local + agent_namespace: "default" + node_name: "" + concurrency: 1 + discoverer: + duration: 500ms + client: + addrs: + - vald-discoverer.default.svc.cluster.local:8081 + health_check_duration: "1s" + connection_pool: + enable_dns_resolver: true + enable_rebalance: true + old_conn_close_duration: 3s + rebalance_duration: 30m + size: 3 + backoff: + backoff_factor: 1.1 + backoff_time_limit: 5s + enable_error_log: true + initial_duration: 5ms + jitter_limit: 100ms + maximum_duration: 5s + retry_count: 100 + call_option: + max_recv_msg_size: 0 + max_retry_rpc_buffer_size: 0 + max_send_msg_size: 0 + wait_for_ready: true + dial_option: + backoff_base_delay: 1s + backoff_jitter: 0.2 + backoff_max_delay: 120s + backoff_multiplier: 1.6 + enable_backoff: false + initial_connection_window_size: 0 + initial_window_size: 0 + insecure: true + keepalive: + permit_without_stream: false + time: "" + timeout: "" + max_msg_size: 0 + min_connection_timeout: 20s + read_buffer_size: 0 + tcp: + dialer: + dual_stack_enabled: true + keepalive: "" + timeout: "" + dns: + cache_enabled: true + cache_expiration: 1h + refresh_duration: 30m + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key + timeout: "" + write_buffer_size: 0 + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key + agent_client_options: + addrs: [] + health_check_duration: "1s" + connection_pool: + enable_dns_resolver: true + enable_rebalance: true + old_conn_close_duration: 3s + rebalance_duration: 30m + size: 3 + backoff: + backoff_factor: 1.1 + backoff_time_limit: 5s + enable_error_log: true + initial_duration: 5ms + jitter_limit: 100ms + maximum_duration: 5s + retry_count: 100 + call_option: + max_recv_msg_size: 0 + max_retry_rpc_buffer_size: 0 + max_send_msg_size: 0 + wait_for_ready: true + dial_option: + write_buffer_size: 0 + read_buffer_size: 0 + initial_window_size: 0 + initial_connection_window_size: 0 + max_msg_size: 0 + backoff_max_delay: "120s" + backoff_base_delay: "1s" + backoff_multiplier: 1.6 + backoff_jitter: 0.2 + min_connection_timeout: "20s" + enable_backoff: false + insecure: true + timeout: "" + tcp: + dns: + cache_enabled: true + cache_expiration: 1h + refresh_duration: 30m + dialer: + timeout: "" + keepalive: "15m" + dual_stack_enabled: true + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key + keepalive: + permit_without_stream: false + time: "" + timeout: "" + tls: + ca: /path/to/ca + cert: /path/to/cert + enabled: false + key: /path/to/key +observability: + enabled: false + otlp: + collector_endpoint: "otel-collector.monitoring.svc.cluster.local:4317" + trace_batch_timeout: "1s" + trace_export_timeout: "1m" + trace_max_export_batch_size: 1024 + trace_max_queue_size: 256 + metrics_export_interval: "1s" + metrics_export_timeout: "1m" + attribute: + namespace: "_MY_POD_NAMESPACE_" + pod_name: "_MY_POD_NAME_" + node_name: "_MY_NODE_NAME_" + service_name: "vald-index-save" + metrics: + enable_cgo: true + enable_goroutine: true + enable_memory: true + enable_version_info: true + version_info_labels: + - vald_version + - server_name + - git_commit + - build_time + - go_version + - go_os + - go_arch + - ngt_version + trace: + enabled: true diff --git a/dockers/index/job/save/Dockerfile b/dockers/index/job/save/Dockerfile new file mode 100644 index 0000000000..9e7361d93b --- /dev/null +++ b/dockers/index/job/save/Dockerfile @@ -0,0 +1,93 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +ARG GO_VERSION=latest +ARG DISTROLESS_IMAGE=gcr.io/distroless/static +ARG DISTROLESS_IMAGE_TAG=nonroot +ARG MAINTAINER="vdaas.org vald team " + +FROM golang:${GO_VERSION} AS golang + +FROM ubuntu:devel AS builder + +ENV GO111MODULE on +ENV DEBIAN_FRONTEND noninteractive +ENV INITRD No +ENV LANG en_US.UTF-8 +ENV GOROOT /opt/go +ENV GOPATH /go +ENV PATH ${PATH}:${GOROOT}/bin:${GOPATH}/bin +ENV ORG vdaas +ENV REPO vald +ENV PKG index/job/save +ENV APP_NAME index-save + +# skipcq: DOK-DL3008 +RUN apt-get update && apt-get install -y --no-install-recommends \ + ca-certificates \ + build-essential \ + curl \ + upx \ + git \ + && apt-get clean \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=golang /usr/local/go $GOROOT +RUN mkdir -p "$GOPATH/src" + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/Makefile.d +COPY Makefile.d . +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} +COPY Makefile . +COPY .git . +COPY go.mod . +COPY go.sum . + +RUN make go/download + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/internal +COPY internal . + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/apis/grpc +COPY apis/grpc . + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/pkg/${PKG} +COPY pkg/${PKG} . + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/cmd/${PKG} +COPY cmd/${PKG} . + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/versions +COPY versions . + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO} +RUN make REPO=${ORG} NAME=${REPO} cmd/${PKG}/${APP_NAME} \ + && mv "cmd/${PKG}/${APP_NAME}" "/usr/bin/${APP_NAME}" + +WORKDIR ${GOPATH}/src/github.com/${ORG}/${REPO}/cmd/${PKG} +RUN cp sample.yaml /tmp/config.yaml + +FROM ${DISTROLESS_IMAGE}:${DISTROLESS_IMAGE_TAG} +LABEL maintainer="${MAINTAINER}" + +ENV APP_NAME index-save + +COPY --from=builder /usr/bin/${APP_NAME} /go/bin/${APP_NAME} +COPY --from=builder /tmp/config.yaml /etc/server/config.yaml + +USER nonroot:nonroot + +ENTRYPOINT ["/go/bin/index-save"] diff --git a/internal/config/index_save.go b/internal/config/index_save.go new file mode 100644 index 0000000000..290909e34a --- /dev/null +++ b/internal/config/index_save.go @@ -0,0 +1,54 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +// IndexSave represents the configurations for index save. +type IndexSave struct { + // AgentPort represent agent port number. + AgentPort int `json:"agent_port" yaml:"agent_port"` + + // AgentName represent agents meta_name for service discovery. + AgentName string `json:"agent_name" yaml:"agent_name"` + + // AgentNamespace represent agent namespace location. + AgentNamespace string `json:"agent_namespace" yaml:"agent_namespace"` + + // AgentDNS represent agents dns A record for service discovery. + AgentDNS string `json:"agent_dns" yaml:"agent_dns"` + + // NodeName represents node name. + NodeName string `json:"node_name" yaml:"node_name"` + + // Concurrency represents indexing concurrency. + Concurrency int `json:"concurrency" yaml:"concurrency"` + + // TargetAddrs represents indexing target addresses. + TargetAddrs []string `json:"target_addrs" yaml:"target_addrs"` + + // Discoverer represents agent discoverer service configuration. + Discoverer *DiscovererClient `json:"discoverer" yaml:"discoverer"` +} + +func (is *IndexSave) Bind() *IndexSave { + is.AgentName = GetActualValue(is.AgentName) + is.AgentNamespace = GetActualValue(is.AgentNamespace) + is.AgentDNS = GetActualValue(is.AgentDNS) + is.NodeName = GetActualValue(is.NodeName) + is.TargetAddrs = GetActualValues(is.TargetAddrs) + + if is.Discoverer != nil { + is.Discoverer.Bind() + } + return is +} diff --git a/internal/config/index_save_test.go b/internal/config/index_save_test.go new file mode 100644 index 0000000000..5040a4dc51 --- /dev/null +++ b/internal/config/index_save_test.go @@ -0,0 +1,132 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +// NOT IMPLEMENTED BELOW +// +// func TestIndexSave_Bind(t *testing.T) { +// type fields struct { +// AgentPort int +// AgentName string +// AgentNamespace string +// AgentDNS string +// NodeName string +// Concurrency int +// TargetAddrs []string +// Discoverer *DiscovererClient +// } +// type want struct { +// want *IndexCreation +// } +// type test struct { +// name string +// fields fields +// want want +// checkFunc func(want, *IndexCreation) error +// beforeFunc func(*testing.T) +// afterFunc func(*testing.T) +// } +// defaultCheckFunc := func(w want, got *IndexCreation) error { +// if !reflect.DeepEqual(got, w.want) { +// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// fields: fields { +// AgentPort:0, +// AgentName:"", +// AgentNamespace:"", +// AgentDNS:"", +// NodeName:"", +// Concurrency:0, +// TargetAddrs:nil, +// Discoverer:DiscovererClient{}, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T,) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T,) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// fields: fields { +// AgentPort:0, +// AgentName:"", +// AgentNamespace:"", +// AgentDNS:"", +// NodeName:"", +// Concurrency:0, +// TargetAddrs:nil, +// Discoverer:DiscovererClient{}, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T,) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T,) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// is := &IndexSave{ +// AgentPort: test.fields.AgentPort, +// AgentName: test.fields.AgentName, +// AgentNamespace: test.fields.AgentNamespace, +// AgentDNS: test.fields.AgentDNS, +// NodeName: test.fields.NodeName, +// Concurrency: test.fields.Concurrency, +// TargetAddrs: test.fields.TargetAddrs, +// Discoverer: test.fields.Discoverer, +// } +// +// got := is.Bind() +// if err := checkFunc(test.want, got); err != nil { +// tt.Errorf("error = %v", err) +// } +// }) +// } +// } diff --git a/pkg/index/job/save/config/config.go b/pkg/index/job/save/config/config.go new file mode 100644 index 0000000000..6810d80241 --- /dev/null +++ b/pkg/index/job/save/config/config.go @@ -0,0 +1,71 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +import ( + "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/errors" +) + +// GlobalConfig is a type alias of config.GlobalConfig representing application base configurations. +type GlobalConfig = config.GlobalConfig + +// Data represents the application configurations. +type Data struct { + // GlobalConfig represents application base configurations. + config.GlobalConfig `json:",inline" yaml:",inline"` + + // Server represent all server configurations + Server *config.Servers `json:"server_config" yaml:"server_config"` + + // Observability represents observability configurations. + Observability *config.Observability `json:"observability" yaml:"observability"` + + // Save represents auto save indexing service configurations. + Save *config.IndexSave `json:"savior" yaml:"savior"` +} + +// NewConfig load configurations from file path. +func NewConfig(path string) (cfg *Data, err error) { + cfg = new(Data) + + if err = config.Read(path, &cfg); err != nil { + return nil, err + } + + if cfg != nil { + _ = cfg.GlobalConfig.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + + if cfg.Server != nil { + _ = cfg.Server.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + + if cfg.Observability != nil { + _ = cfg.Observability.Bind() + } else { + cfg.Observability = new(config.Observability).Bind() + } + + if cfg.Save != nil { + _ = cfg.Save.Bind() + } else { + return nil, errors.ErrInvalidConfig + } + return cfg, nil +} diff --git a/pkg/index/job/save/service/indexer.go b/pkg/index/job/save/service/indexer.go new file mode 100644 index 0000000000..860c29c37a --- /dev/null +++ b/pkg/index/job/save/service/indexer.go @@ -0,0 +1,219 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +import ( + "context" + "reflect" + + agent "github.com/vdaas/vald/apis/grpc/v1/agent/core" + "github.com/vdaas/vald/apis/grpc/v1/payload" + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc/codes" + "github.com/vdaas/vald/internal/net/grpc/status" + "github.com/vdaas/vald/internal/observability/trace" + "github.com/vdaas/vald/internal/strings" + "github.com/vdaas/vald/internal/sync" +) + +const ( + apiName = "vald/index/job/save" + grpcMethodName = "core.v1.Agent/" + agent.SaveIndexRPCName +) + +// Indexer represents an interface for indexing. +type Indexer interface { + PreStart(ctx context.Context) (<-chan error, error) + Start(ctx context.Context) error +} + +type index struct { + client discoverer.Client + targetAddrs []string + targetAddrList map[string]bool + + concurrency int +} + +// New returns Indexer object if no error occurs. +func New(opts ...Option) (Indexer, error) { + idx := new(index) + for _, opt := range append(defaultOpts, opts...) { + if err := opt(idx); err != nil { + oerr := errors.ErrOptionFailed(err, reflect.ValueOf(opt)) + e := &errors.ErrCriticalOption{} + if errors.As(oerr, &e) { + log.Error(err) + return nil, oerr + } + log.Warn(oerr) + } + } + idx.targetAddrList = make(map[string]bool, len(idx.targetAddrs)) + for _, addr := range idx.targetAddrs { + idx.targetAddrList[addr] = true + } + return idx, nil +} + +// PreStart starts the preparation process. +func (idx *index) PreStart(ctx context.Context) (<-chan error, error) { + return idx.client.Start(ctx) +} + +// Start starts indexing process. +func (idx *index) Start(ctx context.Context) error { + ctx, span := trace.StartSpan(ctx, apiName+"/service/index.Start") + defer func() { + if span != nil { + span.End() + } + }() + + err := idx.doSaveIndex(ctx, + func(ctx context.Context, ac agent.AgentClient, copts ...grpc.CallOption) (*payload.Empty, error) { + return ac.SaveIndex(ctx, &payload.Empty{}, copts...) + }, + ) + if err != nil { + var attrs trace.Attributes + switch { + case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): + err = status.WrapWithInternal( + agent.SaveIndexRPCName+" API connection not found", err, + ) + attrs = trace.StatusCodeInternal(err.Error()) + case errors.Is(err, errors.ErrGRPCTargetAddrNotFound): + err = status.WrapWithInternal( + agent.SaveIndexRPCName+" API connection target address \""+strings.Join(idx.targetAddrs, ",")+"\" not found", err, + ) + attrs = trace.StatusCodeInternal(err.Error()) + default: + var ( + st *status.Status + msg string + ) + st, msg, err = status.ParseError(err, codes.Internal, + "failed to parse "+agent.SaveIndexRPCName+" gRPC error response", + ) + attrs = trace.FromGRPCStatus(st.Code(), msg) + } + log.Warn(err) + if span != nil { + span.RecordError(err) + span.SetAttributes(attrs...) + span.SetStatus(trace.StatusError, err.Error()) + } + return err + } + return nil +} + +func (idx *index) doSaveIndex(ctx context.Context, fn func(_ context.Context, _ agent.AgentClient, _ ...grpc.CallOption) (*payload.Empty, error)) (errs error) { + ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, grpcMethodName), apiName+"/service/index.doSaveIndex") + defer func() { + if span != nil { + span.End() + } + }() + + targetAddrs := idx.client.GetAddrs(ctx) + if len(idx.targetAddrs) != 0 { + targetAddrs = idx.extractTargetAddrs(targetAddrs) + + // If targetAddrs is empty, an invalid target addresses may be registered in targetAddrList. + if len(targetAddrs) == 0 { + return errors.ErrGRPCTargetAddrNotFound + } + } + log.Infof("target agent addrs: %v", targetAddrs) + + var emu sync.Mutex + err := idx.client.GetClient().OrderedRangeConcurrent(ctx, targetAddrs, idx.concurrency, + func(ctx context.Context, target string, conn *grpc.ClientConn, copts ...grpc.CallOption) error { + ctx, span := trace.StartSpan(grpc.WrapGRPCMethod(ctx, "OrderedRangeConcurrent/"+target), agent.SaveIndexRPCName+"/"+target) + defer func() { + if span != nil { + span.End() + } + }() + _, err := fn(ctx, agent.NewAgentClient(conn), copts...) + if err != nil { + var attrs trace.Attributes + switch { + case errors.Is(err, context.Canceled): + err = status.WrapWithCanceled( + agent.SaveIndexRPCName+" API canceld", err, + ) + attrs = trace.StatusCodeCancelled(err.Error()) + case errors.Is(err, context.DeadlineExceeded): + err = status.WrapWithCanceled( + agent.SaveIndexRPCName+" API deadline exceeded", err, + ) + attrs = trace.StatusCodeDeadlineExceeded(err.Error()) + case errors.Is(err, errors.ErrGRPCClientConnNotFound("*")): + err = status.WrapWithInternal( + agent.SaveIndexRPCName+" API connection not found", err, + ) + attrs = trace.StatusCodeInternal(err.Error()) + case errors.Is(err, errors.ErrTargetNotFound): + err = status.WrapWithInvalidArgument( + agent.SaveIndexRPCName+" API target not found", err, + ) + attrs = trace.StatusCodeInternal(err.Error()) + default: + var ( + st *status.Status + msg string + ) + st, msg, err = status.ParseError(err, codes.Internal, + "failed to parse "+agent.SaveIndexRPCName+" gRPC error response", + ) + if st != nil && err != nil && st.Code() == codes.FailedPrecondition { + log.Warnf("SaveIndex of %s skipped, message: %s, err: %v", target, st.Message(), errors.Join(st.Err(), err)) + return nil + } + attrs = trace.FromGRPCStatus(st.Code(), msg) + } + log.Warnf("an error occurred in (%s) during indexing: %v", target, err) + if span != nil { + span.RecordError(err) + span.SetAttributes(attrs...) + span.SetStatus(trace.StatusError, err.Error()) + } + emu.Lock() + errs = errors.Join(errs, err) + emu.Unlock() + } + return err + }, + ) + return errors.Join(err, errs) +} + +// extractTargetAddresses filters and extracts target addresses registered in targetAddrList from the given address list. +func (idx *index) extractTargetAddrs(addrs []string) []string { + res := make([]string, 0, len(addrs)) + for _, addr := range addrs { + if !idx.targetAddrList[addr] { + log.Warnf("the gRPC target address not found: %s", addr) + } else { + res = append(res, addr) + } + } + return res +} diff --git a/pkg/index/job/save/service/mock_test.go b/pkg/index/job/save/service/mock_test.go new file mode 100644 index 0000000000..89c0f19aa5 --- /dev/null +++ b/pkg/index/job/save/service/mock_test.go @@ -0,0 +1,57 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +import ( + "context" + + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + "github.com/vdaas/vald/internal/net/grpc" +) + +type mockDiscovererClient struct { + discoverer.Client + GetAddrsFunc func(ctx context.Context) []string + GetClientFunc func() grpc.Client +} + +func (mc *mockDiscovererClient) GetAddrs(ctx context.Context) []string { + return mc.GetAddrsFunc(ctx) +} + +func (mc *mockDiscovererClient) GetClient() grpc.Client { + return mc.GetClientFunc() +} + +type mockGrpcClient struct { + grpc.Client + OrderedRangeConcurrentFunc func(ctx context.Context, + order []string, + concurrency int, + f func(ctx context.Context, + addr string, + conn *grpc.ClientConn, + copts ...grpc.CallOption) error) error +} + +func (mc *mockGrpcClient) OrderedRangeConcurrent(ctx context.Context, + order []string, + concurrency int, + f func(ctx context.Context, + addr string, + conn *grpc.ClientConn, + copts ...grpc.CallOption) error, +) error { + return mc.OrderedRangeConcurrentFunc(ctx, order, concurrency, f) +} diff --git a/pkg/index/job/save/service/options.go b/pkg/index/job/save/service/options.go new file mode 100644 index 0000000000..e015dd363b --- /dev/null +++ b/pkg/index/job/save/service/options.go @@ -0,0 +1,58 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +import ( + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + "github.com/vdaas/vald/internal/errors" +) + +// Option represents the functional option for index. +type Option func(_ *index) error + +var defaultOpts = []Option{ + WithIndexingConcurrency(1), +} + +// WithDiscoverer returns Option that sets discoverer client. +func WithDiscoverer(client discoverer.Client) Option { + return func(idx *index) error { + if client == nil { + return errors.NewErrCriticalOption("discoverer", client) + } + idx.client = client + return nil + } +} + +// WithIndexingConcurrency returns Option that sets indexing concurrency. +func WithIndexingConcurrency(num int) Option { + return func(idx *index) error { + if num <= 0 { + return errors.NewErrInvalidOption("indexingConcurrency", num) + } + idx.concurrency = num + return nil + } +} + +// WithTargetAddrs returns Option that sets indexing target addresses. +func WithTargetAddrs(addrs ...string) Option { + return func(idx *index) error { + if len(addrs) != 0 { + idx.targetAddrs = append(idx.targetAddrs, addrs...) + } + return nil + } +} diff --git a/pkg/index/job/save/usecase/save.go b/pkg/index/job/save/usecase/save.go new file mode 100644 index 0000000000..a230bb5a11 --- /dev/null +++ b/pkg/index/job/save/usecase/save.go @@ -0,0 +1,213 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package usecase + +import ( + "context" + "os" + "syscall" + + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + iconfig "github.com/vdaas/vald/internal/config" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/log" + "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc/interceptor/server/recover" + "github.com/vdaas/vald/internal/observability" + "github.com/vdaas/vald/internal/runner" + "github.com/vdaas/vald/internal/safety" + "github.com/vdaas/vald/internal/servers/server" + "github.com/vdaas/vald/internal/servers/starter" + "github.com/vdaas/vald/internal/sync/errgroup" + "github.com/vdaas/vald/pkg/index/job/save/config" + "github.com/vdaas/vald/pkg/index/job/save/service" +) + +type run struct { + eg errgroup.Group + cfg *config.Data + observability observability.Observability + server starter.Server + indexer service.Indexer +} + +// New returns Runner instance. +func New(cfg *config.Data) (_ runner.Runner, err error) { + eg := errgroup.Get() + + dOpts, err := cfg.Save.Discoverer.Client.Opts() + if err != nil { + return nil, err + } + // skipcq: CRT-D0001 + dOpts = append(dOpts, grpc.WithErrGroup(eg)) + + acOpts, err := cfg.Save.Discoverer.AgentClientOptions.Opts() + if err != nil { + return nil, err + } + // skipcq: CRT-D0001 + acOpts = append(acOpts, grpc.WithErrGroup(eg)) + + discoverer, err := discoverer.New( + discoverer.WithAutoConnect(true), + discoverer.WithName(cfg.Save.AgentName), + discoverer.WithNamespace(cfg.Save.AgentNamespace), + discoverer.WithPort(cfg.Save.AgentPort), + discoverer.WithServiceDNSARecord(cfg.Save.AgentDNS), + discoverer.WithDiscovererClient(grpc.New(dOpts...)), + discoverer.WithDiscoverDuration(cfg.Save.Discoverer.Duration), + discoverer.WithOptions(acOpts...), + discoverer.WithNodeName(cfg.Save.NodeName), + discoverer.WithOnDiscoverFunc(func(ctx context.Context, c discoverer.Client, addrs []string) error { + last := len(addrs) - 1 + for i := 0; i < len(addrs)/2; i++ { + addrs[i], addrs[last-i] = addrs[last-i], addrs[i] + } + return nil + }), + ) + if err != nil { + return nil, err + } + + indexer, err := service.New( + service.WithDiscoverer(discoverer), + service.WithIndexingConcurrency(cfg.Save.Concurrency), + service.WithTargetAddrs(cfg.Save.TargetAddrs...), + ) + if err != nil { + return nil, err + } + + srv, err := starter.New( + starter.WithConfig(cfg.Server), + starter.WithGRPC(func(cfg *iconfig.Server) []server.Option { + return []server.Option{ + server.WithGRPCOption( + grpc.ChainUnaryInterceptor(recover.RecoverInterceptor()), + grpc.ChainStreamInterceptor(recover.RecoverStreamInterceptor()), + ), + } + }), + ) + if err != nil { + return nil, err + } + + var obs observability.Observability + if cfg.Observability.Enabled { + obs, err = observability.NewWithConfig( + cfg.Observability, + ) + if err != nil { + return nil, err + } + } + + return &run{ + eg: eg, + cfg: cfg, + observability: obs, + server: srv, + indexer: indexer, + }, nil +} + +// PreStart is a method called before execution of Start, and it invokes the PreStart method of observability. +func (r *run) PreStart(ctx context.Context) error { + if r.observability != nil { + return r.observability.PreStart(ctx) + } + return nil +} + +// Start is a method used to initiate an operation in the run, and it returns a channel for receiving errors +// during the operation and an error representing any initialization errors. +func (r *run) Start(ctx context.Context) (<-chan error, error) { + ech := make(chan error, 4) + var sech, oech <-chan error + if r.observability != nil { + oech = r.observability.Start(ctx) + } + sech = r.server.ListenAndServe(ctx) + ipech, err := r.indexer.PreStart(ctx) + if err != nil { + close(ech) + return nil, err + } + + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer func() { + p, err := os.FindProcess(os.Getpid()) + if err != nil { + // using Fatal to avoid this process to be zombie + // skipcq: RVV-A0003 + log.Fatalf("failed to find my pid to kill %v", err) + return + } + log.Info("sending SIGTERM to myself to stop this job") + if err := p.Signal(syscall.SIGTERM); err != nil { + log.Error(err) + } + }() + return r.indexer.Start(ctx) + })) + + r.eg.Go(safety.RecoverFunc(func() (err error) { + defer close(ech) + for { + select { + case <-ctx.Done(): + return ctx.Err() + case err = <-oech: + case err = <-sech: + case err = <-ipech: + } + if err != nil { + select { + case <-ctx.Done(): + return errors.Join(ctx.Err(), err) + case ech <- err: + } + } + } + })) + return ech, nil +} + +// PreStop is a method called before execution of Stop. +func (*run) PreStop(_ context.Context) error { + return nil +} + +// Stop is a method used to stop an operation in the run. +func (r *run) Stop(ctx context.Context) (errs error) { + if r.observability != nil { + if err := r.observability.Stop(ctx); err != nil { + errs = errors.Join(errs, err) + } + } + if r.server != nil { + if err := r.server.Shutdown(ctx); err != nil { + errs = errors.Join(errs, err) + } + } + return errs +} + +// PtopStop is a method called after execution of Stop. +func (*run) PostStop(_ context.Context) error { + return nil +} From acd8b0c15651c1b32369d6ee70729cd4af147c3a Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 6 Nov 2023 13:21:03 +0900 Subject: [PATCH 02/16] make gotest/gen and make format Signed-off-by: hlts2 --- internal/config/index_save_test.go | 39 +-- pkg/index/job/save/config/config_test.go | 106 +++++++ pkg/index/job/save/service/indexer_test.go | 332 +++++++++++++++++++++ pkg/index/job/save/service/options_test.go | 274 +++++++++++++++++ pkg/index/job/save/usecase/save_test.go | 106 +++++++ 5 files changed, 838 insertions(+), 19 deletions(-) create mode 100644 pkg/index/job/save/config/config_test.go create mode 100644 pkg/index/job/save/service/indexer_test.go create mode 100644 pkg/index/job/save/service/options_test.go create mode 100644 pkg/index/job/save/usecase/save_test.go diff --git a/internal/config/index_save_test.go b/internal/config/index_save_test.go index 5040a4dc51..f363c77b0e 100644 --- a/internal/config/index_save_test.go +++ b/internal/config/index_save_test.go @@ -17,27 +17,27 @@ package config // // func TestIndexSave_Bind(t *testing.T) { // type fields struct { -// AgentPort int -// AgentName string -// AgentNamespace string -// AgentDNS string -// NodeName string -// Concurrency int -// TargetAddrs []string -// Discoverer *DiscovererClient +// AgentPort int +// AgentName string +// AgentNamespace string +// AgentDNS string +// NodeName string +// Concurrency int +// TargetAddrs []string +// Discoverer *DiscovererClient // } // type want struct { -// want *IndexCreation +// want *IndexSave // } // type test struct { // name string // fields fields // want want -// checkFunc func(want, *IndexCreation) error +// checkFunc func(want, *IndexSave) error // beforeFunc func(*testing.T) // afterFunc func(*testing.T) // } -// defaultCheckFunc := func(w want, got *IndexCreation) error { +// defaultCheckFunc := func(w want, got *IndexSave) error { // if !reflect.DeepEqual(got, w.want) { // return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) // } @@ -113,20 +113,21 @@ package config // checkFunc = defaultCheckFunc // } // is := &IndexSave{ -// AgentPort: test.fields.AgentPort, -// AgentName: test.fields.AgentName, -// AgentNamespace: test.fields.AgentNamespace, -// AgentDNS: test.fields.AgentDNS, -// NodeName: test.fields.NodeName, -// Concurrency: test.fields.Concurrency, -// TargetAddrs: test.fields.TargetAddrs, -// Discoverer: test.fields.Discoverer, +// AgentPort: test.fields.AgentPort, +// AgentName: test.fields.AgentName, +// AgentNamespace: test.fields.AgentNamespace, +// AgentDNS: test.fields.AgentDNS, +// NodeName: test.fields.NodeName, +// Concurrency: test.fields.Concurrency, +// TargetAddrs: test.fields.TargetAddrs, +// Discoverer: test.fields.Discoverer, // } // // got := is.Bind() // if err := checkFunc(test.want, got); err != nil { // tt.Errorf("error = %v", err) // } +// // }) // } // } diff --git a/pkg/index/job/save/config/config_test.go b/pkg/index/job/save/config/config_test.go new file mode 100644 index 0000000000..0cf6858bf7 --- /dev/null +++ b/pkg/index/job/save/config/config_test.go @@ -0,0 +1,106 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package config + +// NOT IMPLEMENTED BELOW +// +// func TestNewConfig(t *testing.T) { +// type args struct { +// path string +// } +// type want struct { +// wantCfg *Data +// err error +// } +// type test struct { +// name string +// args args +// want want +// checkFunc func(want, *Data, error) error +// beforeFunc func(*testing.T, args) +// afterFunc func(*testing.T, args) +// } +// defaultCheckFunc := func(w want, gotCfg *Data, err error) error { +// if !errors.Is(err, w.err) { +// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) +// } +// if !reflect.DeepEqual(gotCfg, w.wantCfg) { +// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", gotCfg, w.wantCfg) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// args: args { +// path:"", +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// args: args { +// path:"", +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt, test.args) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt, test.args) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// +// gotCfg, err := NewConfig(test.args.path) +// if err := checkFunc(test.want, gotCfg, err); err != nil { +// tt.Errorf("error = %v", err) +// } +// +// }) +// } +// } diff --git a/pkg/index/job/save/service/indexer_test.go b/pkg/index/job/save/service/indexer_test.go new file mode 100644 index 0000000000..11f4816a28 --- /dev/null +++ b/pkg/index/job/save/service/indexer_test.go @@ -0,0 +1,332 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +// NOT IMPLEMENTED BELOW +// +// func TestNew(t *testing.T) { +// type args struct { +// opts []Option +// } +// type want struct { +// want Indexer +// err error +// } +// type test struct { +// name string +// args args +// want want +// checkFunc func(want, Indexer, error) error +// beforeFunc func(*testing.T, args) +// afterFunc func(*testing.T, args) +// } +// defaultCheckFunc := func(w want, got Indexer, err error) error { +// if !errors.Is(err, w.err) { +// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) +// } +// if !reflect.DeepEqual(got, w.want) { +// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// args: args { +// opts:nil, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// args: args { +// opts:nil, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt, test.args) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt, test.args) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// +// got, err := New(test.args.opts...) +// if err := checkFunc(test.want, got, err); err != nil { +// tt.Errorf("error = %v", err) +// } +// +// }) +// } +// } +// +// func Test_index_PreStart(t *testing.T) { +// type args struct { +// ctx context.Context +// } +// type fields struct { +// client discoverer.Client +// targetAddrs []string +// targetAddrList map[string]bool +// concurrency int +// } +// type want struct { +// want <-chan error +// err error +// } +// type test struct { +// name string +// args args +// fields fields +// want want +// checkFunc func(want, <-chan error, error) error +// beforeFunc func(*testing.T, args) +// afterFunc func(*testing.T, args) +// } +// defaultCheckFunc := func(w want, got <-chan error, err error) error { +// if !errors.Is(err, w.err) { +// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) +// } +// if !reflect.DeepEqual(got, w.want) { +// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// args: args { +// ctx:nil, +// }, +// fields: fields { +// client:nil, +// targetAddrs:nil, +// targetAddrList:nil, +// concurrency:0, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// args: args { +// ctx:nil, +// }, +// fields: fields { +// client:nil, +// targetAddrs:nil, +// targetAddrList:nil, +// concurrency:0, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt, test.args) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt, test.args) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// idx := &index{ +// client: test.fields.client, +// targetAddrs: test.fields.targetAddrs, +// targetAddrList: test.fields.targetAddrList, +// concurrency: test.fields.concurrency, +// } +// +// got, err := idx.PreStart(test.args.ctx) +// if err := checkFunc(test.want, got, err); err != nil { +// tt.Errorf("error = %v", err) +// } +// +// }) +// } +// } +// +// func Test_index_Start(t *testing.T) { +// type args struct { +// ctx context.Context +// } +// type fields struct { +// client discoverer.Client +// targetAddrs []string +// targetAddrList map[string]bool +// concurrency int +// } +// type want struct { +// err error +// } +// type test struct { +// name string +// args args +// fields fields +// want want +// checkFunc func(want, error) error +// beforeFunc func(*testing.T, args) +// afterFunc func(*testing.T, args) +// } +// defaultCheckFunc := func(w want, err error) error { +// if !errors.Is(err, w.err) { +// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// args: args { +// ctx:nil, +// }, +// fields: fields { +// client:nil, +// targetAddrs:nil, +// targetAddrList:nil, +// concurrency:0, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// args: args { +// ctx:nil, +// }, +// fields: fields { +// client:nil, +// targetAddrs:nil, +// targetAddrList:nil, +// concurrency:0, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt, test.args) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt, test.args) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// idx := &index{ +// client: test.fields.client, +// targetAddrs: test.fields.targetAddrs, +// targetAddrList: test.fields.targetAddrList, +// concurrency: test.fields.concurrency, +// } +// +// err := idx.Start(test.args.ctx) +// if err := checkFunc(test.want, err); err != nil { +// tt.Errorf("error = %v", err) +// } +// +// }) +// } +// } diff --git a/pkg/index/job/save/service/options_test.go b/pkg/index/job/save/service/options_test.go new file mode 100644 index 0000000000..97a83b802f --- /dev/null +++ b/pkg/index/job/save/service/options_test.go @@ -0,0 +1,274 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package service + +// NOT IMPLEMENTED BELOW +// +// func TestWithDiscoverer(t *testing.T) { +// type args struct { +// client discoverer.Client +// } +// type want struct { +// want Option +// } +// type test struct { +// name string +// args args +// want want +// checkFunc func(want, Option) error +// beforeFunc func(*testing.T, args) +// afterFunc func(*testing.T, args) +// } +// defaultCheckFunc := func(w want, got Option) error { +// if !reflect.DeepEqual(got, w.want) { +// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// args: args { +// client:nil, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// args: args { +// client:nil, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt, test.args) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt, test.args) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// +// got := WithDiscoverer(test.args.client) +// if err := checkFunc(test.want, got); err != nil { +// tt.Errorf("error = %v", err) +// } +// +// }) +// } +// } +// +// func TestWithIndexingConcurrency(t *testing.T) { +// type args struct { +// num int +// } +// type want struct { +// want Option +// } +// type test struct { +// name string +// args args +// want want +// checkFunc func(want, Option) error +// beforeFunc func(*testing.T, args) +// afterFunc func(*testing.T, args) +// } +// defaultCheckFunc := func(w want, got Option) error { +// if !reflect.DeepEqual(got, w.want) { +// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// args: args { +// num:0, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// args: args { +// num:0, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt, test.args) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt, test.args) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// +// got := WithIndexingConcurrency(test.args.num) +// if err := checkFunc(test.want, got); err != nil { +// tt.Errorf("error = %v", err) +// } +// +// }) +// } +// } +// +// func TestWithTargetAddrs(t *testing.T) { +// type args struct { +// addrs []string +// } +// type want struct { +// want Option +// } +// type test struct { +// name string +// args args +// want want +// checkFunc func(want, Option) error +// beforeFunc func(*testing.T, args) +// afterFunc func(*testing.T, args) +// } +// defaultCheckFunc := func(w want, got Option) error { +// if !reflect.DeepEqual(got, w.want) { +// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// args: args { +// addrs:nil, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// args: args { +// addrs:nil, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt, test.args) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt, test.args) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// +// got := WithTargetAddrs(test.args.addrs...) +// if err := checkFunc(test.want, got); err != nil { +// tt.Errorf("error = %v", err) +// } +// +// }) +// } +// } diff --git a/pkg/index/job/save/usecase/save_test.go b/pkg/index/job/save/usecase/save_test.go new file mode 100644 index 0000000000..1eab852c2d --- /dev/null +++ b/pkg/index/job/save/usecase/save_test.go @@ -0,0 +1,106 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package usecase + +// NOT IMPLEMENTED BELOW +// +// func TestNew(t *testing.T) { +// type args struct { +// cfg *config.Data +// } +// type want struct { +// want runner.Runner +// err error +// } +// type test struct { +// name string +// args args +// want want +// checkFunc func(want, runner.Runner, error) error +// beforeFunc func(*testing.T, args) +// afterFunc func(*testing.T, args) +// } +// defaultCheckFunc := func(w want, got runner.Runner, err error) error { +// if !errors.Is(err, w.err) { +// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) +// } +// if !reflect.DeepEqual(got, w.want) { +// return errors.Errorf("got: \"%#v\",\n\t\t\t\twant: \"%#v\"", got, w.want) +// } +// return nil +// } +// tests := []test{ +// // TODO test cases +// /* +// { +// name: "test_case_1", +// args: args { +// cfg:nil, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// }, +// */ +// +// // TODO test cases +// /* +// func() test { +// return test { +// name: "test_case_2", +// args: args { +// cfg:nil, +// }, +// want: want{}, +// checkFunc: defaultCheckFunc, +// beforeFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// afterFunc: func(t *testing.T, args args) { +// t.Helper() +// }, +// } +// }(), +// */ +// } +// +// for _, tc := range tests { +// test := tc +// t.Run(test.name, func(tt *testing.T) { +// tt.Parallel() +// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) +// if test.beforeFunc != nil { +// test.beforeFunc(tt, test.args) +// } +// if test.afterFunc != nil { +// defer test.afterFunc(tt, test.args) +// } +// checkFunc := test.checkFunc +// if test.checkFunc == nil { +// checkFunc = defaultCheckFunc +// } +// +// got, err := New(test.args.cfg) +// if err := checkFunc(test.want, got, err); err != nil { +// tt.Errorf("error = %v", err) +// } +// +// }) +// } +// } From 7dbc8641640c03bd17faab39b3901a88043739a9 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 6 Nov 2023 13:25:06 +0900 Subject: [PATCH 03/16] fix: sample configuration data Signed-off-by: hlts2 --- cmd/index/job/save/sample.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/index/job/save/sample.yaml b/cmd/index/job/save/sample.yaml index c83105c86b..8b263e9d73 100644 --- a/cmd/index/job/save/sample.yaml +++ b/cmd/index/job/save/sample.yaml @@ -70,7 +70,7 @@ server_config: cert: /path/to/cert enabled: false key: /path/to/key -creator: +savior: agent_port: 8081 agent_name: "vald-agent-ngt" agent_dns: vald-agent-ngt.default.svc.cluster.local From a208cc17526f0ec19e2d364f7645e59f97769032 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 6 Nov 2023 14:36:01 +0900 Subject: [PATCH 04/16] feat: add save indexing test Signed-off-by: hlts2 --- pkg/index/job/save/service/indexer_test.go | 306 +++++++++++++-------- 1 file changed, 196 insertions(+), 110 deletions(-) diff --git a/pkg/index/job/save/service/indexer_test.go b/pkg/index/job/save/service/indexer_test.go index 11f4816a28..a8af4c546e 100644 --- a/pkg/index/job/save/service/indexer_test.go +++ b/pkg/index/job/save/service/indexer_test.go @@ -13,6 +13,202 @@ // limitations under the License. package service +import ( + "context" + "testing" + + agent "github.com/vdaas/vald/apis/grpc/v1/agent/core" + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + "github.com/vdaas/vald/internal/errors" + "github.com/vdaas/vald/internal/net/grpc" + "github.com/vdaas/vald/internal/net/grpc/codes" + "github.com/vdaas/vald/internal/net/grpc/status" + "github.com/vdaas/vald/internal/test/goleak" +) + +func Test_index_Start(t *testing.T) { + type args struct { + ctx context.Context + } + type fields struct { + client discoverer.Client + targetAddrs []string + targetAddrList map[string]bool + concurrency int + } + type want struct { + err error + } + type test struct { + name string + args args + fields fields + want want + checkFunc func(want, error) error + beforeFunc func(*testing.T, args) + afterFunc func(*testing.T, args) + } + defaultCheckFunc := func(w want, err error) error { + if !errors.Is(err, w.err) { + return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) + } + return nil + } + tests := []test{ + func() test { + addrs := []string{ + "127.0.0.1:8080", + } + return test{ + name: "Success: when there is no error in the indexing request process", + args: args{ + ctx: context.Background(), + }, + + fields: fields{ + client: &mockDiscovererClient{ + GetAddrsFunc: func(_ context.Context) []string { + return addrs + }, + GetClientFunc: func() grpc.Client { + return &mockGrpcClient{ + OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, + _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, + ) error { + return nil + }, + } + }, + }, + }, + } + }(), + func() test { + addrs := []string{ + "127.0.0.1:8080", + } + return test{ + name: "Fail: when there is an error wrapped with gRPC status in the indexing request process", + args: args{ + ctx: context.Background(), + }, + fields: fields{ + client: &mockDiscovererClient{ + GetAddrsFunc: func(_ context.Context) []string { + return addrs + }, + GetClientFunc: func() grpc.Client { + return &mockGrpcClient{ + OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, + _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, + ) error { + return status.WrapWithInternal( + agent.SaveIndexRPCName+" API connection not found", + errors.ErrGRPCClientConnNotFound("*"), + ) + }, + } + }, + }, + }, + want: want{ + err: status.Error(codes.Internal, + agent.SaveIndexRPCName+" API connection not found"), + }, + } + }(), + func() test { + addrs := []string{ + "127.0.0.1:8080", + } + return test{ + name: "Fail: When the OrderedRangeConcurrent method returns a gRPC client conn not found error", + args: args{ + ctx: context.Background(), + }, + + fields: fields{ + client: &mockDiscovererClient{ + GetAddrsFunc: func(_ context.Context) []string { + return addrs + }, + GetClientFunc: func() grpc.Client { + return &mockGrpcClient{ + OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, + _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, + ) error { + return errors.ErrGRPCClientConnNotFound("*") + }, + } + }, + }, + }, + want: want{ + err: status.Error(codes.Internal, + agent.SaveIndexRPCName+" API connection not found"), + }, + } + }(), + func() test { + targetAddrs := []string{ + "127.0.0.1:8080", + } + targetAddrList := map[string]bool{ + targetAddrs[0]: true, + } + return test{ + name: "Fail: when there is no address matching targetAddrList", + args: args{ + ctx: context.Background(), + }, + fields: fields{ + client: &mockDiscovererClient{ + GetAddrsFunc: func(_ context.Context) []string { + return nil + }, + }, + targetAddrs: targetAddrs, + targetAddrList: targetAddrList, + }, + want: want{ + err: status.Error(codes.Internal, + agent.SaveIndexRPCName+" API connection target address \"127.0.0.1:8080\" not found"), + }, + } + }(), + } + + for _, tc := range tests { + test := tc + t.Run(test.name, func(tt *testing.T) { + tt.Parallel() + defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) + if test.beforeFunc != nil { + test.beforeFunc(tt, test.args) + } + if test.afterFunc != nil { + defer test.afterFunc(tt, test.args) + } + checkFunc := test.checkFunc + if test.checkFunc == nil { + checkFunc = defaultCheckFunc + } + idx := &index{ + client: test.fields.client, + targetAddrs: test.fields.targetAddrs, + targetAddrList: test.fields.targetAddrList, + concurrency: test.fields.concurrency, + } + + err := idx.Start(test.args.ctx) + if err := checkFunc(test.want, err); err != nil { + tt.Errorf("error = %v", err) + } + + }) + } +} + // NOT IMPLEMENTED BELOW // // func TestNew(t *testing.T) { @@ -220,113 +416,3 @@ package service // } // } // -// func Test_index_Start(t *testing.T) { -// type args struct { -// ctx context.Context -// } -// type fields struct { -// client discoverer.Client -// targetAddrs []string -// targetAddrList map[string]bool -// concurrency int -// } -// type want struct { -// err error -// } -// type test struct { -// name string -// args args -// fields fields -// want want -// checkFunc func(want, error) error -// beforeFunc func(*testing.T, args) -// afterFunc func(*testing.T, args) -// } -// defaultCheckFunc := func(w want, err error) error { -// if !errors.Is(err, w.err) { -// return errors.Errorf("got_error: \"%#v\",\n\t\t\t\twant: \"%#v\"", err, w.err) -// } -// return nil -// } -// tests := []test{ -// // TODO test cases -// /* -// { -// name: "test_case_1", -// args: args { -// ctx:nil, -// }, -// fields: fields { -// client:nil, -// targetAddrs:nil, -// targetAddrList:nil, -// concurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// }, -// */ -// -// // TODO test cases -// /* -// func() test { -// return test { -// name: "test_case_2", -// args: args { -// ctx:nil, -// }, -// fields: fields { -// client:nil, -// targetAddrs:nil, -// targetAddrList:nil, -// concurrency:0, -// }, -// want: want{}, -// checkFunc: defaultCheckFunc, -// beforeFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// afterFunc: func(t *testing.T, args args) { -// t.Helper() -// }, -// } -// }(), -// */ -// } -// -// for _, tc := range tests { -// test := tc -// t.Run(test.name, func(tt *testing.T) { -// tt.Parallel() -// defer goleak.VerifyNone(tt, goleak.IgnoreCurrent()) -// if test.beforeFunc != nil { -// test.beforeFunc(tt, test.args) -// } -// if test.afterFunc != nil { -// defer test.afterFunc(tt, test.args) -// } -// checkFunc := test.checkFunc -// if test.checkFunc == nil { -// checkFunc = defaultCheckFunc -// } -// idx := &index{ -// client: test.fields.client, -// targetAddrs: test.fields.targetAddrs, -// targetAddrList: test.fields.targetAddrList, -// concurrency: test.fields.concurrency, -// } -// -// err := idx.Start(test.args.ctx) -// if err := checkFunc(test.want, err); err != nil { -// tt.Errorf("error = %v", err) -// } -// -// }) -// } -// } From ba29166a71d18ac460b9fdc4ab90b2e0f91acf4b Mon Sep 17 00:00:00 2001 From: "deepsource-autofix[bot]" <62050782+deepsource-autofix[bot]@users.noreply.github.com> Date: Mon, 6 Nov 2023 05:36:32 +0000 Subject: [PATCH 05/16] style: format code with Gofumpt and Prettier This commit fixes the style issues introduced in 84ade79 according to the output from Gofumpt and Prettier. Details: https://github.com/vdaas/vald/pull/2227 --- pkg/index/job/save/service/indexer_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/index/job/save/service/indexer_test.go b/pkg/index/job/save/service/indexer_test.go index a8af4c546e..274ca534c9 100644 --- a/pkg/index/job/save/service/indexer_test.go +++ b/pkg/index/job/save/service/indexer_test.go @@ -204,7 +204,6 @@ func Test_index_Start(t *testing.T) { if err := checkFunc(test.want, err); err != nil { tt.Errorf("error = %v", err) } - }) } } From 484bb7c9ebb0211bf1102397e2730299db5ce8f6 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 6 Nov 2023 14:58:23 +0900 Subject: [PATCH 06/16] fix: deleted unnecessary code Signed-off-by: hlts2 --- pkg/index/job/save/service/indexer.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/index/job/save/service/indexer.go b/pkg/index/job/save/service/indexer.go index 860c29c37a..ce06496a77 100644 --- a/pkg/index/job/save/service/indexer.go +++ b/pkg/index/job/save/service/indexer.go @@ -183,10 +183,6 @@ func (idx *index) doSaveIndex(ctx context.Context, fn func(_ context.Context, _ st, msg, err = status.ParseError(err, codes.Internal, "failed to parse "+agent.SaveIndexRPCName+" gRPC error response", ) - if st != nil && err != nil && st.Code() == codes.FailedPrecondition { - log.Warnf("SaveIndex of %s skipped, message: %s, err: %v", target, st.Message(), errors.Join(st.Err(), err)) - return nil - } attrs = trace.FromGRPCStatus(st.Code(), msg) } log.Warnf("an error occurred in (%s) during indexing: %v", target, err) From ca83439132bb16eb13b340feafd65ac916b29737 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 6 Nov 2023 15:04:20 +0900 Subject: [PATCH 07/16] fix: update debug error message Signed-off-by: hlts2 --- pkg/index/job/save/service/indexer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/index/job/save/service/indexer.go b/pkg/index/job/save/service/indexer.go index ce06496a77..5c2bab14d7 100644 --- a/pkg/index/job/save/service/indexer.go +++ b/pkg/index/job/save/service/indexer.go @@ -185,7 +185,7 @@ func (idx *index) doSaveIndex(ctx context.Context, fn func(_ context.Context, _ ) attrs = trace.FromGRPCStatus(st.Code(), msg) } - log.Warnf("an error occurred in (%s) during indexing: %v", target, err) + log.Warnf("an error occurred in (%s) during save indexing: %v", target, err) if span != nil { span.RecordError(err) span.SetAttributes(attrs...) From 78f349a7da17cf6b33beaa020c3fb06d92436697 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Mon, 6 Nov 2023 15:07:40 +0900 Subject: [PATCH 08/16] fix: option name Signed-off-by: hlts2 --- pkg/index/job/save/service/options.go | 8 ++++---- pkg/index/job/save/service/options_test.go | 4 ++-- pkg/index/job/save/usecase/save.go | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/index/job/save/service/options.go b/pkg/index/job/save/service/options.go index e015dd363b..bd4c997e37 100644 --- a/pkg/index/job/save/service/options.go +++ b/pkg/index/job/save/service/options.go @@ -22,7 +22,7 @@ import ( type Option func(_ *index) error var defaultOpts = []Option{ - WithIndexingConcurrency(1), + WithSaveIndexingConcurrency(1), } // WithDiscoverer returns Option that sets discoverer client. @@ -36,11 +36,11 @@ func WithDiscoverer(client discoverer.Client) Option { } } -// WithIndexingConcurrency returns Option that sets indexing concurrency. -func WithIndexingConcurrency(num int) Option { +// WithSaveIndexingConcurrency returns Option that sets save indexing concurrency. +func WithSaveIndexingConcurrency(num int) Option { return func(idx *index) error { if num <= 0 { - return errors.NewErrInvalidOption("indexingConcurrency", num) + return errors.NewErrInvalidOption("saveIndexingConcurrency", num) } idx.concurrency = num return nil diff --git a/pkg/index/job/save/service/options_test.go b/pkg/index/job/save/service/options_test.go index 97a83b802f..b8137a55e6 100644 --- a/pkg/index/job/save/service/options_test.go +++ b/pkg/index/job/save/service/options_test.go @@ -101,7 +101,7 @@ package service // } // } // -// func TestWithIndexingConcurrency(t *testing.T) { +// func TestWithSaveIndexingConcurrency(t *testing.T) { // type args struct { // num int // } @@ -178,7 +178,7 @@ package service // checkFunc = defaultCheckFunc // } // -// got := WithIndexingConcurrency(test.args.num) +// got := WithSaveIndexingConcurrency(test.args.num) // if err := checkFunc(test.want, got); err != nil { // tt.Errorf("error = %v", err) // } diff --git a/pkg/index/job/save/usecase/save.go b/pkg/index/job/save/usecase/save.go index a230bb5a11..959eadb1b8 100644 --- a/pkg/index/job/save/usecase/save.go +++ b/pkg/index/job/save/usecase/save.go @@ -84,7 +84,7 @@ func New(cfg *config.Data) (_ runner.Runner, err error) { indexer, err := service.New( service.WithDiscoverer(discoverer), - service.WithIndexingConcurrency(cfg.Save.Concurrency), + service.WithSaveIndexingConcurrency(cfg.Save.Concurrency), service.WithTargetAddrs(cfg.Save.TargetAddrs...), ) if err != nil { From 930382db88bca8443317e8b89128af2411f2b7dc Mon Sep 17 00:00:00 2001 From: hlts2 Date: Tue, 7 Nov 2023 09:50:13 +0900 Subject: [PATCH 09/16] fix: test case name Signed-off-by: hlts2 --- pkg/index/job/save/service/indexer_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/index/job/save/service/indexer_test.go b/pkg/index/job/save/service/indexer_test.go index 274ca534c9..dfd2db4c0e 100644 --- a/pkg/index/job/save/service/indexer_test.go +++ b/pkg/index/job/save/service/indexer_test.go @@ -60,7 +60,7 @@ func Test_index_Start(t *testing.T) { "127.0.0.1:8080", } return test{ - name: "Success: when there is no error in the indexing request process", + name: "Success: when there is no error in the save indexing request process", args: args{ ctx: context.Background(), }, @@ -88,7 +88,7 @@ func Test_index_Start(t *testing.T) { "127.0.0.1:8080", } return test{ - name: "Fail: when there is an error wrapped with gRPC status in the indexing request process", + name: "Fail: when there is an error wrapped with gRPC status in the save indexing request process", args: args{ ctx: context.Background(), }, From 56b63c7b5837a775525a7b74dc7a67e72e3ad46d Mon Sep 17 00:00:00 2001 From: Hiroto Funakoshi Date: Wed, 8 Nov 2023 11:44:41 +0900 Subject: [PATCH 10/16] Update cmd/index/job/save/sample.yaml Co-authored-by: Kiichiro YUKAWA --- cmd/index/job/save/sample.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/index/job/save/sample.yaml b/cmd/index/job/save/sample.yaml index 8b263e9d73..7c2cd85a79 100644 --- a/cmd/index/job/save/sample.yaml +++ b/cmd/index/job/save/sample.yaml @@ -70,7 +70,7 @@ server_config: cert: /path/to/cert enabled: false key: /path/to/key -savior: +saver: agent_port: 8081 agent_name: "vald-agent-ngt" agent_dns: vald-agent-ngt.default.svc.cluster.local From 2956e907fad12461f891ca80782423a1234a03df Mon Sep 17 00:00:00 2001 From: Hiroto Funakoshi Date: Wed, 8 Nov 2023 11:45:58 +0900 Subject: [PATCH 11/16] Update pkg/index/job/save/config/config.go Co-authored-by: Kiichiro YUKAWA --- pkg/index/job/save/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/index/job/save/config/config.go b/pkg/index/job/save/config/config.go index 6810d80241..2e939f2bf2 100644 --- a/pkg/index/job/save/config/config.go +++ b/pkg/index/job/save/config/config.go @@ -36,7 +36,7 @@ type Data struct { Save *config.IndexSave `json:"savior" yaml:"savior"` } -// NewConfig load configurations from file path. +// NewConfig loads configurations from the file path. func NewConfig(path string) (cfg *Data, err error) { cfg = new(Data) From fc44305036bdd13ee29d2c676d73e490ed7cefa3 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 8 Nov 2023 12:19:18 +0900 Subject: [PATCH 12/16] fix: add comment for test case Signed-off-by: hlts2 --- pkg/index/job/save/service/indexer_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/index/job/save/service/indexer_test.go b/pkg/index/job/save/service/indexer_test.go index dfd2db4c0e..9407b8566c 100644 --- a/pkg/index/job/save/service/indexer_test.go +++ b/pkg/index/job/save/service/indexer_test.go @@ -164,6 +164,7 @@ func Test_index_Start(t *testing.T) { fields: fields{ client: &mockDiscovererClient{ GetAddrsFunc: func(_ context.Context) []string { + // NOTE: This function returns nil, meaning that the targetAddrs stored in the field are invalid values. return nil }, }, From 8f49b2c8c6ce0dbb76b210e769e47fb5104b8b5c Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 8 Nov 2023 12:23:32 +0900 Subject: [PATCH 13/16] fix: fix method naming Signed-off-by: hlts2 --- pkg/index/job/save/service/indexer.go | 6 +++--- pkg/index/job/save/service/indexer_test.go | 4 ++-- pkg/index/job/save/usecase/save.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/index/job/save/service/indexer.go b/pkg/index/job/save/service/indexer.go index 5c2bab14d7..b437756061 100644 --- a/pkg/index/job/save/service/indexer.go +++ b/pkg/index/job/save/service/indexer.go @@ -37,7 +37,7 @@ const ( // Indexer represents an interface for indexing. type Indexer interface { - PreStart(ctx context.Context) (<-chan error, error) + StartClient(ctx context.Context) (<-chan error, error) Start(ctx context.Context) error } @@ -70,8 +70,8 @@ func New(opts ...Option) (Indexer, error) { return idx, nil } -// PreStart starts the preparation process. -func (idx *index) PreStart(ctx context.Context) (<-chan error, error) { +// StartClient starts the gRPC client. +func (idx *index) StartClient(ctx context.Context) (<-chan error, error) { return idx.client.Start(ctx) } diff --git a/pkg/index/job/save/service/indexer_test.go b/pkg/index/job/save/service/indexer_test.go index 9407b8566c..c31224ff9d 100644 --- a/pkg/index/job/save/service/indexer_test.go +++ b/pkg/index/job/save/service/indexer_test.go @@ -301,7 +301,7 @@ func Test_index_Start(t *testing.T) { // } // } // -// func Test_index_PreStart(t *testing.T) { +// func Test_index_StartClient(t *testing.T) { // type args struct { // ctx context.Context // } @@ -407,7 +407,7 @@ func Test_index_Start(t *testing.T) { // concurrency: test.fields.concurrency, // } // -// got, err := idx.PreStart(test.args.ctx) +// got, err := idx.StartClient(test.args.ctx) // if err := checkFunc(test.want, got, err); err != nil { // tt.Errorf("error = %v", err) // } diff --git a/pkg/index/job/save/usecase/save.go b/pkg/index/job/save/usecase/save.go index 959eadb1b8..38327e2f1d 100644 --- a/pkg/index/job/save/usecase/save.go +++ b/pkg/index/job/save/usecase/save.go @@ -142,7 +142,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { oech = r.observability.Start(ctx) } sech = r.server.ListenAndServe(ctx) - ipech, err := r.indexer.PreStart(ctx) + cech, err := r.indexer.StartClient(ctx) if err != nil { close(ech) return nil, err @@ -173,7 +173,7 @@ func (r *run) Start(ctx context.Context) (<-chan error, error) { return ctx.Err() case err = <-oech: case err = <-sech: - case err = <-ipech: + case err = <-cech: } if err != nil { select { From f2dc953c4ccb552287e173e23b15753d531cb3e3 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 8 Nov 2023 12:38:50 +0900 Subject: [PATCH 14/16] feat: add mock for discoverer client and grpc client Signed-off-by: hlts2 --- internal/test/mock/discoverer_client_mock.go | 38 +++++++++++++++++++ .../test/mock/grpc_client_mock.go | 25 +++--------- pkg/index/job/save/service/indexer_test.go | 15 ++++---- 3 files changed, 52 insertions(+), 26 deletions(-) create mode 100644 internal/test/mock/discoverer_client_mock.go rename pkg/index/job/save/service/mock_test.go => internal/test/mock/grpc_client_mock.go (65%) diff --git a/internal/test/mock/discoverer_client_mock.go b/internal/test/mock/discoverer_client_mock.go new file mode 100644 index 0000000000..09df7f6a4a --- /dev/null +++ b/internal/test/mock/discoverer_client_mock.go @@ -0,0 +1,38 @@ +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package mock + +import ( + "context" + + "github.com/vdaas/vald/internal/client/v1/client/discoverer" + "github.com/vdaas/vald/internal/net/grpc" +) + +// DiscovererClientMock is the mock for discoverer client. +type DiscovererClientMock struct { + discoverer.Client + GetAddrsFunc func(ctx context.Context) []string + GetClientFunc func() grpc.Client +} + +// GetAddrs calls the GetAddrsFunc object. +func (dc *DiscovererClientMock) GetAddrs(ctx context.Context) []string { + return dc.GetAddrsFunc(ctx) +} + +// GetClient calls GetClientFunc object. +func (dc *DiscovererClientMock) GetClient() grpc.Client { + return dc.GetClientFunc() +} diff --git a/pkg/index/job/save/service/mock_test.go b/internal/test/mock/grpc_client_mock.go similarity index 65% rename from pkg/index/job/save/service/mock_test.go rename to internal/test/mock/grpc_client_mock.go index 89c0f19aa5..d8fa64cd36 100644 --- a/pkg/index/job/save/service/mock_test.go +++ b/internal/test/mock/grpc_client_mock.go @@ -11,30 +11,16 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package service +package mock import ( "context" - "github.com/vdaas/vald/internal/client/v1/client/discoverer" "github.com/vdaas/vald/internal/net/grpc" ) -type mockDiscovererClient struct { - discoverer.Client - GetAddrsFunc func(ctx context.Context) []string - GetClientFunc func() grpc.Client -} - -func (mc *mockDiscovererClient) GetAddrs(ctx context.Context) []string { - return mc.GetAddrsFunc(ctx) -} - -func (mc *mockDiscovererClient) GetClient() grpc.Client { - return mc.GetClientFunc() -} - -type mockGrpcClient struct { +// GRPCClientMock is the mock for gRPC client. +type GRPCClientMock struct { grpc.Client OrderedRangeConcurrentFunc func(ctx context.Context, order []string, @@ -45,7 +31,8 @@ type mockGrpcClient struct { copts ...grpc.CallOption) error) error } -func (mc *mockGrpcClient) OrderedRangeConcurrent(ctx context.Context, +// OrderedRangeConcurrent calls the OrderedRangeConcurrentFunc object. +func (gc *GRPCClientMock) OrderedRangeConcurrent(ctx context.Context, order []string, concurrency int, f func(ctx context.Context, @@ -53,5 +40,5 @@ func (mc *mockGrpcClient) OrderedRangeConcurrent(ctx context.Context, conn *grpc.ClientConn, copts ...grpc.CallOption) error, ) error { - return mc.OrderedRangeConcurrentFunc(ctx, order, concurrency, f) + return gc.OrderedRangeConcurrentFunc(ctx, order, concurrency, f) } diff --git a/pkg/index/job/save/service/indexer_test.go b/pkg/index/job/save/service/indexer_test.go index c31224ff9d..69e381ca31 100644 --- a/pkg/index/job/save/service/indexer_test.go +++ b/pkg/index/job/save/service/indexer_test.go @@ -24,6 +24,7 @@ import ( "github.com/vdaas/vald/internal/net/grpc/codes" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/test/goleak" + "github.com/vdaas/vald/internal/test/mock" ) func Test_index_Start(t *testing.T) { @@ -66,12 +67,12 @@ func Test_index_Start(t *testing.T) { }, fields: fields{ - client: &mockDiscovererClient{ + client: &mock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { return addrs }, GetClientFunc: func() grpc.Client { - return &mockGrpcClient{ + return &mock.GRPCClientMock{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { @@ -93,12 +94,12 @@ func Test_index_Start(t *testing.T) { ctx: context.Background(), }, fields: fields{ - client: &mockDiscovererClient{ + client: &mock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { return addrs }, GetClientFunc: func() grpc.Client { - return &mockGrpcClient{ + return &mock.GRPCClientMock{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { @@ -128,12 +129,12 @@ func Test_index_Start(t *testing.T) { }, fields: fields{ - client: &mockDiscovererClient{ + client: &mock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { return addrs }, GetClientFunc: func() grpc.Client { - return &mockGrpcClient{ + return &mock.GRPCClientMock{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { @@ -162,7 +163,7 @@ func Test_index_Start(t *testing.T) { ctx: context.Background(), }, fields: fields{ - client: &mockDiscovererClient{ + client: &mock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { // NOTE: This function returns nil, meaning that the targetAddrs stored in the field are invalid values. return nil From 66976e9c7fb7dad0c3eb07a10c1fb0cbefcb0d63 Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 8 Nov 2023 15:09:21 +0900 Subject: [PATCH 15/16] fix: struct tag for config Signed-off-by: hlts2 --- pkg/index/job/save/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/index/job/save/config/config.go b/pkg/index/job/save/config/config.go index 2e939f2bf2..975b871c9c 100644 --- a/pkg/index/job/save/config/config.go +++ b/pkg/index/job/save/config/config.go @@ -33,7 +33,7 @@ type Data struct { Observability *config.Observability `json:"observability" yaml:"observability"` // Save represents auto save indexing service configurations. - Save *config.IndexSave `json:"savior" yaml:"savior"` + Save *config.IndexSave `json:"saver" yaml:"saver"` } // NewConfig loads configurations from the file path. From 90cea8893bf8b7d72e6a5a4a7b2de865bca3be6a Mon Sep 17 00:00:00 2001 From: hlts2 Date: Wed, 8 Nov 2023 16:22:11 +0900 Subject: [PATCH 16/16] fix: cyclic import for testing Signed-off-by: hlts2 --- .../mock/{ => client}/discoverer_client_mock.go | 2 +- .../test/mock/{ => grpc}/grpc_client_mock.go | 2 +- pkg/index/job/save/service/indexer_test.go | 17 +++++++++-------- 3 files changed, 11 insertions(+), 10 deletions(-) rename internal/test/mock/{ => client}/discoverer_client_mock.go (98%) rename internal/test/mock/{ => grpc}/grpc_client_mock.go (99%) diff --git a/internal/test/mock/discoverer_client_mock.go b/internal/test/mock/client/discoverer_client_mock.go similarity index 98% rename from internal/test/mock/discoverer_client_mock.go rename to internal/test/mock/client/discoverer_client_mock.go index 09df7f6a4a..3b368eacbe 100644 --- a/internal/test/mock/discoverer_client_mock.go +++ b/internal/test/mock/client/discoverer_client_mock.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package mock +package client import ( "context" diff --git a/internal/test/mock/grpc_client_mock.go b/internal/test/mock/grpc/grpc_client_mock.go similarity index 99% rename from internal/test/mock/grpc_client_mock.go rename to internal/test/mock/grpc/grpc_client_mock.go index d8fa64cd36..926bbf8c61 100644 --- a/internal/test/mock/grpc_client_mock.go +++ b/internal/test/mock/grpc/grpc_client_mock.go @@ -11,7 +11,7 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. -package mock +package grpc import ( "context" diff --git a/pkg/index/job/save/service/indexer_test.go b/pkg/index/job/save/service/indexer_test.go index 69e381ca31..d2341d43b4 100644 --- a/pkg/index/job/save/service/indexer_test.go +++ b/pkg/index/job/save/service/indexer_test.go @@ -24,7 +24,8 @@ import ( "github.com/vdaas/vald/internal/net/grpc/codes" "github.com/vdaas/vald/internal/net/grpc/status" "github.com/vdaas/vald/internal/test/goleak" - "github.com/vdaas/vald/internal/test/mock" + clientmock "github.com/vdaas/vald/internal/test/mock/client" + grpcmock "github.com/vdaas/vald/internal/test/mock/grpc" ) func Test_index_Start(t *testing.T) { @@ -67,12 +68,12 @@ func Test_index_Start(t *testing.T) { }, fields: fields{ - client: &mock.DiscovererClientMock{ + client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { return addrs }, GetClientFunc: func() grpc.Client { - return &mock.GRPCClientMock{ + return &grpcmock.GRPCClientMock{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { @@ -94,12 +95,12 @@ func Test_index_Start(t *testing.T) { ctx: context.Background(), }, fields: fields{ - client: &mock.DiscovererClientMock{ + client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { return addrs }, GetClientFunc: func() grpc.Client { - return &mock.GRPCClientMock{ + return &grpcmock.GRPCClientMock{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { @@ -129,12 +130,12 @@ func Test_index_Start(t *testing.T) { }, fields: fields{ - client: &mock.DiscovererClientMock{ + client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { return addrs }, GetClientFunc: func() grpc.Client { - return &mock.GRPCClientMock{ + return &grpcmock.GRPCClientMock{ OrderedRangeConcurrentFunc: func(_ context.Context, _ []string, _ int, _ func(_ context.Context, _ string, _ *grpc.ClientConn, _ ...grpc.CallOption) error, ) error { @@ -163,7 +164,7 @@ func Test_index_Start(t *testing.T) { ctx: context.Background(), }, fields: fields{ - client: &mock.DiscovererClientMock{ + client: &clientmock.DiscovererClientMock{ GetAddrsFunc: func(_ context.Context) []string { // NOTE: This function returns nil, meaning that the targetAddrs stored in the field are invalid values. return nil