Skip to content

Commit

Permalink
Add verification for index correction e2e and add clusterrole cronjob…
Browse files Browse the repository at this point in the history
…s for operator to deploy index correction (#2205)

* implement the initail framework

* add corrector configuration

* add corrector logic

* add build make command for index correction binary

* add Dockerfile for index correction

* add Docker image for index job correction

* add timer

* fix tag align

* tmp

* fix log

* temporally implement two versions of correct function

* set eg limit from config

* add stream list concurrency config

* implement index id caching

* add config to use cache or not

* style: Format code with prettier and gofumpt

* refactor availableAddrs

* add kvs range duration

* add leftAgentAddrs for performance

* Revert "add kvs range duration"

This reverts commit 5b647be.

* refactor

* fix without cache bug

* enable observability

* refactor

* SIGTERM after complete

* add metrics server

* add pcache

* remove comment

* [TEMP] use pcache

* [TMP] use pcache

* fix empty shard returns error

* fix to use local map

* [TMP] add prestop for pcache

* [TEMP] add pcache config

* style: Format code with prettier and gofumpt

* [TEMP] add pcache log

* fix map alloc size

* [TMP] Add bbolt cache

* update bbolt

* fix bbolt bug

* add bbolt test

* [TEMP] use bbolt as persistent cache

* style: Format code with prettier and gofumpt

* add SetBatch to bbolt

* use batch to write map to disk

* style: Format code with prettier and gofumpt

* delete the map elements on finalize

* manually call GC after the map shrink

* add limit to SetBatch goroutine number

* stop unnecesarry GC

* increase eg limit to the MaxBatchSize

* use ch to set batch bbolt

* fix servers shutdown properly

* use internal/kvs/bbolt

* refactor

* always use bbolt cache for correction

* update sample.yaml for correction

* style: format code with Prettier and Gofumpt

This commit fixes the style issues introduced in 319ec8b according to the output
from Prettier and Gofumpt.

Details: #2152

* use go std slices pkg

* refactor

* add comment

* remove valdsync

* use vald errgroup

* refactor

* Define ErrNoAvailableAgentToInsert

* update comment in English

* Apply new actions yaml format

* Disable godox

* style: format code with Prettier and Gofumpt

This commit fixes the style issues introduced in c860ddc according to the output
from Prettier and Gofumpt.

Details: #2194

* remove comment

* Apply format

* Add type check for type assertion

* use const to specify filemode

* Add bbolt concurrency as config

* fix var style

* Suppress linter

* fix comment

* add test template

* Refactor parameters for index correction

* Refactor config

* Add corrector test

* style: format code with Prettier and Gofumpt

This commit fixes the style issues introduced in 004bf81 according to the output
from Prettier and Gofumpt.

Details: #2194

* Add timestamp check

* Apply format

* fix schema type

* Fix DeepSource errors

* Fix misspell

* Add type check

* Remove unused config

* Fix DeepSource error

* Add required go:build e2e tag

* Remove memo

* Refactor comment

* Add index job correction helm templates

* Add more fields

* Add index correction job E2E test

* Add e2e action for job

* [REVERT THIS] Temporally change version

* Fix name and command

* Apply format

* update crd

* Revert "[REVERT THIS] Temporally change version"

This reverts commit 1801a63.

* Remove unused pkg

* Remove experimental file

* remove old workflow

* Fix cron job name to new one

* Update sample.yaml

* fix build path

* Fix corrector name

* add e2e-jobs to slack notification

* Update crds

* Add StreamListObject to LB

* Add E2E for StreamListObject

* Update error handling

* Fix StreamListObject e2e verification

* Add StreamListObject to LB

* Add E2E for StreamListObject

* Update error handling

* Fix StreamListObject e2e verification

* Update index correction e2e to verify correction result with StramListObject

* Make it possible to deploy index correction cronjob from operator

* Update operator manifests

* Make schedule field empty so that a user has to specify manually

* add default schedule of index correction

---------

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>
  • Loading branch information
2 people authored and kmrmt committed Dec 12, 2023
1 parent c3e3281 commit 3a6b83d
Show file tree
Hide file tree
Showing 9 changed files with 1,050 additions and 11 deletions.
1 change: 1 addition & 0 deletions .github/helm/values/values-lb.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,4 @@ manager:
# suspend because you do not want corrector to start automatically in CI
# instead run it manually
suspend: true
schedule: "1 2 3 4 5"
4 changes: 4 additions & 0 deletions .github/valdrelease/valdrelease.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,7 @@ spec:
auto_index_duration_limit: 2m
auto_index_check_duration: 30s
auto_index_length: 1000
corrector:
enabled: true
suspend: true
schedule: "1 2 3 4 5"
13 changes: 13 additions & 0 deletions charts/vald-helm-operator/templates/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,4 +175,17 @@ rules:
- get
- patch
- update
- apiGroups:
- batch
resources:
- cronjobs
verbs:
- create
- delete
- get
- list
- patch
- update
- watch

{{- end }}
2 changes: 1 addition & 1 deletion charts/vald/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2682,7 +2682,7 @@ manager:
enabled: false
# @schema {"name": "manager.index.corrector.schedule", "type": "string"}
# manager.index.corrector.schedule -- CronJob schedule setting for index correction
schedule: "5 * * * *"
schedule: "6 3 * * *"
# @schema {"name": "manager.index.corrector.suspend", "type": "boolean"}
# manager.index.corrector.suspend -- CronJob suspend setting for index correction
suspend: false
Expand Down
12 changes: 12 additions & 0 deletions k8s/operator/helm/clusterrole.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,15 @@ rules:
- get
- patch
- update
- apiGroups:
- batch
resources:
- cronjobs
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
965 changes: 965 additions & 0 deletions k8s/operator/helm/crds/valdrelease.yaml

Large diffs are not rendered by default.

58 changes: 50 additions & 8 deletions tests/e2e/crud/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"flag"
"fmt"
"os"
"os/exec"
"testing"
"time"

Expand All @@ -42,13 +43,14 @@ var (
port int
ds *hdf5.Dataset

insertNum int
searchNum int
searchByIDNum int
getObjectNum int
updateNum int
upsertNum int
removeNum int
insertNum int
correctionInsertNum int
searchNum int
searchByIDNum int
getObjectNum int
updateNum int
upsertNum int
removeNum int

insertFrom int
searchFrom int
Expand All @@ -73,6 +75,7 @@ func init() {
flag.IntVar(&port, "port", 8081, "gRPC port")

flag.IntVar(&insertNum, "insert-num", 10000, "number of id-vector pairs used for insert")
flag.IntVar(&correctionInsertNum, "correction-insert-num", 3000, "number of id-vector pairs used for insert")
flag.IntVar(&searchNum, "search-num", 10000, "number of id-vector pairs used for search")
flag.IntVar(&searchByIDNum, "search-by-id-num", 100, "number of id-vector pairs used for search-by-id")
flag.IntVar(&getObjectNum, "get-object-num", 100, "number of id-vector pairs used for get-object")
Expand Down Expand Up @@ -758,7 +761,9 @@ func TestE2EIndexJobCorrection(t *testing.T) {
t.Fatalf("an error occurred: %s", err)
}

train := ds.Train[insertFrom : insertFrom+insertNum]
// prepare train data
train := ds.Train[insertFrom : insertFrom+correctionInsertNum]

err = op.Insert(t, ctx, operation.Dataset{
Train: train,
})
Expand All @@ -768,12 +773,49 @@ func TestE2EIndexJobCorrection(t *testing.T) {

sleep(t, waitAfterInsertDuration)

t.Log("Test case 1: just execute index correction and check if replica number is correct after correction")
exe := operation.NewCronJobExecutor("vald-index-correction")
err = exe.CreateAndWait(t, ctx, "correction-test")
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

// check if replica number is correct
err = op.StreamListObject(t, ctx, operation.Dataset{
Train: train,
})
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

t.Log("Test case 2: execute index correction after one agent removed")
t.Log("removing vald-agent-ngt-0...")
cmd := exec.CommandContext(ctx, "sh", "-c", "kubectl delete pod vald-agent-ngt-0 && kubectl wait --for=condition=Ready pod/vald-agent-ngt-0")
out, err := cmd.Output()
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
t.Fatalf("%s, %s, %v", string(out), string(exitErr.Stderr), err)
} else {
t.Fatalf("unexpected error on creating job: %v", err)
}
}
t.Log(string(out))

// correct the deleted index
err = exe.CreateAndWait(t, ctx, "correction-test")
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

// check if replica number is correct
err = op.StreamListObject(t, ctx, operation.Dataset{
Train: train,
})
if err != nil {
t.Fatalf("an error occurred: %s", err)
}

t.Log("Tear down. Removing all vectors...")
err = op.Remove(t, ctx, operation.Dataset{
Train: train,
})
Expand Down
4 changes: 2 additions & 2 deletions tests/e2e/operation/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ func deleteJob(t *testing.T, jobName string) error {
func waitJob(t *testing.T, ctx context.Context, jobName string) error {
t.Helper()
t.Log("waiting for the correction job to complete or fail")
waitCompleteCmd := fmt.Sprintf("kubectl wait --timeout=-1s job/%s --for=condition=complete", jobName)
waitFailedCmd := fmt.Sprintf("kubectl wait --timeout=-1s job/%s --for=condition=failed", jobName)
waitCompleteCmd := fmt.Sprintf("kubectl wait --timeout=10m job/%s --for=condition=complete", jobName)
waitFailedCmd := fmt.Sprintf("kubectl wait --timeout=10m job/%s --for=condition=failed", jobName)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
2 changes: 2 additions & 0 deletions tests/e2e/operation/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,5 +1224,7 @@ exit_loop:
return fmt.Errorf("the number of vectors returned is different at index id %v: got %v, want %v", k, v, replica)
}
}

t.Log("StreamListObject operation finished successfully and all vectors are returned with correct replica number")
return nil
}

0 comments on commit 3a6b83d

Please sign in to comment.