From 366cd02c9ed6b288a8aad0a02bfea67c1ab5a0c9 Mon Sep 17 00:00:00 2001 From: Yusuke Kadowaki Date: Tue, 10 Oct 2023 10:20:16 +0900 Subject: [PATCH] Add index correction helm templates and E2E (#2200) * implement the initail framework * add corrector configuration * add corrector logic * add build make command for index correction binary * add Dockerfile for index correction * add Docker image for index job correction * add timer * fix tag align * tmp * fix log * temporally implement two versions of correct function * set eg limit from config * add stream list concurrency config * implement index id caching * add config to use cache or not * style: Format code with prettier and gofumpt * refactor availableAddrs * add kvs range duration * add leftAgentAddrs for performance * Revert "add kvs range duration" This reverts commit 5b647be6ccc0f9be7e78e38c89ea8897fa3ee574. * refactor * fix without cache bug * enable observability * refactor * SIGTERM after complete * add metrics server * add pcache * remove comment * [TEMP] use pcache * [TMP] use pcache * fix empty shard returns error * fix to use local map * [TMP] add prestop for pcache * [TEMP] add pcache config * style: Format code with prettier and gofumpt * [TEMP] add pcache log * fix map alloc size * [TMP] Add bbolt cache * update bbolt * fix bbolt bug * add bbolt test * [TEMP] use bbolt as persistent cache * style: Format code with prettier and gofumpt * add SetBatch to bbolt * use batch to write map to disk * style: Format code with prettier and gofumpt * delete the map elements on finalize * manually call GC after the map shrink * add limit to SetBatch goroutine number * stop unnecesarry GC * increase eg limit to the MaxBatchSize * use ch to set batch bbolt * fix servers shutdown properly * use internal/kvs/bbolt * refactor * always use bbolt cache for correction * update sample.yaml for correction * style: format code with Prettier and Gofumpt This commit fixes the style issues introduced in 319ec8b according to the output from Prettier and Gofumpt. Details: https://github.com/vdaas/vald/pull/2152 * use go std slices pkg * refactor * add comment * remove valdsync * use vald errgroup * refactor * Define ErrNoAvailableAgentToInsert * update comment in English * Apply new actions yaml format * Disable godox * style: format code with Prettier and Gofumpt This commit fixes the style issues introduced in c860ddc according to the output from Prettier and Gofumpt. Details: https://github.com/vdaas/vald/pull/2194 * remove comment * Apply format * Add type check for type assertion * use const to specify filemode * Add bbolt concurrency as config * fix var style * Suppress linter * fix comment * add test template * Refactor parameters for index correction * Refactor config * Add corrector test * style: format code with Prettier and Gofumpt This commit fixes the style issues introduced in 004bf81 according to the output from Prettier and Gofumpt. Details: https://github.com/vdaas/vald/pull/2194 * Add timestamp check * Apply format * fix schema type * Fix DeepSource errors * Fix misspell * Add type check * Remove unused config * Fix DeepSource error * Add required go:build e2e tag * Remove memo * Refactor comment * Add index job correction helm templates * Add more fields * Add index correction job E2E test * Add e2e action for job * [REVERT THIS] Temporally change version * Fix name and command * Apply format * update crd * Revert "[REVERT THIS] Temporally change version" This reverts commit 1801a63b2bb8826960c3596f42637933f0eab6e6. * Remove unused pkg * Remove experimental file * remove old workflow * Fix cron job name to new one * Update sample.yaml * fix build path * Fix corrector name * add e2e-jobs to slack notification * Update crds --------- Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com> --- .github/helm/values/values-lb.yaml | 5 + .github/workflows/e2e.yml | 37 + Makefile.d/build.mk | 2 +- Makefile.d/e2e.mk | 6 +- Makefile.d/k8s.mk | 14 +- .../vald-helm-operator/crds/valdrelease.yaml | 965 ++++++++++++++++++ .../vald/templates/agent/networkpolicy.yaml | 7 + .../templates/discoverer/networkpolicy.yaml | 7 + .../index/job/correction/configmap.yaml | 71 ++ .../index/job/correction/cronjob.yaml | 62 ++ charts/vald/values.yaml | 62 +- cmd/index/job/correction/sample.yaml | 5 +- tests/e2e/crud/crud_test.go | 36 + tests/e2e/operation/job.go | 121 +++ tests/e2e/operation/operation.go | 30 +- 15 files changed, 1410 insertions(+), 20 deletions(-) create mode 100644 charts/vald/templates/index/job/correction/configmap.yaml create mode 100644 charts/vald/templates/index/job/correction/cronjob.yaml create mode 100644 tests/e2e/operation/job.go diff --git a/.github/helm/values/values-lb.yaml b/.github/helm/values/values-lb.yaml index 591a269672..bc0043b6a9 100644 --- a/.github/helm/values/values-lb.yaml +++ b/.github/helm/values/values-lb.yaml @@ -69,3 +69,8 @@ manager: auto_index_duration_limit: 2m auto_index_check_duration: 30s auto_index_length: 1000 + corrector: + enabled: true + # suspend because you do not want corrector to start automatically in CI + # instead run it manually + suspend: true diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 924865531f..20a9e60c8c 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -227,6 +227,42 @@ jobs: env: POD_NAME: ${{ steps.deploy_vald.outputs.POD_NAME }} + e2e-jobs: + name: "E2E test (Jobs)" + needs: [dump-contexts-to-log] + runs-on: ubuntu-latest + timeout-minutes: 60 + steps: + - uses: actions/checkout@v3 + + - name: Set Git config + run: | + git config --global --add safe.directory ${GITHUB_WORKSPACE} + + - name: Setup E2E environment + id: setup_e2e + uses: ./.github/actions/setup-e2e + + - name: Deploy Vald + id: deploy_vald + uses: ./.github/actions/e2e-deploy-vald + with: + helm_extra_options: ${{ steps.setup_e2e.outputs.HELM_EXTRA_OPTIONS }} + values: .github/helm/values/values-lb.yaml + wait_for_selector: app=vald-lb-gateway + + - name: Run E2E Jobs + run: | + make hack/benchmark/assets/dataset/${{ env.DATASET }} + make E2E_BIND_PORT=8081 \ + E2E_INSERT_COUNT=10000\ + E2E_WAIT_FOR_CREATE_INDEX_DURATION=3m \ + E2E_TARGET_POD_NAME=${POD_NAME} \ + E2E_TARGET_NAMESPACE=default \ + e2e/index/job/correction + env: + POD_NAME: ${{ steps.deploy_vald.outputs.POD_NAME }} + e2e-agent-and-sidecar: name: "E2E Agent & Sidecar test" needs: [dump-contexts-to-log] @@ -278,6 +314,7 @@ jobs: - e2e-stream-crud - e2e-stream-crud-for-operator - e2e-stream-crud-skip-exist-check + - e2e-jobs runs-on: ubuntu-latest steps: - uses: actions/checkout@v3 diff --git a/Makefile.d/build.mk b/Makefile.d/build.mk index 5f0d2089e1..f2f8275b46 100644 --- a/Makefile.d/build.mk +++ b/Makefile.d/build.mk @@ -209,7 +209,7 @@ cmd/manager/index/index: \ cmd/index/job/correction/index-correction: \ $(GO_SOURCES_INTERNAL) \ $(PBGOS) \ - $(shell find $(ROOTDIR)/cmd/index/job/correction/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') \ + $(shell find $(ROOTDIR)/cmd/index/job/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') \ $(shell find $(ROOTDIR)/pkg/index/job/correction -type f -name '*.go' -not -name '*_test.go' -not -name 'doc.go') $(eval CGO_ENABLED = 0) CGO_ENABLED=$(CGO_ENABLED) \ diff --git a/Makefile.d/e2e.mk b/Makefile.d/e2e.mk index 8cd6e43800..f830343788 100644 --- a/Makefile.d/e2e.mk +++ b/Makefile.d/e2e.mk @@ -69,6 +69,11 @@ e2e/remove/timestamp: e2e/insert/search: $(call run-e2e-crud-test,-run TestE2EInsertAndSearch) +.PHONY: e2e/index/job/correction +## run index correction job e2e +e2e/index/job/correction: + $(call run-e2e-crud-test,-run TestE2EIndexJobCorrection) + .PHONY: e2e/maxdim ## run e2e/maxdim e2e/maxdim: @@ -79,4 +84,3 @@ e2e/maxdim: e2e/sidecar: $(call run-e2e-sidecar-test,-run TestE2EForSidecar) - diff --git a/Makefile.d/k8s.mk b/Makefile.d/k8s.mk index 0962b95407..a3b0868056 100644 --- a/Makefile.d/k8s.mk +++ b/Makefile.d/k8s.mk @@ -23,22 +23,26 @@ k8s/manifest/clean: k8s/agent \ k8s/discoverer \ k8s/gateway \ - k8s/manager + k8s/manager \ + k8s/index .PHONY: k8s/manifest/update ## update k8s manifests using helm templates k8s/manifest/update: \ k8s/manifest/clean helm template \ - --values $(HELM_VALUES) \ - --output-dir $(TEMP_DIR) \ - charts/vald + --values $(HELM_VALUES) \ + --set defaults.image.tag=$(VERSION) \ + --output-dir $(TEMP_DIR) \ + charts/vald mkdir -p k8s/gateway mkdir -p k8s/manager + mkdir -p k8s/index/job mv $(TEMP_DIR)/vald/templates/agent k8s/agent mv $(TEMP_DIR)/vald/templates/discoverer k8s/discoverer mv $(TEMP_DIR)/vald/templates/gateway/lb k8s/gateway/lb mv $(TEMP_DIR)/vald/templates/manager/index k8s/manager/index + mv $(TEMP_DIR)/vald/templates/index/job/correction k8s/index/job/correction rm -rf $(TEMP_DIR) .PHONY: k8s/manifest/helm-operator/clean @@ -80,6 +84,7 @@ k8s/vald/deploy: kubectl apply -f $(TEMP_DIR)/vald/templates/agent || true kubectl apply -f $(TEMP_DIR)/vald/templates/discoverer || true kubectl apply -f $(TEMP_DIR)/vald/templates/gateway/lb || true + kubectl apply -f $(TEMP_DIR)/vald/templates/index/job/correction || true rm -rf $(TEMP_DIR) kubectl get pods -o jsonpath="{.items[*].spec.containers[*].image}" | tr " " "\n" @@ -97,6 +102,7 @@ k8s/vald/delete: --set manager.index.image.repository=$(CRORG)/$(MANAGER_INDEX_IMAGE) \ --output-dir $(TEMP_DIR) \ charts/vald + kubectl delete -f $(TEMP_DIR)/vald/templates/index/job/correction kubectl delete -f $(TEMP_DIR)/vald/templates/gateway/lb kubectl delete -f $(TEMP_DIR)/vald/templates/manager/index kubectl delete -f $(TEMP_DIR)/vald/templates/discoverer diff --git a/charts/vald-helm-operator/crds/valdrelease.yaml b/charts/vald-helm-operator/crds/valdrelease.yaml index 46c151ab94..3854881577 100644 --- a/charts/vald-helm-operator/crds/valdrelease.yaml +++ b/charts/vald-helm-operator/crds/valdrelease.yaml @@ -6068,6 +6068,971 @@ spec: annotations: type: object x-kubernetes-preserve-unknown-fields: true + corrector: + type: object + properties: + agent_namespace: + type: string + bbolt_async_write_concurrency: + type: integer + minimum: 1 + discoverer: + type: object + properties: + agent_client_options: + type: object + properties: + addrs: + type: array + items: + type: string + backoff: + type: object + properties: + backoff_factor: + type: number + backoff_time_limit: + type: string + enable_error_log: + type: boolean + initial_duration: + type: string + jitter_limit: + type: string + maximum_duration: + type: string + retry_count: + type: integer + call_option: + type: object + x-kubernetes-preserve-unknown-fields: true + circuit_breaker: + type: object + properties: + closed_error_rate: + type: number + closed_refresh_timeout: + type: string + half_open_error_rate: + type: number + min_samples: + type: integer + open_timeout: + type: string + connection_pool: + type: object + properties: + enable_dns_resolver: + type: boolean + enable_rebalance: + type: boolean + old_conn_close_duration: + type: string + rebalance_duration: + type: string + size: + type: integer + dial_option: + type: object + properties: + backoff_base_delay: + type: string + backoff_jitter: + type: number + backoff_max_delay: + type: string + backoff_multiplier: + type: number + enable_backoff: + type: boolean + initial_connection_window_size: + type: integer + initial_window_size: + type: integer + insecure: + type: boolean + interceptors: + type: array + items: + type: string + enum: + - TraceInterceptor + keepalive: + type: object + properties: + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_msg_size: + type: integer + min_connection_timeout: + type: string + net: + type: object + properties: + dialer: + type: object + properties: + dual_stack_enabled: + type: boolean + keepalive: + type: string + timeout: + type: string + dns: + type: object + properties: + cache_enabled: + type: boolean + cache_expiration: + type: string + refresh_duration: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + read_buffer_size: + type: integer + timeout: + type: string + write_buffer_size: + type: integer + health_check_duration: + type: string + max_recv_msg_size: + type: integer + max_retry_rpc_buffer_size: + type: integer + max_send_msg_size: + type: integer + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + wait_for_ready: + type: boolean + client: + type: object + properties: + addrs: + type: array + items: + type: string + backoff: + type: object + properties: + backoff_factor: + type: number + backoff_time_limit: + type: string + enable_error_log: + type: boolean + initial_duration: + type: string + jitter_limit: + type: string + maximum_duration: + type: string + retry_count: + type: integer + call_option: + type: object + x-kubernetes-preserve-unknown-fields: true + circuit_breaker: + type: object + properties: + closed_error_rate: + type: number + closed_refresh_timeout: + type: string + half_open_error_rate: + type: number + min_samples: + type: integer + open_timeout: + type: string + connection_pool: + type: object + properties: + enable_dns_resolver: + type: boolean + enable_rebalance: + type: boolean + old_conn_close_duration: + type: string + rebalance_duration: + type: string + size: + type: integer + dial_option: + type: object + properties: + backoff_base_delay: + type: string + backoff_jitter: + type: number + backoff_max_delay: + type: string + backoff_multiplier: + type: number + enable_backoff: + type: boolean + initial_connection_window_size: + type: integer + initial_window_size: + type: integer + insecure: + type: boolean + interceptors: + type: array + items: + type: string + enum: + - TraceInterceptor + keepalive: + type: object + properties: + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_msg_size: + type: integer + min_connection_timeout: + type: string + net: + type: object + properties: + dialer: + type: object + properties: + dual_stack_enabled: + type: boolean + keepalive: + type: string + timeout: + type: string + dns: + type: object + properties: + cache_enabled: + type: boolean + cache_expiration: + type: string + refresh_duration: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + read_buffer_size: + type: integer + timeout: + type: string + write_buffer_size: + type: integer + health_check_duration: + type: string + max_recv_msg_size: + type: integer + max_retry_rpc_buffer_size: + type: integer + max_send_msg_size: + type: integer + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + wait_for_ready: + type: boolean + duration: + type: string + enabled: + type: boolean + env: + type: array + items: + type: object + x-kubernetes-preserve-unknown-fields: true + image: + type: object + properties: + pullPolicy: + type: string + enum: + - Always + - Never + - IfNotPresent + repository: + type: string + tag: + type: string + name: + type: string + node_name: + type: string + observability: + type: object + properties: + enabled: + type: boolean + metrics: + type: object + properties: + enable_cgo: + type: boolean + enable_goroutine: + type: boolean + enable_memory: + type: boolean + enable_version_info: + type: boolean + version_info_labels: + type: array + items: + type: string + enum: + - vald_version + - server_name + - git_commit + - build_time + - go_version + - go_os + - go_arch + - cgo_enabled + - ngt_version + - build_cpu_info_flags + otlp: + type: object + properties: + attribute: + type: object + properties: + namespace: + type: string + node_name: + type: string + pod_name: + type: string + service_name: + type: string + collector_endpoint: + type: string + metrics_export_interval: + type: string + metrics_export_timeout: + type: string + trace_batch_timeout: + type: string + trace_export_timeout: + type: string + trace_max_export_batch_size: + type: integer + trace_max_queue_size: + type: integer + trace: + type: object + properties: + enabled: + type: boolean + schedule: + type: string + server_config: + type: object + properties: + full_shutdown_duration: + type: string + healths: + type: object + properties: + liveness: + type: object + properties: + enabled: + type: boolean + host: + type: string + livenessProbe: + type: object + properties: + failureThreshold: + type: integer + httpGet: + type: object + properties: + path: + type: string + port: + type: string + scheme: + type: string + initialDelaySeconds: + type: integer + periodSeconds: + type: integer + successThreshold: + type: integer + timeoutSeconds: + type: integer + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + readiness: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + readinessProbe: + type: object + properties: + failureThreshold: + type: integer + httpGet: + type: object + properties: + path: + type: string + port: + type: string + scheme: + type: string + initialDelaySeconds: + type: integer + periodSeconds: + type: integer + successThreshold: + type: integer + timeoutSeconds: + type: integer + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + startup: + type: object + properties: + enabled: + type: boolean + port: + type: integer + maximum: 65535 + minimum: 0 + startupProbe: + type: object + properties: + failureThreshold: + type: integer + httpGet: + type: object + properties: + path: + type: string + port: + type: string + scheme: + type: string + initialDelaySeconds: + type: integer + periodSeconds: + type: integer + successThreshold: + type: integer + timeoutSeconds: + type: integer + metrics: + type: object + properties: + pprof: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + servers: + type: object + properties: + grpc: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + grpc: + type: object + properties: + bidirectional_stream_concurrency: + type: integer + connection_timeout: + type: string + enable_reflection: + type: boolean + header_table_size: + type: integer + initial_conn_window_size: + type: integer + initial_window_size: + type: integer + interceptors: + type: array + items: + type: string + enum: + - RecoverInterceptor + - AccessLogInterceptor + - TraceInterceptor + - MetricInterceptor + keepalive: + type: object + properties: + max_conn_age: + type: string + max_conn_age_grace: + type: string + max_conn_idle: + type: string + min_time: + type: string + permit_without_stream: + type: boolean + time: + type: string + timeout: + type: string + max_header_list_size: + type: integer + max_receive_message_size: + type: integer + max_send_message_size: + type: integer + read_buffer_size: + type: integer + write_buffer_size: + type: integer + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + restart: + type: boolean + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + rest: + type: object + properties: + enabled: + type: boolean + host: + type: string + port: + type: integer + maximum: 65535 + minimum: 0 + server: + type: object + properties: + http: + type: object + properties: + handler_timeout: + type: string + idle_timeout: + type: string + read_header_timeout: + type: string + read_timeout: + type: string + shutdown_duration: + type: string + write_timeout: + type: string + mode: + type: string + network: + type: string + enum: + - tcp + - tcp4 + - tcp6 + - udp + - udp4 + - udp6 + - unix + - unixgram + - unixpacket + probe_wait_time: + type: string + socket_option: + type: object + properties: + ip_recover_destination_addr: + type: boolean + ip_transparent: + type: boolean + reuse_addr: + type: boolean + reuse_port: + type: boolean + tcp_cork: + type: boolean + tcp_defer_accept: + type: boolean + tcp_fast_open: + type: boolean + tcp_no_delay: + type: boolean + tcp_quick_ack: + type: boolean + socket_path: + type: string + servicePort: + type: integer + maximum: 65535 + minimum: 0 + tls: + type: object + properties: + ca: + type: string + cert: + type: string + enabled: + type: boolean + insecure_skip_verify: + type: boolean + key: + type: string + startingDeadlineSeconds: + type: integer + stream_list_concurrency: + type: integer + minimum: 1 + suspend: + type: boolean + ttlSecondsAfterFinished: + type: integer + version: + type: string + pattern: ^v[0-9]+\.[0-9]+\.[0-9]$ enabled: type: boolean env: diff --git a/charts/vald/templates/agent/networkpolicy.yaml b/charts/vald/templates/agent/networkpolicy.yaml index e6e1ef90bb..ef51ea5439 100644 --- a/charts/vald/templates/agent/networkpolicy.yaml +++ b/charts/vald/templates/agent/networkpolicy.yaml @@ -17,6 +17,7 @@ {{- $agent := .Values.agent -}} {{- $lb := .Values.gateway.lb -}} {{- $index := .Values.manager.index -}} +{{- $corrector := .Values.manager.index.corrector -}} {{- if .Values.defaults.networkPolicy.enabled }} apiVersion: networking.k8s.io/v1 kind: NetworkPolicy @@ -44,6 +45,12 @@ spec: podSelector: matchLabels: app: {{ $index.name }} + - namespaceSelector: + matchLabels: + kubernetes.io/metadata.name: {{ .Release.Namespace }} + podSelector: + matchLabels: + app: {{ $corrector.name }} {{- if .Values.defaults.networkPolicy.custom.ingress }} {{- toYaml .Values.defaults.networkPolicy.custom.ingress | nindent 4 }} {{- end }} diff --git a/charts/vald/templates/discoverer/networkpolicy.yaml b/charts/vald/templates/discoverer/networkpolicy.yaml index 065cc97b59..41803702dd 100644 --- a/charts/vald/templates/discoverer/networkpolicy.yaml +++ b/charts/vald/templates/discoverer/networkpolicy.yaml @@ -17,6 +17,7 @@ {{- $discoverer := .Values.discoverer -}} {{- $lb := .Values.gateway.lb -}} {{- $index := .Values.manager.index -}} +{{- $corrector := .Values.manager.index.corrector -}} {{- if .Values.defaults.networkPolicy.enabled }} apiVersion: networking.k8s.io/v1 kind: NetworkPolicy @@ -46,6 +47,12 @@ spec: podSelector: matchLabels: app: {{ $index.name }} + - namespaceSelector: + matchLabels: + kubernetes.io/metadata.name: {{ .Release.Namespace }} + podSelector: + matchLabels: + app: {{ $corrector.name }} {{- if .Values.defaults.networkPolicy.custom.ingress }} {{- toYaml .Values.defaults.networkPolicy.custom.ingress | nindent 4 }} {{- end }} diff --git a/charts/vald/templates/index/job/correction/configmap.yaml b/charts/vald/templates/index/job/correction/configmap.yaml new file mode 100644 index 0000000000..77db8328e2 --- /dev/null +++ b/charts/vald/templates/index/job/correction/configmap.yaml @@ -0,0 +1,71 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +{{- $corrector := .Values.manager.index.corrector -}} +{{- $gateway := .Values.gateway.lb -}} +{{- $index := .Values.manager.index -}} +{{- $agent := .Values.agent -}} +{{- $discoverer := .Values.discoverer -}} +{{- if $corrector.enabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ $corrector.name }}-config + labels: + app.kubernetes.io/name: {{ include "vald.name" . }} + helm.sh/chart: {{ include "vald.chart" . }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/version: {{ .Chart.Version }} + app.kubernetes.io/component: {{ $corrector.name }} +data: + config.yaml: | + --- + version: {{ $corrector.version }} + time_zone: {{ default .Values.defaults.time_zone $corrector.time_zone }} + logging: + {{- $logging := dict "Values" $corrector.logging "default" .Values.defaults.logging }} + {{- include "vald.logging" $logging | nindent 6 }} + server_config: + {{- $servers := dict "Values" $corrector.server_config "default" .Values.defaults.server_config }} + {{- include "vald.servers" $servers | nindent 6 }} + observability: + {{- $observability := dict "Values" $corrector.observability "default" .Values.defaults.observability }} + {{- include "vald.observability" $observability | nindent 6 }} + corrector: + agent_port: {{ default .Values.defaults.server_config.servers.grpc.port $agent.server_config.servers.grpc.port }} + agent_name: {{ $agent.name | quote }} + agent_dns: {{ $agent.name }}.{{ .Release.Namespace }}.svc.cluster.local + agent_namespace: {{ $index.indexer.agent_namespace | quote }} + node_name: {{ $index.indexer.node_name | quote }} + stream_list_concurrency: {{ $corrector.stream_list_concurrency }} + bbolt_async_write_concurrency: {{ $corrector.bbolt_async_write_concurrency }} + index_replica: {{ $gateway.gateway_config.index_replica }} + discoverer: + duration: {{ $corrector.discoverer.duration }} + client: + {{- $discovererClient := $index.indexer.discoverer.client }} + {{- $discovererServerPort := $discoverer.server_config.servers.grpc.port }} + {{- $defaultDiscovererHost := printf "%s.%s.svc.cluster.local" $discoverer.name .Release.Namespace }} + {{- $defaultDiscovererPort := default .Values.defaults.server_config.servers.grpc.port $discovererServerPort }} + {{- $defaultDiscovererAddr := (list (printf "%s:%d" $defaultDiscovererHost (int64 $defaultDiscovererPort))) }} + {{- $discovererAddrs := dict "Values" $discovererClient.addrs "default" $defaultDiscovererAddr }} + {{- include "vald.grpc.client.addrs" $discovererAddrs | nindent 10 }} + {{- $discovererGRPCclient := dict "Values" $discovererClient "default" .Values.defaults.grpc.client }} + {{- include "vald.grpc.client" $discovererGRPCclient | nindent 10 }} + agent_client_options: + {{- include "vald.grpc.client.addrs" (dict "Values" $corrector.discoverer.agent_client_options.addrs) | nindent 10 }} + {{- include "vald.grpc.client" (dict "Values" $corrector.discoverer.agent_client_options "default" .Values.defaults.grpc.client) | nindent 10 }} +{{- end }} diff --git a/charts/vald/templates/index/job/correction/cronjob.yaml b/charts/vald/templates/index/job/correction/cronjob.yaml new file mode 100644 index 0000000000..70c3f408f5 --- /dev/null +++ b/charts/vald/templates/index/job/correction/cronjob.yaml @@ -0,0 +1,62 @@ +# +# Copyright (C) 2019-2023 vdaas.org vald team +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# You may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +{{- $corrector := .Values.manager.index.corrector -}} +{{- if $corrector.enabled }} +apiVersion: batch/v1 +kind: CronJob +metadata: + name: {{ $corrector.name }} + labels: + app: {{ $corrector.name }} + app.kubernetes.io/name: {{ include "vald.name" . }} + helm.sh/chart: {{ include "vald.chart" . }} + app.kubernetes.io/managed-by: {{ .Release.Service }} + app.kubernetes.io/instance: {{ .Release.Name }} + app.kubernetes.io/version: {{ .Chart.Version }} +spec: + schedule: {{ $corrector.schedule }} + concurrencyPolicy: Forbid + suspend: {{ $corrector.suspend }} + startingDeadlineSeconds: {{ $corrector.startingDeadlineSeconds }} + jobTemplate: + spec: + ttlSecondsAfterFinished: {{ $corrector.ttlSecondsAfterFinished }} + template: + metadata: + labels: + app: {{ $corrector.name }} + spec: + containers: + - name: {{ $corrector.name }} + image: "{{ $corrector.image.repository }}:{{ default .Values.defaults.image.tag $corrector.image.tag }}" + imagePullPolicy: {{ $corrector.image.pullPolicy }} + volumeMounts: + - name: {{ $corrector.name }}-config + mountPath: /etc/server/ + {{- $servers := dict "Values" $corrector.server_config "default" .Values.defaults.server_config -}} + {{- include "vald.containerPorts" $servers | trim | nindent 14 }} + {{- if $corrector.env }} + env: + {{- toYaml $corrector.env | nindent 16 }} + {{- end }} + restartPolicy: OnFailure + volumes: + - name: {{ $corrector.name }}-config + configMap: + defaultMode: 420 + name: {{ $corrector.name }}-config +{{- end }} diff --git a/charts/vald/values.yaml b/charts/vald/values.yaml index 0cc8c02bbb..f095a921bf 100644 --- a/charts/vald/values.yaml +++ b/charts/vald/values.yaml @@ -2633,12 +2633,68 @@ manager: keepalive: 15m #indexer fetches uncommitted index length, which includes huge payload so we need to set keepalive longer than usual # @schema {"name": "manager.index.corrector", "type": "object"} corrector: + # @schema {"name": "manager.index.corrector.name", "type": "string"} + # manager.index.corrector.name -- name of index correction job + name: vald-index-correction + # @schema {"name": "manager.index.corrector.image", "alias": "image"} + image: + # manager.index.corrector.image.repository -- image repository + repository: vdaas/vald-index-correction + # manager.index.corrector.image.tag -- image tag (overrides defaults.image.tag) + tag: "" + # manager.index.image.pullPolicy -- image pull policy + pullPolicy: Always + # @schema {"name": "manager.index.corrector.server_config", "alias": "server_config"} + # manager.index.corrector.server_config -- server config (overrides defaults.server_config) + server_config: + servers: + rest: {} + grpc: {} + healths: + liveness: {} + readiness: {} + startup: {} + metrics: + pprof: {} + # @schema {"name": "manager.index.corrector.env", "alias": "env"} + # manager.index.corrector.env -- environment variables + env: + - name: MY_NODE_NAME + valueFrom: + fieldRef: + fieldPath: spec.nodeName + - name: MY_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: MY_POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + # @schema {"name": "manager.index.corrector.observability", "alias": "observability"} + # manager.index.corrector.observability -- observability config (overrides defaults.observability) + observability: + otlp: + attribute: + service_name: vald-manager-index # @schema {"name": "manager.index.corrector.enabled", "type": "boolean"} # manager.index.corrector.enabled -- enable index correction CronJob enabled: false - # @schema {"name": "manager.index.corrector.check_duration", "type": "string"} - # manager.index.corrector.enabled -- check duration of index correction CronJob - check_duration: 24h + # @schema {"name": "manager.index.corrector.schedule", "type": "string"} + # manager.index.corrector.schedule -- CronJob schedule setting for index correction + schedule: "5 * * * *" + # @schema {"name": "manager.index.corrector.suspend", "type": "boolean"} + # manager.index.corrector.suspend -- CronJob suspend setting for index correction + suspend: false + # @schema {"name": "manager.index.corrector.ttlSecondsAfterFinished", "type": "integer"} + # manager.index.corrector.ttlSecondsAfterFinished -- ttl setting for K8s completed jobs + ttlSecondsAfterFinished: 86400 + # @schema {"name": "manager.index.corrector.startingDeadlineSeconds", "type": "integer"} + # manager.index.corrector.startingDeadlineSeconds -- startingDeadlineSeconds setting for K8s completed jobs + startingDeadlineSeconds: 86400 + # @schema {"name": "manager.index.corrector.version", "alias": "version"} + # manager.index.corrector.version -- version of index manager config + version: v0.0.0 # @schema {"name": "manager.index.corrector.stream_list_concurrency", "type": "integer", "minimum": 1} # manager.index.corrector.stream_list_concurrency -- concurrency for stream list object rpc stream_list_concurrency: 200 diff --git a/cmd/index/job/correction/sample.yaml b/cmd/index/job/correction/sample.yaml index 09ad7dc5ca..bc3f144e41 100644 --- a/cmd/index/job/correction/sample.yaml +++ b/cmd/index/job/correction/sample.yaml @@ -70,8 +70,6 @@ server_config: cert: /path/to/cert enabled: false key: /path/to/key -gateway: - index_replica: 3 corrector: agent_port: 8081 agent_name: "vald-agent-ngt" @@ -80,6 +78,7 @@ corrector: node_name: "" stream_list_concurrency: 200 bbolt_async_write_concurrency: 2048 + index_replica: 3 discoverer: duration: 500ms client: @@ -215,7 +214,7 @@ observability: namespace: "_MY_POD_NAMESPACE_" pod_name: "_MY_POD_NAME_" node_name: "_MY_NODE_NAME_" - service_name: "vald-index-job-correction" + service_name: "vald-index-correction" metrics: enable_cgo: true enable_goroutine: true diff --git a/tests/e2e/crud/crud_test.go b/tests/e2e/crud/crud_test.go index 97e3428e08..f78a99c6aa 100644 --- a/tests/e2e/crud/crud_test.go +++ b/tests/e2e/crud/crud_test.go @@ -738,3 +738,39 @@ func TestE2ECRUDWithSkipStrictExistCheck(t *testing.T) { t.Fatalf("an error occurred on #13: %s", err) } } + +// TestE2EIndexJobCorrection tests the index correction job. +// It inserts vectors, runs the index correction job, and then removes the vectors. +// TODO: Add index replica count check after inplementing StreamListObject in LB +func TestE2EIndexJobCorrection(t *testing.T) { + t.Cleanup(teardown) + ctx := context.Background() + + op, err := operation.New(host, port) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + train := ds.Train[insertFrom : insertFrom+insertNum] + err = op.Insert(t, ctx, operation.Dataset{ + Train: train, + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + sleep(t, waitAfterInsertDuration) + + exe := operation.NewCronJobExecutor("vald-index-correction") + err = exe.CreateAndWait(t, ctx, "correction-test") + if err != nil { + t.Fatalf("an error occurred: %s", err) + } + + err = op.Remove(t, ctx, operation.Dataset{ + Train: train, + }) + if err != nil { + t.Fatalf("an error occurred: %s", err) + } +} diff --git a/tests/e2e/operation/job.go b/tests/e2e/operation/job.go new file mode 100644 index 0000000000..9c3567929d --- /dev/null +++ b/tests/e2e/operation/job.go @@ -0,0 +1,121 @@ +//go:build e2e + +// Copyright (C) 2019-2023 vdaas.org vald team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// You may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package operation + +import ( + "context" + "fmt" + "os/exec" + "testing" +) + +func (j *cronJobExecute) CreateAndWait(t *testing.T, ctx context.Context, jobName string) error { + if err := createJob(t, jobName, j.cronJob); err != nil { + return err + } + + defer func() { + err := deleteJob(t, jobName) + if err != nil { + t.Errorf("failed to delete job: %s", err) + } + }() + + return waitJob(t, ctx, jobName) +} + +func createJob(t *testing.T, jobName, cronJobName string) error { + t.Helper() + t.Logf("creating job: %s from CronJob %s", jobName, cronJobName) + createCmd := fmt.Sprintf("kubectl create job %s --from=cronjob/%s", jobName, cronJobName) + cmd := exec.Command("sh", "-c", createCmd) + return execCmd(t, cmd) +} + +func deleteJob(t *testing.T, jobName string) error { + t.Helper() + t.Log("deleting correction job") + deleteKubeCmd := fmt.Sprintf("kubectl delete job %s", jobName) + cmd := exec.Command("sh", "-c", deleteKubeCmd) + return execCmd(t, cmd) +} + +func waitJob(t *testing.T, ctx context.Context, jobName string) error { + t.Helper() + t.Log("waiting for the correction job to complete or fail") + waitCompleteCmd := fmt.Sprintf("kubectl wait --timeout=-1s job/%s --for=condition=complete", jobName) + waitFailedCmd := fmt.Sprintf("kubectl wait --timeout=-1s job/%s --for=condition=failed", jobName) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + complete := make(chan struct{}) + failed := make(chan struct{}) + ech := make(chan error) + go func() { + cmd := exec.CommandContext(ctx, "sh", "-c", waitCompleteCmd) + err := execCmd(t, cmd) + if err != nil { + ech <- err + return + } + + complete <- struct{}{} + }() + + go func() { + cmd := exec.CommandContext(ctx, "sh", "-c", waitFailedCmd) + err := execCmd(t, cmd) + if err != nil { + ech <- err + return + } + + t.Logf("%s failed. dumping status", jobName) + dumpStatusCmd := fmt.Sprintf("kubectl get job %s -o yaml", jobName) + cmd = exec.Command("sh", "-c", dumpStatusCmd) + err = execCmd(t, cmd) + if err != nil { + t.Log("failed to dump status") + } + failed <- struct{}{} + }() + + select { + case <-ctx.Done(): + return ctx.Err() + case <-complete: + return nil + case <-failed: + return fmt.Errorf("correction job failed") + case err := <-ech: + return err + } +} + +func execCmd(t *testing.T, cmd *exec.Cmd) error { + t.Helper() + out, err := cmd.Output() + if err != nil { + if exitErr, ok := err.(*exec.ExitError); ok { + return fmt.Errorf("%s, %s, %w", string(out), string(exitErr.Stderr), err) + } else { + return fmt.Errorf("unexpected error on creating job: %w", err) + } + } + t.Log(string(out)) + return nil +} diff --git a/tests/e2e/operation/operation.go b/tests/e2e/operation/operation.go index 332f8fdaf0..6cfc078ee8 100644 --- a/tests/e2e/operation/operation.go +++ b/tests/e2e/operation/operation.go @@ -28,11 +28,6 @@ import ( "google.golang.org/grpc/keepalive" ) -type client struct { - host string - port int -} - type Dataset struct { Train [][]float32 Test [][]float32 @@ -138,9 +133,12 @@ type Client interface { IndexInfo(t *testing.T, ctx context.Context) (*payload.Info_Index_Count, error) } -const ( - defaultSearchTimeout = 4 * int64(time.Second) -) +type client struct { + host string + port int +} + +var _ Client = (*client)(nil) func New(host string, port int) (Client, error) { return &client{ @@ -229,3 +227,19 @@ func (c *client) recall(results []string, neighbors []int) (recall float64) { return recall / float64(len(neighbors)) } + +type JobExecutor interface { + CreateAndWait(t *testing.T, ctx context.Context, jobName string) error +} + +type cronJobExecute struct { + cronJob string +} + +var _ JobExecutor = (*cronJobExecute)(nil) + +func NewCronJobExecutor(cronJob string) JobExecutor { + return &cronJobExecute{ + cronJob: cronJob, + } +}