Skip to content

Commit d3332f1

Browse files
authored
streaming_sink: use Astra token (#312)
Use Astra token directly for streaming sink operations instead of first exchanging for a Pulsar token.
1 parent a64fbfe commit d3332f1

File tree

1 file changed

+10
-31
lines changed

1 file changed

+10
-31
lines changed

internal/provider/resource_streaming_sink.go

Lines changed: 10 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -128,8 +128,7 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
128128
return diag.Errorf("\"deletion_protection\" must be explicitly set to \"false\" in order to destroy astra_streaming_sink")
129129
}
130130

131-
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
132-
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
131+
astraClient := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
133132
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
134133

135134
tenantName := resourceData.Get("tenant_name").(string)
@@ -142,7 +141,7 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
142141

143142
pulsarCluster := getPulsarCluster("", cloudProvider, region, "")
144143

145-
orgResp, err := client.GetCurrentOrganization(ctx)
144+
orgResp, err := astraClient.GetCurrentOrganization(ctx)
146145
if err != nil {
147146
return diag.Errorf("failed to get current organization ID: %v", err)
148147
}
@@ -152,15 +151,9 @@ func resourceStreamingSinkDelete(ctx context.Context, resourceData *schema.Resou
152151
return diag.Errorf("failed to decode current organization ID: %v", err)
153152
}
154153

155-
token := meta.(astraClients).token
156-
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, streamingClient, tenantName)
157-
if err != nil {
158-
return diag.FromErr(err)
159-
}
160-
161154
deleteSinkParams := astrastreaming.DeleteSinkParams{
162155
XDataStaxPulsarCluster: pulsarCluster,
163-
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
156+
Authorization: meta.(astraClients).token,
164157
}
165158

166159
deleteSinkResponse, err := streamingClientv3.DeleteSinkWithResponse(ctx, tenantName, namespace, sinkName, &deleteSinkParams)
@@ -208,8 +201,7 @@ type SinkResponse struct {
208201
}
209202

210203
func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
211-
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
212-
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
204+
astraClient := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
213205
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
214206

215207
tenantName := resourceData.Get("tenant_name").(string)
@@ -223,7 +215,7 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
223215

224216
pulsarCluster := getPulsarCluster("", cloudProvider, region, "")
225217

226-
orgBody, err := client.GetCurrentOrganization(ctx)
218+
orgBody, err := astraClient.GetCurrentOrganization(ctx)
227219
if err != nil {
228220
return diag.Errorf("failed to get current organization ID: %v", err)
229221
}
@@ -233,15 +225,9 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
233225
return diag.Errorf("failed to decode current organization ID: %v", err)
234226
}
235227

236-
token := meta.(astraClients).token
237-
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, streamingClient, tenantName)
238-
if err != nil {
239-
diag.FromErr(err)
240-
}
241-
242228
getSinksParams := astrastreaming.GetSinksParams{
243229
XDataStaxPulsarCluster: pulsarCluster,
244-
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
230+
Authorization: meta.(astraClients).token,
245231
}
246232

247233
getSinkResponse, err := streamingClientv3.GetSinksWithResponse(ctx, tenantName, namespace, sinkName, &getSinksParams)
@@ -262,8 +248,7 @@ func resourceStreamingSinkRead(ctx context.Context, resourceData *schema.Resourc
262248
}
263249

264250
func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.ResourceData, meta interface{}) diag.Diagnostics {
265-
streamingClient := meta.(astraClients).astraStreamingClient.(*astrastreaming.ClientWithResponses)
266-
client := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
251+
astraClient := meta.(astraClients).astraClient.(*astra.ClientWithResponses)
267252
streamingClientv3 := meta.(astraClients).astraStreamingClientv3
268253

269254
rawRegion := resourceData.Get("region").(string)
@@ -285,7 +270,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
285270
archive = fmt.Sprintf("builtin://%s", sinkName)
286271
}
287272

288-
orgResp, err := client.GetCurrentOrganization(ctx)
273+
orgResp, err := astraClient.GetCurrentOrganization(ctx)
289274
if err != nil {
290275
return diag.Errorf("failed to get current organization ID: %v", err)
291276
}
@@ -295,7 +280,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
295280
return diag.Errorf("failed to decode current organization ID: %v", err)
296281
}
297282

298-
streamingClustersResponse, err := streamingClient.GetPulsarClustersWithResponse(ctx, org.ID)
283+
streamingClustersResponse, err := streamingClientv3.GetPulsarClustersWithResponse(ctx, org.ID)
299284
if err != nil {
300285
return diag.FromErr(fmt.Errorf("failed to request pulsar clusters: %w", err))
301286
}
@@ -307,12 +292,6 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
307292

308293
pulsarCluster := getPulsarCluster("", cloudProvider, region, "")
309294

310-
token := meta.(astraClients).token
311-
pulsarToken, err := getPulsarToken(ctx, pulsarCluster, token, org, streamingClient, tenantName)
312-
if err != nil {
313-
diag.FromErr(err)
314-
}
315-
316295
var configs map[string]interface{}
317296
if err := json.Unmarshal([]byte(rawConfigs), &configs); err != nil {
318297
return diag.Errorf("failed to unmarshal sink config: %v", err)
@@ -321,7 +300,7 @@ func resourceStreamingSinkCreate(ctx context.Context, resourceData *schema.Resou
321300
createSinkParams := astrastreaming.CreateSinkJSONParams{
322301
XDataStaxPulsarCluster: pulsarCluster,
323302
XDataStaxCurrentOrg: "",
324-
Authorization: fmt.Sprintf("Bearer %s", pulsarToken),
303+
Authorization: meta.(astraClients).token,
325304
}
326305

327306
sinkInputs := []string{topic}

0 commit comments

Comments
 (0)