Fix concurrent starting the next group (#1853)

when in StartNextGroupRolloutGroupSuccessAction#startNextGroup:
    1. start all scheduled actions
    2. if started are > 0 -> RUNNING, otherwise -> FINISHED (if not dynamic rollout)

what could possibly happen is that at same time:
    * because of a success condition met the JpaRolloutsExecutor triggers start the group
    * user triggers start of the next group (via RolloutsManagement#triggerNextGroup)

then it could:
    * the 'first' one succeeds to start next group
    * the second attempts to start it (JpaRolloutsExecutor found the previous had met the success condition or trigger next found it SCHEDULED and next to run)
    * the second finds no scheduled actions (just running) and decides there are no actions. So, it assumes (wrongly) no actions in group - and set it as FINISHED

This way we could have FINISHED group with still running actions

Signed-off-by: Marinov Avgustin <Avgustin.Marinov@bosch.com>
This commit is contained in:
Avgustin Marinov
2024-10-04 11:05:05 +03:00
committed by GitHub
parent 7ca5cbe1aa
commit de323b66d1
7 changed files with 65 additions and 138 deletions

View File

@@ -359,20 +359,6 @@ public interface DeploymentManagement {
@PreAuthorize(SpringEvalExpressions.HAS_AUTH_READ_TARGET)
Page<String> findMessagesByActionStatusId(@NotNull Pageable pageable, long actionStatusId);
/**
* Counts all messages for an {@link ActionStatus}.
* <p/>
* No access control applied.
*
* @deprecated Used by UI only. With future removal of UI it could be removed.
* @param actionStatusId
* the id of {@link ActionStatus} to count the messages from
* @return count of messages by a specific {@link ActionStatus} id
*/
@Deprecated(forRemoval = true)
@PreAuthorize(SpringEvalExpressions.HAS_AUTH_READ_TARGET)
long countMessagesByActionStatusId(long actionStatusId);
/**
* Get the {@link Action} entity for given actionId with all lazy attributes
* (i.e. distributionSet, target, target.assignedDs).
@@ -498,7 +484,7 @@ public interface DeploymentManagement {
* @return the amount of started actions
*/
@PreAuthorize(SpringEvalExpressions.HAS_AUTH_READ_TARGET)
long startScheduledActionsByRolloutGroupParent(long rolloutId, long distributionSetId, Long rolloutGroupParentId);
void startScheduledActionsByRolloutGroupParent(long rolloutId, long distributionSetId, Long rolloutGroupParentId);
/**
* Handles the target assignments. Shall be part of same group

View File

@@ -267,8 +267,7 @@ public class JpaRolloutExecutor implements RolloutExecutor {
deleteScheduledActions(rollout, scheduledActions);
// avoid another scheduler round and re-check if all scheduled actions
// has been cleaned up. we flush first to ensure that the we include the
// deletion above
// has been cleaned up. we flush first to ensure that will include the deletion above
entityManager.flush();
final boolean hasScheduledActionsLeft = actionRepository.countByRolloutIdAndStatus(rollout.getId(),
Status.SCHEDULED) > 0;
@@ -354,21 +353,19 @@ public class JpaRolloutExecutor implements RolloutExecutor {
}
}
final List<JpaRolloutGroup> rolloutGroupsRunning =
final List<JpaRolloutGroup> runningGroups =
rollout.getRolloutGroups().stream()
.filter(group -> group.getStatus() == RolloutGroupStatus.RUNNING)
.map(JpaRolloutGroup.class::cast)
.toList();
if (rolloutGroupsRunning.isEmpty()) {
// no running rollouts, probably there was an error
// somewhere at the latest group. And the latest group has
// been switched from running into error state. So we need
// to find the latest group which
if (runningGroups.isEmpty()) {
// no running rollouts, probably there was an error somewhere at the latest group. And the latest group has
// been switched from running into error state. So we need to find the latest group which
executeLatestRolloutGroup(rollout);
} else {
log.debug("Rollout {} has {} running groups", rollout.getId(), rolloutGroupsRunning.size());
executeRolloutGroups(rollout, rolloutGroupsRunning, rollout.getRolloutGroups().get(rollout.getRolloutGroups().size() - 1));
log.debug("Rollout {} has {} running groups", rollout.getId(), runningGroups.size());
executeRunningGroups(rollout, runningGroups, rollout.getRolloutGroups().get(rollout.getRolloutGroups().size() - 1));
}
if (isRolloutComplete(rollout)) {
@@ -458,26 +455,24 @@ public class JpaRolloutExecutor implements RolloutExecutor {
}
}
private void executeRolloutGroups(final JpaRollout rollout, final List<JpaRolloutGroup> rolloutGroups, final RolloutGroup lastRolloutGroup) {
for (final JpaRolloutGroup rolloutGroup : rolloutGroups) {
private void executeRunningGroups(final JpaRollout rollout, final List<JpaRolloutGroup> runningGroups, final RolloutGroup lastGroup) {
for (final JpaRolloutGroup rolloutGroup : runningGroups) {
final long targetCount = countTargetsFrom(rolloutGroup);
if (rolloutGroup.getTotalTargets() != targetCount) {
updateTotalTargetCount(rolloutGroup, targetCount);
}
final RolloutGroup evalProxy = rolloutGroup == rolloutGroups.get(rolloutGroups.size() - 1) ?
final RolloutGroup evalProxy = rolloutGroup == runningGroups.get(runningGroups.size() - 1) ?
evalProxy(rolloutGroup) : rolloutGroup;
// error state check, do we need to stop the whole
// rollout because of error?
// error state check, do we need to stop the whole rollout because of error?
final boolean isError = checkErrorState(rollout, evalProxy);
if (isError) {
log.info("Rollout {} {} has error, calling error action", rollout.getName(), rollout.getId());
callErrorAction(rollout, rolloutGroup);
} else {
// not in error so check finished state, do we need to
// start the next group?
// not in error so check finished state, do we need to start the next group?
checkSuccessCondition(rollout, rolloutGroup, evalProxy, rolloutGroup.getSuccessCondition());
if (!(rolloutGroup == lastRolloutGroup && rolloutGroup.isDynamic()) && isRolloutGroupComplete(rollout, rolloutGroup)) {
if (!(rolloutGroup == lastGroup && rolloutGroup.isDynamic()) && isRolloutGroupComplete(rollout, rolloutGroup)) {
rolloutGroup.setStatus(RolloutGroupStatus.FINISHED);
rolloutGroupRepository.save(rolloutGroup);
}
@@ -757,6 +752,7 @@ public class JpaRolloutExecutor implements RolloutExecutor {
}
return;
}
final JpaRolloutGroup group = new JpaRolloutGroup();
final String lastGroupWithoutSuffix = "group-" + groupCount;
final String suffix = lastGroup.getName().startsWith(lastGroupWithoutSuffix) ? lastGroup.getName().substring(lastGroupWithoutSuffix.length()) : "";

View File

@@ -645,40 +645,38 @@ public class JpaDeploymentManagement extends JpaActionManagement implements Depl
}
@Override
public long startScheduledActionsByRolloutGroupParent(final long rolloutId, final long distributionSetId,
public void startScheduledActionsByRolloutGroupParent(final long rolloutId, final long distributionSetId,
final Long rolloutGroupParentId) {
long totalActionsCount = 0L;
long lastStartedActionsCount;
do {
lastStartedActionsCount = DeploymentHelper.runInNewTransaction(
while (DeploymentHelper.runInNewTransaction(
txManager,
"startScheduledActions-" + rolloutId,
status -> {
final Page<Action> rolloutGroupActions = findActionsByRolloutAndRolloutGroupParent(
rolloutId, rolloutGroupParentId, ACTION_PAGE_LIMIT);
if (rolloutGroupActions.getContent().isEmpty()) {
return 0L;
final PageRequest pageRequest = PageRequest.of(0, ACTION_PAGE_LIMIT);
final Page<Action> groupScheduledActions;
if (rolloutGroupParentId == null) {
groupScheduledActions = actionRepository.findByRolloutIdAndRolloutGroupParentIsNullAndStatus(pageRequest, rolloutId, Action.Status.SCHEDULED);
} else {
groupScheduledActions = actionRepository.findByRolloutIdAndRolloutGroupParentIdAndStatus(pageRequest, rolloutId, rolloutGroupParentId, Action.Status.SCHEDULED);
}
// self invocation won't check @PreAuthorize but it is already checked for the method
startScheduledActions(rolloutGroupActions.getContent());
return rolloutGroupActions.getTotalElements();
});
totalActionsCount += lastStartedActionsCount;
} while (lastStartedActionsCount > 0);
return totalActionsCount;
if (groupScheduledActions.getContent().isEmpty()) {
return 0L;
} else {
// self invocation won't check @PreAuthorize but it is already checked for the method
startScheduledActions(groupScheduledActions.getContent());
return groupScheduledActions.getTotalElements();
}
}) > 0);
}
@Override
public void startScheduledActions(final List<Action> rolloutGroupActions) {
// Close actions already assigned and collect pending assignments
final List<JpaAction> pendingTargetAssignments = rolloutGroupActions.stream()
.map(JpaAction.class::cast).map(this::closeActionIfSetWasAlreadyAssigned).filter(Objects::nonNull)
.collect(Collectors.toList());
.map(JpaAction.class::cast)
.map(this::closeActionIfSetWasAlreadyAssigned)
.filter(Objects::nonNull)
.toList();
if (pendingTargetAssignments.isEmpty()) {
return;
}
@@ -689,21 +687,7 @@ public class JpaDeploymentManagement extends JpaActionManagement implements Depl
}
}
private Page<Action> findActionsByRolloutAndRolloutGroupParent(final Long rolloutId,
final Long rolloutGroupParentId, final int limit) {
final PageRequest pageRequest = PageRequest.of(0, limit);
if (rolloutGroupParentId == null) {
return actionRepository.findByRolloutIdAndRolloutGroupParentIsNullAndStatus(pageRequest, rolloutId,
Action.Status.SCHEDULED);
} else {
return actionRepository.findByRolloutIdAndRolloutGroupParentIdAndStatus(pageRequest, rolloutId,
rolloutGroupParentId, Action.Status.SCHEDULED);
}
}
private JpaAction closeActionIfSetWasAlreadyAssigned(final JpaAction action) {
if (isMultiAssignmentsEnabled()) {
return action;
}
@@ -899,19 +883,6 @@ public class JpaDeploymentManagement extends JpaActionManagement implements Depl
return new PageImpl<>(result, pageable, result.size());
}
@Override
public long countMessagesByActionStatusId(final long actionStatusId) {
final CriteriaBuilder cb = entityManager.getCriteriaBuilder();
final CriteriaQuery<Long> countMsgQuery = cb.createQuery(Long.class);
final Root<JpaActionStatus> countMsgQueryFrom = countMsgQuery.distinct(true).from(JpaActionStatus.class);
final ListJoin<JpaActionStatus, String> cJoin = countMsgQueryFrom.joinList("messages", JoinType.LEFT);
countMsgQuery.select(cb.count(cJoin))
.where(cb.equal(countMsgQueryFrom.get(JpaActionStatus_.id), actionStatusId));
return entityManager.createQuery(countMsgQuery).getSingleResult();
}
@Override
public long countActionsAll() {
return actionRepository.count();

View File

@@ -452,7 +452,7 @@ public class JpaRolloutManagement implements RolloutManagement {
ConcurrencyFailureException.class }, maxAttempts = Constants.TX_RT_MAX, backoff = @Backoff(delay = Constants.TX_RT_DELAY))
public Rollout approveOrDeny(final long rolloutId, final Rollout.ApprovalDecision decision, final String remark) {
log.debug("approveOrDeny rollout called for rollout {} with decision {}", rolloutId, decision);
final JpaRollout rollout = getRolloutAndThrowExceptionIfNotFound(rolloutId);
final JpaRollout rollout = getRolloutOrThrowExceptionIfNotFound(rolloutId);
RolloutHelper.verifyRolloutInStatus(rollout, RolloutStatus.WAITING_FOR_APPROVAL);
switch (decision) {
case APPROVED:
@@ -478,7 +478,7 @@ public class JpaRolloutManagement implements RolloutManagement {
public Rollout start(final long rolloutId) {
log.debug("startRollout called for rollout {}", rolloutId);
final JpaRollout rollout = getRolloutAndThrowExceptionIfNotFound(rolloutId);
final JpaRollout rollout = getRolloutOrThrowExceptionIfNotFound(rolloutId);
RolloutHelper.checkIfRolloutCanStarted(rollout, rollout);
rollout.setStatus(RolloutStatus.STARTING);
rollout.setLastCheck(0);
@@ -490,7 +490,7 @@ public class JpaRolloutManagement implements RolloutManagement {
@Retryable(include = {
ConcurrencyFailureException.class }, maxAttempts = Constants.TX_RT_MAX, backoff = @Backoff(delay = Constants.TX_RT_DELAY))
public void pauseRollout(final long rolloutId) {
final JpaRollout rollout = getRolloutAndThrowExceptionIfNotFound(rolloutId);
final JpaRollout rollout = getRolloutOrThrowExceptionIfNotFound(rolloutId);
if (RolloutStatus.RUNNING != rollout.getStatus()) {
throw new RolloutIllegalStateException("Rollout can only be paused in state running but current state is "
+ rollout.getStatus().name().toLowerCase());
@@ -509,7 +509,7 @@ public class JpaRolloutManagement implements RolloutManagement {
@Retryable(include = {
ConcurrencyFailureException.class }, maxAttempts = Constants.TX_RT_MAX, backoff = @Backoff(delay = Constants.TX_RT_DELAY))
public void resumeRollout(final long rolloutId) {
final JpaRollout rollout = getRolloutAndThrowExceptionIfNotFound(rolloutId);
final JpaRollout rollout = getRolloutOrThrowExceptionIfNotFound(rolloutId);
if (RolloutStatus.PAUSED != rollout.getStatus()) {
throw new RolloutIllegalStateException("Rollout can only be resumed in state paused but current state is "
+ rollout.getStatus().name().toLowerCase());
@@ -583,7 +583,7 @@ public class JpaRolloutManagement implements RolloutManagement {
ConcurrencyFailureException.class }, maxAttempts = Constants.TX_RT_MAX, backoff = @Backoff(delay = Constants.TX_RT_DELAY))
public Rollout update(final RolloutUpdate u) {
final GenericRolloutUpdate update = (GenericRolloutUpdate) u;
final JpaRollout rollout = getRolloutAndThrowExceptionIfNotFound(update.getId());
final JpaRollout rollout = getRolloutOrThrowExceptionIfNotFound(update.getId());
checkIfDeleted(update.getId(), rollout.getStatus());
@@ -598,7 +598,7 @@ public class JpaRolloutManagement implements RolloutManagement {
}
}
private JpaRollout getRolloutAndThrowExceptionIfNotFound(final Long rolloutId) {
private JpaRollout getRolloutOrThrowExceptionIfNotFound(final Long rolloutId) {
return rolloutRepository.findById(rolloutId)
.orElseThrow(() -> new EntityNotFoundException(Rollout.class, rolloutId));
}
@@ -809,7 +809,7 @@ public class JpaRolloutManagement implements RolloutManagement {
@Retryable(include = {
ConcurrencyFailureException.class }, maxAttempts = Constants.TX_RT_MAX, backoff = @Backoff(delay = Constants.TX_RT_DELAY))
public void triggerNextGroup(final long rolloutId) {
final JpaRollout rollout = getRolloutAndThrowExceptionIfNotFound(rolloutId);
final JpaRollout rollout = getRolloutOrThrowExceptionIfNotFound(rolloutId);
if (RolloutStatus.RUNNING != rollout.getStatus()) {
throw new RolloutIllegalStateException("Rollout is not in running state");
}
@@ -824,7 +824,8 @@ public class JpaRolloutManagement implements RolloutManagement {
final RolloutGroup latestRunning = groups.stream()
.sorted(Comparator.comparingLong(RolloutGroup::getId).reversed())
.filter(g -> RolloutGroupStatus.RUNNING.equals(g.getStatus())).findFirst()
.filter(g -> RolloutGroupStatus.RUNNING.equals(g.getStatus()))
.findFirst()
.orElseThrow(() -> new RolloutIllegalStateException("No group is running"));
startNextRolloutGroupAction.exec(rollout, latestRunning);

View File

@@ -121,7 +121,7 @@ public interface RolloutGroupRepository
@Modifying
@Transactional
@Query("UPDATE JpaRolloutGroup g SET g.status = :status WHERE g.parent = :parent")
void setStatusForCildren(@Param("status") RolloutGroupStatus status, @Param("parent") RolloutGroup parent);
void setStatusForChildren(@Param("status") RolloutGroupStatus status, @Param("parent") RolloutGroup parent);
/**
* Retrieves all {@link RolloutGroup} for a specific rollout and status not

View File

@@ -9,12 +9,9 @@
*/
package org.eclipse.hawkbit.repository.jpa.rollout.condition;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.hawkbit.repository.DeploymentManagement;
import org.eclipse.hawkbit.repository.jpa.repository.RolloutGroupRepository;
import org.eclipse.hawkbit.repository.jpa.model.JpaRolloutGroup;
import org.eclipse.hawkbit.repository.model.Rollout;
import org.eclipse.hawkbit.repository.model.RolloutGroup;
import org.eclipse.hawkbit.repository.model.RolloutGroup.RolloutGroupStatus;
@@ -27,13 +24,12 @@ import org.eclipse.hawkbit.security.SystemSecurityContext;
public class StartNextGroupRolloutGroupSuccessAction implements RolloutGroupActionEvaluator<RolloutGroup.RolloutGroupSuccessAction> {
private final RolloutGroupRepository rolloutGroupRepository;
private final DeploymentManagement deploymentManagement;
private final SystemSecurityContext systemSecurityContext;
public StartNextGroupRolloutGroupSuccessAction(final RolloutGroupRepository rolloutGroupRepository,
final DeploymentManagement deploymentManagement, final SystemSecurityContext systemSecurityContext) {
public StartNextGroupRolloutGroupSuccessAction(
final RolloutGroupRepository rolloutGroupRepository, final DeploymentManagement deploymentManagement,
final SystemSecurityContext systemSecurityContext) {
this.rolloutGroupRepository = rolloutGroupRepository;
this.deploymentManagement = deploymentManagement;
this.systemSecurityContext = systemSecurityContext;
@@ -44,45 +40,25 @@ public class StartNextGroupRolloutGroupSuccessAction implements RolloutGroupActi
return RolloutGroup.RolloutGroupSuccessAction.NEXTGROUP;
}
// Note - the exec could be called by JpaRolloutsExecutor and buy JpaRolloutsManagement#triggerNextGroup
// this means it cold be called by concurrently.
@Override
public void exec(final Rollout rollout, final RolloutGroup rolloutGroup) {
systemSecurityContext.runAsSystem(() -> {
startNextGroup(rollout, rolloutGroup);
// retrieve all actions according to the parent group of the finished rolloutGroup,
// so retrieve all child-group actions which need to be started.
deploymentManagement.startScheduledActionsByRolloutGroupParent(
rollout.getId(), rollout.getDistributionSet().getId(), rolloutGroup.getId());
log.debug("Next actions started for rollout {} and parent group {}", rollout, rolloutGroup);
if (!rolloutGroupRepository
.findByParentIdAndStatus(rolloutGroup.getId(), RolloutGroupStatus.SCHEDULED).isEmpty()) {
// get next scheduled group and set them in state running
// there could be a case that the next group is empty (e.g. targets has been deleted after the group has been scheduled)
// but then, on the next run it will be finished and next will be started.
rolloutGroupRepository.setStatusForChildren(RolloutGroupStatus.RUNNING, rolloutGroup);
log.debug("Next group set to RUNNING for rollout {} and parent group {}", rollout, rolloutGroup);
}
return null;
});
}
private void startNextGroup(final Rollout rollout, final RolloutGroup rolloutGroup) {
// retrieve all actions according to the parent group of the finished
// rolloutGroup, so retrieve all child-group actions which need to be
// started.
final long countOfStartedActions = deploymentManagement.startScheduledActionsByRolloutGroupParent(
rollout.getId(), rollout.getDistributionSet().getId(), rolloutGroup.getId());
log.debug("{} Next actions started for rollout {} and parent group {}", countOfStartedActions, rollout,
rolloutGroup);
if (countOfStartedActions > 0) {
// get all next scheduled groups and set them in state running
rolloutGroupRepository.setStatusForCildren(RolloutGroupStatus.RUNNING, rolloutGroup);
} else {
log.debug("No actions to start for next rolloutgroup of parent {} {}", rolloutGroup.getId(),
rolloutGroup.getName());
// nothing for next group, just finish the group, this can happen
// e.g. if targets has been deleted after the group has been
// scheduled. If the group is empty now, we just finish the group if
// there are not actions available for this group.
final List<JpaRolloutGroup> findByRolloutGroupParent = rolloutGroupRepository
.findByParentIdAndStatus(rolloutGroup.getId(), RolloutGroupStatus.SCHEDULED);
findByRolloutGroupParent.forEach(nextGroup -> {
if (nextGroup.isDynamic()) {
nextGroup.setStatus(RolloutGroupStatus.RUNNING);
} else {
log.debug("Rolloutgroup {} is finished, starting next group", nextGroup);
nextGroup.setStatus(RolloutGroupStatus.FINISHED);
rolloutGroupRepository.save(nextGroup);
// find the next group to set in running state
startNextGroup(rollout, nextGroup);
}
});
}
}
}
}

View File

@@ -374,15 +374,12 @@ class RolloutManagementTest extends AbstractJpaIntegrationTest {
successCondition, errorCondition);
finishActionAndDeleteTargetsOfFirstRunningGroup(createdRollout);
checkSecondGroupStatusIsRunning(createdRollout);
finishActionAndDeleteTargetsOfSecondRunningGroup(createdRollout);
deleteAllTargetsFromThirdGroup(createdRollout);
rolloutHandler.handleAll(); // one more time to finish the second group
verifyRolloutAndAllGroupsAreFinished(createdRollout);
}
@Step("Finish three actions of the rollout group and delete two targets")