Skip to content

Commit

Permalink
Optimize rollout group start (eclipse-hawkbit#318)
Browse files Browse the repository at this point in the history
Signed-off-by: kaizimmerm <[email protected]>
  • Loading branch information
kaizimmerm authored Oct 20, 2016
1 parent 21d8636 commit 3f49567
Show file tree
Hide file tree
Showing 55 changed files with 194 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ void createScheduledAction(@NotEmpty Collection<Target> targets, @NotNull Distri
* rollout group in a specific status
*/
@PreAuthorize(SpringEvalExpressions.HAS_AUTH_READ_TARGET)
List<Action> findActionsByRolloutGroupParentAndStatus(@NotNull Rollout rollout,
List<Long> findActionsByRolloutGroupParentAndStatus(@NotNull Rollout rollout,
@NotNull RolloutGroup rolloutGroupParent, @NotNull Action.Status actionStatus);

/**
Expand Down Expand Up @@ -505,12 +505,12 @@ List<Action> findActionsByRolloutGroupParentAndStatus(@NotNull Rollout rollout,
* Starting an action which is scheduled, e.g. in case of roll out a
* scheduled action must be started now.
*
* @param action
* the action to start now.
* @param actionId
* the the ID of the action to start now.
* @return the action which has been started
*/
@PreAuthorize(SpringEvalExpressions.HAS_AUTH_READ_TARGET)
Action startScheduledAction(@NotNull Action action);
Action startScheduledAction(@NotNull Long actionId);

/**
* All {@link ActionStatus} entries in the repository.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.springframework.data.jpa.repository.Modifying;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;
import org.springframework.hateoas.Identifiable;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

Expand Down Expand Up @@ -146,7 +147,7 @@ Page<Action> findByTargetAndDistributionSet(final Pageable pageable, @Param("tar
* @return a list of actions according to the searched target
*/
@Query("Select a from JpaAction a where a.target = :target order by a.id")
List<Action> findByTarget(@Param("target") final JpaTarget target);
List<Action> findByTarget(@Param("target") JpaTarget target);

/**
* Retrieves all {@link Action}s of a specific target and given active flag
Expand Down Expand Up @@ -347,8 +348,8 @@ Long countByRolloutAndRolloutGroupAndStatusNotAndStatusNotAndStatusNot(JpaRollou
* @return the actions referring a specific rollout and a specific parent
* rolloutgroup in a specific status
*/
List<Action> findByRolloutAndRolloutGroupParentAndStatus(JpaRollout rollout, JpaRolloutGroup rolloutGroupParent,
Status actionStatus);
List<Identifiable<Long>> findByRolloutAndRolloutGroupParentAndStatus(JpaRollout rollout,
JpaRolloutGroup rolloutGroupParent, Status actionStatus);

/**
* Retrieves all actions for a specific rollout and in a specific status.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import javax.validation.constraints.NotNull;

import org.eclipse.hawkbit.repository.ControllerManagement;
import org.eclipse.hawkbit.repository.RepositoryConstants;
Expand Down Expand Up @@ -235,7 +234,7 @@ public TargetInfo updateTargetStatus(final TargetInfo targetInfo, final TargetUp

@Override
@Modifying
@Transactional(isolation = Isolation.READ_UNCOMMITTED)
@Transactional(isolation = Isolation.READ_COMMITTED)
public Action addCancelActionStatus(final ActionStatus actionStatus) {
final JpaAction action = (JpaAction) actionStatus.getAction();

Expand Down Expand Up @@ -274,26 +273,27 @@ private void handleFinishedCancelation(final ActionStatus actionStatus, final Jp

@Override
@Modifying
@Transactional(isolation = Isolation.READ_UNCOMMITTED)
public Action addUpdateActionStatus(@NotNull final ActionStatus actionStatus) {
final JpaAction action = (JpaAction) actionStatus.getAction();
@Transactional(isolation = Isolation.READ_COMMITTED)
public Action addUpdateActionStatus(final ActionStatus actionStatus) {
final Action action = actionStatus.getAction();
final Long actionId = action.getId();

// if action is already closed we accept further status updates if
// permitted so by configuration. This is especially useful if the
// action status feedback channel order from the device cannot be
// guaranteed. However, if an action is closed we do not accept further
// close messages.
if (actionIsNotActiveButIntermediateFeedbackStillAllowed(actionStatus, action)) {
if (actionIsNotActiveButIntermediateFeedbackStillAllowed(actionStatus, action.isActive())) {
LOG.debug("Update of actionStatus {} for action {} not possible since action not active anymore.",
actionStatus.getId(), action.getId());
actionStatus.getStatus(), actionId);
return action;
}
return handleAddUpdateActionStatus((JpaActionStatus) actionStatus, action);
return handleAddUpdateActionStatus((JpaActionStatus) actionStatus, actionId);
}

private boolean actionIsNotActiveButIntermediateFeedbackStillAllowed(final ActionStatus actionStatus,
final JpaAction action) {
return !action.isActive() && (repositoryProperties.isRejectActionStatusForClosedAction()
final boolean actionActive) {
return !actionActive && (repositoryProperties.isRejectActionStatusForClosedAction()
|| (Status.ERROR.equals(actionStatus.getStatus()) || Status.FINISHED.equals(actionStatus.getStatus())));
}

Expand All @@ -304,22 +304,22 @@ private boolean actionIsNotActiveButIntermediateFeedbackStillAllowed(final Actio
* @param action
* @return
*/
private Action handleAddUpdateActionStatus(final JpaActionStatus actionStatus, final JpaAction action) {
LOG.debug("addUpdateActionStatus for action {}", action.getId());
private Action handleAddUpdateActionStatus(final JpaActionStatus actionStatus, final Long actionId) {
LOG.debug("addUpdateActionStatus for action {}", actionId);

final JpaAction mergedAction = entityManager.merge(action);
JpaTarget mergedTarget = (JpaTarget) mergedAction.getTarget();
final JpaAction action = actionRepository.findById(actionId);
JpaTarget target = (JpaTarget) action.getTarget();
// check for a potential DOS attack
checkForToManyStatusEntries(action);

switch (actionStatus.getStatus()) {
case ERROR:
mergedTarget = DeploymentHelper.updateTargetInfo(mergedTarget, TargetUpdateStatus.ERROR, false,
targetInfoRepository, entityManager);
handleErrorOnAction(mergedAction, mergedTarget);
target = DeploymentHelper.updateTargetInfo(target, TargetUpdateStatus.ERROR, false, targetInfoRepository,
entityManager);
handleErrorOnAction(action, target);
break;
case FINISHED:
handleFinishedAndStoreInTargetStatus(mergedTarget, mergedAction);
handleFinishedAndStoreInTargetStatus(target, action);
break;
case CANCELED:
case WARNING:
Expand All @@ -330,9 +330,9 @@ private Action handleAddUpdateActionStatus(final JpaActionStatus actionStatus, f

actionStatusRepository.save(actionStatus);

LOG.debug("addUpdateActionStatus {} for target {} is finished.", action.getId(), mergedTarget.getId());
LOG.debug("addUpdateActionStatus {} for target {} is finished.", action, target.getId());

return actionRepository.save(mergedAction);
return actionRepository.save(action);
}

private void handleErrorOnAction(final JpaAction mergedAction, final JpaTarget mergedTarget) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.hawkbit.repository.ActionFields;
import org.eclipse.hawkbit.repository.DeploymentManagement;
import org.eclipse.hawkbit.repository.DistributionSetAssignmentResult;
import org.eclipse.hawkbit.repository.RepositoryConstants;
import org.eclipse.hawkbit.repository.TargetManagement;
import org.eclipse.hawkbit.repository.eventbus.event.CancelTargetAssignmentEvent;
import org.eclipse.hawkbit.repository.eventbus.event.TargetAssignDistributionSetEvent;
Expand Down Expand Up @@ -188,8 +189,7 @@ public DistributionSetAssignmentResult assignDistributionSet(final Long dsID,
@Transactional(isolation = Isolation.READ_COMMITTED)
@CacheEvict(value = { "distributionUsageAssigned" }, allEntries = true)
public DistributionSetAssignmentResult assignDistributionSet(final Long dsID,
final Collection<TargetWithActionType> targets,
final String actionMessage) {
final Collection<TargetWithActionType> targets, final String actionMessage) {
final JpaDistributionSet set = distributoinSetRepository.findOne(dsID);
if (set == null) {
throw new EntityNotFoundException(
Expand Down Expand Up @@ -304,16 +304,7 @@ private DistributionSetAssignmentResult assignDistributionSetToTargets(@NotNull
// the initial running status because we will change the status
// of the action itself and with this action status we have a nicer
// action history.
targetIdsToActions.values().forEach(action -> {
final JpaActionStatus actionStatus = new JpaActionStatus();
actionStatus.setAction(action);
actionStatus.setOccurredAt(action.getCreatedAt());
actionStatus.setStatus(Status.RUNNING);
if(actionMessage != null) {
actionStatus.addMessage(actionMessage);
}
actionStatusRepository.save(actionStatus);
});
targetIdsToActions.values().forEach(action -> setRunningActionStatus(action, actionMessage));

// flush to get action IDs
entityManager.flush();
Expand Down Expand Up @@ -380,15 +371,12 @@ private void assignDistributionSetEvent(final JpaTarget target, final Long actio
*/
private Set<Long> overrideObsoleteUpdateActions(final List<Long> targetsIds) {

final Set<Long> cancelledTargetIds = new HashSet<>();

// Figure out if there are potential target/action combinations that
// need to be considered
// for cancelation
// need to be considered for cancellation
final List<JpaAction> activeActions = actionRepository
.findByActiveAndTargetIdInAndActionStatusNotEqualToAndDistributionSetRequiredMigrationStep(targetsIds,
Action.Status.CANCELING);
activeActions.forEach(action -> {
final Set<Long> cancelledTargetIds = activeActions.stream().map(action -> {
action.setStatus(Status.CANCELING);
// document that the status has been retrieved

Expand All @@ -397,8 +385,8 @@ private Set<Long> overrideObsoleteUpdateActions(final List<Long> targetsIds) {

cancelAssignDistributionSetEvent(action.getTarget(), action.getId());

cancelledTargetIds.add(action.getTarget().getId());
});
return action.getTarget().getId();
}).collect(Collectors.toSet());

actionRepository.save(activeActions);

Expand All @@ -410,8 +398,8 @@ private DistributionSetAssignmentResult assignDistributionSetByTargetId(@NotNull
@NotEmpty final List<String> tIDs, final ActionType actionType, final long forcedTime) {

return assignDistributionSetToTargets(set, tIDs.stream()
.map(t -> new TargetWithActionType(t, actionType, forcedTime)).collect(Collectors.toList()), null,
null, null);
.map(t -> new TargetWithActionType(t, actionType, forcedTime)).collect(Collectors.toList()), null, null,
null);
}

@Override
Expand Down Expand Up @@ -511,55 +499,72 @@ public void createScheduledAction(final Collection<Target> targets, final Distri
@Override
@Modifying
@Transactional(propagation = Propagation.REQUIRES_NEW, isolation = Isolation.READ_COMMITTED)
public Action startScheduledAction(final Action action) {
public Action startScheduledAction(final Long actionId) {

final JpaAction mergedAction = (JpaAction) entityManager.merge(action);
final JpaTarget mergedTarget = (JpaTarget) entityManager.merge(action.getTarget());
final JpaAction action = actionRepository.findById(actionId);

// check if we need to override running update actions
final Set<Long> overrideObsoleteUpdateActions = overrideObsoleteUpdateActions(
Collections.singletonList(action.getTarget().getId()));

final boolean hasDistributionSetAlreadyAssigned = targetRepository
.count(TargetSpecifications.hasControllerIdAndAssignedDistributionSetIdNot(
Collections.singletonList(mergedTarget.getControllerId()),
action.getDistributionSet().getId())) == 0;
if (hasDistributionSetAlreadyAssigned) {
if (action.getTarget().getAssignedDistributionSet() != null && action.getDistributionSet().getId()
.equals(action.getTarget().getAssignedDistributionSet().getId())) {
// the target has already the distribution set assigned, we don't
// need to start the scheduled action, just finished it.
mergedAction.setStatus(Status.FINISHED);
mergedAction.setActive(false);
return actionRepository.save(mergedAction);
// need to start the scheduled action, just finish it.
action.setStatus(Status.FINISHED);
action.setActive(false);
setSkipActionStatus(action);
return actionRepository.save(action);
}

mergedAction.setActive(true);
mergedAction.setStatus(Status.RUNNING);
final Action savedAction = actionRepository.save(mergedAction);
action.setActive(true);
action.setStatus(Status.RUNNING);
final JpaAction savedAction = actionRepository.save(action);

final JpaActionStatus actionStatus = new JpaActionStatus();
actionStatus.setAction(action);
actionStatus.setOccurredAt(action.getCreatedAt());
actionStatus.setStatus(Status.RUNNING);
actionStatusRepository.save(actionStatus);
setRunningActionStatus(savedAction, null);

final JpaTarget target = (JpaTarget) savedAction.getTarget();

mergedTarget.setAssignedDistributionSet(action.getDistributionSet());
final JpaTargetInfo targetInfo = (JpaTargetInfo) mergedTarget.getTargetInfo();
target.setAssignedDistributionSet(savedAction.getDistributionSet());
final JpaTargetInfo targetInfo = (JpaTargetInfo) target.getTargetInfo();
targetInfo.setUpdateStatus(TargetUpdateStatus.PENDING);
targetRepository.save(mergedTarget);
targetRepository.save(target);
targetInfoRepository.save(targetInfo);

// in case we canceled an action before for this target, then don't fire
// assignment event
if (!overrideObsoleteUpdateActions.contains(savedAction.getId())) {
final List<JpaSoftwareModule> softwareModules = softwareModuleRepository
.findByAssignedTo((JpaDistributionSet) action.getDistributionSet());
.findByAssignedTo((JpaDistributionSet) savedAction.getDistributionSet());
// send distribution set assignment event

assignDistributionSetEvent((JpaTarget) mergedAction.getTarget(), mergedAction.getId(), softwareModules);
assignDistributionSetEvent((JpaTarget) savedAction.getTarget(), savedAction.getId(), softwareModules);
}
return savedAction;
}

private void setRunningActionStatus(final JpaAction action, final String actionMessage) {
final JpaActionStatus actionStatus = new JpaActionStatus();
actionStatus.setAction(action);
actionStatus.setOccurredAt(action.getCreatedAt());
actionStatus.setStatus(Status.RUNNING);
if (actionMessage != null) {
actionStatus.addMessage(actionMessage);
}

actionStatusRepository.save(actionStatus);
}

private void setSkipActionStatus(final JpaAction action) {
final JpaActionStatus actionStatus = new JpaActionStatus();
actionStatus.setAction(action);
actionStatus.setOccurredAt(action.getCreatedAt());
actionStatus.setStatus(Status.RUNNING);
actionStatus.addMessage(RepositoryConstants.SERVER_MESSAGE_PREFIX
+ "Distribution Set is already assigned. Skipping this action.");
actionStatusRepository.save(actionStatus);
}

@Override
public Action findAction(final Long actionId) {
return actionRepository.findOne(actionId);
Expand Down Expand Up @@ -679,10 +684,11 @@ public Page<ActionStatus> findActionStatusByActionWithMessages(final Pageable pa
}

@Override
public List<Action> findActionsByRolloutGroupParentAndStatus(final Rollout rollout,
public List<Long> findActionsByRolloutGroupParentAndStatus(final Rollout rollout,
final RolloutGroup rolloutGroupParent, final Action.Status actionStatus) {
return actionRepository.findByRolloutAndRolloutGroupParentAndStatus((JpaRollout) rollout,
(JpaRolloutGroup) rolloutGroupParent, actionStatus);
(JpaRolloutGroup) rolloutGroupParent, actionStatus).stream().map(ident -> ident.getId())
.collect(Collectors.toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.Collections;
import java.util.List;

import javax.validation.constraints.NotNull;

import org.eclipse.hawkbit.repository.TargetFields;
import org.eclipse.hawkbit.repository.TargetFilterQueryFields;
import org.eclipse.hawkbit.repository.TargetFilterQueryManagement;
Expand All @@ -37,8 +39,6 @@

import com.google.common.base.Strings;

import javax.validation.constraints.NotNull;

/**
* JPA implementation of {@link TargetFilterQueryManagement}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.List;
import java.util.stream.Collectors;

import javax.persistence.PersistenceException;

import org.eclipse.hawkbit.exception.AbstractServerRtException;
import org.eclipse.hawkbit.repository.DeploymentManagement;
import org.eclipse.hawkbit.repository.TargetFilterQueryManagement;
Expand All @@ -32,8 +34,6 @@
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.transaction.support.TransactionTemplate;

import javax.persistence.PersistenceException;

/**
* Checks if targets need a new distribution set (DS) based on the target filter
* queries and assigns the new DS when necessary. First all target filter
Expand Down
Loading

0 comments on commit 3f49567

Please sign in to comment.