Refactor action repository (#2118)
Signed-off-by: Avgustin Marinov <Avgustin.Marinov@bosch.com>
This commit is contained in:
@@ -81,9 +81,8 @@ import org.springframework.security.core.context.SecurityContextHolder;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
|
||||
/**
|
||||
* {@link AmqpMessageDispatcherService} create all outgoing AMQP messages and
|
||||
* delegate the messages to a {@link AmqpMessageSenderService}.
|
||||
*
|
||||
* {@link AmqpMessageDispatcherService} create all outgoing AMQP messages and delegate the messages to a {@link AmqpMessageSenderService}.
|
||||
* <p/>
|
||||
* Additionally, the dispatcher listener/subscribe for some target events e.g. assignment.
|
||||
*/
|
||||
@Slf4j
|
||||
@@ -173,7 +172,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
}
|
||||
|
||||
log.debug("MultiActionEvent received for {}", multiActionEvent.getControllerIds());
|
||||
sendMultiActionRequestMessages(multiActionEvent.getTenant(), multiActionEvent.getControllerIds());
|
||||
sendMultiActionRequestMessages(multiActionEvent.getControllerIds());
|
||||
}
|
||||
|
||||
protected void sendUpdateMessageToTarget(
|
||||
@@ -184,27 +183,6 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
sendUpdateMessageToTargets(actionProp, Collections.singletonList(target), softwareModules);
|
||||
}
|
||||
|
||||
protected void sendMultiActionRequestToTarget(
|
||||
final String tenant, final Target target, final List<Action> actions,
|
||||
final Function<Action, Map<SoftwareModule, List<SoftwareModuleMetadata>>> getSoftwareModuleMetaData) {
|
||||
final URI targetAddress = target.getAddress();
|
||||
if (!IpUtil.isAmqpUri(targetAddress) || CollectionUtils.isEmpty(actions)) {
|
||||
return;
|
||||
}
|
||||
|
||||
final DmfMultiActionRequest multiActionRequest = new DmfMultiActionRequest();
|
||||
actions.forEach(action -> {
|
||||
final DmfActionRequest actionRequest = createDmfActionRequest(target, action, getSoftwareModuleMetaData.apply(action));
|
||||
final int weight = deploymentManagement.getWeightConsideringDefault(action);
|
||||
multiActionRequest.addElement(getEventTypeForAction(action), actionRequest, weight);
|
||||
});
|
||||
|
||||
final Message message = getMessageConverter().toMessage(
|
||||
multiActionRequest,
|
||||
createConnectorMessagePropertiesEvent(tenant, target.getControllerId(), EventTopic.MULTI_ACTION));
|
||||
amqpSenderService.sendMessage(message, targetAddress);
|
||||
}
|
||||
|
||||
protected DmfDownloadAndUpdateRequest createDownloadAndUpdateRequest(
|
||||
final Target target, final Long actionId,
|
||||
final Map<SoftwareModule, List<SoftwareModuleMetadata>> softwareModules) {
|
||||
@@ -303,14 +281,6 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
return dmfTarget;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a Confirmation request.
|
||||
*
|
||||
* @param target the target
|
||||
* @param actionId the actionId
|
||||
* @param softwareModules the software modules
|
||||
* @return confirm request
|
||||
*/
|
||||
protected DmfConfirmRequest createConfirmRequest(
|
||||
final Target target, final Long actionId, final Map<SoftwareModule, List<SoftwareModuleMetadata>> softwareModules) {
|
||||
final DmfConfirmRequest request = new DmfConfirmRequest();
|
||||
@@ -325,6 +295,33 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
return request;
|
||||
}
|
||||
|
||||
void sendMultiActionRequestToTarget(
|
||||
final Target target, final List<Action> actions,
|
||||
final Function<SoftwareModule, List<SoftwareModuleMetadata>> getSoftwareModuleMetaData) {
|
||||
final URI targetAddress = target.getAddress();
|
||||
if (!IpUtil.isAmqpUri(targetAddress) || CollectionUtils.isEmpty(actions)) {
|
||||
return;
|
||||
}
|
||||
|
||||
final DmfMultiActionRequest multiActionRequest = new DmfMultiActionRequest();
|
||||
actions.forEach(action -> {
|
||||
final DmfActionRequest actionRequest = createDmfActionRequest(
|
||||
target, action,
|
||||
action.getDistributionSet().getModules().stream()
|
||||
.collect(Collectors.toMap(Function.identity(), module -> {
|
||||
final List<SoftwareModuleMetadata> softwareModuleMetadata = getSoftwareModuleMetaData.apply(module);
|
||||
return softwareModuleMetadata == null ? Collections.emptyList() : softwareModuleMetadata;
|
||||
})));
|
||||
final int weight = deploymentManagement.getWeightConsideringDefault(action);
|
||||
multiActionRequest.addElement(getEventTypeForAction(action), actionRequest, weight);
|
||||
});
|
||||
|
||||
final Message message = getMessageConverter().toMessage(
|
||||
multiActionRequest,
|
||||
createConnectorMessagePropertiesEvent(target.getTenant(), target.getControllerId(), EventTopic.MULTI_ACTION));
|
||||
amqpSenderService.sendMessage(message, targetAddress);
|
||||
}
|
||||
|
||||
private static DmfActionRequest createPlainActionRequest(final Action action) {
|
||||
final DmfActionRequest actionRequest = new DmfActionRequest();
|
||||
actionRequest.setActionId(action.getId());
|
||||
@@ -463,23 +460,27 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
}
|
||||
}
|
||||
|
||||
private void sendMultiActionRequestMessages(final String tenant, final List<String> controllerIds) {
|
||||
final Map<SoftwareModule, List<SoftwareModuleMetadata>> softwareModuleMetadata = new HashMap<>();
|
||||
targetManagement.getByControllerID(controllerIds).stream()
|
||||
.filter(target -> IpUtil.isAmqpUri(target.getAddress())).forEach(target -> {
|
||||
final List<Action> activeActions = deploymentManagement
|
||||
.findActiveActionsWithHighestWeight(target.getControllerId(), MAX_ACTION_COUNT);
|
||||
private void sendMultiActionRequestMessages(final List<String> controllerIds) {
|
||||
final Map<String, List<Action>> controllerIdToActions = controllerIds.stream()
|
||||
.collect(Collectors.toMap(
|
||||
Function.identity(),
|
||||
controllerId -> deploymentManagement.findActiveActionsWithHighestWeight(controllerId, MAX_ACTION_COUNT)));
|
||||
|
||||
activeActions.forEach(action ->
|
||||
action.getDistributionSet().getModules().forEach(module ->
|
||||
softwareModuleMetadata.computeIfAbsent(module, this::getSoftwareModuleMetadata)));
|
||||
// gets all software modules for all action at once
|
||||
final Set<Long> allSmIds = controllerIdToActions.values().stream()
|
||||
.flatMap(actions -> actions.stream()
|
||||
.map(Action::getDistributionSet)
|
||||
.flatMap(ds -> ds.getModules().stream())
|
||||
.map(SoftwareModule::getId))
|
||||
.collect(Collectors.toSet());
|
||||
final Map<Long, List<SoftwareModuleMetadata>> getSoftwareModuleMetadata =
|
||||
allSmIds.isEmpty()
|
||||
? Collections.emptyMap()
|
||||
: softwareModuleManagement.findMetaDataBySoftwareModuleIdsAndTargetVisible(allSmIds);
|
||||
|
||||
if (!activeActions.isEmpty()) {
|
||||
sendMultiActionRequestToTarget(tenant, target, activeActions,
|
||||
action -> action.getDistributionSet().getModules().stream()
|
||||
.collect(Collectors.toMap(m -> m, softwareModuleMetadata::get)));
|
||||
}
|
||||
});
|
||||
targetManagement.getByControllerID(controllerIds).forEach(target ->
|
||||
sendMultiActionRequestToTarget(
|
||||
target, controllerIdToActions.get(target.getControllerId()), module -> getSoftwareModuleMetadata.get(module.getId())));
|
||||
}
|
||||
|
||||
private DmfActionRequest createDmfActionRequest(
|
||||
@@ -602,7 +603,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
}
|
||||
|
||||
private Map<SoftwareModule, List<SoftwareModuleMetadata>> getSoftwareModulesWithMetadata(final DistributionSet distributionSet) {
|
||||
return distributionSet.getModules().stream().collect(Collectors.toMap(m -> m, this::getSoftwareModuleMetadata));
|
||||
return distributionSet.getModules().stream().collect(Collectors.toMap(Function.identity(), this::getSoftwareModuleMetadata));
|
||||
}
|
||||
|
||||
private List<SoftwareModuleMetadata> getSoftwareModuleMetadata(final SoftwareModule module) {
|
||||
|
||||
@@ -20,6 +20,7 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.UUID;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
@@ -67,9 +68,8 @@ import org.springframework.util.ObjectUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* {@link AmqpMessageHandlerService} handles all incoming target interaction
|
||||
* AMQP messages (e.g. create target, check for updates etc.) for the queue
|
||||
* which is configured for the property hawkbit.dmf.rabbitmq.receiverQueue.
|
||||
* {@link AmqpMessageHandlerService} handles all incoming target interaction AMQP messages (e.g. create target, check for updates etc.) for the
|
||||
* queue which is configured for the property hawkbit.dmf.rabbitmq.receiverQueue.
|
||||
*/
|
||||
@Slf4j
|
||||
public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
@@ -344,41 +344,39 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
private void sendCurrentActionsAsMultiActionToTarget(final Target target) {
|
||||
final List<Action> actions = controllerManagement.findActiveActionsWithHighestWeight(target.getControllerId(), MAX_ACTION_COUNT);
|
||||
|
||||
final Set<DistributionSet> distributionSets = actions.stream().map(Action::getDistributionSet).collect(Collectors.toSet());
|
||||
final Map<Long, Map<SoftwareModule, List<SoftwareModuleMetadata>>> softwareModulesPerDistributionSet = distributionSets
|
||||
.stream().collect(Collectors.toMap(DistributionSet::getId, this::getSoftwareModulesWithMetadata));
|
||||
// gets all software modules for all action at once
|
||||
final Set<Long> allSmIds = actions.stream()
|
||||
.map(Action::getDistributionSet)
|
||||
.flatMap(ds -> ds.getModules().stream())
|
||||
.map(SoftwareModule::getId)
|
||||
.collect(Collectors.toSet());
|
||||
final Map<Long, List<SoftwareModuleMetadata>> getSoftwareModuleMetadata =
|
||||
allSmIds.isEmpty() ? Collections.emptyMap() : controllerManagement.findTargetVisibleMetaDataBySoftwareModuleId(allSmIds);
|
||||
|
||||
amqpMessageDispatcherService.sendMultiActionRequestToTarget(
|
||||
target.getTenant(), target, actions,
|
||||
action -> softwareModulesPerDistributionSet.get(action.getDistributionSet().getId()));
|
||||
amqpMessageDispatcherService.sendMultiActionRequestToTarget(target, actions, module -> getSoftwareModuleMetadata.get(module.getId()));
|
||||
}
|
||||
|
||||
private void sendOldestActionToTarget(final Target target) {
|
||||
final Optional<Action> actionOptional = controllerManagement.findActiveActionWithHighestWeight(target.getControllerId());
|
||||
|
||||
if (actionOptional.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
||||
final Action action = actionOptional.get();
|
||||
if (action.isCancelingOrCanceled()) {
|
||||
amqpMessageDispatcherService.sendCancelMessageToTarget(target.getTenant(), target.getControllerId(),
|
||||
action.getId(), target.getAddress());
|
||||
amqpMessageDispatcherService.sendCancelMessageToTarget(
|
||||
target.getTenant(), target.getControllerId(), action.getId(), target.getAddress());
|
||||
} else {
|
||||
amqpMessageDispatcherService.sendUpdateMessageToTarget(new ActionProperties(action), action.getTarget(),
|
||||
getSoftwareModulesWithMetadata(action.getDistributionSet()));
|
||||
amqpMessageDispatcherService.sendUpdateMessageToTarget(
|
||||
new ActionProperties(action), action.getTarget(), getSoftwareModulesWithMetadata(action.getDistributionSet()));
|
||||
}
|
||||
}
|
||||
|
||||
private Map<SoftwareModule, List<SoftwareModuleMetadata>> getSoftwareModulesWithMetadata(final DistributionSet distributionSet) {
|
||||
final List<Long> smIds = distributionSet.getModules().stream().map(SoftwareModule::getId)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final Map<Long, List<SoftwareModuleMetadata>> metadata = controllerManagement
|
||||
.findTargetVisibleMetaDataBySoftwareModuleId(smIds);
|
||||
|
||||
return distributionSet.getModules().stream()
|
||||
.collect(Collectors.toMap(sm -> sm, sm -> metadata.getOrDefault(sm.getId(), Collections.emptyList())));
|
||||
final List<Long> smIds = distributionSet.getModules().stream().map(SoftwareModule::getId).collect(Collectors.toList());
|
||||
final Map<Long, List<SoftwareModuleMetadata>> metadata = controllerManagement.findTargetVisibleMetaDataBySoftwareModuleId(smIds);
|
||||
return distributionSet.getModules().stream().collect(Collectors.toMap(
|
||||
Function.identity(), sm -> metadata.getOrDefault(sm.getId(), Collections.emptyList())));
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -465,8 +465,9 @@ public class AmqpMessageDispatcherServiceIntegrationTest extends AbstractAmqpSer
|
||||
assertLatestMultiActionMessageContainsInstallMessages(controllerId, Arrays.asList(smIds1, smIds2, smIds1));
|
||||
|
||||
final List<Long> installActions = getLatestMultiActionMessageActions(controllerId).stream()
|
||||
.filter(entry -> entry.getValue().equals(EventTopic.DOWNLOAD_AND_INSTALL)).map(Entry::getKey)
|
||||
.collect(Collectors.toList());
|
||||
.filter(entry -> entry.getValue().equals(EventTopic.DOWNLOAD_AND_INSTALL))
|
||||
.map(Entry::getKey)
|
||||
.toList();
|
||||
|
||||
updateActionViaDmfClient(controllerId, installActions.get(0), DmfActionStatus.FINISHED);
|
||||
waitUntilEventMessagesAreDispatchedToTarget(EventTopic.REQUEST_ATTRIBUTES_UPDATE, EventTopic.MULTI_ACTION);
|
||||
|
||||
Reference in New Issue
Block a user