Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class SunbirdKey {
public static final String COLLECTION = "collection";
public static final String EVENT_SET = "eventSet";
public static final String OBJECT_TYPE = "objectType";
public static final String ROOT_ORG_ID = "rootOrgId";

private SunbirdKey() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
import org.sunbird.common.exception.ProjectCommonException;
import org.sunbird.common.models.response.Response;
import org.sunbird.common.models.util.JsonKey;
import org.sunbird.common.models.util.LoggerUtil;
import org.sunbird.common.models.util.ProjectLogger;
import org.sunbird.common.models.util.TelemetryEnvKey;
import org.sunbird.common.request.Request;
import org.sunbird.common.request.RequestContext;
Expand Down Expand Up @@ -148,6 +146,19 @@ private Map<String, Object> mapESFieldsToObject(Map<String, Object> courseBatch)
cert_template.getKey(), mapToObject(cert_template.getValue())));
courseBatch.put(CourseJsonKey.CERTIFICATE_TEMPLATES_COLUMN, certificateTemplates);
}
Map<String, Map<String, Object>> batchAttributes =
(Map<String, Map<String, Object>>)
courseBatch.getOrDefault(CourseJsonKey.BATCH_ATTRIBUTES, null);
if(MapUtils.isNotEmpty(batchAttributes)){
batchAttributes
.entrySet()
.stream()
.forEach(
batchAttribute ->
batchAttributes.put(
batchAttribute.getKey(), mapToObject(batchAttribute.getValue())));
courseBatch.put(CourseJsonKey.BATCH_ATTRIBUTES_COLUMN, batchAttributes);
}
return courseBatch;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ public abstract class CourseJsonKey {
"sunbird_send_email_notifictaion_api";
public static final String NOTIFY_TEMPLATE="notifyTemplate";
public static final String CERT_TEMPLATES = "certTemplates";
public static final String ADDITIONAL_PROPS = "additionalProps";
public static final String ADDITIONAL_PROPS = "additionalProps";
public static final String BATCH_ATTRIBUTES = "batchAttributes";
public static final String BATCH_ATTRIBUTES_COLUMN = "batchattributes";
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ public static Map<String, Object> esCourseMapping(CourseBatch courseBatch, Strin
esCourseMap.put(key, null);
});
esCourseMap.put(CourseJsonKey.CERTIFICATE_TEMPLATES_COLUMN, courseBatch.getCertTemplates());
if (esCourseMap.containsKey(JsonKey.CURRENT_BATCH_SIZE))
esCourseMap.remove(JsonKey.CURRENT_BATCH_SIZE);
return esCourseMap;
}

Expand All @@ -210,6 +212,8 @@ public static Map<String, Object> cassandraCourseMapping(CourseBatch courseBatch
Map<String, Object> courseBatchMap = mapper.convertValue(courseBatch, Map.class);
changeInDateFormatAll.forEach(key -> {
try {
if (courseBatchMap.containsKey(JsonKey.CURRENT_BATCH_SIZE))
courseBatchMap.remove(JsonKey.CURRENT_BATCH_SIZE);
if (courseBatchMap.containsKey(key))
courseBatchMap.put(key, setEndOfDay(key, dateTimeFormat.parse(dateTimeFormat.format(courseBatchMap.get(key))), dateFormat));
} catch (ParseException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class CourseBatch implements Serializable {
private Integer status;

private Map<String, Object> certTemplates;
private Integer currentBatchSize;
private Map<String, Object> batchAttributes;

public String getCourseCreator() {
return courseCreator;
Expand Down Expand Up @@ -213,4 +215,21 @@ public String getOldUpdatedDate() {
public void setOldUpdatedDate(String updatedDate) {
this.oldUpdatedDate = updatedDate;
}

public Map<String, Object> getBatchAttributes() {
return batchAttributes;
}

public void setBatchAttributes(Map<String, Object> batchAttributes) {
this.batchAttributes = batchAttributes;
}

public Integer getCurrentBatchSize() {
return currentBatchSize;
}

public void setCurrentBatchSize(Integer currentBatchSize) {
this.currentBatchSize = currentBatchSize;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is poor design... you have batchAttriubtes then why you need currentBatchSize as additional attribute in the object??
the request body you can add the batchSize inside the batchAttributes -- so that it is easy in future to add more parameters...


}
Original file line number Diff line number Diff line change
Expand Up @@ -97,55 +97,76 @@ private void createCourseBatch(Request actorMessage) throws Throwable {
List<Map<String, Object>> correlatedObject = new ArrayList<>();
String courseBatchId = ProjectUtil.getUniqueIdFromTimestamp(actorMessage.getEnv());
Map<String, String> headers =
(Map<String, String>) actorMessage.getContext().get(JsonKey.HEADER);
(Map<String, String>) actorMessage.getContext().get(JsonKey.HEADER);
String requestedBy = (String) actorMessage.getContext().get(JsonKey.REQUESTED_BY);

if (Util.isNotNull(request.get(JsonKey.PARTICIPANTS))) {
ProjectCommonException.throwClientErrorException(
ResponseCode.invalidRequestParameter,
ProjectUtil.formatMessage(
ResponseCode.invalidRequestParameter.getErrorMessage(), PARTICIPANTS));
ResponseCode.invalidRequestParameter,
ProjectUtil.formatMessage(
ResponseCode.invalidRequestParameter.getErrorMessage(), PARTICIPANTS));
}
CourseBatch courseBatch = JsonUtil.convert(request, CourseBatch.class);
courseBatch.setStatus(setCourseBatchStatus(actorMessage.getRequestContext(), (String) request.get(JsonKey.START_DATE)));
String courseId = (String) request.get(JsonKey.COURSE_ID);
Map<String, Object> contentDetails = getContentDetails(actorMessage.getRequestContext(),courseId, headers);
Map<String, Object> contentDetails = getContentDetails(actorMessage.getRequestContext(), courseId, headers);
courseBatch.setCreatedDate(ProjectUtil.getTimeStamp());
if(StringUtils.isBlank(courseBatch.getCreatedBy()))
courseBatch.setCreatedBy(requestedBy);
String primaryCategory = (String) contentDetails.getOrDefault(JsonKey.PRIMARYCATEGORY, "");
if (StringUtils.isBlank(courseBatch.getCreatedBy()))
courseBatch.setCreatedBy(requestedBy);
validateContentOrg(actorMessage.getRequestContext(), courseBatch.getCreatedFor());
validateMentors(courseBatch, (String) actorMessage.getContext().getOrDefault(JsonKey.X_AUTH_TOKEN, ""), actorMessage.getRequestContext());
courseBatch.setBatchId(courseBatchId);
if (primaryCategory.equalsIgnoreCase(JsonKey.PRIMARY_CATEGORY_BLENDED_PROGRAM) && courseBatch.getCurrentBatchSize() != null) {
try {
int maxUsersInBatch = Integer.parseInt(PropertiesCache.getInstance().getProperty(JsonKey.MAX_USERS_IN_BATCH));
if (maxUsersInBatch < courseBatch.getCurrentBatchSize()) {
ProjectCommonException.throwClientErrorException(
ResponseCode.invalidCurrentBatchSize, ResponseCode.invalidCurrentBatchSize.getErrorMessage());
} else {
Map<String, Object> map = new HashMap<>();
map.put(JsonKey.CURRENT_BATCH_SIZE, String.valueOf(courseBatch.getCurrentBatchSize()));
map.put(JsonKey.MAX_USERS_IN_BATCH, String.valueOf(maxUsersInBatch));
courseBatch.setBatchAttributes(map);
}
} catch (NumberFormatException e) {
ProjectCommonException.throwClientErrorException(
ResponseCode.maxBatchSizeMissing, ResponseCode.maxBatchSizeMissing.getErrorMessage());
}
} else if (primaryCategory.equalsIgnoreCase(JsonKey.PRIMARY_CATEGORY_BLENDED_PROGRAM) && courseBatch.getCurrentBatchSize() == null) {
ProjectCommonException.throwClientErrorException(
ResponseCode.currentBatchSizeMissing, ResponseCode.currentBatchSizeMissing.getErrorMessage());
}
Response result = courseBatchDao.create(actorMessage.getRequestContext(), courseBatch);
result.put(JsonKey.BATCH_ID, courseBatchId);

Map<String, Object> esCourseMap = CourseBatchUtil.esCourseMapping(courseBatch, dateFormat);
CourseBatchUtil.syncCourseBatchForeground(actorMessage.getRequestContext(),
courseBatchId, esCourseMap);
courseBatchId, esCourseMap);
sender().tell(result, self());

if (request.containsKey(JsonKey.CURRENT_BATCH_SIZE))
request.remove(JsonKey.CURRENT_BATCH_SIZE);
targetObject =
TelemetryUtil.generateTargetObject(
courseBatchId, TelemetryEnvKey.BATCH, JsonKey.CREATE, null);
TelemetryUtil.generateTargetObject(
courseBatchId, TelemetryEnvKey.BATCH, JsonKey.CREATE, null);
TelemetryUtil.generateCorrelatedObject(
(String) request.get(JsonKey.COURSE_ID), JsonKey.COURSE, null, correlatedObject);
(String) request.get(JsonKey.COURSE_ID), JsonKey.COURSE, null, correlatedObject);

Map<String, String> rollUp = new HashMap<>();
rollUp.put("l1", (String) request.get(JsonKey.COURSE_ID));
TelemetryUtil.addTargetObjectRollUp(rollUp, targetObject);
TelemetryUtil.telemetryProcessingCall(request, targetObject, correlatedObject, actorMessage.getContext());

// updateBatchCount(courseBatch);
updateCollection(actorMessage.getRequestContext(), esCourseMap, contentDetails);
// updateBatchCount(courseBatch);
updateCollection(actorMessage.getRequestContext(), esCourseMap, contentDetails);
if (courseNotificationActive()) {
batchOperationNotifier(actorMessage, courseBatch, null);
}
}

private boolean courseNotificationActive() {
return Boolean.parseBoolean(
PropertiesCache.getInstance()
.getProperty(JsonKey.SUNBIRD_COURSE_BATCH_NOTIFICATIONS_ENABLED));
PropertiesCache.getInstance()
.getProperty(JsonKey.SUNBIRD_COURSE_BATCH_NOTIFICATIONS_ENABLED));
}

private void batchOperationNotifier(Request actorMessage, CourseBatch courseBatch, Map<String, Object> participantMentorMap) {
Expand All @@ -157,13 +178,13 @@ private void batchOperationNotifier(Request actorMessage, CourseBatch courseBatc
if (participantMentorMap != null) {
batchNotificationMap.put(JsonKey.UPDATE, true);
batchNotificationMap.put(
JsonKey.ADDED_MENTORS, participantMentorMap.get(JsonKey.ADDED_MENTORS));
JsonKey.ADDED_MENTORS, participantMentorMap.get(JsonKey.ADDED_MENTORS));
batchNotificationMap.put(
JsonKey.REMOVED_MENTORS, participantMentorMap.get(JsonKey.REMOVED_MENTORS));
JsonKey.REMOVED_MENTORS, participantMentorMap.get(JsonKey.REMOVED_MENTORS));
batchNotificationMap.put(
JsonKey.ADDED_PARTICIPANTS, participantMentorMap.get(JsonKey.ADDED_PARTICIPANTS));
JsonKey.ADDED_PARTICIPANTS, participantMentorMap.get(JsonKey.ADDED_PARTICIPANTS));
batchNotificationMap.put(
JsonKey.REMOVED_PARTICIPANTS, participantMentorMap.get(JsonKey.REMOVED_PARTICIPANTS));
JsonKey.REMOVED_PARTICIPANTS, participantMentorMap.get(JsonKey.REMOVED_PARTICIPANTS));

} else {
batchNotificationMap.put(JsonKey.OPERATION_TYPE, JsonKey.ADD);
Expand All @@ -187,34 +208,54 @@ private void updateCourseBatch(Request actorMessage) throws Exception {
Map<String, Object> request = actorMessage.getRequest();
if (Util.isNotNull(request.get(JsonKey.PARTICIPANTS))) {
ProjectCommonException.throwClientErrorException(
ResponseCode.invalidRequestParameter,
ProjectUtil.formatMessage(
ResponseCode.invalidRequestParameter.getErrorMessage(), PARTICIPANTS));
ResponseCode.invalidRequestParameter,
ProjectUtil.formatMessage(
ResponseCode.invalidRequestParameter.getErrorMessage(), PARTICIPANTS));
}
String batchId =
request.containsKey(JsonKey.BATCH_ID)
? (String) request.get(JsonKey.BATCH_ID)
: (String) request.get(JsonKey.ID);
request.containsKey(JsonKey.BATCH_ID)
? (String) request.get(JsonKey.BATCH_ID)
: (String) request.get(JsonKey.ID);
CourseBatch oldBatch =
courseBatchDao.readById((String) request.get(JsonKey.COURSE_ID), batchId, actorMessage.getRequestContext());
courseBatchDao.readById((String) request.get(JsonKey.COURSE_ID), batchId, actorMessage.getRequestContext());
CourseBatch courseBatch = getUpdateCourseBatch(actorMessage.getRequestContext(), request, oldBatch);
courseBatch.setUpdatedDate(ProjectUtil.getTimeStamp());
Map<String, Object> contentDetails = getContentDetails(actorMessage.getRequestContext(),courseBatch.getCourseId(), headers);
Map<String, Object> contentDetails = getContentDetails(actorMessage.getRequestContext(), courseBatch.getCourseId(), headers);
validateUserPermission(courseBatch, requestedBy);
validateContentOrg(actorMessage.getRequestContext(), courseBatch.getCreatedFor());
validateMentors(courseBatch, (String) actorMessage.getContext().getOrDefault(JsonKey.X_AUTH_TOKEN, ""), actorMessage.getRequestContext());
String primaryCategory = (String) contentDetails.getOrDefault(JsonKey.PRIMARYCATEGORY, "");
if (primaryCategory.equalsIgnoreCase(JsonKey.PRIMARY_CATEGORY_BLENDED_PROGRAM) && courseBatch.getCurrentBatchSize() != null) {
try {
int maxUsersInBatch = Integer.parseInt(PropertiesCache.getInstance().getProperty(JsonKey.MAX_USERS_IN_BATCH));
if (maxUsersInBatch < courseBatch.getCurrentBatchSize()) {
ProjectCommonException.throwClientErrorException(
ResponseCode.invalidCurrentBatchSize, ResponseCode.invalidCurrentBatchSize.getErrorMessage());
} else {
Map<String, Object> map = new HashMap<>();
map.put(JsonKey.CURRENT_BATCH_SIZE, String.valueOf(courseBatch.getCurrentBatchSize()));
map.put(JsonKey.MAX_USERS_IN_BATCH, String.valueOf(maxUsersInBatch));
courseBatch.setBatchAttributes(map);
}
} catch (NumberFormatException e) {
ProjectCommonException.throwClientErrorException(
ResponseCode.maxBatchSizeMissing, ResponseCode.maxBatchSizeMissing.getErrorMessage());
}
}
participantsMap = getMentorLists(participantsMap, oldBatch, courseBatch);
Map<String, Object> courseBatchMap = CourseBatchUtil.cassandraCourseMapping(courseBatch, dateFormat);
Response result =
courseBatchDao.update(actorMessage.getRequestContext(), (String) request.get(JsonKey.COURSE_ID), batchId, courseBatchMap);
courseBatchDao.update(actorMessage.getRequestContext(), (String) request.get(JsonKey.COURSE_ID), batchId, courseBatchMap);
if (request.containsKey(JsonKey.CURRENT_BATCH_SIZE))
request.remove(JsonKey.CURRENT_BATCH_SIZE);
CourseBatch updatedCourseObject = mapESFieldsToObject(courseBatch);
sender().tell(result, self());
Map<String, Object> esCourseMap = CourseBatchUtil.esCourseMapping(updatedCourseObject, dateFormat);

CourseBatchUtil.syncCourseBatchForeground(actorMessage.getRequestContext(), batchId, esCourseMap);

targetObject =
TelemetryUtil.generateTargetObject(batchId, TelemetryEnvKey.BATCH, JsonKey.UPDATE, null);
TelemetryUtil.generateTargetObject(batchId, TelemetryEnvKey.BATCH, JsonKey.UPDATE, null);

Map<String, String> rollUp = new HashMap<>();
rollUp.put("l1", courseBatch.getCourseId());
Expand Down Expand Up @@ -592,7 +633,7 @@ private boolean isOrgValid(RequestContext requestContext, String orgId) {
}

private Map<String, Object> getContentDetails(RequestContext requestContext, String courseId, Map<String, String> headers) {
Map<String, Object> ekStepContent = ContentUtil.getContent(courseId, Arrays.asList("status", "batches", "leafNodesCount"));
Map<String, Object> ekStepContent = ContentUtil.getContent(courseId, Arrays.asList("status", "batches", "leafNodesCount", "primaryCategory"));
logger.info(requestContext, "CourseBatchManagementActor:getEkStepContent: courseId: " + courseId, null,
ekStepContent);
String status = (String) ((Map<String, Object>)ekStepContent.getOrDefault("content", new HashMap<>())).getOrDefault("status", "");
Expand Down Expand Up @@ -652,6 +693,17 @@ private CourseBatch mapESFieldsToObject(CourseBatch courseBatch) {
cert_template.getKey(), mapToObject((Map<String, Object>) cert_template.getValue())));
courseBatch.setCertTemplates(certificateTemplates);
}
Map<String, Object> batchAttributes = courseBatch.getBatchAttributes();
if(MapUtils.isNotEmpty(batchAttributes)) {
batchAttributes
.entrySet()
.stream()
.forEach(
batchAttribute ->
batchAttributes.put(
batchAttribute.getKey(), mapToObject((Map<String, Object>) batchAttribute.getValue())));
courseBatch.setBatchAttributes(batchAttributes);
}
return courseBatch;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ class ContentConsumptionActor @Inject() extends BaseEnrolmentActor {
val userId: String = request.getOrDefault(JsonKey.USER_ID, primaryUserId).asInstanceOf[String]
val courseId: String = request.getOrDefault(JsonKey.COURSE_ID, "").asInstanceOf[String]
val batchId: String = request.getOrDefault(JsonKey.BATCH_ID, "").asInstanceOf[String]
val rootOrgId: String = request.getOrDefault(JsonKey.ROOT_ORG_ID, "").asInstanceOf[String]
val filters = Map[String, AnyRef]("userid"-> userId, "courseid"-> courseId, "batchid"-> batchId).asJava
val result = cassandraOperation
.getRecords(request.getRequestContext, enrolmentDBInfo.getKeySpace, enrolmentDBInfo.getTableName, filters,
Expand All @@ -431,7 +432,7 @@ class ContentConsumptionActor @Inject() extends BaseEnrolmentActor {
.asInstanceOf[java.util.List[java.util.Map[String, AnyRef]]]
val response = {
if (CollectionUtils.isNotEmpty(resp)) {
pushEnrolmentSyncEvent(userId, courseId, batchId)
pushEnrolmentSyncEvent(userId, courseId, batchId, rootOrgId)
successResponse()
} else {
new ProjectCommonException(ResponseCode.invalidRequestData.getErrorCode,
Expand All @@ -441,13 +442,13 @@ class ContentConsumptionActor @Inject() extends BaseEnrolmentActor {
sender().tell(response, self)
}

def pushEnrolmentSyncEvent(userId: String, courseId: String, batchId: String) = {
def pushEnrolmentSyncEvent(userId: String, courseId: String, batchId: String, rooOrgId: String) = {
val now = System.currentTimeMillis()
val event =
s"""{"eid":"BE_JOB_REQUEST","ets":$now,"mid":"LP.$now.${UUID.randomUUID()}"
s"""{"eid":"AUDIT","ets":$now,"mid":"AUDIT.$now.${UUID.randomUUID()}"
|,"actor":{"type":"System","id":"Course Batch Updater"},"context":{"pdata":{"ver":"1.0","id":"org.sunbird.platform"}}
|,"object":{"type":"CourseBatchEnrolment","id":"${batchId}_${userId}"},"edata":{"action":"user-enrolment-sync"
|,"iteration":1,"batchId":"$batchId","userId":"$userId","courseId":"$courseId"}}""".stripMargin
|,"iteration":1,"batchId":"$batchId","userId":"$userId","courseId":"$courseId","rootOrgId":"$rooOrgId"}}""".stripMargin
.replaceAll("\n", "")
if(pushTokafkaEnabled){
val topic = ProjectUtil.getConfigValue("kafka_enrolment_sync_topic")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1078,5 +1078,11 @@ public final class JsonKey {
public static final String CURRENT_OFFSET = "currentOffSet";
public static final String SECURE_SETTINGS = "secureSettings";
public static final String X_AUTH_USER_ORG_ID = "x-authenticated-user-orgid";
private JsonKey() {}
public static final String MAX_USERS_IN_BATCH = "maxUserInBatch";
public static final String CURRENT_BATCH_SIZE = "currentBatchSize";
public static final String PRIMARYCATEGORY = "primaryCategory";
public static final String PRIMARY_CATEGORY_BLENDED_PROGRAM = "Blended Program";


private JsonKey() {}
}
Loading