Simplified JpaRolloutManagement, moved rollout handling logic to JpaRolloutHandler (#1094)

* introduce RolloutExecutor
* removed unused method

Signed-off-by: Ahmed Sayed <ahmed.sayed@bosch.io>
This commit is contained in:
Ahmed Sayed
2021-05-20 07:41:55 +02:00
committed by GitHub
parent 069d717be1
commit 8f8cfeb53a
4 changed files with 738 additions and 566 deletions

View File

@@ -0,0 +1,51 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.repository;
import static org.eclipse.hawkbit.im.authentication.SpPermission.SpringEvalExpressions;
import org.eclipse.hawkbit.repository.model.Rollout;
import org.eclipse.hawkbit.repository.model.Target;
import org.springframework.security.access.prepost.PreAuthorize;
/**
* Represents the handler service for creating, deleting, and starting a Rollout
*/
@FunctionalInterface
public interface RolloutExecutor {
/**
* Process rollout based on its current {@link Rollout#getStatus()}.
*
* For {@link RolloutStatus#CREATING} that means creating the
* {@link RolloutGroup}s with {@link Target}s and when finished switch to
* {@link RolloutStatus#READY}.
*
* For {@link RolloutStatus#READY} that means switching to
* {@link RolloutStatus#STARTING} if the {@link Rollout#getStartAt()} is set
* and time of calling this method is beyond this point in time. This auto
* start mechanism is optional. Call {@link #start(Long)} otherwise.
*
* For {@link RolloutStatus#STARTING} that means starting the first
* {@link RolloutGroup}s in line and when finished switch to
* {@link RolloutStatus#RUNNING}.
*
* For {@link RolloutStatus#RUNNING} that means checking to activate further
* groups based on the defined thresholds. Switched to
* {@link RolloutStatus#FINISHED} is all groups are finished.
*
* For {@link RolloutStatus#DELETING} that means either soft delete in case
* rollout was already {@link RolloutStatus#RUNNING} which results in status
* change {@link RolloutStatus#DELETED} or hard delete from the persistence
* otherwise.
*
*/
@PreAuthorize(SpringEvalExpressions.IS_SYSTEM_CODE)
void execute(Rollout rollout);
}

View File

@@ -0,0 +1,660 @@
/**
* Copyright (c) 2021 Bosch.IO GmbH and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*/
package org.eclipse.hawkbit.repository.jpa;
import static org.springframework.data.domain.Sort.Direction;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.persistence.EntityManager;
import org.eclipse.hawkbit.repository.DeploymentManagement;
import org.eclipse.hawkbit.repository.QuotaManagement;
import org.eclipse.hawkbit.repository.RolloutApprovalStrategy;
import org.eclipse.hawkbit.repository.RolloutGroupManagement;
import org.eclipse.hawkbit.repository.RolloutExecutor;
import org.eclipse.hawkbit.repository.RolloutHelper;
import org.eclipse.hawkbit.repository.RolloutManagement;
import org.eclipse.hawkbit.repository.TargetManagement;
import org.eclipse.hawkbit.repository.event.remote.RolloutGroupDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutUpdatedEvent;
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
import org.eclipse.hawkbit.repository.exception.RolloutIllegalStateException;
import org.eclipse.hawkbit.repository.jpa.executor.AfterTransactionCommitExecutor;
import org.eclipse.hawkbit.repository.jpa.model.JpaAction;
import org.eclipse.hawkbit.repository.jpa.model.JpaRollout;
import org.eclipse.hawkbit.repository.jpa.model.JpaRolloutGroup;
import org.eclipse.hawkbit.repository.jpa.model.RolloutTargetGroup;
import org.eclipse.hawkbit.repository.jpa.rollout.condition.RolloutGroupActionEvaluator;
import org.eclipse.hawkbit.repository.jpa.rollout.condition.RolloutGroupConditionEvaluator;
import org.eclipse.hawkbit.repository.jpa.utils.DeploymentHelper;
import org.eclipse.hawkbit.repository.jpa.utils.QuotaHelper;
import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.Action.ActionType;
import org.eclipse.hawkbit.repository.model.Action.Status;
import org.eclipse.hawkbit.repository.model.DistributionSet;
import org.eclipse.hawkbit.repository.model.Rollout;
import org.eclipse.hawkbit.repository.model.Rollout.RolloutStatus;
import org.eclipse.hawkbit.repository.model.RolloutGroup;
import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupErrorCondition;
import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupStatus;
import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupSuccessCondition;
import org.eclipse.hawkbit.repository.model.Target;
import org.eclipse.hawkbit.repository.model.helper.EventPublisherHolder;
import org.eclipse.hawkbit.tenancy.TenantAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionException;
import org.springframework.util.StringUtils;
/**
* A Jpa implementation of {@link RolloutExecutor}
*/
public class JpaRolloutExecutor implements RolloutExecutor {
private static final Logger LOGGER = LoggerFactory.getLogger(JpaRolloutExecutor.class);
/**
* Max amount of targets that are handled in one transaction.
*/
private static final int TRANSACTION_TARGETS = 5_000;
/**
* Maximum amount of actions that are deleted in one transaction.
*/
private static final int TRANSACTION_ACTIONS = 5_000;
/**
* Action statuses that result in a terminated action
*/
private static final List<Status> DEFAULT_ACTION_TERMINATION_STATUSES = Arrays.asList(Status.ERROR, Status.FINISHED,
Status.CANCELED);
/**
* In case of DOWNLOAD_ONLY, actions can be finished with DOWNLOADED status.
*/
private static final List<Status> DOWNLOAD_ONLY_ACTION_TERMINATION_STATUSES = Arrays.asList(Status.ERROR,
Status.FINISHED, Status.CANCELED, Status.DOWNLOADED);
private final RolloutTargetGroupRepository rolloutTargetGroupRepository;
private final EntityManager entityManager;
private final RolloutRepository rolloutRepository;
private final ActionRepository actionRepository;
private final RolloutGroupRepository rolloutGroupRepository;
private final AfterTransactionCommitExecutor afterCommit;
private final TenantAware tenantAware;
private final RolloutGroupManagement rolloutGroupManagement;
private final QuotaManagement quotaManagement;
private final DeploymentManagement deploymentManagement;
private final TargetManagement targetManagement;
private final EventPublisherHolder eventPublisherHolder;
private final PlatformTransactionManager txManager;
private final RolloutApprovalStrategy rolloutApprovalStrategy;
private final ApplicationContext context;
/**
* Constructor
*/
public JpaRolloutExecutor(final RolloutTargetGroupRepository rolloutTargetGroupRepository,
final EntityManager entityManager, final RolloutRepository rolloutRepository,
final ActionRepository actionRepository, final RolloutGroupRepository rolloutGroupRepository,
final AfterTransactionCommitExecutor afterCommit, final TenantAware tenantAware,
final RolloutGroupManagement rolloutGroupManagement, final QuotaManagement quotaManagement,
final DeploymentManagement deploymentManagement, final TargetManagement targetManagement,
final EventPublisherHolder eventPublisherHolder, final PlatformTransactionManager txManager,
final RolloutApprovalStrategy rolloutApprovalStrategy, final ApplicationContext context) {
this.rolloutTargetGroupRepository = rolloutTargetGroupRepository;
this.entityManager = entityManager;
this.rolloutRepository = rolloutRepository;
this.actionRepository = actionRepository;
this.rolloutGroupRepository = rolloutGroupRepository;
this.afterCommit = afterCommit;
this.tenantAware = tenantAware;
this.rolloutGroupManagement = rolloutGroupManagement;
this.quotaManagement = quotaManagement;
this.deploymentManagement = deploymentManagement;
this.targetManagement = targetManagement;
this.eventPublisherHolder = eventPublisherHolder;
this.txManager = txManager;
this.rolloutApprovalStrategy = rolloutApprovalStrategy;
this.context = context;
}
@Override
public void execute(final Rollout rollout) {
LOGGER.debug("handle rollout {}", rollout.getId());
switch (rollout.getStatus()) {
case CREATING:
handleCreateRollout((JpaRollout) rollout);
break;
case DELETING:
handleDeleteRollout((JpaRollout) rollout);
break;
case READY:
handleReadyRollout(rollout);
break;
case STARTING:
handleStartingRollout(rollout);
break;
case RUNNING:
handleRunningRollout((JpaRollout) rollout);
break;
default:
LOGGER.error("Rollout in status {} not supposed to be handled!", rollout.getStatus());
break;
}
}
private void handleCreateRollout(final JpaRollout rollout) {
LOGGER.debug("handleCreateRollout called for rollout {}", rollout.getId());
final List<RolloutGroup> rolloutGroups = rolloutGroupManagement.findByRollout(
PageRequest.of(0, quotaManagement.getMaxRolloutGroupsPerRollout(), Sort.by(Direction.ASC, "id")),
rollout.getId()).getContent();
int readyGroups = 0;
int totalTargets = 0;
for (final RolloutGroup group : rolloutGroups) {
if (RolloutGroupStatus.READY == group.getStatus()) {
readyGroups++;
totalTargets += group.getTotalTargets();
continue;
}
final RolloutGroup filledGroup = fillRolloutGroupWithTargets(rollout, group);
if (RolloutGroupStatus.READY == filledGroup.getStatus()) {
readyGroups++;
totalTargets += filledGroup.getTotalTargets();
}
}
// When all groups are ready the rollout status can be changed to be
// ready, too.
if (readyGroups == rolloutGroups.size()) {
if (!rolloutApprovalStrategy.isApprovalNeeded(rollout)) {
rollout.setStatus(RolloutStatus.READY);
LOGGER.debug("rollout {} creation done. Switch to READY.", rollout.getId());
} else {
LOGGER.debug("rollout {} creation done. Switch to WAITING_FOR_APPROVAL.", rollout.getId());
rollout.setStatus(RolloutStatus.WAITING_FOR_APPROVAL);
rolloutApprovalStrategy.onApprovalRequired(rollout);
}
rollout.setLastCheck(0);
rollout.setTotalTargets(totalTargets);
rolloutRepository.save(rollout);
}
}
private void handleDeleteRollout(final JpaRollout rollout) {
LOGGER.debug("handleDeleteRollout called for {}", rollout.getId());
// check if there are actions beyond schedule
boolean hardDeleteRolloutGroups = !actionRepository.existsByRolloutIdAndStatusNotIn(rollout.getId(),
Status.SCHEDULED);
if (hardDeleteRolloutGroups) {
LOGGER.debug("Rollout {} has no actions other than scheduled -> hard delete", rollout.getId());
hardDeleteRollout(rollout);
return;
}
// clean up all scheduled actions
final Slice<JpaAction> scheduledActions = findScheduledActionsByRollout(rollout);
deleteScheduledActions(rollout, scheduledActions);
// avoid another scheduler round and re-check if all scheduled actions
// has been cleaned up. we flush first to ensure that the we include the
// deletion above
entityManager.flush();
final boolean hasScheduledActionsLeft = actionRepository.countByRolloutIdAndStatus(rollout.getId(),
Status.SCHEDULED) > 0;
if (hasScheduledActionsLeft) {
return;
}
// only hard delete the rollout if no actions are left for the rollout.
// In case actions are left, they are probably are running or were
// running before, so only soft delete.
hardDeleteRolloutGroups = !actionRepository.existsByRolloutId(rollout.getId());
if (hardDeleteRolloutGroups) {
hardDeleteRollout(rollout);
return;
}
// set soft delete
rollout.setStatus(RolloutStatus.DELETED);
rollout.setDeleted(true);
rolloutRepository.save(rollout);
sendRolloutGroupDeletedEvents(rollout);
}
private void handleReadyRollout(final Rollout rollout) {
if (rollout.getStartAt() != null && rollout.getStartAt() <= System.currentTimeMillis()) {
LOGGER.debug(
"handleReadyRollout called for rollout {} with autostart beyond define time. Switch to STARTING",
rollout.getId());
context.getBean(RolloutManagement.class).start(rollout.getId());
}
}
private void handleStartingRollout(final Rollout rollout) {
LOGGER.debug("handleStartingRollout called for rollout {}", rollout.getId());
if (ensureAllGroupsAreScheduled(rollout)) {
startFirstRolloutGroup(rollout);
}
}
private void handleRunningRollout(final JpaRollout rollout) {
LOGGER.debug("handleRunningRollout called for rollout {}", rollout.getId());
final List<JpaRolloutGroup> rolloutGroupsRunning = rolloutGroupRepository.findByRolloutAndStatus(rollout,
RolloutGroupStatus.RUNNING);
if (rolloutGroupsRunning.isEmpty()) {
// no running rollouts, probably there was an error
// somewhere at the latest group. And the latest group has
// been switched from running into error state. So we need
// to find the latest group which
executeLatestRolloutGroup(rollout);
} else {
LOGGER.debug("Rollout {} has {} running groups", rollout.getId(), rolloutGroupsRunning.size());
executeRolloutGroups(rollout, rolloutGroupsRunning);
}
if (isRolloutComplete(rollout)) {
LOGGER.info("Rollout {} is finished, setting FINISHED status", rollout);
rollout.setStatus(RolloutStatus.FINISHED);
rolloutRepository.save(rollout);
}
}
private void hardDeleteRollout(final JpaRollout rollout) {
sendRolloutGroupDeletedEvents(rollout);
rolloutRepository.delete(rollout);
}
private void deleteScheduledActions(final JpaRollout rollout, final Slice<JpaAction> scheduledActions) {
final boolean hasScheduledActions = scheduledActions.getNumberOfElements() > 0;
if (hasScheduledActions) {
try {
final Iterable<JpaAction> iterable = scheduledActions::iterator;
final List<Long> actionIds = StreamSupport.stream(iterable.spliterator(), false)
.map(Action::getId)
.collect(Collectors.toList());
actionRepository.deleteByIdIn(actionIds);
afterCommit.afterCommit(() -> eventPublisherHolder.getEventPublisher()
.publishEvent(new RolloutUpdatedEvent(rollout, eventPublisherHolder.getApplicationId())));
} catch (final RuntimeException e) {
LOGGER.error("Exception during deletion of actions of rollout {}", rollout, e);
}
}
}
private Slice<JpaAction> findScheduledActionsByRollout(final JpaRollout rollout) {
return actionRepository.findByRolloutIdAndStatus(PageRequest.of(0, TRANSACTION_ACTIONS), rollout.getId(),
Status.SCHEDULED);
}
private void sendRolloutGroupDeletedEvents(final JpaRollout rollout) {
final List<Long> groupIds = rollout.getRolloutGroups().stream().map(RolloutGroup::getId)
.collect(Collectors.toList());
afterCommit.afterCommit(() -> groupIds.forEach(rolloutGroupId -> eventPublisherHolder.getEventPublisher()
.publishEvent(new RolloutGroupDeletedEvent(tenantAware.getCurrentTenant(), rolloutGroupId,
JpaRolloutGroup.class.getName(), eventPublisherHolder.getApplicationId()))));
}
private boolean isRolloutComplete(final JpaRollout rollout) {
// ensure that changes in the same transaction count
entityManager.flush();
final Long groupsActiveLeft = rolloutGroupRepository.countByRolloutIdAndStatusOrStatus(rollout.getId(),
RolloutGroupStatus.RUNNING, RolloutGroupStatus.SCHEDULED);
return groupsActiveLeft == 0;
}
private void executeLatestRolloutGroup(final JpaRollout rollout) {
final List<JpaRolloutGroup> latestRolloutGroup = rolloutGroupRepository
.findByRolloutAndStatusNotOrderByIdDesc(rollout, RolloutGroupStatus.SCHEDULED);
if (latestRolloutGroup.isEmpty()) {
return;
}
executeRolloutGroupSuccessAction(rollout, latestRolloutGroup.get(0));
}
private void executeRolloutGroups(final JpaRollout rollout, final List<JpaRolloutGroup> rolloutGroups) {
for (final JpaRolloutGroup rolloutGroup : rolloutGroups) {
final long targetCount = countTargetsFrom(rolloutGroup);
if (rolloutGroup.getTotalTargets() != targetCount) {
updateTotalTargetCount(rolloutGroup, targetCount);
}
// error state check, do we need to stop the whole
// rollout because of error?
final boolean isError = checkErrorState(rollout, rolloutGroup);
if (isError) {
LOGGER.info("Rollout {} {} has error, calling error action", rollout.getName(), rollout.getId());
callErrorAction(rollout, rolloutGroup);
} else {
// not in error so check finished state, do we need to
// start the next group?
final RolloutGroupSuccessCondition finishedCondition = rolloutGroup.getSuccessCondition();
checkFinishCondition(rollout, rolloutGroup, finishedCondition);
if (isRolloutGroupComplete(rollout, rolloutGroup)) {
rolloutGroup.setStatus(RolloutGroupStatus.FINISHED);
rolloutGroupRepository.save(rolloutGroup);
}
}
}
}
private void updateTotalTargetCount(final JpaRolloutGroup rolloutGroup, final long countTargetsOfRolloutGroup) {
final JpaRollout jpaRollout = (JpaRollout) rolloutGroup.getRollout();
final long updatedTargetCount =
jpaRollout.getTotalTargets() - (rolloutGroup.getTotalTargets() - countTargetsOfRolloutGroup);
jpaRollout.setTotalTargets(updatedTargetCount);
rolloutGroup.setTotalTargets((int) countTargetsOfRolloutGroup);
rolloutRepository.save(jpaRollout);
rolloutGroupRepository.save(rolloutGroup);
}
private long countTargetsFrom(final JpaRolloutGroup rolloutGroup) {
return rolloutGroupManagement.countTargetsOfRolloutsGroup(rolloutGroup.getId());
}
private void callErrorAction(final Rollout rollout, final RolloutGroup rolloutGroup) {
try {
context.getBean(rolloutGroup.getErrorAction().getBeanName(), RolloutGroupActionEvaluator.class)
.eval(rollout, rolloutGroup, rolloutGroup.getErrorActionExp());
} catch (final BeansException e) {
LOGGER.error("Something bad happend when accessing the error action bean {}",
rolloutGroup.getErrorAction().getBeanName(), e);
}
}
private boolean isRolloutGroupComplete(final JpaRollout rollout, final JpaRolloutGroup rolloutGroup) {
final Long actionsLeftForRollout = ActionType.DOWNLOAD_ONLY == rollout.getActionType()
? actionRepository.countByRolloutAndRolloutGroupAndStatusNotIn(rollout, rolloutGroup,
DOWNLOAD_ONLY_ACTION_TERMINATION_STATUSES)
: actionRepository.countByRolloutAndRolloutGroupAndStatusNotIn(rollout, rolloutGroup,
DEFAULT_ACTION_TERMINATION_STATUSES);
return actionsLeftForRollout == 0;
}
private boolean checkErrorState(final Rollout rollout, final RolloutGroup rolloutGroup) {
final RolloutGroupErrorCondition errorCondition = rolloutGroup.getErrorCondition();
if (errorCondition == null) {
// there is no error condition, so return false, don't have error.
return false;
}
try {
return context.getBean(errorCondition.getBeanName(), RolloutGroupConditionEvaluator.class).eval(rollout,
rolloutGroup, rolloutGroup.getErrorConditionExp());
} catch (final BeansException e) {
LOGGER.error("Something bad happend when accessing the error condition bean {}",
errorCondition.getBeanName(), e);
return false;
}
}
private boolean checkFinishCondition(final Rollout rollout, final RolloutGroup rolloutGroup,
final RolloutGroupSuccessCondition finishCondition) {
LOGGER.trace("Checking finish condition {} on rolloutgroup {}", finishCondition, rolloutGroup);
try {
final boolean isFinished = context
.getBean(finishCondition.getBeanName(), RolloutGroupConditionEvaluator.class)
.eval(rollout, rolloutGroup, rolloutGroup.getSuccessConditionExp());
if (isFinished) {
LOGGER.debug("Rolloutgroup {} is finished, starting next group", rolloutGroup);
executeRolloutGroupSuccessAction(rollout, rolloutGroup);
} else {
LOGGER.debug("Rolloutgroup {} is still running", rolloutGroup);
}
return isFinished;
} catch (final BeansException e) {
LOGGER.error("Something bad happend when accessing the finish condition bean {}",
finishCondition.getBeanName(), e);
return false;
}
}
private void executeRolloutGroupSuccessAction(final Rollout rollout, final RolloutGroup rolloutGroup) {
context.getBean(rolloutGroup.getSuccessAction().getBeanName(), RolloutGroupActionEvaluator.class)
.eval(rollout, rolloutGroup, rolloutGroup.getSuccessActionExp());
}
private void startFirstRolloutGroup(final Rollout rollout) {
LOGGER.debug("startFirstRolloutGroup called for rollout {}", rollout.getId());
RolloutHelper.verifyRolloutInStatus(rollout, RolloutStatus.STARTING);
final JpaRollout jpaRollout = (JpaRollout) rollout;
final List<JpaRolloutGroup> rolloutGroups = rolloutGroupRepository.findByRolloutOrderByIdAsc(jpaRollout);
final JpaRolloutGroup rolloutGroup = rolloutGroups.get(0);
if (rolloutGroup.getParent() != null) {
throw new RolloutIllegalStateException("First Group is not the first group.");
}
deploymentManagement.startScheduledActionsByRolloutGroupParent(rollout.getId(),
rollout.getDistributionSet().getId(), null);
rolloutGroup.setStatus(RolloutGroupStatus.RUNNING);
rolloutGroupRepository.save(rolloutGroup);
jpaRollout.setStatus(RolloutStatus.RUNNING);
jpaRollout.setLastCheck(0);
rolloutRepository.save(jpaRollout);
}
private boolean ensureAllGroupsAreScheduled(final Rollout rollout) {
final JpaRollout jpaRollout = (JpaRollout) rollout;
final List<JpaRolloutGroup> groupsToBeScheduled = rolloutGroupRepository.findByRolloutAndStatus(rollout,
RolloutGroupStatus.READY);
final long scheduledGroups = groupsToBeScheduled.stream()
.filter(group -> scheduleRolloutGroup(jpaRollout, group)).count();
return scheduledGroups == groupsToBeScheduled.size();
}
private RolloutGroup fillRolloutGroupWithTargets(final JpaRollout rollout, final RolloutGroup group1) {
RolloutHelper.verifyRolloutInStatus(rollout, RolloutStatus.CREATING);
final JpaRolloutGroup group = (JpaRolloutGroup) group1;
final String baseFilter = RolloutHelper.getTargetFilterQuery(rollout);
final String groupTargetFilter;
if (StringUtils.isEmpty(group.getTargetFilterQuery())) {
groupTargetFilter = baseFilter;
} else {
groupTargetFilter = baseFilter + ";" + group.getTargetFilterQuery();
}
final List<Long> readyGroups = RolloutHelper.getGroupsByStatusIncludingGroup(rollout.getRolloutGroups(),
RolloutGroupStatus.READY, group);
final long targetsInGroupFilter = DeploymentHelper.runInNewTransaction(txManager,
"countAllTargetsByTargetFilterQueryAndNotInRolloutGroups",
count -> targetManagement.countByRsqlAndNotInRolloutGroups(readyGroups, groupTargetFilter));
final long expectedInGroup = Math.round(group.getTargetPercentage() / 100 * (double) targetsInGroupFilter);
final long currentlyInGroup = DeploymentHelper.runInNewTransaction(txManager,
"countRolloutTargetGroupByRolloutGroup",
count -> rolloutTargetGroupRepository.countByRolloutGroup(group));
// Switch the Group status to READY, when there are enough Targets in
// the Group
if (currentlyInGroup >= expectedInGroup) {
group.setStatus(RolloutGroupStatus.READY);
return rolloutGroupRepository.save(group);
}
try {
long targetsLeftToAdd = expectedInGroup - currentlyInGroup;
do {
// Add up to TRANSACTION_TARGETS of the left targets
// In case a TransactionException is thrown this loop aborts
targetsLeftToAdd -= assignTargetsToGroupInNewTransaction(rollout, group, groupTargetFilter,
Math.min(TRANSACTION_TARGETS, targetsLeftToAdd));
} while (targetsLeftToAdd > 0);
group.setStatus(RolloutGroupStatus.READY);
group.setTotalTargets(
DeploymentHelper.runInNewTransaction(txManager, "countRolloutTargetGroupByRolloutGroup",
count -> rolloutTargetGroupRepository.countByRolloutGroup(group)).intValue());
return rolloutGroupRepository.save(group);
} catch (final TransactionException e) {
LOGGER.warn("Transaction assigning Targets to RolloutGroup failed", e);
return group;
}
}
private Long assignTargetsToGroupInNewTransaction(final JpaRollout rollout, final RolloutGroup group,
final String targetFilter, final long limit) {
return DeploymentHelper.runInNewTransaction(txManager, "assignTargetsToRolloutGroup", status -> {
final PageRequest pageRequest = PageRequest.of(0, Math.toIntExact(limit));
final List<Long> readyGroups = RolloutHelper.getGroupsByStatusIncludingGroup(rollout.getRolloutGroups(),
RolloutGroupStatus.READY, group);
final Page<Target> targets = targetManagement.findByTargetFilterQueryAndNotInRolloutGroups(pageRequest,
readyGroups, targetFilter);
createAssignmentOfTargetsToGroup(targets, group);
return Long.valueOf(targets.getNumberOfElements());
});
}
/**
* Schedules a group of the rollout. Scheduled Actions are created to
* achieve this. The creation of those Actions is allowed to fail.
*/
private boolean scheduleRolloutGroup(final JpaRollout rollout, final JpaRolloutGroup group) {
final long targetsInGroup = rolloutTargetGroupRepository.countByRolloutGroup(group);
final long countOfActions = actionRepository.countByRolloutAndRolloutGroup(rollout, group);
long actionsLeft = targetsInGroup - countOfActions;
if (actionsLeft > 0) {
actionsLeft -= createActionsForRolloutGroup(rollout, group);
}
if (actionsLeft <= 0) {
group.setStatus(RolloutGroupStatus.SCHEDULED);
rolloutGroupRepository.save(group);
return true;
}
return false;
}
private long createActionsForRolloutGroup(final Rollout rollout, final RolloutGroup group) {
long totalActionsCreated = 0;
try {
long actionsCreated;
do {
actionsCreated = createActionsForTargetsInNewTransaction(rollout.getId(), group.getId(), TRANSACTION_TARGETS);
totalActionsCreated += actionsCreated;
} while (actionsCreated > 0);
} catch (final TransactionException e) {
LOGGER.warn("Transaction assigning Targets to RolloutGroup failed", e);
return 0;
}
return totalActionsCreated;
}
private Long createActionsForTargetsInNewTransaction(final long rolloutId, final long groupId,
final int limit) {
return DeploymentHelper.runInNewTransaction(txManager, "createActionsForTargets", status -> {
final PageRequest pageRequest = PageRequest.of(0, limit);
final Rollout rollout = rolloutRepository.findById(rolloutId)
.orElseThrow(() -> new EntityNotFoundException(Rollout.class, rolloutId));
final RolloutGroup group = rolloutGroupRepository.findById(groupId)
.orElseThrow(() -> new EntityNotFoundException(RolloutGroup.class, groupId));
final DistributionSet distributionSet = rollout.getDistributionSet();
final ActionType actionType = rollout.getActionType();
final long forceTime = rollout.getForcedTime();
final Page<Target> targets = targetManagement.findByInRolloutGroupWithoutAction(pageRequest, groupId);
if (targets.getTotalElements() > 0) {
createScheduledAction(targets.getContent(), distributionSet, actionType, forceTime, rollout, group);
}
return Long.valueOf(targets.getNumberOfElements());
});
}
private void createAssignmentOfTargetsToGroup(final Page<Target> targets, final RolloutGroup group) {
targets.forEach(target -> rolloutTargetGroupRepository.save(new RolloutTargetGroup(group, target)));
}
/**
* Creates an action entry into the action repository. In case of existing
* scheduled actions the scheduled actions gets canceled. A scheduled action
* is created in-active.
*/
private void createScheduledAction(final Collection<Target> targets, final DistributionSet distributionSet,
final ActionType actionType, final Long forcedTime, final Rollout rollout,
final RolloutGroup rolloutGroup) {
// cancel all current scheduled actions for this target. E.g. an action
// is already scheduled and a next action is created then cancel the
// current scheduled action to cancel. E.g. a new scheduled action is
// created.
final List<Long> targetIds = targets.stream().map(Target::getId).collect(Collectors.toList());
deploymentManagement.cancelInactiveScheduledActionsForTargets(targetIds);
targets.forEach(target -> {
assertActionsPerTargetQuota(target, 1);
final JpaAction action = new JpaAction();
action.setTarget(target);
action.setActive(false);
action.setDistributionSet(distributionSet);
action.setActionType(actionType);
action.setForcedTime(forcedTime);
action.setStatus(Status.SCHEDULED);
action.setRollout(rollout);
action.setRolloutGroup(rolloutGroup);
action.setInitiatedBy(rollout.getCreatedBy());
rollout.getWeight().ifPresent(action::setWeight);
actionRepository.save(action);
});
}
/**
* Enforces the quota defining the maximum number of {@link Action}s per
* {@link Target}.
*
* @param target
* The target
* @param requested
* number of actions to check
*/
private void assertActionsPerTargetQuota(final Target target, final int requested) {
final int quota = quotaManagement.getMaxActionsPerTarget();
QuotaHelper.assertAssignmentQuota(target.getId(), requested, quota, Action.class, Target.class,
actionRepository::countByTargetId);
}
}

View File

@@ -11,16 +11,13 @@ package org.eclipse.hawkbit.repository.jpa;
import static org.eclipse.hawkbit.repository.jpa.builder.JpaRolloutGroupCreate.addSuccessAndErrorConditionsAndActions;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.persistence.EntityManager;
import javax.validation.ConstraintDeclarationException;
import javax.validation.ValidationException;
@@ -29,6 +26,7 @@ import org.eclipse.hawkbit.repository.DeploymentManagement;
import org.eclipse.hawkbit.repository.DistributionSetManagement;
import org.eclipse.hawkbit.repository.QuotaManagement;
import org.eclipse.hawkbit.repository.RolloutApprovalStrategy;
import org.eclipse.hawkbit.repository.RolloutExecutor;
import org.eclipse.hawkbit.repository.RolloutFields;
import org.eclipse.hawkbit.repository.RolloutGroupManagement;
import org.eclipse.hawkbit.repository.RolloutHelper;
@@ -40,37 +38,26 @@ import org.eclipse.hawkbit.repository.builder.GenericRolloutUpdate;
import org.eclipse.hawkbit.repository.builder.RolloutCreate;
import org.eclipse.hawkbit.repository.builder.RolloutGroupCreate;
import org.eclipse.hawkbit.repository.builder.RolloutUpdate;
import org.eclipse.hawkbit.repository.event.remote.RolloutGroupDeletedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutGroupCreatedEvent;
import org.eclipse.hawkbit.repository.event.remote.entity.RolloutUpdatedEvent;
import org.eclipse.hawkbit.repository.exception.EntityNotFoundException;
import org.eclipse.hawkbit.repository.exception.EntityReadOnlyException;
import org.eclipse.hawkbit.repository.exception.RolloutIllegalStateException;
import org.eclipse.hawkbit.repository.jpa.configuration.Constants;
import org.eclipse.hawkbit.repository.jpa.executor.AfterTransactionCommitExecutor;
import org.eclipse.hawkbit.repository.jpa.model.JpaAction;
import org.eclipse.hawkbit.repository.jpa.model.JpaRollout;
import org.eclipse.hawkbit.repository.jpa.model.JpaRolloutGroup;
import org.eclipse.hawkbit.repository.jpa.model.RolloutTargetGroup;
import org.eclipse.hawkbit.repository.jpa.rollout.condition.RolloutGroupActionEvaluator;
import org.eclipse.hawkbit.repository.jpa.rollout.condition.RolloutGroupConditionEvaluator;
import org.eclipse.hawkbit.repository.jpa.rsql.RSQLUtility;
import org.eclipse.hawkbit.repository.jpa.specifications.RolloutSpecification;
import org.eclipse.hawkbit.repository.jpa.specifications.SpecificationsBuilder;
import org.eclipse.hawkbit.repository.jpa.utils.DeploymentHelper;
import org.eclipse.hawkbit.repository.jpa.utils.QuotaHelper;
import org.eclipse.hawkbit.repository.jpa.utils.WeightValidationHelper;
import org.eclipse.hawkbit.repository.model.Action;
import org.eclipse.hawkbit.repository.model.Action.ActionType;
import org.eclipse.hawkbit.repository.model.Action.Status;
import org.eclipse.hawkbit.repository.model.BaseEntity;
import org.eclipse.hawkbit.repository.model.DistributionSet;
import org.eclipse.hawkbit.repository.model.Rollout;
import org.eclipse.hawkbit.repository.model.Rollout.RolloutStatus;
import org.eclipse.hawkbit.repository.model.RolloutGroup;
import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupErrorCondition;
import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupStatus;
import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupSuccessCondition;
import org.eclipse.hawkbit.repository.model.RolloutGroupConditions;
import org.eclipse.hawkbit.repository.model.RolloutGroupsValidation;
import org.eclipse.hawkbit.repository.model.Target;
@@ -82,16 +69,12 @@ import org.eclipse.hawkbit.security.SystemSecurityContext;
import org.eclipse.hawkbit.tenancy.TenantAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.dao.ConcurrencyFailureException;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.domain.Slice;
import org.springframework.data.domain.Sort;
import org.springframework.data.domain.Sort.Direction;
import org.springframework.data.jpa.domain.Specification;
import org.springframework.integration.support.locks.LockRegistry;
import org.springframework.orm.jpa.vendor.Database;
@@ -100,11 +83,9 @@ import org.springframework.retry.annotation.Retryable;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionException;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.validation.annotation.Validated;
@@ -118,66 +99,47 @@ import com.google.common.collect.Lists;
public class JpaRolloutManagement extends AbstractRolloutManagement {
private static final Logger LOGGER = LoggerFactory.getLogger(JpaRolloutManagement.class);
/**
* Max amount of targets that are handled in one transaction.
*/
private static final int TRANSACTION_TARGETS = 5_000;
/**
* Maximum amount of actions that are deleted in one transaction.
*/
private static final int TRANSACTION_ACTIONS = 5_000;
private static final List<RolloutStatus> ACTIVE_ROLLOUTS = Arrays.asList(RolloutStatus.CREATING,
RolloutStatus.DELETING, RolloutStatus.STARTING, RolloutStatus.READY, RolloutStatus.RUNNING);
// In case of DOWNLOAD_ONLY, actions can be finished with DOWNLOADED status.
private static final List<Status> DOWNLOAD_ONLY_ACTION_TERMINATION_STATUSES = Arrays.asList(Status.ERROR,
Status.FINISHED, Status.CANCELED, Status.DOWNLOADED);
private static final List<Status> DEFAULT_ACTION_TERMINATION_STATUSES = Arrays.asList(Status.ERROR, Status.FINISHED,
Status.CANCELED);
@Autowired
private RolloutRepository rolloutRepository;
@Autowired
private RolloutGroupRepository rolloutGroupRepository;
@Autowired
private RolloutTargetGroupRepository rolloutTargetGroupRepository;
@Autowired
private ActionRepository actionRepository;
@Autowired
private AfterTransactionCommitExecutor afterCommit;
@Autowired
private EntityManager entityManager;
@Autowired
private QuotaManagement quotaManagement;
@Autowired
private RolloutStatusCache rolloutStatusCache;
private final RolloutExecutor rolloutExecutor;
private final EventPublisherHolder eventPublisherHolder;
private final Database database;
JpaRolloutManagement(final TargetManagement targetManagement, final DeploymentManagement deploymentManagement,
public JpaRolloutManagement(final TargetManagement targetManagement, final DeploymentManagement deploymentManagement,
final RolloutGroupManagement rolloutGroupManagement,
final DistributionSetManagement distributionSetManagement, final ApplicationContext context,
final EventPublisherHolder eventPublisherHolder, final VirtualPropertyReplacer virtualPropertyReplacer,
final PlatformTransactionManager txManager, final TenantAware tenantAware, final LockRegistry lockRegistry,
final Database database, final RolloutApprovalStrategy rolloutApprovalStrategy,
final TenantConfigurationManagement tenantConfigurationManagement,
final SystemSecurityContext systemSecurityContext) {
final SystemSecurityContext systemSecurityContext, final RolloutExecutor rolloutExecutor) {
super(targetManagement, deploymentManagement, rolloutGroupManagement, distributionSetManagement, context,
virtualPropertyReplacer, txManager, tenantAware, lockRegistry, rolloutApprovalStrategy,
tenantConfigurationManagement, systemSecurityContext);
this.eventPublisherHolder = eventPublisherHolder;
this.database = database;
this.rolloutExecutor = rolloutExecutor;
}
@Override
@@ -331,120 +293,6 @@ public class JpaRolloutManagement extends AbstractRolloutManagement {
new RolloutGroupCreatedEvent(group, rollout.getId(), eventPublisherHolder.getApplicationId())));
}
private void handleCreateRollout(final JpaRollout rollout) {
LOGGER.debug("handleCreateRollout called for rollout {}", rollout.getId());
final List<RolloutGroup> rolloutGroups = rolloutGroupManagement.findByRollout(
PageRequest.of(0, quotaManagement.getMaxRolloutGroupsPerRollout(), Sort.by(Direction.ASC, "id")),
rollout.getId()).getContent();
int readyGroups = 0;
int totalTargets = 0;
for (final RolloutGroup group : rolloutGroups) {
if (RolloutGroupStatus.READY == group.getStatus()) {
readyGroups++;
totalTargets += group.getTotalTargets();
continue;
}
final RolloutGroup filledGroup = fillRolloutGroupWithTargets(rollout, group);
if (RolloutGroupStatus.READY == filledGroup.getStatus()) {
readyGroups++;
totalTargets += filledGroup.getTotalTargets();
}
}
// When all groups are ready the rollout status can be changed to be
// ready, too.
if (readyGroups == rolloutGroups.size()) {
if (!rolloutApprovalStrategy.isApprovalNeeded(rollout)) {
rollout.setStatus(RolloutStatus.READY);
LOGGER.debug("rollout {} creation done. Switch to READY.", rollout.getId());
} else {
LOGGER.debug("rollout {} creation done. Switch to WAITING_FOR_APPROVAL.", rollout.getId());
rollout.setStatus(RolloutStatus.WAITING_FOR_APPROVAL);
rolloutApprovalStrategy.onApprovalRequired(rollout);
}
rollout.setLastCheck(0);
rollout.setTotalTargets(totalTargets);
rolloutRepository.save(rollout);
}
}
private RolloutGroup fillRolloutGroupWithTargets(final JpaRollout rollout, final RolloutGroup group1) {
RolloutHelper.verifyRolloutInStatus(rollout, RolloutStatus.CREATING);
final JpaRolloutGroup group = (JpaRolloutGroup) group1;
final String baseFilter = RolloutHelper.getTargetFilterQuery(rollout);
final String groupTargetFilter;
if (StringUtils.isEmpty(group.getTargetFilterQuery())) {
groupTargetFilter = baseFilter;
} else {
groupTargetFilter = baseFilter + ";" + group.getTargetFilterQuery();
}
final List<Long> readyGroups = RolloutHelper.getGroupsByStatusIncludingGroup(rollout.getRolloutGroups(),
RolloutGroupStatus.READY, group);
final long targetsInGroupFilter = DeploymentHelper.runInNewTransaction(txManager,
"countAllTargetsByTargetFilterQueryAndNotInRolloutGroups",
count -> targetManagement.countByRsqlAndNotInRolloutGroups(readyGroups, groupTargetFilter));
final long expectedInGroup = Math.round(group.getTargetPercentage() / 100 * (double) targetsInGroupFilter);
final long currentlyInGroup = DeploymentHelper.runInNewTransaction(txManager,
"countRolloutTargetGroupByRolloutGroup",
count -> rolloutTargetGroupRepository.countByRolloutGroup(group));
// Switch the Group status to READY, when there are enough Targets in
// the Group
if (currentlyInGroup >= expectedInGroup) {
group.setStatus(RolloutGroupStatus.READY);
return rolloutGroupRepository.save(group);
}
try {
long targetsLeftToAdd = expectedInGroup - currentlyInGroup;
do {
// Add up to TRANSACTION_TARGETS of the left targets
// In case a TransactionException is thrown this loop aborts
targetsLeftToAdd -= assignTargetsToGroupInNewTransaction(rollout, group, groupTargetFilter,
Math.min(TRANSACTION_TARGETS, targetsLeftToAdd));
} while (targetsLeftToAdd > 0);
group.setStatus(RolloutGroupStatus.READY);
group.setTotalTargets(
DeploymentHelper.runInNewTransaction(txManager, "countRolloutTargetGroupByRolloutGroup",
count -> rolloutTargetGroupRepository.countByRolloutGroup(group)).intValue());
return rolloutGroupRepository.save(group);
} catch (final TransactionException e) {
LOGGER.warn("Transaction assigning Targets to RolloutGroup failed", e);
return group;
}
}
private Long assignTargetsToGroupInNewTransaction(final JpaRollout rollout, final RolloutGroup group,
final String targetFilter, final long limit) {
return DeploymentHelper.runInNewTransaction(txManager, "assignTargetsToRolloutGroup", status -> {
final PageRequest pageRequest = PageRequest.of(0, Math.toIntExact(limit));
final List<Long> readyGroups = RolloutHelper.getGroupsByStatusIncludingGroup(rollout.getRolloutGroups(),
RolloutGroupStatus.READY, group);
final Page<Target> targets = targetManagement.findByTargetFilterQueryAndNotInRolloutGroups(pageRequest,
readyGroups, targetFilter);
createAssignmentOfTargetsToGroup(targets, group);
return Long.valueOf(targets.getNumberOfElements());
});
}
private void createAssignmentOfTargetsToGroup(final Page<Target> targets, final RolloutGroup group) {
targets.forEach(target -> rolloutTargetGroupRepository.save(new RolloutTargetGroup(group, target)));
}
@Override
@Async
public ListenableFuture<RolloutGroupsValidation> validateTargetsInGroups(final List<RolloutGroupCreate> groups,
@@ -507,131 +355,6 @@ public class JpaRolloutManagement extends AbstractRolloutManagement {
return rolloutRepository.save(rollout);
}
private void startFirstRolloutGroup(final Rollout rollout) {
LOGGER.debug("startFirstRolloutGroup called for rollout {}", rollout.getId());
RolloutHelper.verifyRolloutInStatus(rollout, RolloutStatus.STARTING);
final JpaRollout jpaRollout = (JpaRollout) rollout;
final List<JpaRolloutGroup> rolloutGroups = rolloutGroupRepository.findByRolloutOrderByIdAsc(jpaRollout);
final JpaRolloutGroup rolloutGroup = rolloutGroups.get(0);
if (rolloutGroup.getParent() != null) {
throw new RolloutIllegalStateException("First Group is not the first group.");
}
deploymentManagement.startScheduledActionsByRolloutGroupParent(rollout.getId(),
rollout.getDistributionSet().getId(), null);
rolloutGroup.setStatus(RolloutGroupStatus.RUNNING);
rolloutGroupRepository.save(rolloutGroup);
jpaRollout.setStatus(RolloutStatus.RUNNING);
jpaRollout.setLastCheck(0);
rolloutRepository.save(jpaRollout);
}
private boolean ensureAllGroupsAreScheduled(final Rollout rollout) {
final JpaRollout jpaRollout = (JpaRollout) rollout;
final List<JpaRolloutGroup> groupsToBeScheduled = rolloutGroupRepository.findByRolloutAndStatus(rollout,
RolloutGroupStatus.READY);
final long scheduledGroups = groupsToBeScheduled.stream()
.filter(group -> scheduleRolloutGroup(jpaRollout, group)).count();
return scheduledGroups == groupsToBeScheduled.size();
}
/**
* Schedules a group of the rollout. Scheduled Actions are created to
* achieve this. The creation of those Actions is allowed to fail.
*/
private boolean scheduleRolloutGroup(final JpaRollout rollout, final JpaRolloutGroup group) {
final long targetsInGroup = rolloutTargetGroupRepository.countByRolloutGroup(group);
final long countOfActions = actionRepository.countByRolloutAndRolloutGroup(rollout, group);
long actionsLeft = targetsInGroup - countOfActions;
if (actionsLeft > 0) {
actionsLeft -= createActionsForRolloutGroup(rollout, group);
}
if (actionsLeft <= 0) {
group.setStatus(RolloutGroupStatus.SCHEDULED);
rolloutGroupRepository.save(group);
return true;
}
return false;
}
private long createActionsForRolloutGroup(final Rollout rollout, final RolloutGroup group) {
long totalActionsCreated = 0;
try {
long actionsCreated;
do {
actionsCreated = createActionsForTargetsInNewTransaction(rollout.getId(), group.getId(), TRANSACTION_TARGETS);
totalActionsCreated += actionsCreated;
} while (actionsCreated > 0);
} catch (final TransactionException e) {
LOGGER.warn("Transaction assigning Targets to RolloutGroup failed", e);
return 0;
}
return totalActionsCreated;
}
private Long createActionsForTargetsInNewTransaction(final long rolloutId, final long groupId,
final int limit) {
return DeploymentHelper.runInNewTransaction(txManager, "createActionsForTargets", status -> {
final PageRequest pageRequest = PageRequest.of(0, limit);
final Rollout rollout = rolloutRepository.findById(rolloutId)
.orElseThrow(() -> new EntityNotFoundException(Rollout.class, rolloutId));
final RolloutGroup group = rolloutGroupRepository.findById(groupId)
.orElseThrow(() -> new EntityNotFoundException(RolloutGroup.class, groupId));
final DistributionSet distributionSet = rollout.getDistributionSet();
final ActionType actionType = rollout.getActionType();
final long forceTime = rollout.getForcedTime();
final Page<Target> targets = targetManagement.findByInRolloutGroupWithoutAction(pageRequest, groupId);
if (targets.getTotalElements() > 0) {
createScheduledAction(targets.getContent(), distributionSet, actionType, forceTime, rollout, group);
}
return Long.valueOf(targets.getNumberOfElements());
});
}
/**
* Creates an action entry into the action repository. In case of existing
* scheduled actions the scheduled actions gets canceled. A scheduled action
* is created in-active.
*/
private void createScheduledAction(final Collection<Target> targets, final DistributionSet distributionSet,
final ActionType actionType, final Long forcedTime, final Rollout rollout,
final RolloutGroup rolloutGroup) {
// cancel all current scheduled actions for this target. E.g. an action
// is already scheduled and a next action is created then cancel the
// current scheduled action to cancel. E.g. a new scheduled action is
// created.
final List<Long> targetIds = targets.stream().map(Target::getId).collect(Collectors.toList());
deploymentManagement.cancelInactiveScheduledActionsForTargets(targetIds);
targets.forEach(target -> {
assertActionsPerTargetQuota(target, 1);
final JpaAction action = new JpaAction();
action.setTarget(target);
action.setActive(false);
action.setDistributionSet(distributionSet);
action.setActionType(actionType);
action.setForcedTime(forcedTime);
action.setStatus(Status.SCHEDULED);
action.setRollout(rollout);
action.setRolloutGroup(rolloutGroup);
action.setInitiatedBy(rollout.getCreatedBy());
rollout.getWeight().ifPresent(action::setWeight);
actionRepository.save(action);
});
}
@Override
@Transactional
@Retryable(include = {
@@ -665,151 +388,6 @@ public class JpaRolloutManagement extends AbstractRolloutManagement {
rolloutRepository.save(rollout);
}
private void handleRunningRollout(final JpaRollout rollout) {
LOGGER.debug("handleRunningRollout called for rollout {}", rollout.getId());
final List<JpaRolloutGroup> rolloutGroupsRunning = rolloutGroupRepository.findByRolloutAndStatus(rollout,
RolloutGroupStatus.RUNNING);
if (rolloutGroupsRunning.isEmpty()) {
// no running rollouts, probably there was an error
// somewhere at the latest group. And the latest group has
// been switched from running into error state. So we need
// to find the latest group which
executeLatestRolloutGroup(rollout);
} else {
LOGGER.debug("Rollout {} has {} running groups", rollout.getId(), rolloutGroupsRunning.size());
executeRolloutGroups(rollout, rolloutGroupsRunning);
}
if (isRolloutComplete(rollout)) {
LOGGER.info("Rollout {} is finished, setting FINISHED status", rollout);
rollout.setStatus(RolloutStatus.FINISHED);
rolloutRepository.save(rollout);
}
}
private void executeRolloutGroups(final JpaRollout rollout, final List<JpaRolloutGroup> rolloutGroups) {
for (final JpaRolloutGroup rolloutGroup : rolloutGroups) {
final long targetCount = countTargetsFrom(rolloutGroup);
if (rolloutGroup.getTotalTargets() != targetCount) {
updateTotalTargetCount(rolloutGroup, targetCount);
}
// error state check, do we need to stop the whole
// rollout because of error?
final boolean isError = checkErrorState(rollout, rolloutGroup);
if (isError) {
LOGGER.info("Rollout {} {} has error, calling error action", rollout.getName(), rollout.getId());
callErrorAction(rollout, rolloutGroup);
} else {
// not in error so check finished state, do we need to
// start the next group?
final RolloutGroupSuccessCondition finishedCondition = rolloutGroup.getSuccessCondition();
checkFinishCondition(rollout, rolloutGroup, finishedCondition);
if (isRolloutGroupComplete(rollout, rolloutGroup)) {
rolloutGroup.setStatus(RolloutGroupStatus.FINISHED);
rolloutGroupRepository.save(rolloutGroup);
}
}
}
}
private void updateTotalTargetCount(final JpaRolloutGroup rolloutGroup, final long countTargetsOfRolloutGroup) {
final JpaRollout jpaRollout = (JpaRollout) rolloutGroup.getRollout();
final long updatedTargetCount = jpaRollout.getTotalTargets()
- (rolloutGroup.getTotalTargets() - countTargetsOfRolloutGroup);
jpaRollout.setTotalTargets(updatedTargetCount);
rolloutGroup.setTotalTargets((int) countTargetsOfRolloutGroup);
rolloutRepository.save(jpaRollout);
rolloutGroupRepository.save(rolloutGroup);
}
private long countTargetsFrom(final JpaRolloutGroup rolloutGroup) {
return rolloutGroupManagement.countTargetsOfRolloutsGroup(rolloutGroup.getId());
}
private void executeLatestRolloutGroup(final JpaRollout rollout) {
final List<JpaRolloutGroup> latestRolloutGroup = rolloutGroupRepository
.findByRolloutAndStatusNotOrderByIdDesc(rollout, RolloutGroupStatus.SCHEDULED);
if (latestRolloutGroup.isEmpty()) {
return;
}
executeRolloutGroupSuccessAction(rollout, latestRolloutGroup.get(0));
}
private void callErrorAction(final Rollout rollout, final RolloutGroup rolloutGroup) {
try {
context.getBean(rolloutGroup.getErrorAction().getBeanName(), RolloutGroupActionEvaluator.class)
.eval(rollout, rolloutGroup, rolloutGroup.getErrorActionExp());
} catch (final BeansException e) {
LOGGER.error("Something bad happend when accessing the error action bean {}",
rolloutGroup.getErrorAction().getBeanName(), e);
}
}
private boolean isRolloutComplete(final JpaRollout rollout) {
// ensure that changes in the same transaction count
entityManager.flush();
final Long groupsActiveLeft = rolloutGroupRepository.countByRolloutIdAndStatusOrStatus(rollout.getId(),
RolloutGroupStatus.RUNNING, RolloutGroupStatus.SCHEDULED);
return groupsActiveLeft == 0;
}
private boolean isRolloutGroupComplete(final JpaRollout rollout, final JpaRolloutGroup rolloutGroup) {
final Long actionsLeftForRollout = ActionType.DOWNLOAD_ONLY == rollout.getActionType()
? actionRepository.countByRolloutAndRolloutGroupAndStatusNotIn(rollout, rolloutGroup,
DOWNLOAD_ONLY_ACTION_TERMINATION_STATUSES)
: actionRepository.countByRolloutAndRolloutGroupAndStatusNotIn(rollout, rolloutGroup,
DEFAULT_ACTION_TERMINATION_STATUSES);
return actionsLeftForRollout == 0;
}
private boolean checkErrorState(final Rollout rollout, final RolloutGroup rolloutGroup) {
final RolloutGroupErrorCondition errorCondition = rolloutGroup.getErrorCondition();
if (errorCondition == null) {
// there is no error condition, so return false, don't have error.
return false;
}
try {
return context.getBean(errorCondition.getBeanName(), RolloutGroupConditionEvaluator.class).eval(rollout,
rolloutGroup, rolloutGroup.getErrorConditionExp());
} catch (final BeansException e) {
LOGGER.error("Something bad happend when accessing the error condition bean {}",
errorCondition.getBeanName(), e);
return false;
}
}
private boolean checkFinishCondition(final Rollout rollout, final RolloutGroup rolloutGroup,
final RolloutGroupSuccessCondition finishCondition) {
LOGGER.trace("Checking finish condition {} on rolloutgroup {}", finishCondition, rolloutGroup);
try {
final boolean isFinished = context
.getBean(finishCondition.getBeanName(), RolloutGroupConditionEvaluator.class)
.eval(rollout, rolloutGroup, rolloutGroup.getSuccessConditionExp());
if (isFinished) {
LOGGER.debug("Rolloutgroup {} is finished, starting next group", rolloutGroup);
executeRolloutGroupSuccessAction(rollout, rolloutGroup);
} else {
LOGGER.debug("Rolloutgroup {} is still running", rolloutGroup);
}
return isFinished;
} catch (final BeansException e) {
LOGGER.error("Something bad happend when accessing the finish condition bean {}",
finishCondition.getBeanName(), e);
return false;
}
}
private void executeRolloutGroupSuccessAction(final Rollout rollout, final RolloutGroup rolloutGroup) {
context.getBean(rolloutGroup.getSuccessAction().getBeanName(), RolloutGroupActionEvaluator.class).eval(rollout,
rolloutGroup, rolloutGroup.getSuccessActionExp());
}
@Override
// No transaction, will be created per handled rollout
@Transactional(propagation = Propagation.NEVER)
@@ -839,51 +417,10 @@ public class JpaRolloutManagement extends AbstractRolloutManagement {
private long handleRollout(final long rolloutId) {
final JpaRollout rollout = rolloutRepository.findById(rolloutId)
.orElseThrow(() -> new EntityNotFoundException(Rollout.class, rolloutId));
runInUserContext(rollout, () -> handleRollout(rollout));
runInUserContext(rollout, () -> rolloutExecutor.execute(rollout));
return 0;
}
private void handleRollout(final JpaRollout rollout) {
LOGGER.debug("Handle rollout {}", rollout.getId());
switch (rollout.getStatus()) {
case CREATING:
handleCreateRollout(rollout);
break;
case DELETING:
handleDeleteRollout(rollout);
break;
case READY:
handleReadyRollout(rollout);
break;
case STARTING:
handleStartingRollout(rollout);
break;
case RUNNING:
handleRunningRollout(rollout);
break;
default:
LOGGER.error("Rollout in status {} not supposed to be handled!", rollout.getStatus());
break;
}
}
private void handleStartingRollout(final Rollout rollout) {
LOGGER.debug("handleStartingRollout called for rollout {}", rollout.getId());
if (ensureAllGroupsAreScheduled(rollout)) {
startFirstRolloutGroup(rollout);
}
}
private void handleReadyRollout(final Rollout rollout) {
if (rollout.getStartAt() != null && rollout.getStartAt() <= System.currentTimeMillis()) {
LOGGER.debug(
"handleReadyRollout called for rollout {} with autostart beyond define time. Switch to STARTING",
rollout.getId());
start(rollout.getId());
}
}
@Override
@Transactional
@Retryable(include = {
@@ -904,85 +441,6 @@ public class JpaRolloutManagement extends AbstractRolloutManagement {
rolloutRepository.save(jpaRollout);
}
private void handleDeleteRollout(final JpaRollout rollout) {
LOGGER.debug("handleDeleteRollout called for {}", rollout.getId());
// check if there are actions beyond schedule
boolean hardDeleteRolloutGroups = !actionRepository.existsByRolloutIdAndStatusNotIn(rollout.getId(),
Status.SCHEDULED);
if (hardDeleteRolloutGroups) {
LOGGER.debug("Rollout {} has no actions other than scheduled -> hard delete", rollout.getId());
hardDeleteRollout(rollout);
return;
}
// clean up all scheduled actions
final Slice<JpaAction> scheduledActions = findScheduledActionsByRollout(rollout);
deleteScheduledActions(rollout, scheduledActions);
// avoid another scheduler round and re-check if all scheduled actions
// has been cleaned up. we flush first to ensure that the we include the
// deletion above
entityManager.flush();
final boolean hasScheduledActionsLeft = actionRepository.countByRolloutIdAndStatus(rollout.getId(),
Status.SCHEDULED) > 0;
if (hasScheduledActionsLeft) {
return;
}
// only hard delete the rollout if no actions are left for the rollout.
// In case actions are left, they are probably are running or were
// running before, so only soft delete.
hardDeleteRolloutGroups = !actionRepository.existsByRolloutId(rollout.getId());
if (hardDeleteRolloutGroups) {
hardDeleteRollout(rollout);
return;
}
// set soft delete
rollout.setStatus(RolloutStatus.DELETED);
rollout.setDeleted(true);
rolloutRepository.save(rollout);
sendRolloutGroupDeletedEvents(rollout);
}
private void sendRolloutGroupDeletedEvents(final JpaRollout rollout) {
final List<Long> groupIds = rollout.getRolloutGroups().stream().map(RolloutGroup::getId)
.collect(Collectors.toList());
afterCommit.afterCommit(() -> groupIds.forEach(rolloutGroupId -> eventPublisherHolder.getEventPublisher()
.publishEvent(new RolloutGroupDeletedEvent(tenantAware.getCurrentTenant(), rolloutGroupId,
JpaRolloutGroup.class.getName(), eventPublisherHolder.getApplicationId()))));
}
private void hardDeleteRollout(final JpaRollout rollout) {
sendRolloutGroupDeletedEvents(rollout);
rolloutRepository.delete(rollout);
}
private void deleteScheduledActions(final JpaRollout rollout, final Slice<JpaAction> scheduledActions) {
final boolean hasScheduledActions = scheduledActions.getNumberOfElements() > 0;
if (hasScheduledActions) {
try {
final Iterable<JpaAction> iterable = scheduledActions::iterator;
final List<Long> actionIds = StreamSupport.stream(iterable.spliterator(), false).map(Action::getId)
.collect(Collectors.toList());
actionRepository.deleteByIdIn(actionIds);
afterCommit.afterCommit(() -> eventPublisherHolder.getEventPublisher()
.publishEvent(new RolloutUpdatedEvent(rollout, eventPublisherHolder.getApplicationId())));
} catch (final RuntimeException e) {
LOGGER.error("Exception during deletion of actions of rollout {}", rollout, e);
}
}
}
private Slice<JpaAction> findScheduledActionsByRollout(final JpaRollout rollout) {
return actionRepository.findByRolloutIdAndStatus(PageRequest.of(0, TRANSACTION_ACTIONS), rollout.getId(),
Status.SCHEDULED);
}
@Override
public long count() {
return rolloutRepository.count(RolloutSpecification.isDeletedWithDistributionSet(false));
@@ -1137,21 +595,6 @@ public class JpaRolloutManagement extends AbstractRolloutManagement {
QuotaHelper.assertAssignmentQuota(requested, quota, Target.class, RolloutGroup.class);
}
/**
* Enforces the quota defining the maximum number of {@link Action}s per
* {@link Target}.
*
* @param target
* The target
* @param requested
* number of actions to check
*/
private void assertActionsPerTargetQuota(final Target target, final int requested) {
final int quota = quotaManagement.getMaxActionsPerTarget();
QuotaHelper.assertAssignmentQuota(target.getId(), requested, quota, Action.class, Target.class,
actionRepository::countByTargetId);
}
private void runInUserContext(final BaseEntity rollout, final Runnable handler) {
DeploymentHelper.runInNonSystemContext(handler, () -> Objects.requireNonNull(rollout.getCreatedBy()), tenantAware);
}

View File

@@ -31,6 +31,7 @@ import org.eclipse.hawkbit.repository.RepositoryDefaultConfiguration;
import org.eclipse.hawkbit.repository.RepositoryProperties;
import org.eclipse.hawkbit.repository.RolloutApprovalStrategy;
import org.eclipse.hawkbit.repository.RolloutGroupManagement;
import org.eclipse.hawkbit.repository.RolloutExecutor;
import org.eclipse.hawkbit.repository.RolloutManagement;
import org.eclipse.hawkbit.repository.RolloutStatusCache;
import org.eclipse.hawkbit.repository.SoftwareModuleManagement;
@@ -617,6 +618,23 @@ public class RepositoryApplicationConfiguration extends JpaBaseConfiguration {
virtualPropertyReplacer, softwareModuleRepository, criteriaNoCountDao, properties.getDatabase());
}
@Bean
@ConditionalOnMissingBean
RolloutExecutor rolloutExecutor(
final RolloutTargetGroupRepository rolloutTargetGroupRepository, final EntityManager entityManager,
final RolloutRepository rolloutRepository, final ActionRepository actionRepository,
final RolloutGroupRepository rolloutGroupRepository, final AfterTransactionCommitExecutor afterCommit,
final TenantAware tenantAware, final RolloutGroupManagement rolloutGroupManagement,
final QuotaManagement quotaManagement, final DeploymentManagement deploymentManagement,
final TargetManagement targetManagement, final EventPublisherHolder eventPublisherHolder,
final PlatformTransactionManager txManager, final RolloutApprovalStrategy rolloutApprovalStrategy,
final ApplicationContext context) {
return new JpaRolloutExecutor(rolloutTargetGroupRepository, entityManager, rolloutRepository, actionRepository,
rolloutGroupRepository, afterCommit, tenantAware, rolloutGroupManagement, quotaManagement,
deploymentManagement, targetManagement, eventPublisherHolder, txManager, rolloutApprovalStrategy,
context);
}
@Bean
@ConditionalOnMissingBean
RolloutManagement rolloutManagement(final TargetManagement targetManagement,
@@ -626,11 +644,11 @@ public class RepositoryApplicationConfiguration extends JpaBaseConfiguration {
final PlatformTransactionManager txManager, final TenantAware tenantAware, final LockRegistry lockRegistry,
final JpaProperties properties, final RolloutApprovalStrategy rolloutApprovalStrategy,
final TenantConfigurationManagement tenantConfigurationManagement,
final SystemSecurityContext systemSecurityContext) {
final SystemSecurityContext systemSecurityContext, final RolloutExecutor rolloutExecutor) {
return new JpaRolloutManagement(targetManagement, deploymentManagement, rolloutGroupManagement,
distributionSetManagement, context, eventPublisherHolder, virtualPropertyReplacer, txManager,
tenantAware, lockRegistry, properties.getDatabase(), rolloutApprovalStrategy,
tenantConfigurationManagement, systemSecurityContext);
tenantConfigurationManagement, systemSecurityContext, rolloutExecutor);
}
/**