Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 45 additions & 36 deletions core/pkg/sync/http/http_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Sync struct {
AuthHeader string
Interval uint32
ready bool
eTag string
}

// Client defines the behaviour required of a http client
Expand All @@ -42,7 +43,7 @@ type Cron interface {
}

func (hs *Sync) ReSync(ctx context.Context, dataSync chan<- sync.DataSync) error {
msg, err := hs.Fetch(ctx)
msg, _, err := hs.fetchBody(ctx, true)
if err != nil {
return err
}
Expand All @@ -63,7 +64,7 @@ func (hs *Sync) IsReady() bool {

func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
// Initial fetch
fetch, err := hs.Fetch(ctx)
fetch, _, err := hs.fetchBody(ctx, true)
if err != nil {
return err
}
Expand All @@ -74,28 +75,25 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
hs.Logger.Debug(fmt.Sprintf("polling %s every %d seconds", hs.URI, hs.Interval))
_ = hs.Cron.AddFunc(fmt.Sprintf("*/%d * * * *", hs.Interval), func() {
hs.Logger.Debug(fmt.Sprintf("fetching configuration from %s", hs.URI))
body, err := hs.fetchBodyFromURL(ctx, hs.URI)
previousBodySHA := hs.LastBodySHA
body, noChange, err := hs.fetchBody(ctx, false)
if err != nil {
hs.Logger.Error(err.Error())
hs.Logger.Error(fmt.Sprintf("error fetching: %s", err.Error()))
return
}

if body == "" {
if body == "" && !noChange {
hs.Logger.Debug("configuration deleted")
return
}

currentSHA := hs.generateSha([]byte(body))

if hs.LastBodySHA == "" {
hs.Logger.Debug("new configuration created")
if previousBodySHA == "" {
hs.Logger.Debug("configuration created")
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI}
} else if hs.LastBodySHA != currentSHA {
hs.Logger.Debug("configuration modified")
} else if previousBodySHA != hs.LastBodySHA {
hs.Logger.Debug("configuration updated")
dataSync <- sync.DataSync{FlagData: body, Source: hs.URI}
}

hs.LastBodySHA = currentSHA
})

hs.Cron.Start()
Expand All @@ -108,10 +106,14 @@ func (hs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error {
return nil
}

func (hs *Sync) fetchBodyFromURL(ctx context.Context, url string) (string, error) {
req, err := http.NewRequestWithContext(ctx, "GET", url, bytes.NewBuffer(nil))
func (hs *Sync) fetchBody(ctx context.Context, fetchAll bool) (string, bool, error) {
if hs.URI == "" {
return "", false, errors.New("no HTTP URL string set")
}

req, err := http.NewRequestWithContext(ctx, "GET", hs.URI, bytes.NewBuffer(nil))
if err != nil {
return "", fmt.Errorf("error creating request for url %s: %w", url, err)
return "", false, fmt.Errorf("error creating request for url %s: %w", hs.URI, err)
}

req.Header.Add("Accept", "application/json")
Expand All @@ -124,32 +126,50 @@ func (hs *Sync) fetchBodyFromURL(ctx context.Context, url string) (string, error
req.Header.Set("Authorization", bearer)
}

if hs.eTag != "" && !fetchAll {
req.Header.Set("If-None-Match", hs.eTag)
}

resp, err := hs.Client.Do(req)
if err != nil {
return "", fmt.Errorf("error calling endpoint %s: %w", url, err)
return "", false, fmt.Errorf("error calling endpoint %s: %w", hs.URI, err)
}
defer func() {
err = resp.Body.Close()
if err != nil {
hs.Logger.Debug(fmt.Sprintf("error closing the response body: %s", err.Error()))
hs.Logger.Error(fmt.Sprintf("error closing the response body: %s", err.Error()))
}
}()

if resp.StatusCode == 304 {
hs.Logger.Debug("no changes detected")
return "", true, nil
}

statusOK := resp.StatusCode >= 200 && resp.StatusCode < 300
if !statusOK {
return "", fmt.Errorf("error fetching from url %s: %s", url, resp.Status)
return "", false, fmt.Errorf("error fetching from url %s: %s", hs.URI, resp.Status)
}

if resp.Header.Get("ETag") != "" {
hs.eTag = resp.Header.Get("ETag")
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("unable to read body to bytes: %w", err)
return "", false, fmt.Errorf("unable to read body to bytes: %w", err)
}

json, err := utils.ConvertToJSON(body, getFileExtensions(url), resp.Header.Get("Content-Type"))
json, err := utils.ConvertToJSON(body, getFileExtensions(hs.URI), resp.Header.Get("Content-Type"))
if err != nil {
return "", fmt.Errorf("error converting response body to json: %w", err)
return "", false, fmt.Errorf("error converting response body to json: %w", err)
}
return json, nil

if json != "" {
hs.LastBodySHA = hs.generateSha([]byte(body))
}

return json, false, nil
}

// getFileExtensions returns the file extension from the URL path
Expand All @@ -169,17 +189,6 @@ func (hs *Sync) generateSha(body []byte) string {
}

func (hs *Sync) Fetch(ctx context.Context) (string, error) {
if hs.URI == "" {
return "", errors.New("no HTTP URL string set")
}

body, err := hs.fetchBodyFromURL(ctx, hs.URI)
if err != nil {
return "", err
}
if body != "" {
hs.LastBodySHA = hs.generateSha([]byte(body))
}

return body, nil
body, _, err := hs.fetchBody(ctx, false)
return body, err
}
81 changes: 81 additions & 0 deletions core/pkg/sync/http/http_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func TestHTTPSync_Fetch(t *testing.T) {
uri string
bearerToken string
authHeader string
eTagHeader string
lastBodySHA string
handleResponse func(*testing.T, Sync, string, error)
}{
Expand Down Expand Up @@ -240,6 +241,85 @@ func TestHTTPSync_Fetch(t *testing.T) {
}
},
},
"not modified response etag matched": {
setup: func(t *testing.T, client *syncmock.MockClient) {
expectedIfNoneMatch := `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`
client.EXPECT().Do(gomock.Any()).DoAndReturn(func(req *http.Request) (*http.Response, error) {
actualIfNoneMatch := req.Header.Get("If-None-Match")
if actualIfNoneMatch != expectedIfNoneMatch {
t.Fatalf("expected If-None-Match header to be '%s', got %s", expectedIfNoneMatch, actualIfNoneMatch)
}
return &http.Response{
Header: map[string][]string{"ETag": {expectedIfNoneMatch}},
Body: io.NopCloser(strings.NewReader("")),
StatusCode: http.StatusNotModified,
}, nil
})
},
uri: "http://localhost",
eTagHeader: `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`,
handleResponse: func(t *testing.T, httpSync Sync, _ string, err error) {
if err != nil {
t.Fatalf("fetch: %v", err)
}

expectedLastBodySHA := ""
expectedETag := `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`
if httpSync.LastBodySHA != expectedLastBodySHA {
t.Errorf(
"expected last body sha to be: '%s', got: '%s'", expectedLastBodySHA, httpSync.LastBodySHA,
)
}
if httpSync.eTag != expectedETag {
t.Errorf(
"expected last etag to be: '%s', got: '%s'", expectedETag, httpSync.eTag,
)
}
},
},
"modified response etag mismatched": {
setup: func(t *testing.T, client *syncmock.MockClient) {
expectedIfNoneMatch := `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`
client.EXPECT().Do(gomock.Any()).DoAndReturn(func(req *http.Request) (*http.Response, error) {
actualIfNoneMatch := req.Header.Get("If-None-Match")
if actualIfNoneMatch != expectedIfNoneMatch {
t.Fatalf("expected If-None-Match header to be '%s', got %s", expectedIfNoneMatch, actualIfNoneMatch)
}

newContent := "\"Hey there!\""
newETag := `"c2e01ce63d90109c4c7f4f6dcea97ed1bb2b51e3647f36caf5acbe27413a24bb"`

return &http.Response{
Header: map[string][]string{
"Content-Type": {"application/json"},
"Etag": {newETag},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing worth mention here is that the http package apparently doesn't handle case insensitive well and I had to make this testing header to be Etag (small case t), even though in real world usage (non-testing), ETag works well.

Copy link
Contributor Author

@ChihweiLHBird ChihweiLHBird Jul 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess building the header with the Header().Set() function rather than passing in a raw map will fix it but I didn't have time to try at this time.

},
Body: io.NopCloser(strings.NewReader(newContent)),
StatusCode: http.StatusOK,
}, nil
})
},
uri: "http://localhost",
eTagHeader: `"1af17a664e3fa8e419b8ba05c2a173169df76162a5a286e0c405b460d478f7ef"`,
handleResponse: func(t *testing.T, httpSync Sync, _ string, err error) {
if err != nil {
t.Fatalf("fetch: %v", err)
}

expectedLastBodySHA := "wuAc5j2QEJxMf09tzql-0bsrUeNkfzbK9ay-J0E6JLs="
expectedETag := `"c2e01ce63d90109c4c7f4f6dcea97ed1bb2b51e3647f36caf5acbe27413a24bb"`
if httpSync.LastBodySHA != expectedLastBodySHA {
t.Errorf(
"expected last body sha to be: '%s', got: '%s'", expectedLastBodySHA, httpSync.LastBodySHA,
)
}
if httpSync.eTag != expectedETag {
t.Errorf(
"expected last etag to be: '%s', got: '%s'", expectedETag, httpSync.eTag,
)
}
},
},
}

for name, tt := range tests {
Expand All @@ -255,6 +335,7 @@ func TestHTTPSync_Fetch(t *testing.T) {
AuthHeader: tt.authHeader,
LastBodySHA: tt.lastBodySHA,
Logger: logger.NewLogger(nil, false),
eTag: tt.eTagHeader,
}

fetched, err := httpSync.Fetch(context.Background())
Expand Down
5 changes: 5 additions & 0 deletions docs/concepts/syncs.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ invoked with **HTTP GET** request.
The polling interval, port, TLS settings, and authentication information can be configured.
See [sync source](../reference/sync-configuration.md#source-configuration) configuration for details.

To optimize network usage, it honors the HTTP ETag protocol: if the server includes an `ETag` header in its response,
flagd will store this value and send it in the `If-None-Match` header on subsequent requests. If the flag data has
not changed, the server responds with 304 Not Modified, and flagd will skip updating its state. If the data has
changed, the server returns the new content and a new ETag, prompting flagd to update its flags.

---

### gRPC sync
Expand Down
Loading