Skip to content

Commit b981437

Browse files
authored
Merge pull request #3692 from atlanhq/DG-1895
DG-1895 Added a fix, incase corrupt or null classification vertices are found, we will skip them and detach them.
2 parents f699ca6 + c2d6b29 commit b981437

File tree

4 files changed

+34
-12
lines changed

4 files changed

+34
-12
lines changed

repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java

+19-4
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.atlas.model.instance.AtlasObjectId;
3535
import org.apache.atlas.model.instance.AtlasRelationship;
3636
import org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2;
37+
import org.apache.atlas.repository.store.graph.v2.TransactionInterceptHelper;
3738
import org.apache.atlas.type.AtlasArrayType;
3839
import org.apache.atlas.type.AtlasMapType;
3940
import org.apache.atlas.utils.AtlasPerfMetrics;
@@ -359,18 +360,32 @@ public static boolean getRestrictPropagationThroughHierarchy(AtlasVertex classif
359360
return getRestrictPropagation(classificationVertex,CLASSIFICATION_VERTEX_RESTRICT_PROPAGATE_THROUGH_HIERARCHY);
360361
}
361362

362-
public static AtlasVertex getClassificationVertex(AtlasVertex entityVertex, String classificationName) {
363+
public void repairTagVertex(AtlasEdge edge, AtlasVertex classificationVertex) {
364+
LOG.info("Repairing corrupt tag-vertex");
365+
removeEdge(edge);
366+
removeVertex(classificationVertex);
367+
}
368+
369+
public static AtlasVertex getClassificationVertex(GraphHelper graphHelper, AtlasVertex entityVertex, String classificationName) {
363370
AtlasVertex ret = null;
364371
Iterable edges = entityVertex.query().direction(AtlasEdgeDirection.OUT).label(CLASSIFICATION_LABEL)
365372
.has(CLASSIFICATION_EDGE_IS_PROPAGATED_PROPERTY_KEY, false)
366373
.has(CLASSIFICATION_EDGE_NAME_PROPERTY_KEY, classificationName).edges();
374+
367375
if (edges != null) {
368376
Iterator<AtlasEdge> iterator = edges.iterator();
369377

370-
if (iterator.hasNext()) {
378+
while (iterator.hasNext()) {
371379
AtlasEdge edge = iterator.next();
372-
373-
ret = (edge != null) ? edge.getInVertex() : null;
380+
if(Objects.nonNull(edge))
381+
{
382+
AtlasVertex classificationVertex = edge.getInVertex();
383+
if(Objects.nonNull(classificationVertex) && StringUtils.isNotEmpty(classificationVertex.getProperty(TYPE_NAME_PROPERTY_KEY, String.class))) {
384+
return edge.getInVertex();
385+
} else if(graphHelper != null) {
386+
graphHelper.repairTagVertex(edge, edge.getInVertex());
387+
}
388+
}
374389
}
375390
}
376391

repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -3565,8 +3565,12 @@ public void deleteClassification(String entityGuid, String classificationName) t
35653565

35663566
validateClassificationExists(traitNames, classificationName);
35673567

3568-
AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName);
3568+
AtlasVertex classificationVertex = getClassificationVertex(graphHelper, entityVertex, classificationName);
35693569

3570+
if (Objects.isNull(classificationVertex)) {
3571+
LOG.error(AtlasErrorCode.CLASSIFICATION_NOT_FOUND.getFormattedErrorMessage(classificationName));
3572+
return;
3573+
}
35703574
// Get in progress task to see if there already is a propagation for this particular vertex
35713575
List<AtlasTask> inProgressTasks = taskManagement.getInProgressTasks();
35723576
for (AtlasTask task : inProgressTasks) {
@@ -3578,7 +3582,8 @@ public void deleteClassification(String entityGuid, String classificationName) t
35783582
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
35793583

35803584
if (classification == null) {
3581-
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_FOUND, classificationName);
3585+
LOG.error(AtlasErrorCode.CLASSIFICATION_NOT_FOUND.getFormattedErrorMessage(classificationName));
3586+
return;
35823587
}
35833588

35843589
// remove classification from propagated entities if propagation is turned on
@@ -3778,10 +3783,11 @@ public void updateClassifications(EntityMutationContext context, String guid, Li
37783783
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_UPDATE_FROM_PROPAGATED_ENTITY, classificationName);
37793784
}
37803785

3781-
AtlasVertex classificationVertex = getClassificationVertex(entityVertex, classificationName);
3786+
AtlasVertex classificationVertex = getClassificationVertex(graphHelper, entityVertex, classificationName);
37823787

37833788
if (classificationVertex == null) {
3784-
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, classificationName);
3789+
LOG.error(AtlasErrorCode.CLASSIFICATION_NOT_FOUND.getFormattedErrorMessage(classificationName));
3790+
continue;
37853791
}
37863792

37873793
if (LOG.isDebugEnabled()) {

repository/src/main/java/org/apache/atlas/tasks/AtlasTaskService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.*;
2727

2828
import static org.apache.atlas.repository.Constants.TASK_GUID;
29+
import static org.apache.atlas.repository.graph.GraphHelper.getClassificationVertex;
2930
import static org.apache.atlas.repository.store.graph.v2.AtlasGraphUtilsV2.setEncodedProperty;
3031
import static org.apache.atlas.repository.store.graph.v2.tasks.ClassificationTask.PARAM_CLASSIFICATION_VERTEX_ID;
3132
import static org.apache.atlas.tasks.TaskRegistry.toAtlasTask;
@@ -184,7 +185,7 @@ private String resolveAndReturnClassificationId(String classificationName, Strin
184185
throw new AtlasBaseException(AtlasErrorCode.INSTANCE_GUID_NOT_FOUND, entityGuid);
185186
}
186187

187-
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, classificationName);
188+
AtlasVertex classificationVertex = getClassificationVertex(null, entityVertex, classificationName);
188189

189190
if (classificationVertex != null) {
190191
ret = classificationVertex.getIdForDisplay();

repository/src/test/java/org/apache/atlas/repository/tagpropagation/ClassificationPropagationWithTasksTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ public void add() throws AtlasBaseException {
160160
entityStore.addClassification(Collections.singletonList(HDFS_PATH_EMPLOYEES), tagY);
161161

162162
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
163-
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME_X);
163+
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(null, entityVertex, TAG_NAME_X);
164164

165165
assertNotNull(entityVertex);
166166
assertNotNull(classificationVertex);
@@ -183,7 +183,7 @@ public void update() throws AtlasBaseException {
183183
entityStore.updateClassifications(hdfs_employees.getGuid(), Collections.singletonList(tagY));
184184

185185
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
186-
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME_Y);
186+
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(null, entityVertex, TAG_NAME_Y);
187187

188188
assertNotNull(RequestContext.get().getQueuedTasks());
189189
assertTrue(RequestContext.get().getQueuedTasks().size() > 0, "No tasks were queued!");
@@ -204,7 +204,7 @@ public void delete() throws AtlasBaseException {
204204
tagX.setPropagate(false);
205205

206206
AtlasVertex entityVertex = AtlasGraphUtilsV2.findByGuid(hdfs_employees.getGuid());
207-
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(entityVertex, TAG_NAME);
207+
AtlasVertex classificationVertex = GraphHelper.getClassificationVertex(null, entityVertex, TAG_NAME);
208208

209209
try {
210210
entityStore.deleteClassification(HDFS_PATH_EMPLOYEES, tagX.getTypeName());

0 commit comments

Comments
 (0)