Skip to content

Commit

Permalink
Merge pull request #57 from atlanhq/DVX-592-Add-Atlan-Tags-Methods
Browse files Browse the repository at this point in the history
DVX:592 - Feat: Added Add, Modify, Remove and Update methods on AtlanTags
  • Loading branch information
0xquark authored Sep 12, 2024
2 parents 547e6f9 + c617a57 commit 5fa6b7a
Show file tree
Hide file tree
Showing 5 changed files with 261 additions and 29 deletions.
136 changes: 136 additions & 0 deletions atlan/assets/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,142 @@ func GetByGuid[T AtlanObject](guid string) (T, error) {
return newAsset, nil
}

func ModifyTags(api API,
assetType reflect.Type,
qualifiedName string,
atlanTagNames []string,
propagate bool,
removePropagationOnDelete bool,
restrictLineagePropagation bool,
restrictPropagationThroughHierarchy bool) error {

var atlanTags []structs.AtlanTag

for _, name := range atlanTagNames {
TagName, _ := GetAtlanTagIDForName(name)
atlanTags = append(atlanTags, structs.AtlanTag{
TypeName: &TagName,
Propagate: &propagate,
RemovePropagationsOnEntityDelete: &removePropagationOnDelete,
RestrictPropagationThroughLineage: &restrictLineagePropagation,
RestrictPropagationThroughHierarchy: &restrictPropagationThroughHierarchy,
})
}

queryParams := map[string]string{
"attr:qualifiedName": qualifiedName,
}

API, _ := api.FormatPathWithParams(assetType.Name(), "classifications")

_, err := DefaultAtlanClient.CallAPI(
API,
queryParams,
atlanTags,
)
if err != nil {
return fmt.Errorf("failed to modify tags: %w", err)
}

return nil
}

func AddAtlanTags[T AtlanObject](
qualifiedName string,
atlanTagNames []string,
propagate bool,
removePropagationOnDelete bool,
restrictLineagePropagation bool,
restrictPropagationThroughHierarchy bool,
) error {

var asset T
assetType := reflect.TypeOf(asset).Elem()

err := ModifyTags(
UPDATE_ENTITY_BY_ATTRIBUTE,
assetType,
qualifiedName,
atlanTagNames,
propagate,
removePropagationOnDelete,
restrictLineagePropagation,
restrictPropagationThroughHierarchy,
)
if err != nil {
return fmt.Errorf("failed to add Atlan tags: %w", err)
}
return nil
}

func UpdateAtlanTags[T AtlanObject](
qualifiedName string,
atlanTagNames []string,
propagate bool,
removePropagationOnDelete bool,
restrictLineagePropagation bool,
restrictPropagationThroughHierarchy bool,
) error {

var asset T
assetType := reflect.TypeOf(asset).Elem()

err := ModifyTags(
PARTIAL_UPDATE_ENTITY_BY_ATTRIBUTE,
assetType,
qualifiedName,
atlanTagNames,
propagate,
removePropagationOnDelete,
restrictLineagePropagation,
restrictPropagationThroughHierarchy,
)
if err != nil {
return fmt.Errorf("failed to modify Atlan tags: %w", err)
}
return nil
}

func RemoveAtlanTag[T AtlanObject](
qualifiedName string,
atlanTagName string,
) error {

var api API

api = DELETE_ENTITY_BY_ATTRIBUTE
var asset T
assetType := reflect.TypeOf(asset).Elem()

// Get the internal ID for the tag name
classificationID, err := GetAtlanTagIDForName(atlanTagName)
if err != nil {
return fmt.Errorf("failed to get Atlan tag ID for name %s: %w", atlanTagName, err)
}

// If classification ID is not found, return an error
if classificationID == "" {
return fmt.Errorf("Atlan tag not found: %s", atlanTagName)
}

// Set query params with the qualified name
queryParams := map[string]string{
"attr:qualifiedName": qualifiedName,
}

// Construct the API path for deleting the tag
API, _ := api.FormatPathWithParams(assetType.Name(), "classification", classificationID)

// Call the Atlan API to remove the tag
_, err = DefaultAtlanClient.CallAPI(API, queryParams, nil)
if err != nil {
return fmt.Errorf("failed to remove Atlan tag: %w", err)
}

return nil
}

// GetByQualifiedName retrieves an asset by guid
func GetByQualifiedName[T AtlanObject](qualifiedName string) (T, error) {

var asset T
Expand Down
6 changes: 5 additions & 1 deletion atlan/assets/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,11 @@ func (ac *AtlanClient) CallAPI(api *API, queryParams interface{}, requestObj int
switch v := queryParams.(type) {
case map[string]string:
for key, value := range v {
query.Add(key, value)
if key == "attr:qualifiedName" {
path += fmt.Sprintf("?%s=%s", key, url.QueryEscape(value))
} else {
query.Add(key, value)
}
}
case map[string][]string:
for key, values := range v {
Expand Down
60 changes: 60 additions & 0 deletions atlan/assets/constants.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package assets

import (
"fmt"
"net/http"
"net/url"
"path"
)

const (
Expand All @@ -27,6 +30,8 @@ type API struct {
Method string
Status int
Endpoint Endpoint
Consumes string
Produces string
}

type Endpoint struct {
Expand Down Expand Up @@ -147,6 +152,27 @@ var (
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

UPDATE_ENTITY_BY_ATTRIBUTE = API{
Path: ENTITY_API + "uniqueAttribute/type/",
Method: http.MethodPost,
Status: http.StatusNoContent,
Endpoint: AtlasEndpoint,
}

PARTIAL_UPDATE_ENTITY_BY_ATTRIBUTE = API{
Path: ENTITY_API + "uniqueAttribute/type/",
Method: http.MethodPut,
Status: http.StatusOK,
Endpoint: AtlasEndpoint,
}

DELETE_ENTITY_BY_ATTRIBUTE = API{
Path: ENTITY_API + "uniqueAttribute/type/",
Method: http.MethodDelete,
Status: http.StatusNoContent,
Endpoint: AtlasEndpoint,
}
)

// Constants for the Atlas search DSL
Expand Down Expand Up @@ -188,3 +214,37 @@ const (
UPDATE_TIME_AS_DATE = "__modificationTimestamp.date"
USER_DESCRIPTION = "userDescription"
)

// FormatPathWithParams returns a new API object with the path formatted by joining the provided parameters.
func (api *API) FormatPathWithParams(params ...string) (*API, error) {
// Join the base path with the additional params
requestPath, err := MultipartURLJoin(api.Path, params...)
if err != nil {
return nil, fmt.Errorf("failed to join URL parts: %w", err)
}

// Return a new API object with the formatted path
return &API{
Path: requestPath,
Method: api.Method,
Status: api.Status,
Endpoint: api.Endpoint,
Consumes: api.Consumes,
Produces: api.Produces,
}, nil
}

// MultipartURLJoin joins the base path with the provided segments.
func MultipartURLJoin(basePath string, params ...string) (string, error) {
// Parse the base path as a URL
u, err := url.Parse(basePath)
if err != nil {
return "", fmt.Errorf("invalid base path: %w", err)
}

// Join additional path segments
u.Path = path.Join(u.Path, path.Join(params...))

// Return the final formatted URL
return u.String(), nil
}
14 changes: 7 additions & 7 deletions atlan/model/structs/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,13 +439,13 @@ type Attributes struct {

// AtlanTag represents a tag in Atlan.
type AtlanTag struct {
TypeName string `json:"typeName"`
EntityGuid string `json:"entityGuid"`
EntityStatus string `json:"entityStatus"`
Propagate bool `json:"propagate"`
RemovePropagationsOnEntityDelete bool `json:"removePropagationsOnEntityDelete"`
RestrictPropagationThroughLineage bool `json:"restrictPropagationThroughLineage"`
RestrictPropagationThroughHierarchy bool `json:"restrictPropagationThroughHierarchy"`
TypeName *string `json:"typeName"`
EntityGuid *string `json:"entityGuid,omitempty"`
EntityStatus *string `json:"entityStatus,omitempty"`
Propagate *bool `json:"propagate,omitempty"`
RemovePropagationsOnEntityDelete *bool `json:"removePropagationsOnEntityDelete,omitempty"`
RestrictPropagationThroughLineage *bool `json:"restrictPropagationThroughLineage,omitempty"`
RestrictPropagationThroughHierarchy *bool `json:"restrictPropagationThroughHierarchy,omitempty"`
}

type Link struct {
Expand Down
74 changes: 53 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,69 @@ func main() {

ctx.SetLogger(true, "debug")

t := &assets.Table{} // create a new Table instance
//t := &assets.Table{} // create a new Table instance

// Define the Atlan tag details
qualifiedName := "default/snowflake/1725896074/ANALYTICS/WIDE_WORLD_IMPORTERS/FCT_STOCK_ITEM_HOLDINGS"
//atlanTagNames := []string{"Daily", "Hourly"} // List of tags to add

err := assets.RemoveAtlanTag[*assets.Table](qualifiedName, "Confidential")
/*
// Set the propagation options
propagate := true
removePropagationOnDelete := true
restrictLineagePropagation := false
restrictPropagationThroughHierarchy := false
// Call the AddAtlanTags function
err := assets.UpdateAtlanTags[*assets.Table](
qualifiedName, // The qualified name of the asset
atlanTagNames, // The list of Atlan tags to add
propagate, // Whether to propagate the tags or not
removePropagationOnDelete, // Remove propagation on delete
restrictLineagePropagation, // Restrict lineage propagation
restrictPropagationThroughHierarchy, // Restrict propagation through hierarchy
)
*/
if err != nil {
fmt.Printf("Failed to add Atlan tags: %v\n", err)
} else {
fmt.Println("Atlan tags added successfully.")
}

//schemaName := "WIDEWORLDIMPORTERS_PURCHASING"
//dataBaseName := "RAW"
//dataBaseQualifiedName := "default/snowflake/1723642516/RAW"
//connectionQualifiedName := "default/snowflake/1723642516"

t.Creator("TestTable6", "default/snowflake/1723642516/RAW/WIDEWORLDIMPORTERS_PURCHASING")
response, err := assets.Save(t) // save the table
if err != nil {
fmt.Println("Error:", err)
} else {
for _, entity := range response.MutatedEntities.CREATE {
//fmt.Println("Response:", entity)
fmt.Printf("Entity ID: %s, Display Text: %s\n", entity.Guid, entity.DisplayText)
/*
t.Creator("TestTable6", "default/snowflake/1723642516/RAW/WIDEWORLDIMPORTERS_PURCHASING")
response, err := assets.Save(t) // save the table
if err != nil {
fmt.Println("Error:", err)
} else {
for _, entity := range response.MutatedEntities.CREATE {
//fmt.Println("Response:", entity)
fmt.Printf("Entity ID: %s, Display Text: %s\n", entity.Guid, entity.DisplayText)
}
}
}
t1 := &assets.Table{} // create a new Table instance
t1 := &assets.Table{} // create a new Table instance
t1.Updater("TestTable7", "default/snowflake/1723642516/RAW/WIDEWORLDIMPORTERS_PURCHASING/TestTable4")
DisplayName := "TestTableModified"
t1.Name = &DisplayName
response2, err := assets.Save(t1)
if err != nil {
} else {
for _, entity := range response2.MutatedEntities.UPDATE {
println("Response:", entity)
println("Entity ID:", entity.Guid, "Display Text:", entity.DisplayText)
t1.Updater("TestTable7", "default/snowflake/1723642516/RAW/WIDEWORLDIMPORTERS_PURCHASING/TestTable4")
DisplayName := "TestTableModified"
t1.Name = &DisplayName
response2, err := assets.Save(t1)
if err != nil {
} else {
for _, entity := range response2.MutatedEntities.UPDATE {
println("Response:", entity)
println("Entity ID:", entity.Guid, "Display Text:", entity.DisplayText)
}
}
}
*/
/*
qualifiedname := "default/snowflake/1715371897/RAW/WIDEWORLDIMPORTERS_SALESFORCE/FIVETRAN_API_CALL"
Expand Down

0 comments on commit 5fa6b7a

Please sign in to comment.