@@ -65,6 +65,12 @@ func resourceCDC() *schema.Resource {
6565 Required : true ,
6666 ForceNew : true ,
6767 },
68+ "pulsar_cluster" : {
69+ Description : "Pulsar cluster name" ,
70+ Type : schema .TypeString ,
71+ Optional : true ,
72+ ForceNew : true ,
73+ },
6874 "tenant_name" : {
6975 Description : "Streaming tenant name" ,
7076 Type : schema .TypeString ,
@@ -96,6 +102,7 @@ func resourceCDCDelete(ctx context.Context, resourceData *schema.ResourceData, m
96102 token := meta .(astraClients ).token
97103
98104 id := resourceData .Id ()
105+ pulsarClusterFromConfig := resourceData .Get ("pulsar_cluster_name" ).(string )
99106
100107 databaseId , keyspace , table , tenantName , err := parseCDCID (id )
101108 if err != nil {
@@ -114,7 +121,7 @@ func resourceCDCDelete(ctx context.Context, resourceData *schema.ResourceData, m
114121 return diag .FromErr (fmt .Errorf ("failed to read current organization: %w" , err ))
115122 }
116123
117- pulsarCluster , pulsarToken , err := prepCDC (ctx , client , databaseId , token , org , streamingClient , tenantName )
124+ pulsarCluster , pulsarToken , err := prepCDC (ctx , client , databaseId , token , org , streamingClient , tenantName , pulsarClusterFromConfig )
118125 if err != nil {
119126 diag .FromErr (err )
120127 }
@@ -190,6 +197,7 @@ func resourceCDCRead(ctx context.Context, resourceData *schema.ResourceData, met
190197 token := meta .(astraClients ).token
191198
192199 id := resourceData .Id ()
200+ pulsarClusterFromConfig := resourceData .Get ("pulsar_cluster_name" ).(string )
193201
194202 databaseId , keyspace , table , tenantName , err := parseCDCID (id )
195203 if err != nil {
@@ -206,7 +214,7 @@ func resourceCDCRead(ctx context.Context, resourceData *schema.ResourceData, met
206214 return diag .FromErr (fmt .Errorf ("failed to read organization: %w" , err ))
207215 }
208216
209- pulsarCluster , pulsarToken , err := prepCDC (ctx , client , databaseId , token , orgId , streamingClient , tenantName )
217+ pulsarCluster , pulsarToken , err := prepCDC (ctx , client , databaseId , token , orgId , streamingClient , tenantName , pulsarClusterFromConfig )
210218 if err != nil {
211219 diag .FromErr (err )
212220 }
@@ -291,6 +299,7 @@ func resourceCDCCreate(ctx context.Context, resourceData *schema.ResourceData, m
291299 databaseId := resourceData .Get ("database_id" ).(string )
292300 databaseName := resourceData .Get ("database_name" ).(string )
293301 topicPartitions := resourceData .Get ("topic_partitions" ).(int )
302+ pulsarClusterFromConfig := resourceData .Get ("pulsar_cluster_name" ).(string )
294303 tenantName := resourceData .Get ("tenant_name" ).(string )
295304
296305 orgBody , _ := client .GetCurrentOrganization (ctx )
@@ -309,7 +318,7 @@ func resourceCDCCreate(ctx context.Context, resourceData *schema.ResourceData, m
309318 TopicPartitions : topicPartitions ,
310319 }
311320
312- pulsarCluster , pulsarToken , err := prepCDC (ctx , client , databaseId , token , org , streamingClient , tenantName )
321+ pulsarCluster , pulsarToken , err := prepCDC (ctx , client , databaseId , token , org , streamingClient , tenantName , pulsarClusterFromConfig )
313322 if err != nil {
314323 return diag .FromErr (err )
315324 }
@@ -386,7 +395,10 @@ func getTableCDCStatus(databaseID, keyspace, table string, cdcStatuses CDCStatus
386395 return nil
387396}
388397
389- func prepCDC (ctx context.Context , client * astra.ClientWithResponses , databaseId string , token string , org OrgId , streamingClient * astrastreaming.ClientWithResponses , tenantName string ) (string , string , error ) {
398+ // prepCDC get the pulsar cluster name (if it's not set) and the pulsar token
399+ func prepCDC (ctx context.Context , client * astra.ClientWithResponses , databaseId string , token string , org OrgId ,
400+ streamingClient * astrastreaming.ClientWithResponses , tenantName string , pulsarCluster string ) (string , string , error ) {
401+
390402 databaseResourceData := schema.ResourceData {}
391403 db , err := getDatabase (ctx , & databaseResourceData , client , databaseId )
392404 if err != nil {
@@ -395,9 +407,8 @@ func prepCDC(ctx context.Context, client *astra.ClientWithResponses, databaseId
395407
396408 // In most astra APIs there are dashes in region names depending on the cloud provider, this seems not to be the case for streaming
397409 cloudProvider := string (* db .Info .CloudProvider )
398- fmt . Printf ( "%s" , cloudProvider )
410+ pulsarCluster = getPulsarCluster ( pulsarCluster , cloudProvider , * db . Info . Region , "" )
399411
400- pulsarCluster := getPulsarCluster ("" , cloudProvider , * db .Info .Region , "" )
401412 pulsarToken , err := getPulsarToken (ctx , pulsarCluster , token , org , streamingClient , tenantName )
402413 return pulsarCluster , pulsarToken , err
403414}
@@ -455,6 +466,7 @@ func setCDCData(resourceData *schema.ResourceData, cdc CDCStatus) diag.Diagnosti
455466 return nil
456467}
457468
469+ // parseCDCID expects an ID in the format "databaseId/keyspace/table/tenantName"
458470func parseCDCID (id string ) (string , string , string , string , error ) {
459471 idParts := strings .Split (strings .ToLower (id ), "/" )
460472 if len (idParts ) != 4 {
0 commit comments