Skip to content

Commit 2874079

Browse files
authored
[COP-1751] more red panda flakyness fixes (#2153)
* retry proto registration on EOF, require 3 consecutive 2xx responses from Red Panda in readiness check, force ipv4
1 parent b5dbefd commit 2874079

File tree

3 files changed

+130
-10
lines changed

3 files changed

+130
-10
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
- Red Panda Schema Registry: require 3x consecutive responses with 2xx status code, always use ipv4, when registering protos, retry on `EOF` response

framework/components/dockercompose/chip_ingress_set/chip_ingress.go

Lines changed: 86 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ import (
44
"context"
55
"fmt"
66
"io"
7+
"net"
78
"net/http"
9+
"net/url"
810
"os"
911
"strings"
1012
"time"
@@ -124,10 +126,7 @@ func New(in *Input) (*Output, error) {
124126
).WaitForService(DEFAULT_RED_PANDA_SERVICE_NAME,
125127
wait.ForAll(
126128
wait.ForListeningPort(DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT).WithPollInterval(100*time.Millisecond),
127-
wait.ForListeningPort(DEFAULT_RED_PANDA_KAFKA_PORT).WithPollInterval(100*time.Millisecond),
128129
wait.NewHostPortStrategy(DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT).WithPollInterval(100*time.Millisecond),
129-
wait.NewHostPortStrategy(DEFAULT_RED_PANDA_KAFKA_PORT).WithPollInterval(100*time.Millisecond),
130-
wait.ForHTTP("/v1/status/ready").WithPort("9644"), // admin API port
131130
wait.ForHTTP("/status/ready").WithPort(DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT).WithPollInterval(100*time.Millisecond),
132131
).WithDeadline(2*time.Minute),
133132
).WaitForService(DEFAULT_RED_PANDA_CONSOLE_SERVICE_NAME,
@@ -221,7 +220,7 @@ func New(in *Input) (*Output, error) {
221220
in.UseCache = true
222221
framework.L.Info().Msg("Chip Ingress stack started")
223222

224-
return output, nil
223+
return output, checkSchemaRegistryReadiness(2*time.Minute, 300*time.Millisecond, output.RedPanda.SchemaRegistryExternalURL, 3)
225224
}
226225

227226
func composeFilePath(rawFilePath string) (string, error) {
@@ -282,3 +281,86 @@ func connectNetwork(connCtx context.Context, timeout time.Duration, dockerClient
282281
}
283282
}
284283
}
284+
285+
// checkSchemaRegistryReadiness verifies that the Schema Registry answers 2xx on GET /subjects
286+
// for minSuccessCount *consecutive* attempts, polling every `interval`, with an overall `timeout`.
287+
func checkSchemaRegistryReadiness(timeout, interval time.Duration, registryURL string, minSuccessCount int) error {
288+
if minSuccessCount < 1 {
289+
minSuccessCount = 1
290+
}
291+
u, uErr := url.Parse(registryURL)
292+
if uErr != nil {
293+
return fmt.Errorf("parse registry URL: %w", uErr)
294+
}
295+
var pErr error
296+
u.Path, pErr = url.JoinPath(u.Path, "/subjects") // keeps existing base path, adds /subjects
297+
if pErr != nil {
298+
return fmt.Errorf("join /subjects path: %w", pErr)
299+
}
300+
301+
// Fresh connection per request; prefer IPv4 to avoid ::1 races.
302+
tr := &http.Transport{
303+
DisableKeepAlives: true,
304+
DialContext: func(ctx context.Context, _, addr string) (net.Conn, error) {
305+
d := &net.Dialer{Timeout: 10 * time.Second, KeepAlive: 30 * time.Second}
306+
return d.DialContext(ctx, "tcp4", addr)
307+
},
308+
ForceAttemptHTTP2: false, // optional; stick to HTTP/1.1 for simplicity
309+
}
310+
client := &http.Client{
311+
Transport: tr,
312+
Timeout: 10 * time.Second, // per-request timeout
313+
}
314+
315+
ctx, cancel := context.WithTimeout(context.Background(), timeout)
316+
defer cancel()
317+
318+
t := time.NewTicker(interval)
319+
defer t.Stop()
320+
321+
consecutive := 0
322+
var lastErr error
323+
324+
for {
325+
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
326+
// small belt-and-suspenders to ensure no reuse even if transport changes
327+
req.Close = true
328+
329+
resp, err := client.Do(req)
330+
if err == nil {
331+
// Always drain & close.
332+
io.Copy(io.Discard, resp.Body)
333+
resp.Body.Close()
334+
}
335+
336+
if err == nil && resp.StatusCode/100 == 2 {
337+
framework.L.Debug().Msgf("schema registry ready check succeeded with status %d (%d/%d)", resp.StatusCode, consecutive+1, minSuccessCount)
338+
consecutive++
339+
if consecutive >= minSuccessCount {
340+
framework.L.Debug().Msg("schema registry is ready")
341+
return nil
342+
}
343+
} else {
344+
consecutive = 0
345+
if err != nil {
346+
framework.L.Debug().Msgf("schema registry ready check failed with error %v (need %d/%d consecutive successes)", err, consecutive, minSuccessCount)
347+
lastErr = fmt.Errorf("GET /subjects failed: %w", err)
348+
} else {
349+
framework.L.Debug().Msgf("schema registry ready check failed with error %v and status code %d (need %d/%d consecutive successes)", err, resp.StatusCode, consecutive, minSuccessCount)
350+
lastErr = fmt.Errorf("GET /subjects status %d", resp.StatusCode)
351+
}
352+
}
353+
354+
select {
355+
case <-ctx.Done():
356+
if lastErr == nil {
357+
lastErr = ctx.Err()
358+
}
359+
return fmt.Errorf("schema registry not ready after %s; needed %d consecutive successes (got %d): %w",
360+
timeout, minSuccessCount, consecutive, lastErr)
361+
case <-t.C:
362+
framework.L.Debug().Msg("schema registry not ready yet, retrying...")
363+
// poll again
364+
}
365+
}
366+
}

framework/components/dockercompose/chip_ingress_set/protos.go

Lines changed: 43 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66
"encoding/json"
77
"fmt"
88
"io"
9+
"net"
910
"net/http"
11+
"net/url"
1012
"os"
1113
"path/filepath"
1214
"regexp"
@@ -145,7 +147,7 @@ func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSch
145147
delete(subjects, path)
146148
}
147149

148-
registerErr := registerAllWithTopologicalSorting(schemaRegistryURL, protoMap, subjects, protoSchemaSet.Folders)
150+
registerErr := registerAllWithTopologicalSorting(schemaRegistryURL, protoMap, subjects)
149151
if registerErr != nil {
150152
return errors.Wrapf(registerErr, "failed to register protos from %s", protoSchemaSet.URI)
151153
}
@@ -473,7 +475,6 @@ func registerAllWithTopologicalSorting(
473475
schemaRegistryURL string,
474476
protoMap map[string]string, // path -> proto source
475477
subjectMap map[string]string, // path -> subject
476-
folders []string, // folders configuration used to determine import prefix transformations
477478
) error {
478479
framework.L.Info().Msgf("Registering %d protobuf schemas", len(protoMap))
479480

@@ -571,7 +572,7 @@ func checkSchemaExists(registryURL, subject string) (int, bool) {
571572
}, retry.Attempts(10), retry.Delay(100*time.Millisecond), retry.DelayType(retry.BackOffDelay), retry.OnRetry(func(n uint, err error) {
572573
framework.L.Debug().Str("attempt/max", fmt.Sprintf("%d/%d", n, maxAttempts)).Msgf("Retrying to check schema existence for %s: %v", subject, err)
573574
}), retry.RetryIf(func(err error) bool {
574-
return strings.Contains(err.Error(), "connection reset by peer")
575+
return isRetryableError(err)
575576
}))
576577

577578
if existErr != nil {
@@ -613,13 +614,31 @@ func registerSingleProto(
613614
return 0, errors.Wrap(payloadErr, "failed to marshal payload")
614615
}
615616

616-
url := fmt.Sprintf("%s/subjects/%s/versions", registryURL, subject)
617+
parsedURL, urlErr := url.Parse(fmt.Sprintf("%s/subjects/%s/versions", registryURL, subject))
618+
if urlErr != nil {
619+
return 0, errors.Wrap(urlErr, "failed to parse schema registry URL")
620+
}
621+
617622
maxAttempts := uint(10)
623+
client := http.Client{Transport: &http.Transport{
624+
DisableKeepAlives: true,
625+
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
626+
// prefer ipv4 to avoid issues with some local setups
627+
return (&net.Dialer{Timeout: 10 * time.Second}).DialContext(ctx, "tcp4", addr)
628+
},
629+
}}
618630

619631
var resp *http.Response
620632
registerErr := retry.Do(func() error {
621633
var respErr error
622-
resp, respErr = http.Post(url, "application/vnd.schemaregistry.v1+json", bytes.NewReader(payload))
634+
resp, respErr = client.Do(&http.Request{
635+
Method: "POST",
636+
URL: parsedURL,
637+
Header: map[string][]string{
638+
"Content-Type": {"application/vnd.schemaregistry.v1+json"},
639+
},
640+
Body: io.NopCloser(bytes.NewReader(payload)),
641+
})
623642
if respErr != nil {
624643
return errors.Wrap(respErr, "failed to post to schema registry")
625644
}
@@ -638,7 +657,7 @@ func registerSingleProto(
638657
}), retry.RetryIf(func(err error) bool {
639658
// we don't want to retry all errors, because some of them are are expected (e.g. missing dependencies)
640659
// and will be handled by higher-level code
641-
return strings.Contains(err.Error(), "connection reset by peer")
660+
return isRetryableError(err)
642661
}))
643662
if registerErr != nil {
644663
return 0, errors.Wrapf(registerErr, "failed to register schema for subject %s", subject)
@@ -779,3 +798,21 @@ func topologicalSort(dependencies map[string][]string) ([]string, error) {
779798

780799
return result, nil
781800
}
801+
802+
func isRetryableError(err error) bool {
803+
if err == nil {
804+
return false
805+
}
806+
807+
retryableErrorMessages := []string{
808+
"connection reset by peer",
809+
"EOF",
810+
}
811+
812+
for _, msg := range retryableErrorMessages {
813+
if strings.Contains(err.Error(), msg) {
814+
return true
815+
}
816+
}
817+
return false
818+
}

0 commit comments

Comments
 (0)