Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@
import com.linkedin.venice.exceptions.VeniceResourceAccessException;
import com.linkedin.venice.exceptions.VeniceTimeoutException;
import com.linkedin.venice.hadoop.exceptions.VeniceInvalidInputException;
import com.linkedin.venice.hadoop.exceptions.VeniceSchemaFieldNotFoundException;
import com.linkedin.venice.hadoop.exceptions.VeniceSchemaMismatchException;
import com.linkedin.venice.hadoop.input.kafka.KafkaInputDictTrainer;
import com.linkedin.venice.hadoop.mapreduce.datawriter.jobs.DataWriterMRJob;
import com.linkedin.venice.hadoop.mapreduce.engine.DefaultJobClientWrapper;
Expand Down Expand Up @@ -888,6 +890,8 @@ public void run() {
* data path contains no data as well in the avro flow.
*/
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.INVALID_INPUT_FILE);
} else if (e instanceof VeniceSchemaFieldNotFoundException || e instanceof VeniceSchemaMismatchException) {
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.INPUT_DATA_SCHEMA_VALIDATION_FAILED);
}
pushJobDetails.overallStatus.add(getPushJobDetailsStatusTuple(PushJobDetailsStatus.ERROR.getValue()));
pushJobDetails.failureDetails = e.toString();
Expand Down Expand Up @@ -1934,7 +1938,7 @@ void validateKeySchema(PushJobSetting setting) {
+ "\n\t\tSchema defined in Venice: \t%s";
String errorMessage =
String.format(errorMessageFormat, setting.storeName, pushJobSetting.keySchemaString, serverSchema.toString());
throw new VeniceException(errorMessage);
throw new VeniceSchemaMismatchException(errorMessage);
}
}

Expand Down Expand Up @@ -2034,7 +2038,7 @@ void validateAndRetrieveValueSchemas(
throw new VeniceException("Superset schema not found for store: " + setting.storeName);
}
if (!validateSubsetValueSchema(pushJobSetting.valueSchema, supersetSchema.getSchemaStr())) {
throw new VeniceException(
throw new VeniceSchemaMismatchException(
"Input value schema is not subset of superset schema. Input value schema: " + pushJobSetting.valueSchema
+ " , superset schema: " + supersetSchema.getSchemaStr());
}
Expand Down Expand Up @@ -2378,10 +2382,7 @@ void createNewStoreVersion(
pushJobSetting.repushSourceVersion,
setting.pushToSeparateRealtimeTopicEnabled));
if (versionCreationResponse.isError()) {
if (ErrorType.CONCURRENT_BATCH_PUSH.equals(versionCreationResponse.getErrorType())) {
LOGGER.error("Unable to run this job since another batch push is running. See the error message for details.");
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.CONCURRENT_BATCH_PUSH);
}
handleVersionCreationError(versionCreationResponse);
throw new VeniceException(
"Failed to create new store version with urls: " + setting.veniceControllerUrl + ", error: "
+ versionCreationResponse.getError());
Expand Down Expand Up @@ -2447,6 +2448,24 @@ void createNewStoreVersion(
}
}

/**
* We handle errors during creation flow prior to propagating the exception to make sure the errors are
* categorized. The checkpoints as part of categorization is used to differentiate between user errors and platform
* errors.
*/
@VisibleForTesting
void handleVersionCreationError(VersionCreationResponse versionCreationResponse) {
if (ErrorType.CONCURRENT_BATCH_PUSH.equals(versionCreationResponse.getErrorType())) {
LOGGER.error("Unable to run this job since another batch push is running. See the error message for details.");
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.CONCURRENT_BATCH_PUSH);
} else if (ErrorType.ACL_ERROR.equals(versionCreationResponse.getErrorType())) {
// Reusing WRITE_ACL_FAILED checkpoint for all types of ACL errors. Ideally rename this to READ_WRITE_ACL_FAILED
// or more generic one
LOGGER.error("Push job failed due to : {}", versionCreationResponse.getError());
updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.WRITE_ACL_FAILED);
}
}

synchronized VeniceWriter<KafkaKey, byte[], byte[]> getVeniceWriter(PushJobSetting pushJobSetting) {
if (veniceWriter == null) {
VeniceWriterFactory veniceWriterFactory = new VeniceWriterFactory(getVeniceWriterProperties(pushJobSetting));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.linkedin.venice.hadoop.exceptions;

import com.linkedin.venice.exceptions.VeniceException;


/**
* Customized exception for schema mismatch error during Venice push job. The exception captures the following
* scenarios where key or value schema of the input data does not match with the expected schema on the venice
* server.
*/
public class VeniceSchemaMismatchException extends VeniceException {
public VeniceSchemaMismatchException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,11 @@
import com.linkedin.venice.controllerapi.StoreResponse;
import com.linkedin.venice.controllerapi.VersionCreationResponse;
import com.linkedin.venice.etl.ETLValueSchemaTransformation;
import com.linkedin.venice.exceptions.ConcurrentBatchPushException;
import com.linkedin.venice.exceptions.UndefinedPropertyException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceStoreAclException;
import com.linkedin.venice.hadoop.exceptions.VeniceSchemaMismatchException;
import com.linkedin.venice.hadoop.exceptions.VeniceValidationException;
import com.linkedin.venice.hadoop.mapreduce.datawriter.jobs.DataWriterMRJob;
import com.linkedin.venice.hadoop.task.datawriter.DataWriterTaskTracker;
Expand Down Expand Up @@ -194,6 +197,26 @@ public void testCheckLastModifiedTimestamp() throws Exception {
}
}

@Test
public void testHandleVersionCreationACLError() {
VenicePushJob mockJob = getSpyVenicePushJob(new Properties(), null);
Throwable error = new VeniceStoreAclException("ACL error");
VersionCreationResponse response = new VersionCreationResponse();
response.setError(error);
mockJob.handleVersionCreationError(response);
verify(mockJob).updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.WRITE_ACL_FAILED);
}

@Test
public void testHandleVersionCreationConcurrentPushError() {
VenicePushJob mockJob = getSpyVenicePushJob(new Properties(), null);
Throwable error = new ConcurrentBatchPushException("Another push is in progress");
VersionCreationResponse response = new VersionCreationResponse();
response.setError(error);
mockJob.handleVersionCreationError(response);
verify(mockJob).updatePushJobDetailsWithCheckpoint(PushJobCheckpoints.CONCURRENT_BATCH_PUSH);
}

@Test
public void testVPJCheckInputUpdateSchema() {
VenicePushJob vpj = mock(VenicePushJob.class);
Expand All @@ -202,6 +225,17 @@ public void testVPJCheckInputUpdateSchema() {
Assert.assertFalse(vpj.isUpdateSchema(NAME_RECORD_V1_SCHEMA.toString()));
}

@Test(expectedExceptions = VeniceSchemaMismatchException.class)
public void testValidateKeySchemaMismatch() {
String keySchema = "\"string\"";
String serverKeySchema = "\"int\"";
VenicePushJob vpj = getSpyVenicePushJob(new Properties(), null);
PushJobSetting setting = vpj.getPushJobSetting();
setting.storeKeySchema = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(serverKeySchema);
setting.keySchema = AvroSchemaParseUtils.parseSchemaFromJSONLooseValidation(keySchema);
vpj.validateKeySchema(vpj.getPushJobSetting());
}

@Test
public void testValidateAndRetrieveRmdSchemaWithNoRmdField() {
PushJobSetting setting = new PushJobSetting();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public enum ErrorType {
@JsonEnumDefaultValue
GENERAL_ERROR(ExceptionType.GENERAL_ERROR), BAD_REQUEST(ExceptionType.BAD_REQUEST),
CONCURRENT_BATCH_PUSH(ExceptionType.BAD_REQUEST), RESOURCE_STILL_EXISTS(ExceptionType.BAD_REQUEST),
PROTOCOL_ERROR(ExceptionType.BAD_REQUEST);
PROTOCOL_ERROR(ExceptionType.BAD_REQUEST), ACL_ERROR(ExceptionType.BAD_REQUEST),;

private final ExceptionType exceptionType;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.linkedin.venice.exceptions;

/**
* Customized exception for ACL related errors in Venice store operations. A dedicated exception helps to propagate
* the ACL error information to the VPJ layer where it categorizes the error as user related.
*/
public class VeniceStoreAclException extends VeniceException {
public VeniceStoreAclException(String message) {
super(message);
super.errorType = ErrorType.ACL_ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.exceptions.VeniceHttpException;
import com.linkedin.venice.exceptions.VeniceNoStoreException;
import com.linkedin.venice.exceptions.VeniceStoreAclException;
import com.linkedin.venice.exceptions.VeniceUnsupportedOperationException;
import com.linkedin.venice.meta.PartitionerConfig;
import com.linkedin.venice.meta.Store;
Expand Down Expand Up @@ -447,11 +448,11 @@ public Route requestTopicForPushing(Admin admin) {
// Also allow allowList users to run this command
if (!isAllowListUser(request)) {
if (!hasWriteAccessToTopic(request)) {
return buildAclErrorResponse(request, response, true, false);
buildStoreAclErrorAndThrowException(request, response, true, false);
}

if (this.checkReadMethodForKafka && !hasReadAccessToTopic(request)) {
return buildAclErrorResponse(request, response, false, true);
buildStoreAclErrorAndThrowException(request, response, false, true);
}
}

Expand Down Expand Up @@ -482,24 +483,23 @@ public Route requestTopicForPushing(Admin admin) {
* When partners have ACL issues for their push, we should provide an accurate and informative messages that
* help partners to unblock by themselves.
*/
private String buildAclErrorResponse(
private void buildStoreAclErrorAndThrowException(
Request request,
Response response,
boolean missingWriteAccess,
boolean missingReadAccess) throws JsonProcessingException {
response.status(HttpStatus.SC_FORBIDDEN);
VersionCreationResponse responseObject = new VersionCreationResponse();
String userId = getPrincipalId(request);
String errorMessage = "Missing [%s] ACLs for user \"" + userId + "\". Please setup ACLs for your store.";
if (missingWriteAccess) {
errorMessage = String.format(errorMessage, "write");
}
if (missingReadAccess) {
} else if (missingReadAccess) {
errorMessage = String.format(errorMessage, "read");
} else {
errorMessage = String.format(errorMessage, "read and write");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont see any calls with both missingWriteAccess and missingReadAccess being false.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically yes; This is meant to be more of a guard to not have an empty ACL error message.
e.g., I still haven't gotten to the bottom of this error message I noticed

Exception type: class com.linkedin.venice.exceptions.VeniceHttpException. Detailed message: Http Status 403 - Missing [{}] ACLs for user "<redacted>". Please setup ACLs for your store.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

who could be calling that?!! anyways, since it is either read or write Acl issues, why not make it a single boolean, true means missing write, false mean missing read?

}
responseObject.setError(errorMessage);
responseObject.setErrorType(ErrorType.BAD_REQUEST);
return AdminSparkServer.OBJECT_MAPPER.writeValueAsString(responseObject);

throw new VeniceStoreAclException(errorMessage);
}

/**
Expand Down
Loading