Skip to content

Commit efe4cfc

Browse files
committed
nit: change the nomenclature to distributed tasks
1 parent bd43177 commit efe4cfc

File tree

5 files changed

+18
-25
lines changed

5 files changed

+18
-25
lines changed

intg/src/main/java/org/apache/atlas/AtlasConfiguration.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public enum AtlasConfiguration {
4040
NOTIFICATION_HOOK_TOPIC_NAME("atlas.notification.hook.topic.name", "ATLAS_HOOK"),
4141
NOTIFICATION_ENTITIES_TOPIC_NAME("atlas.notification.entities.topic.name", "ATLAS_ENTITIES"),
4242
NOTIFICATION_RELATIONSHIPS_TOPIC_NAME("atlas.notification.relationships.topic.name", "ATLAS_RELATIONSHIPS"),
43-
NOTIFICATION_ATLAS_DISTRIBUTED_TASKS_TOPIC_NAME("atlas.notification.atlas.tasks.topic.name", "ATLAS_TASKS"),
43+
NOTIFICATION_ATLAS_DISTRIBUTED_TASKS_TOPIC_NAME("atlas.notification.distributed.tasks.topic.name", "ATLAS_DISTRIBUTED_TASKS"),
4444

4545
NOTIFICATION_HOOK_CONSUMER_TOPIC_NAMES("atlas.notification.hook.consumer.topic.names", "ATLAS_HOOK"), // a comma separated list of topic names
4646
NOTIFICATION_ENTITIES_CONSUMER_TOPIC_NAMES("atlas.notification.entities.consumer.topic.names", "ATLAS_ENTITIES"), // a comma separated list of topic names

notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
7575
put(NotificationType.HOOK, ATLAS_HOOK_TOPIC);
7676
put(NotificationType.ENTITIES, ATLAS_ENTITIES_TOPIC);
7777
put(NotificationType.RELATIONSHIPS, ATLAS_RELATIONSHIPS_TOPIC);
78-
put(NotificationType.ATLAS_TASKS, ATLAS_DISTRIBUTED_TASKS_TOPIC);
78+
put(NotificationType.ATLAS_DISTRIBUTED_TASKS, ATLAS_DISTRIBUTED_TASKS_TOPIC);
7979

8080
}
8181
};

notification/src/main/java/org/apache/atlas/notification/NotificationInterface.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ enum NotificationType {
5252

5353
RELATIONSHIPS(new EntityMessageDeserializer()),
5454

55-
ATLAS_TASKS(new AtlasDistributedTaskMessageDeserializer());
55+
ATLAS_DISTRIBUTED_TASKS(new AtlasDistributedTaskMessageDeserializer());
5656

5757
private final AtlasNotificationMessageDeserializer deserializer;
5858

notification/src/main/java/org/apache/atlas/notification/task/AtlasTaskNotificationSender.java notification/src/main/java/org/apache/atlas/notification/task/AtlasDistributedTaskNotificationSender.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
1-
package org.apache.atlas.notification;
1+
package org.apache.atlas.notification.task;
22

33
import org.apache.atlas.model.notification.AtlasDistributedTaskNotification;
4+
import org.apache.atlas.notification.NotificationException;
5+
import org.apache.atlas.notification.NotificationInterface;
46
import org.slf4j.Logger;
57
import org.slf4j.LoggerFactory;
68
import org.springframework.beans.factory.annotation.Autowired;
@@ -11,15 +13,15 @@
1113
import java.util.Map;
1214

1315
@Component
14-
public class AtlasTaskNotificationSender {
15-
private static final Logger LOG = LoggerFactory.getLogger(AtlasTaskNotificationSender.class);
16+
public class AtlasDistributedTaskNotificationSender {
17+
private static final Logger LOG = LoggerFactory.getLogger(AtlasDistributedTaskNotificationSender.class);
1618

1719
private final NotificationInterface notificationInterface;
1820

1921
private final static Long batchSize = 40000L;
2022

2123
@Autowired
22-
public AtlasTaskNotificationSender(NotificationInterface notificationInterface) {
24+
public AtlasDistributedTaskNotificationSender(NotificationInterface notificationInterface) {
2325
this.notificationInterface = notificationInterface;
2426
}
2527

@@ -35,15 +37,15 @@ public AtlasDistributedTaskNotification createRelationshipCleanUpTask(String ver
3537

3638
public void send(AtlasDistributedTaskNotification notification) {
3739
try {
38-
notificationInterface.send(NotificationInterface.NotificationType.ATLAS_TASKS, notification);
40+
notificationInterface.send(NotificationInterface.NotificationType.ATLAS_DISTRIBUTED_TASKS, notification);
3941
} catch (NotificationException e) {
4042
LOG.error("Failed to send notification for task: {}", notification, e);
4143
}
4244
}
4345

4446
public void send(List<AtlasDistributedTaskNotification> notifications) {
4547
try {
46-
notificationInterface.send(NotificationInterface.NotificationType.ATLAS_TASKS, notifications);
48+
notificationInterface.send(NotificationInterface.NotificationType.ATLAS_DISTRIBUTED_TASKS, notifications);
4749
} catch (NotificationException e) {
4850
LOG.error("Failed to send notifications for tasks: {}", notifications, e);
4951
}

repository/src/main/java/org/apache/atlas/repository/store/graph/v2/AtlasEntityStoreV2.java

+7-16
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
import org.apache.atlas.model.notification.AtlasDistributedTaskNotification;
3838
import org.apache.atlas.model.tasks.AtlasTask;
3939
import org.apache.atlas.model.typedef.AtlasBaseTypeDef;
40-
import org.apache.atlas.notification.AtlasTaskNotificationSender;
40+
import org.apache.atlas.notification.task.AtlasDistributedTaskNotificationSender;
4141
import org.apache.atlas.repository.Constants;
4242
import org.apache.atlas.repository.RepositoryException;
4343
import org.apache.atlas.repository.graph.GraphHelper;
@@ -143,13 +143,13 @@ public class AtlasEntityStoreV2 implements AtlasEntityStore {
143143

144144
private final ESAliasStore esAliasStore;
145145
private final IAtlasMinimalChangeNotifier atlasAlternateChangeNotifier;
146-
private final AtlasTaskNotificationSender taskNotificationSender;
146+
private final AtlasDistributedTaskNotificationSender taskNotificationSender;
147147

148148
@Inject
149149
public AtlasEntityStoreV2(AtlasGraph graph, DeleteHandlerDelegate deleteDelegate, RestoreHandlerV1 restoreHandlerV1, AtlasTypeRegistry typeRegistry,
150150
IAtlasEntityChangeNotifier entityChangeNotifier, EntityGraphMapper entityGraphMapper, TaskManagement taskManagement,
151151
AtlasRelationshipStore atlasRelationshipStore, FeatureFlagStore featureFlagStore,
152-
IAtlasMinimalChangeNotifier atlasAlternateChangeNotifier, AtlasTaskNotificationSender taskNotificationSender) {
152+
IAtlasMinimalChangeNotifier atlasAlternateChangeNotifier, AtlasDistributedTaskNotificationSender taskNotificationSender) {
153153

154154
this.graph = graph;
155155
this.deleteDelegate = deleteDelegate;
@@ -591,10 +591,6 @@ public EntityMutationResponse deleteById(final String guid) throws AtlasBaseExce
591591

592592
AtlasAuthorizationUtils.verifyDeleteEntityAccess(typeRegistry, entityHeader, "delete entity: guid=" + guid);
593593

594-
if (AtlasConfiguration.NOTIFICATION_ATLAS_DISTRIBUTED_TASKS_TOPIC_NAME.getBoolean()) {
595-
checkAndCreateProcessRelationshipsCleanupTaskNotification(typeRegistry.getEntityTypeByName(entityHeader.getTypeName()), vertex);
596-
}
597-
598594
deletionCandidates.add(vertex);
599595
} else {
600596
if (LOG.isDebugEnabled()) {
@@ -641,10 +637,6 @@ public EntityMutationResponse deleteByIds(final List<String> guids) throws Atlas
641637

642638
AtlasAuthorizationUtils.verifyDeleteEntityAccess(typeRegistry, entityHeader, "delete entity: guid=" + guid);
643639

644-
if (AtlasConfiguration.NOTIFICATION_ATLAS_DISTRIBUTED_TASKS_TOPIC_NAME.getBoolean()) {
645-
checkAndCreateProcessRelationshipsCleanupTaskNotification(typeRegistry.getEntityTypeByName(entityHeader.getTypeName()), vertex);
646-
}
647-
648640
deletionCandidates.add(vertex);
649641
}
650642

@@ -755,10 +747,6 @@ public EntityMutationResponse deleteByUniqueAttributes(AtlasEntityType entityTyp
755747
AtlasAuthorizationUtils.verifyDeleteEntityAccess(typeRegistry, entityHeader,
756748
"delete entity: typeName=" + entityType.getTypeName() + ", uniqueAttributes=" + uniqAttributes);
757749

758-
if (AtlasConfiguration.NOTIFICATION_ATLAS_DISTRIBUTED_TASKS_TOPIC_NAME.getBoolean()) {
759-
checkAndCreateProcessRelationshipsCleanupTaskNotification(typeRegistry.getEntityTypeByName(entityHeader.getTypeName()), vertex);
760-
}
761-
762750
deletionCandidates.add(vertex);
763751
} else {
764752
if (LOG.isDebugEnabled()) {
@@ -1744,7 +1732,6 @@ private EntityMutationContext preCreateOrUpdate(EntityStream entityStream, Entit
17441732

17451733
if (entity != null) { // entity would be null if guid is not in the stream but referenced by an entity in the stream
17461734
AtlasEntityType entityType = typeRegistry.getEntityTypeByName(entity.getTypeName());
1747-
// If typeName is Process or ColumnProcess we send task notification
17481735

17491736
if (entityType == null) {
17501737
throw new AtlasBaseException(element.getValue(), AtlasErrorCode.TYPE_NAME_INVALID, TypeCategory.ENTITY.name(), entity.getTypeName());
@@ -2125,6 +2112,10 @@ private EntityMutationResponse deleteVertices(Collection<AtlasVertex> deletionCa
21252112

21262113
String typeName = getTypeName(vertex);
21272114

2115+
if(ATLAS_DISTRIBUTED_TASK_ENABLED.getBoolean()) {
2116+
checkAndCreateProcessRelationshipsCleanupTaskNotification(typeRegistry.getEntityTypeByName(typeName), vertex);
2117+
}
2118+
21282119
List<PreProcessor> preProcessors = getPreProcessor(typeName);
21292120
for(PreProcessor processor : preProcessors){
21302121
processor.processDelete(vertex);

0 commit comments

Comments
 (0)