Fix / improves management of the Rollout's total target counter (#2449)

Signed-off-by: Avgustin Marinov <Avgustin.Marinov@bosch.com>
This commit is contained in:
Avgustin Marinov
2025-06-12 15:15:25 +03:00
committed by GitHub
parent 2098dc6223
commit 103a0dade0
2 changed files with 69 additions and 129 deletions

View File

@@ -13,7 +13,6 @@ import static org.eclipse.hawkbit.repository.jpa.builder.JpaRolloutGroupCreate.a
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
@@ -205,8 +204,9 @@ public class JpaRolloutExecutor implements RolloutExecutor {
log.debug("handleCreateRollout called for rollout {}", rollout.getId());
final List<RolloutGroup> rolloutGroups = rolloutGroupManagement.findByRollout(
rollout.getId(), PageRequest.of(0, quotaManagement.getMaxRolloutGroupsPerRollout(), Sort.by(Direction.ASC, "id"))
).getContent();
rollout.getId(),
PageRequest.of(0, quotaManagement.getMaxRolloutGroupsPerRollout(), Sort.by(Direction.ASC, "id")))
.getContent();
int readyGroups = 0;
int totalTargets = 0;
@@ -240,8 +240,8 @@ public class JpaRolloutExecutor implements RolloutExecutor {
rollout.setStatus(RolloutStatus.WAITING_FOR_APPROVAL);
rolloutApprovalStrategy.onApprovalRequired(rollout);
}
rollout.setLastCheck(0);
rollout.setTotalTargets(totalTargets);
rollout.setLastCheck(0);
rolloutRepository.save(rollout);
}
}
@@ -261,19 +261,17 @@ public class JpaRolloutExecutor implements RolloutExecutor {
final Slice<JpaAction> scheduledActions = findScheduledActionsByRollout(rollout);
deleteScheduledActions(rollout, scheduledActions);
// avoid another scheduler round and re-check if all scheduled actions
// has been cleaned up. we flush first to ensure that will include the deletion above
// avoid another scheduler round and re-check if all scheduled actions has been cleaned up.
// we flush first to ensure that will include the deletion above
entityManager.flush();
final boolean hasScheduledActionsLeft = actionRepository.countByRolloutIdAndStatus(rollout.getId(),
Status.SCHEDULED) > 0;
final boolean hasScheduledActionsLeft = actionRepository.countByRolloutIdAndStatus(rollout.getId(), Status.SCHEDULED) > 0;
if (hasScheduledActionsLeft) {
return;
}
// only hard delete the rollout if no actions are left for the rollout.
// In case actions are left, they are probably are running or were
// running before, so only soft delete.
// In case actions are left, they are probably are running or were running before, so only soft delete.
hardDeleteRolloutGroups = !actionRepository.existsByRolloutId(rollout.getId());
if (hardDeleteRolloutGroups) {
hardDeleteRollout(rollout);
@@ -284,8 +282,6 @@ public class JpaRolloutExecutor implements RolloutExecutor {
rollout.setStatus(RolloutStatus.DELETED);
rollout.setDeleted(true);
rolloutRepository.save(rollout);
//
// sendRolloutGroupDeletedEvents(rollout);
}
private void handleStopRollout(final JpaRollout rollout) {
@@ -294,22 +290,20 @@ public class JpaRolloutExecutor implements RolloutExecutor {
final Slice<JpaAction> scheduledActions = findScheduledActionsByRollout(rollout);
deleteScheduledActions(rollout, scheduledActions);
// avoid another scheduler round and re-check if all scheduled actions
// has been cleaned up. we flush first to ensure that the we include the
// deletion above
// avoid another scheduler round and re-check if all scheduled actions has been cleaned up. we flush first to ensure that
// we include the deletion above
entityManager.flush();
final boolean hasScheduledActionsLeft = actionRepository.countByRolloutIdAndStatus(rollout.getId(),
Status.SCHEDULED) > 0;
final boolean hasScheduledActionsLeft = actionRepository.countByRolloutIdAndStatus(rollout.getId(), Status.SCHEDULED) > 0;
if (hasScheduledActionsLeft) {
return;
}
rolloutGroupRepository.findByRolloutAndStatusNotIn(rollout,
Arrays.asList(RolloutGroupStatus.FINISHED, RolloutGroupStatus.ERROR)).forEach(rolloutGroup -> {
rolloutGroup.setStatus(RolloutGroupStatus.FINISHED);
rolloutGroupRepository.save(rolloutGroup);
});
rolloutGroupRepository.findByRolloutAndStatusNotIn(rollout, List.of(RolloutGroupStatus.FINISHED, RolloutGroupStatus.ERROR))
.forEach(rolloutGroup -> {
rolloutGroup.setStatus(RolloutGroupStatus.FINISHED);
rolloutGroupRepository.save(rolloutGroup);
});
rollout.setStatus(RolloutStatus.FINISHED);
rolloutRepository.save(rollout);
@@ -347,9 +341,9 @@ public class JpaRolloutExecutor implements RolloutExecutor {
}
final List<JpaRolloutGroup> runningGroups = rollout.getRolloutGroups().stream()
.filter(group -> group.getStatus() == RolloutGroupStatus.RUNNING)
.map(JpaRolloutGroup.class::cast)
.toList();
.filter(group -> group.getStatus() == RolloutGroupStatus.RUNNING)
.map(JpaRolloutGroup.class::cast)
.toList();
if (runningGroups.isEmpty()) {
// no running rollouts, probably there was an error somewhere at the latest group. And the latest group has
@@ -368,7 +362,6 @@ public class JpaRolloutExecutor implements RolloutExecutor {
}
private void hardDeleteRollout(final JpaRollout rollout) {
// sendRolloutGroupDeletedEvents(rollout);
rolloutRepository.delete(rollout);
}
@@ -393,13 +386,6 @@ public class JpaRolloutExecutor implements RolloutExecutor {
return actionRepository.findByRolloutIdAndStatus(PageRequest.of(0, TRANSACTION_ACTIONS), rollout.getId(),
Status.SCHEDULED);
}
//
// private void sendRolloutGroupDeletedEvents(final JpaRollout rollout) {
// final List<Long> groupIds = rollout.getRolloutGroups().stream().map(RolloutGroup::getId).toList();
// afterCommit.afterCommit(() -> groupIds.forEach(rolloutGroupId -> eventPublisherHolder.getEventPublisher()
// .publishEvent(new RolloutGroupDeletedEvent(tenantAware.getCurrentTenant(), rolloutGroupId,
// JpaRolloutGroup.class, eventPublisherHolder.getApplicationId()))));
// }
private boolean isRolloutComplete(final JpaRollout rollout) {
// ensure that changes in the same transaction count
@@ -447,13 +433,13 @@ public class JpaRolloutExecutor implements RolloutExecutor {
private void executeRunningGroups(final JpaRollout rollout, final List<JpaRolloutGroup> runningGroups, final RolloutGroup lastGroup) {
for (final JpaRolloutGroup rolloutGroup : runningGroups) {
// handle eventual deletion of devices, which might reflect the success condition
final long targetCount = countTargetsFrom(rolloutGroup);
if (rolloutGroup.getTotalTargets() != targetCount) {
updateTotalTargetCount(rolloutGroup, targetCount);
}
final RolloutGroup evalProxy = rolloutGroup == runningGroups.get(runningGroups.size() - 1) ?
evalProxy(rolloutGroup) : rolloutGroup;
final RolloutGroup evalProxy = rolloutGroup == runningGroups.get(runningGroups.size() - 1) ? evalProxy(rolloutGroup) : rolloutGroup;
// error state check, do we need to stop the whole rollout because of error?
final boolean isError = checkErrorState(rollout, evalProxy);
if (isError) {
@@ -491,8 +477,7 @@ public class JpaRolloutExecutor implements RolloutExecutor {
try {
evaluationManager.getErrorActionEvaluator(rolloutGroup.getErrorAction()).exec(rollout, rolloutGroup);
} catch (final EvaluatorNotConfiguredException e) {
log.error("Something bad happened when accessing the error action bean {}",
rolloutGroup.getErrorAction().name(), e);
log.error("Something bad happened when accessing the error action bean {}", rolloutGroup.getErrorAction().name(), e);
}
}
@@ -506,16 +491,15 @@ public class JpaRolloutExecutor implements RolloutExecutor {
}
private boolean checkErrorState(final Rollout rollout, final RolloutGroup rolloutGroup) {
final RolloutGroupErrorCondition errorCondition = rolloutGroup.getErrorCondition();
if (errorCondition == null) {
// there is no error condition, so return false, don't have error.
return false;
}
try {
return evaluationManager.getErrorConditionEvaluator(errorCondition).eval(rollout, rolloutGroup,
rolloutGroup.getErrorConditionExp());
return evaluationManager
.getErrorConditionEvaluator(errorCondition)
.eval(rollout, rolloutGroup, rolloutGroup.getErrorConditionExp());
} catch (final EvaluatorNotConfiguredException e) {
log.error("Something bad happened when accessing the error condition bean {}", errorCondition.name(), e);
return false;
@@ -526,17 +510,17 @@ public class JpaRolloutExecutor implements RolloutExecutor {
final RolloutGroupSuccessCondition successCondition) {
log.trace("Checking finish condition {} on rolloutgroup {}", successCondition, rolloutGroup);
try {
final boolean isFinished = evaluationManager.getSuccessConditionEvaluator(successCondition).eval(rollout,
evalProxy, rolloutGroup.getSuccessConditionExp());
final boolean isFinished = evaluationManager
.getSuccessConditionEvaluator(successCondition)
.eval(rollout, evalProxy, rolloutGroup.getSuccessConditionExp());
if (isFinished) {
log.debug("Rolloutgroup {} is finished, starting next group", rolloutGroup);
log.debug("Rollout group {} is finished, starting next group", rolloutGroup);
executeRolloutGroupSuccessAction(rollout, rolloutGroup);
} else {
log.debug("Rolloutgroup {} is still running", rolloutGroup);
log.debug("Rollout group {} is still running", rolloutGroup);
}
} catch (final EvaluatorNotConfiguredException e) {
log.error("Something bad happened when accessing the finish condition or success action bean {}",
successCondition.name(), e);
log.error("Something bad happened when accessing the finish condition or success action bean {}", successCondition.name(), e);
}
}
@@ -546,7 +530,6 @@ public class JpaRolloutExecutor implements RolloutExecutor {
private void startFirstRolloutGroup(final JpaRollout rollout) {
log.debug("startFirstRolloutGroup called for rollout {}", rollout.getId());
RolloutHelper.verifyRolloutInStatus(rollout, RolloutStatus.STARTING);
final List<JpaRolloutGroup> rolloutGroups = rolloutGroupRepository.findByRolloutOrderByIdAsc(rollout);
final JpaRolloutGroup rolloutGroup = rolloutGroups.get(0);
@@ -554,8 +537,7 @@ public class JpaRolloutExecutor implements RolloutExecutor {
throw new RolloutIllegalStateException("First found group is not the first group (has a parent).");
}
deploymentManagement.startScheduledActionsByRolloutGroupParent(
rollout.getId(), rollout.getDistributionSet().getId(), null);
deploymentManagement.startScheduledActionsByRolloutGroupParent(rollout.getId(), rollout.getDistributionSet().getId(), null);
rolloutGroup.setStatus(RolloutGroupStatus.RUNNING);
rolloutGroupRepository.save(rolloutGroup);
@@ -574,23 +556,19 @@ public class JpaRolloutExecutor implements RolloutExecutor {
private JpaRolloutGroup fillRolloutGroupWithTargets(
final JpaRollout rollout, final JpaRolloutGroup group, final List<RolloutGroup> rolloutGroups) {
// TODO - is that needed? in execute already has been checked that it is in CREATING
// if it has been paused/deleted meanwhile - no problem, anyway - couldn't be prevented race condition
RolloutHelper.verifyRolloutInStatus(rollout, RolloutStatus.CREATING);
final String groupTargetFilter = RolloutHelper.getGroupTargetFilter(RolloutHelper.getTargetFilterQuery(rollout), group);
final List<Long> readyGroups = RolloutHelper.getGroupsByStatusIncludingGroup(
rollout.getRolloutGroups(), RolloutGroupStatus.READY, group);
final long targetsInGroupFilter;
if (!RolloutHelper.isRolloutRetried(rollout.getTargetFilterQuery())) { // default case
targetsInGroupFilter = DeploymentHelper.runInNewTransaction(txManager,
targetsInGroupFilter = DeploymentHelper.runInNewTransaction(
txManager,
"countByRsqlAndNotInRolloutGroupsAndCompatibleAndUpdatable",
count -> targetManagement.countByRsqlAndNotInRolloutGroupsAndCompatibleAndUpdatable(
groupTargetFilter, readyGroups, rollout.getDistributionSet().getType()));
} else { // if it is a rollout retry
targetsInGroupFilter = DeploymentHelper.runInNewTransaction(txManager,
targetsInGroupFilter = DeploymentHelper.runInNewTransaction(
txManager,
"countByFailedRolloutAndNotInRolloutGroupsAndCompatible",
count -> targetManagement.countByFailedRolloutAndNotInRolloutGroups(
RolloutHelper.getIdFromRetriedTargetFilter(rollout.getTargetFilterQuery()), readyGroups));
@@ -604,19 +582,12 @@ public class JpaRolloutExecutor implements RolloutExecutor {
}
final long expectedInGroup = Math.round(percentFromTheRest * targetsInGroupFilter / 100);
final long currentlyInGroup = DeploymentHelper.runInNewTransaction(txManager,
long targetsLeftToAdd = expectedInGroup - DeploymentHelper.runInNewTransaction(
txManager,
"countRolloutTargetGroupByRolloutGroup",
count -> rolloutTargetGroupRepository.countByRolloutGroup(group));
// if there are enough Targets in the group, switch the group status to READY,
if (currentlyInGroup >= expectedInGroup) {
group.setStatus(RolloutGroupStatus.READY);
return rolloutGroupRepository.save(group);
}
try {
long targetsLeftToAdd = expectedInGroup - currentlyInGroup;
do {
while (targetsLeftToAdd > 0) {
// Add up to TRANSACTION_TARGETS of the left targets. In case a TransactionException is thrown this loop aborts
final long assigned = assignTargetsToGroupInNewTransaction(
rollout, group, groupTargetFilter, Math.min(TRANSACTION_TARGETS, targetsLeftToAdd));
@@ -625,12 +596,10 @@ public class JpaRolloutExecutor implements RolloutExecutor {
} else {
targetsLeftToAdd -= assigned;
}
} while (targetsLeftToAdd > 0);
}
group.setStatus(RolloutGroupStatus.READY);
group.setTotalTargets(DeploymentHelper.runInNewTransaction(txManager,
"countRolloutTargetGroupByRolloutGroup",
count -> rolloutTargetGroupRepository.countByRolloutGroup(group)).intValue());
group.setTotalTargets((int) (expectedInGroup - targetsLeftToAdd));
return rolloutGroupRepository.save(group);
} catch (final TransactionException e) {
log.warn(TRANSACTION_ASSIGNING_TARGETS_TO_ROLLOUT_GROUP_FAILED, e);
@@ -638,7 +607,7 @@ public class JpaRolloutExecutor implements RolloutExecutor {
}
}
private Long assignTargetsToGroupInNewTransaction(
private int assignTargetsToGroupInNewTransaction(
final JpaRollout rollout, final RolloutGroup group, final String targetFilter, final long limit) {
return DeploymentHelper.runInNewTransaction(txManager, "assignTargetsToRolloutGroup", status -> {
final PageRequest pageRequest = PageRequest.of(0, Math.toIntExact(limit));
@@ -655,7 +624,7 @@ public class JpaRolloutExecutor implements RolloutExecutor {
rolloutTargetGroupRepository.saveAll(targets.stream().map(target -> new RolloutTargetGroup(group, target)).toList());
return Long.valueOf(targets.getNumberOfElements());
return targets.getNumberOfElements();
});
}
@@ -668,11 +637,8 @@ public class JpaRolloutExecutor implements RolloutExecutor {
return false;
}
RolloutHelper.verifyRolloutInStatus(rollout, RolloutStatus.RUNNING);
final List<RolloutGroup> rolloutGroups = rollout.getRolloutGroups();
final JpaRolloutGroup group = (JpaRolloutGroup) rolloutGroups.get(rolloutGroups.size() - 1);
final long expectedInGroup = Math.max((int) group.getTargetPercentage(), 1);
final long currentlyInGroup = group.getTotalTargets();
if (currentlyInGroup >= expectedInGroup || group.getStatus() == RolloutGroupStatus.FINISHED) {
@@ -689,22 +655,23 @@ public class JpaRolloutExecutor implements RolloutExecutor {
// don't use RolloutHelper.getTargetFilterQuery(rollout)
// since it contains condition for device to be created before the rollout
rollout.getTargetFilterQuery(), group);
long newActions = 0;
do {
// Add up to TRANSACTION_TARGETS actions of the left targets
// In case a TransactionException is thrown this loop aborts
final int createdActions = createActionsForDynamicGroupInNewTransaction(rollout, group, groupTargetFilter,
Math.min(TRANSACTION_TARGETS, targetsLeftToAdd));
if (createdActions == 0) {
break; // no more to assign
} else {
newActions += createdActions;
targetsLeftToAdd -= createdActions;
}
} while (targetsLeftToAdd > 0);
if (newActions > 0) {
updateTotalTargetCount(group, group.getTotalTargets() + newActions);
final Slice<Target> targets = targetManagement.findByRsqlAndNoOverridingActionsAndNotInRolloutAndCompatibleAndUpdatable(
rollout.getId(), groupTargetFilter, rollout.getDistributionSet().getType(),
PageRequest.of(0, Math.toIntExact(Math.min(TRANSACTION_TARGETS, targetsLeftToAdd))));
if (targets.getNumberOfElements() > 0) {
final List<Action> newActions = createActions(
targets.getContent(), rollout.getDistributionSet(), rollout.getActionType(), rollout.getForcedTime(),
rollout, group);
if (!newActions.isEmpty() && group.getStatus() == RolloutGroupStatus.RUNNING) {
deploymentManagement.startScheduledActions(newActions);
}
// updates the total targets of the current group and the rollout - new actions size is the same as targets size
group.setTotalTargets(group.getTotalTargets() + newActions.size());
rolloutGroupRepository.save(group);
rollout.setTotalTargets(rollout.getTotalTargets() + newActions.size());
rolloutRepository.save(rollout);
// TODO - try to return false and proceed with handleRunningRollout
// the problem is that OptimisticLockException is thrown in that case
@@ -766,32 +733,8 @@ public class JpaRolloutExecutor implements RolloutExecutor {
((JpaRolloutManagement) rolloutManagement).publishRolloutGroupCreatedEventAfterCommit(savedGroup, rollout);
}
private int createActionsForDynamicGroupInNewTransaction(
final JpaRollout rollout, final RolloutGroup group, final String targetFilter, final long limit) {
return DeploymentHelper.runInNewTransaction(txManager, "createActionsForRolloutDynamicGroup", status -> {
// Dynamic rollouts shall always have weight!
final Slice<Target> targets = targetManagement.findByRsqlAndNoOverridingActionsAndNotInRolloutAndCompatibleAndUpdatable(
rollout.getId(), targetFilter, rollout.getDistributionSet().getType(), PageRequest.of(0, Math.toIntExact(limit)));
if (targets.getNumberOfElements() == 0) {
return 0;
}
final DistributionSet distributionSet = rollout.getDistributionSet();
final ActionType actionType = rollout.getActionType();
final long forceTime = rollout.getForcedTime();
final List<Action> newActions = createActions(targets.getContent(), distributionSet, actionType, forceTime, rollout, group);
if (!newActions.isEmpty() && group.getStatus() == RolloutGroupStatus.RUNNING) {
deploymentManagement.startScheduledActions(newActions);
}
return newActions.size();
});
}
/**
* Schedules a group of the rollout. Scheduled Actions are created to
* achieve this. The creation of those Actions is allowed to fail.
* Schedules a group of the rollout. Scheduled Actions are created to achieve this. The creation of those Actions is allowed to fail.
*/
private boolean scheduleRolloutGroup(final JpaRollout rollout, final JpaRolloutGroup group) {
final long targetsInGroup = rolloutTargetGroupRepository.countByRolloutGroup(group);
@@ -802,14 +745,15 @@ public class JpaRolloutExecutor implements RolloutExecutor {
actionsLeft -= createActionsForRolloutGroup(rollout, group);
}
if (actionsLeft <= 0) {
if (actionsLeft > 0) {
return false;
} else {
if (group.getStatus() != RolloutGroupStatus.SCHEDULED && group.getStatus() != RolloutGroupStatus.RUNNING) { // dynamic groups could already be running
group.setStatus(RolloutGroupStatus.SCHEDULED);
rolloutGroupRepository.save(group);
}
return true;
}
return false;
}
private long createActionsForRolloutGroup(final Rollout rollout, final RolloutGroup group) {
@@ -820,7 +764,6 @@ public class JpaRolloutExecutor implements RolloutExecutor {
actionsCreated = createActionsForTargetsInNewTransaction(rollout, group);
totalActionsCreated += actionsCreated;
} while (actionsCreated > 0);
} catch (final TransactionException e) {
log.warn(TRANSACTION_ASSIGNING_TARGETS_TO_ROLLOUT_GROUP_FAILED, e);
return 0;
@@ -849,8 +792,9 @@ public class JpaRolloutExecutor implements RolloutExecutor {
* Creates an action entry into the action repository. In case of existing scheduled actions the scheduled actions gets canceled.
* A scheduled action is created in-active for static and running for dynamic groups.
*/
private List<Action> createActions(final Collection<Target> targets, final DistributionSet distributionSet,
final ActionType actionType, final Long forcedTime, final Rollout rollout, final RolloutGroup rolloutGroup) {
private List<Action> createActions(
final Collection<Target> targets, final DistributionSet distributionSet, final ActionType actionType, final Long forcedTime,
final Rollout rollout, final RolloutGroup rolloutGroup) {
// cancel all current scheduled actions for this target. E.g. an action is already scheduled and a next action is created
// then cancel the current scheduled action to cancel. E.g. a new scheduled action is created.
final List<Long> targetIds = targets.stream().map(Target::getId).toList();
@@ -886,7 +830,6 @@ public class JpaRolloutExecutor implements RolloutExecutor {
*/
private void assertActionsPerTargetQuota(final Target target, final int requested) {
final int quota = quotaManagement.getMaxActionsPerTarget();
QuotaHelper.assertAssignmentQuota(target.getId(), requested, quota, Action.class, Target.class,
actionRepository::countByTargetId);
QuotaHelper.assertAssignmentQuota(target.getId(), requested, quota, Action.class, Target.class, actionRepository::countByTargetId);
}
}

View File

@@ -417,18 +417,15 @@ class RolloutManagementTest extends AbstractJpaIntegrationTest {
}
@Test
// @Title("Deleting targets of a rollout")
@Description("Verifying that next group is started when targets of the group have been deleted.")
void checkRunningRolloutsStartsNextGroupIfTargetsDeleted() {
final int amountTargetsForRollout = 15;
final int amountOtherTargets = 0;
final int amountGroups = 3;
final String successCondition = "100";
final String errorCondition = "80";
final Rollout createdRollout = testdataFactory.createAndStartRollout(amountTargetsForRollout,
amountOtherTargets, amountGroups,
successCondition, errorCondition);
final Rollout createdRollout = testdataFactory.createAndStartRollout(
amountTargetsForRollout, amountOtherTargets, amountGroups, successCondition, errorCondition);
finishActionAndDeleteTargetsOfFirstRunningGroup(createdRollout);
checkSecondGroupStatusIsRunning(createdRollout);