Skip to content

Commit f3ae364

Browse files
committed
feat: avoiding throttling & improve performance
1 parent f204436 commit f3ae364

File tree

9 files changed

+554
-186
lines changed

9 files changed

+554
-186
lines changed

go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ require (
3939
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/postgresql/armpostgresql v1.2.0
4040
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/postgresql/armpostgresqlflexibleservers v1.1.0
4141
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/redis/armredis v1.0.0
42-
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resourcegraph/armresourcegraph v0.9.0
4342
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0
4443
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/servicebus/armservicebus v1.2.0
4544
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/signalr/armsignalr v1.2.0

go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,6 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/postgresql/armpostgresqlfl
7878
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/postgresql/armpostgresqlflexibleservers v1.1.0/go.mod h1:nKcJObAisSPDrO9lMuuCBoYY7Ki7ADt8p6XmBhpKNTk=
7979
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/redis/armredis v1.0.0 h1:nmpTBgRg1HynngFYICRhceC7s5dmbKN9fJ/XQz/UQ2I=
8080
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/redis/armredis v1.0.0/go.mod h1:3yjiOtnkVociBTlF7UZrwAGfJrGaOCsvtVS4HzNajxQ=
81-
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resourcegraph/armresourcegraph v0.9.0 h1:zLzoX5+W2l95UJoVwiyNS4dX8vHyQ6x2xRLoBBL9wMk=
82-
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resourcegraph/armresourcegraph v0.9.0/go.mod h1:wVEOJfGTj0oPAUGA1JuRAvz/lxXQsWW16axmHPP47Bk=
8381
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM=
8482
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE=
8583
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/servicebus/armservicebus v1.2.0 h1:jngSeKBnzC7qIk3rvbWHsLI7eeasEucORHWr2CHX0Yg=

internal/graph/graph.go

Lines changed: 182 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -4,71 +4,137 @@
44
package graph
55

66
import (
7+
"bytes"
78
"context"
9+
"encoding/json"
10+
"fmt"
11+
"io"
12+
"net/http"
13+
"strconv"
814
"time"
915

1016
"github.com/Azure/azqr/internal/to"
1117
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
12-
arg "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resourcegraph/armresourcegraph"
18+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
1319
"github.com/rs/zerolog/log"
1420
)
1521

16-
type (
17-
GraphQuery struct {
18-
client *arg.Client
19-
}
22+
// GraphQueryClient provides methods to query Azure Resource Graph using HTTP client.
23+
type GraphQueryClient struct {
24+
httpClient *http.Client // HTTP client for making requests
25+
endpoint string // Resource Graph endpoint URL
26+
accessToken string // Bearer token for authentication
27+
}
28+
29+
// GraphResult holds the data returned from a Resource Graph query.
30+
type GraphResult struct {
31+
Data []interface{} // Query result data
32+
}
2033

21-
GraphResult struct {
22-
Data []interface{}
34+
// QueryRequestOptions represents options for the Resource Graph query.
35+
type QueryRequestOptions struct {
36+
ResultFormat string `json:"resultFormat,omitempty"` // Format of the result
37+
Top *int32 `json:"top,omitempty"` // Max number of results
38+
SkipToken *string `json:"skipToken,omitempty"` // Token for pagination
39+
}
40+
41+
// QueryRequest represents the payload for a Resource Graph query.
42+
type QueryRequest struct {
43+
Subscriptions []string `json:"subscriptions"` // List of subscription IDs
44+
Query string `json:"query"` // Kusto query string
45+
Options *QueryRequestOptions `json:"options"` // Query options
46+
}
47+
48+
// QueryResponse represents the response from the Resource Graph API.
49+
type QueryResponse struct {
50+
Data []interface{} `json:"data"` // Query result data
51+
SkipToken *string `json:"skipToken,omitempty"`
52+
Quota int // Value of x-ms-user-quota-remaining header as int
53+
RetryAfter time.Duration // Value of x-ms-user-quota-resets-after header as timespan
54+
}
55+
56+
// NewGraphQuery creates a new GraphQuery using the provided TokenCredential.
57+
func NewGraphQuery(cred azcore.TokenCredential) *GraphQueryClient {
58+
// Create a new HTTP client with a timeout
59+
httpClient := &http.Client{
60+
Timeout: 60 * time.Second,
2361
}
24-
)
2562

26-
func NewGraphQuery(cred azcore.TokenCredential) *GraphQuery {
27-
client, err := arg.NewClient(cred, nil)
63+
// Acquire an access token using the provided credential
64+
token, err := cred.GetToken(context.Background(), policy.TokenRequestOptions{
65+
Scopes: []string{"https://management.azure.com/.default"},
66+
})
2867
if err != nil {
29-
log.Fatal().Err(err).Msg("Failed to create Resource Graph client")
30-
return nil
68+
log.Fatal().Err(err).Msg("Failed to acquire Azure access token")
3169
}
32-
return &GraphQuery{
33-
client: client,
70+
71+
// Set the access token string
72+
accessToken := token.Token
73+
74+
return &GraphQueryClient{
75+
httpClient: httpClient,
76+
endpoint: "https://management.azure.com/providers/Microsoft.ResourceGraph/resources?api-version=2021-03-01",
77+
accessToken: accessToken,
3478
}
3579
}
3680

37-
func (q *GraphQuery) Query(ctx context.Context, query string, subscriptions []*string) *GraphResult {
81+
// Query executes a Resource Graph query for the given subscriptions and query string.
82+
// It handles batching and pagination.
83+
func (q *GraphQueryClient) Query(ctx context.Context, query string, subscriptions []*string) *GraphResult {
3884
result := GraphResult{
3985
Data: make([]interface{}, 0),
4086
}
4187

88+
// Convert []*string to []string for serialization
89+
subscriptionIDs := make([]string, len(subscriptions))
90+
for i, s := range subscriptions {
91+
if s != nil {
92+
subscriptionIDs[i] = *s
93+
}
94+
}
95+
4296
// Run the query in batches of 300 subscriptions
4397
batchSize := 300
44-
for i := 0; i < len(subscriptions); i += batchSize {
98+
for i := 0; i < len(subscriptionIDs); i += batchSize {
4599
j := i + batchSize
46-
if j > len(subscriptions) {
47-
j = len(subscriptions)
100+
if j > len(subscriptionIDs) {
101+
j = len(subscriptionIDs)
48102
}
49103

50-
format := arg.ResultFormatObjectArray
51-
request := arg.QueryRequest{
52-
Subscriptions: subscriptions[i:j],
53-
Query: &query,
54-
Options: &arg.QueryRequestOptions{
55-
ResultFormat: &format,
56-
Top: to.Ptr(int32(1000)),
57-
},
58-
}
59-
60-
if q.client == nil {
61-
log.Fatal().Msg("Resource Graph client not initialized")
104+
format := "objectArray"
105+
options := &QueryRequestOptions{
106+
ResultFormat: format,
107+
Top: to.Ptr(int32(1000)),
62108
}
63109

64110
var skipToken *string = nil
65111
for ok := true; ok; ok = skipToken != nil {
66-
request.Options.SkipToken = skipToken
67-
// Run the query and get the results
68-
results, err := q.retry(ctx, 3, 10*time.Second, request)
112+
options.SkipToken = skipToken
113+
request := QueryRequest{
114+
Subscriptions: subscriptionIDs[i:j],
115+
Query: query,
116+
Options: options,
117+
}
118+
119+
startTime := time.Now()
120+
resp, err := q.retry(ctx, 3, 10*time.Second, request)
121+
elapsed := time.Since(startTime).Milliseconds()
69122
if err == nil {
70-
result.Data = append(result.Data, results.Data.([]interface{})...)
71-
skipToken = results.SkipToken
123+
result.Data = append(result.Data, resp.Data...)
124+
skipToken = resp.SkipToken
125+
// If the response contains a skip token, it means there are more results to fetch
126+
// so if the request took less than 333ms, we wait to avoid throttling
127+
if skipToken != nil && elapsed < 333 {
128+
// If the query took less than 333ms, wait to avoid throttling
129+
log.Debug().Msgf("Graph query took %d ms, waiting to avoid throttling", elapsed)
130+
time.Sleep(time.Duration(400-elapsed) * time.Millisecond)
131+
}
132+
// Quota limit reached, sleep for the duration specified in the response header
133+
if resp.Quota == 0 {
134+
duration := resp.RetryAfter
135+
log.Debug().Msgf("Graph query quota limit reached. Sleeping for %s", duration)
136+
time.Sleep(duration)
137+
}
72138
} else {
73139
log.Fatal().Err(err).Msgf("Failed to run Resource Graph query: %s", query)
74140
return nil
@@ -78,18 +144,16 @@ func (q *GraphQuery) Query(ctx context.Context, query string, subscriptions []*s
78144
return &result
79145
}
80146

81-
func (q *GraphQuery) retry(ctx context.Context, attempts int, sleep time.Duration, request arg.QueryRequest) (arg.ClientResourcesResponse, error) {
147+
// retry executes the Resource Graph query with retries and exponential backoff.
148+
// Returns the QueryResponse or error.
149+
func (q *GraphQueryClient) retry(ctx context.Context, attempts int, sleep time.Duration, request QueryRequest) (*QueryResponse, error) {
82150
var err error
83151
for i := 0; ; i++ {
84-
res, err := q.client.Resources(ctx, request, nil)
152+
resp, err := q.doRequest(ctx, request)
85153
if err == nil {
86-
return res, nil
154+
return resp, nil
87155
}
88156

89-
// if shouldSkipError(err) {
90-
// return []azqr.AzureServiceResult{}, nil
91-
// }
92-
93157
errAsString := err.Error()
94158

95159
if i >= (attempts - 1) {
@@ -102,5 +166,80 @@ func (q *GraphQuery) retry(ctx context.Context, attempts int, sleep time.Duratio
102166
time.Sleep(sleep)
103167
sleep *= 2
104168
}
105-
return arg.ClientResourcesResponse{}, err
169+
return nil, err
170+
}
171+
172+
// doRequest sends the HTTP request to the Resource Graph API and returns the response.
173+
func (q *GraphQueryClient) doRequest(ctx context.Context, request QueryRequest) (*QueryResponse, error) {
174+
// Serialize request to JSON
175+
body, err := json.Marshal(request)
176+
if err != nil {
177+
return nil, fmt.Errorf("failed to marshal request: %w", err)
178+
}
179+
180+
// Create HTTP request
181+
req, err := http.NewRequestWithContext(ctx, http.MethodPost, q.endpoint, bytes.NewReader(body))
182+
if err != nil {
183+
return nil, fmt.Errorf("failed to create HTTP request: %w", err)
184+
}
185+
186+
// Set headers
187+
req.Header.Set("Content-Type", "application/json")
188+
req.Header.Set("Authorization", "Bearer "+q.accessToken)
189+
190+
// Send request
191+
resp, err := q.httpClient.Do(req)
192+
if err != nil {
193+
return nil, fmt.Errorf("failed to send HTTP request: %w", err)
194+
}
195+
defer func() {
196+
if err := resp.Body.Close(); err != nil {
197+
log.Fatal().Err(err).Msg("Failed to close response body")
198+
}
199+
}()
200+
201+
// Read response body
202+
respBody, err := io.ReadAll(resp.Body)
203+
if err != nil {
204+
return nil, fmt.Errorf("failed to read response body: %w", err)
205+
}
206+
207+
// Check for non-200 status codes
208+
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
209+
return nil, fmt.Errorf("received non-2xx status code: %d, body: %s", resp.StatusCode, string(respBody))
210+
}
211+
212+
// Parse response JSON
213+
var queryResp QueryResponse
214+
if err := json.Unmarshal(respBody, &queryResp); err != nil {
215+
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
216+
}
217+
218+
// Extract quota headers and set them in the QueryResponse struct
219+
220+
// Parse x-ms-user-quota-remaining as int
221+
quotaStr := resp.Header.Get("x-ms-user-quota-remaining")
222+
if quotaStr != "" {
223+
quota, err := strconv.Atoi(quotaStr)
224+
if err != nil {
225+
return nil, fmt.Errorf("failed to parse quota header: %w", err)
226+
}
227+
queryResp.Quota = quota
228+
}
229+
230+
// Parse x-ms-user-quota-resets-after as timespan in "hh:mm:ss" format
231+
retryAfterStr := resp.Header.Get("x-ms-user-quota-resets-after")
232+
if retryAfterStr != "" {
233+
// If time.ParseDuration fails, fallback to manual parsing
234+
var h, m, s int
235+
_, scanErr := fmt.Sscanf(retryAfterStr, "%d:%d:%d", &h, &m, &s)
236+
if scanErr != nil {
237+
return nil, fmt.Errorf("failed to parse retry-after header: %w", scanErr)
238+
}
239+
queryResp.RetryAfter = time.Duration(h)*time.Hour + time.Duration(m)*time.Minute + time.Duration(s)*time.Second
240+
}
241+
242+
log.Debug().Msgf("Graph query quota remaining: %d, Retry after: %s", queryResp.Quota, queryResp.RetryAfter)
243+
244+
return &queryResp, nil
106245
}

0 commit comments

Comments
 (0)