Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DVX: 592 - feat: Added Add, Modify, Remove and Update methods on AtlanTags #57

Merged
merged 2 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions atlan/assets/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,142 @@ func GetByGuid[T AtlanObject](guid string) (T, error) {
return newAsset, nil
}

func ModifyTags(api API,
assetType reflect.Type,
qualifiedName string,
atlanTagNames []string,
propagate bool,
removePropagationOnDelete bool,
restrictLineagePropagation bool,
restrictPropagationThroughHierarchy bool) error {

var atlanTags []structs.AtlanTag

for _, name := range atlanTagNames {
TagName, _ := GetAtlanTagIDForName(name)
atlanTags = append(atlanTags, structs.AtlanTag{
TypeName: &TagName,
Propagate: &propagate,
RemovePropagationsOnEntityDelete: &removePropagationOnDelete,
RestrictPropagationThroughLineage: &restrictLineagePropagation,
RestrictPropagationThroughHierarchy: &restrictPropagationThroughHierarchy,
})
}

queryParams := map[string]string{
"attr:qualifiedName": qualifiedName,
}

API, _ := api.FormatPathWithParams(assetType.Name(), "classifications")

_, err := DefaultAtlanClient.CallAPI(
API,
queryParams,
atlanTags,
)
if err != nil {
return fmt.Errorf("failed to modify tags: %w", err)
}

return nil
}

func AddAtlanTags[T AtlanObject](
qualifiedName string,
atlanTagNames []string,
propagate bool,
removePropagationOnDelete bool,
restrictLineagePropagation bool,
restrictPropagationThroughHierarchy bool,
) error {

var asset T
assetType := reflect.TypeOf(asset).Elem()

err := ModifyTags(
UPDATE_ENTITY_BY_ATTRIBUTE,
assetType,
qualifiedName,
atlanTagNames,
propagate,
removePropagationOnDelete,
restrictLineagePropagation,
restrictPropagationThroughHierarchy,
)
if err != nil {
return fmt.Errorf("failed to add Atlan tags: %w", err)
}
return nil
}

func UpdateAtlanTags[T AtlanObject](
qualifiedName string,
atlanTagNames []string,
propagate bool,
removePropagationOnDelete bool,
restrictLineagePropagation bool,
restrictPropagationThroughHierarchy bool,
) error {

var asset T
assetType := reflect.TypeOf(asset).Elem()

err := ModifyTags(
PARTIAL_UPDATE_ENTITY_BY_ATTRIBUTE,
assetType,
qualifiedName,
atlanTagNames,
propagate,
removePropagationOnDelete,
restrictLineagePropagation,
restrictPropagationThroughHierarchy,
)
if err != nil {
return fmt.Errorf("failed to modify Atlan tags: %w", err)
}
return nil
}

func RemoveAtlanTag[T AtlanObject](
qualifiedName string,
atlanTagName string,
) error {

var api API

api = DELETE_ENTITY_BY_ATTRIBUTE
var asset T
assetType := reflect.TypeOf(asset).Elem()

// Get the internal ID for the tag name
classificationID, err := GetAtlanTagIDForName(atlanTagName)
if err != nil {
return fmt.Errorf("failed to get Atlan tag ID for name %s: %w", atlanTagName, err)
}

// If classification ID is not found, return an error
if classificationID == "" {
return fmt.Errorf("Atlan tag not found: %s", atlanTagName)
}

// Set query params with the qualified name
queryParams := map[string]string{
"attr:qualifiedName": qualifiedName,
}

// Construct the API path for deleting the tag
API, _ := api.FormatPathWithParams(assetType.Name(), "classification", classificationID)

// Call the Atlan API to remove the tag
_, err = DefaultAtlanClient.CallAPI(API, queryParams, nil)
if err != nil {
return fmt.Errorf("failed to remove Atlan tag: %w", err)
}

return nil
}

// GetByQualifiedName retrieves an asset by guid
func GetByQualifiedName[T AtlanObject](qualifiedName string) (T, error) {

var asset T
Expand Down
6 changes: 5 additions & 1 deletion atlan/assets/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ func (ac *AtlanClient) CallAPI(api *API, queryParams interface{}, requestObj int
switch v := queryParams.(type) {
case map[string]string:
for key, value := range v {
query.Add(key, value)
if key == "attr:qualifiedName" {
path += fmt.Sprintf("?%s=%s", key, url.QueryEscape(value))
} else {
query.Add(key, value)
}
}
case map[string][]string:
for key, values := range v {
Expand Down
60 changes: 60 additions & 0 deletions atlan/assets/constants.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package assets

import (
"fmt"
"net/http"
"net/url"
"path"
)

const (
Expand All @@ -27,6 +30,8 @@ type API struct {
Method string
Status int
Endpoint Endpoint
Consumes string
Produces string
}

type Endpoint struct {
Expand Down Expand Up @@ -147,6 +152,27 @@ var (
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

UPDATE_ENTITY_BY_ATTRIBUTE = API{
Path: ENTITY_API + "uniqueAttribute/type/",
Method: http.MethodPost,
Status: http.StatusNoContent,
Endpoint: AtlasEndpoint,
}

PARTIAL_UPDATE_ENTITY_BY_ATTRIBUTE = API{
Path: ENTITY_API + "uniqueAttribute/type/",
Method: http.MethodPut,
Status: http.StatusOK,
Endpoint: AtlasEndpoint,
}

DELETE_ENTITY_BY_ATTRIBUTE = API{
Path: ENTITY_API + "uniqueAttribute/type/",
Method: http.MethodDelete,
Status: http.StatusNoContent,
Endpoint: AtlasEndpoint,
}
)

// Constants for the Atlas search DSL
Expand Down Expand Up @@ -188,3 +214,37 @@ const (
UPDATE_TIME_AS_DATE = "__modificationTimestamp.date"
USER_DESCRIPTION = "userDescription"
)

// FormatPathWithParams returns a new API object with the path formatted by joining the provided parameters.
func (api *API) FormatPathWithParams(params ...string) (*API, error) {
// Join the base path with the additional params
requestPath, err := MultipartURLJoin(api.Path, params...)
if err != nil {
return nil, fmt.Errorf("failed to join URL parts: %w", err)
}

// Return a new API object with the formatted path
return &API{
Path: requestPath,
Method: api.Method,
Status: api.Status,
Endpoint: api.Endpoint,
Consumes: api.Consumes,
Produces: api.Produces,
}, nil
}

// MultipartURLJoin joins the base path with the provided segments.
func MultipartURLJoin(basePath string, params ...string) (string, error) {
// Parse the base path as a URL
u, err := url.Parse(basePath)
if err != nil {
return "", fmt.Errorf("invalid base path: %w", err)
}

// Join additional path segments
u.Path = path.Join(u.Path, path.Join(params...))

// Return the final formatted URL
return u.String(), nil
}
14 changes: 7 additions & 7 deletions atlan/model/structs/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,13 @@ type Attributes struct {

// AtlanTag represents a tag in Atlan.
type AtlanTag struct {
TypeName string `json:"typeName"`
EntityGuid string `json:"entityGuid"`
EntityStatus string `json:"entityStatus"`
Propagate bool `json:"propagate"`
RemovePropagationsOnEntityDelete bool `json:"removePropagationsOnEntityDelete"`
RestrictPropagationThroughLineage bool `json:"restrictPropagationThroughLineage"`
RestrictPropagationThroughHierarchy bool `json:"restrictPropagationThroughHierarchy"`
TypeName *string `json:"typeName"`
EntityGuid *string `json:"entityGuid,omitempty"`
EntityStatus *string `json:"entityStatus,omitempty"`
Propagate *bool `json:"propagate,omitempty"`
RemovePropagationsOnEntityDelete *bool `json:"removePropagationsOnEntityDelete,omitempty"`
RestrictPropagationThroughLineage *bool `json:"restrictPropagationThroughLineage,omitempty"`
RestrictPropagationThroughHierarchy *bool `json:"restrictPropagationThroughHierarchy,omitempty"`
}

type Link struct {
Expand Down
74 changes: 53 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,69 @@ func main() {

ctx.SetLogger(true, "debug")

t := &assets.Table{} // create a new Table instance
//t := &assets.Table{} // create a new Table instance

// Define the Atlan tag details
qualifiedName := "default/snowflake/1725896074/ANALYTICS/WIDE_WORLD_IMPORTERS/FCT_STOCK_ITEM_HOLDINGS"
//atlanTagNames := []string{"Daily", "Hourly"} // List of tags to add

err := assets.RemoveAtlanTag[*assets.Table](qualifiedName, "Confidential")
/*
// Set the propagation options
propagate := true
removePropagationOnDelete := true
restrictLineagePropagation := false
restrictPropagationThroughHierarchy := false


// Call the AddAtlanTags function
err := assets.UpdateAtlanTags[*assets.Table](
qualifiedName, // The qualified name of the asset
atlanTagNames, // The list of Atlan tags to add
propagate, // Whether to propagate the tags or not
removePropagationOnDelete, // Remove propagation on delete
restrictLineagePropagation, // Restrict lineage propagation
restrictPropagationThroughHierarchy, // Restrict propagation through hierarchy
)
*/
if err != nil {
fmt.Printf("Failed to add Atlan tags: %v\n", err)
} else {
fmt.Println("Atlan tags added successfully.")
}

//schemaName := "WIDEWORLDIMPORTERS_PURCHASING"
//dataBaseName := "RAW"
//dataBaseQualifiedName := "default/snowflake/1723642516/RAW"
//connectionQualifiedName := "default/snowflake/1723642516"

t.Creator("TestTable6", "default/snowflake/1723642516/RAW/WIDEWORLDIMPORTERS_PURCHASING")
response, err := assets.Save(t) // save the table
if err != nil {
fmt.Println("Error:", err)
} else {
for _, entity := range response.MutatedEntities.CREATE {
//fmt.Println("Response:", entity)
fmt.Printf("Entity ID: %s, Display Text: %s\n", entity.Guid, entity.DisplayText)
/*
t.Creator("TestTable6", "default/snowflake/1723642516/RAW/WIDEWORLDIMPORTERS_PURCHASING")
response, err := assets.Save(t) // save the table
if err != nil {
fmt.Println("Error:", err)
} else {
for _, entity := range response.MutatedEntities.CREATE {
//fmt.Println("Response:", entity)
fmt.Printf("Entity ID: %s, Display Text: %s\n", entity.Guid, entity.DisplayText)
}
}
}

t1 := &assets.Table{} // create a new Table instance
t1 := &assets.Table{} // create a new Table instance

t1.Updater("TestTable7", "default/snowflake/1723642516/RAW/WIDEWORLDIMPORTERS_PURCHASING/TestTable4")
DisplayName := "TestTableModified"
t1.Name = &DisplayName
response2, err := assets.Save(t1)
if err != nil {
} else {
for _, entity := range response2.MutatedEntities.UPDATE {
println("Response:", entity)
println("Entity ID:", entity.Guid, "Display Text:", entity.DisplayText)
t1.Updater("TestTable7", "default/snowflake/1723642516/RAW/WIDEWORLDIMPORTERS_PURCHASING/TestTable4")
DisplayName := "TestTableModified"
t1.Name = &DisplayName
response2, err := assets.Save(t1)
if err != nil {
} else {
for _, entity := range response2.MutatedEntities.UPDATE {
println("Response:", entity)
println("Entity ID:", entity.Guid, "Display Text:", entity.DisplayText)
}
}
}

*/
/*
qualifiedname := "default/snowflake/1715371897/RAW/WIDEWORLDIMPORTERS_SALESFORCE/FIVETRAN_API_CALL"

Expand Down
Loading