Fix unused tenant param in AmqpMessageDispatcherService (#2101)
Signed-off-by: Avgustin Marinov <Avgustin.Marinov@bosch.com>
This commit is contained in:
@@ -149,12 +149,11 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
|||||||
*/
|
*/
|
||||||
@EventListener(classes = TargetAssignDistributionSetEvent.class)
|
@EventListener(classes = TargetAssignDistributionSetEvent.class)
|
||||||
protected void targetAssignDistributionSet(final TargetAssignDistributionSetEvent assignedEvent) {
|
protected void targetAssignDistributionSet(final TargetAssignDistributionSetEvent assignedEvent) {
|
||||||
if (!shouldBeProcessed(assignedEvent)) {
|
if (shouldSkip(assignedEvent)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<Target> filteredTargetList = getTargetsWithoutPendingCancellations(
|
final List<Target> filteredTargetList = getTargetsWithoutPendingCancellations(assignedEvent.getActions().keySet());
|
||||||
assignedEvent.getTenant(), assignedEvent.getActions().keySet());
|
|
||||||
|
|
||||||
if (!filteredTargetList.isEmpty()) {
|
if (!filteredTargetList.isEmpty()) {
|
||||||
log.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
|
log.debug("targetAssignDistributionSet retrieved. I will forward it to DMF broker.");
|
||||||
@@ -169,7 +168,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
|||||||
*/
|
*/
|
||||||
@EventListener(classes = MultiActionEvent.class)
|
@EventListener(classes = MultiActionEvent.class)
|
||||||
protected void onMultiAction(final MultiActionEvent multiActionEvent) {
|
protected void onMultiAction(final MultiActionEvent multiActionEvent) {
|
||||||
if (!shouldBeProcessed(multiActionEvent)) {
|
if (shouldSkip(multiActionEvent)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -227,7 +226,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
|||||||
*/
|
*/
|
||||||
@EventListener(classes = CancelTargetAssignmentEvent.class)
|
@EventListener(classes = CancelTargetAssignmentEvent.class)
|
||||||
protected void targetCancelAssignmentToDistributionSet(final CancelTargetAssignmentEvent cancelEvent) {
|
protected void targetCancelAssignmentToDistributionSet(final CancelTargetAssignmentEvent cancelEvent) {
|
||||||
if (!shouldBeProcessed(cancelEvent)) {
|
if (shouldSkip(cancelEvent)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -250,7 +249,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
|||||||
*/
|
*/
|
||||||
@EventListener(classes = TargetDeletedEvent.class)
|
@EventListener(classes = TargetDeletedEvent.class)
|
||||||
protected void targetDelete(final TargetDeletedEvent deleteEvent) {
|
protected void targetDelete(final TargetDeletedEvent deleteEvent) {
|
||||||
if (!shouldBeProcessed(deleteEvent)) {
|
if (shouldSkip(deleteEvent)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -277,8 +276,8 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
|||||||
IpUtil.createAmqpUri(virtualHost, ping.getMessageProperties().getReplyTo()));
|
IpUtil.createAmqpUri(virtualHost, ping.getMessageProperties().getReplyTo()));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean shouldBeProcessed(final RemoteApplicationEvent event) {
|
protected boolean shouldSkip(final RemoteApplicationEvent event) {
|
||||||
return isFromSelf(event);
|
return !isFromSelf(event);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void sendCancelMessageToTarget(final String tenant, final String controllerId, final Long actionId, final URI address) {
|
protected void sendCancelMessageToTarget(final String tenant, final String controllerId, final Long actionId, final URI address) {
|
||||||
@@ -430,7 +429,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
|||||||
: EventTopic.BATCH_DOWNLOAD_AND_INSTALL;
|
: EventTopic.BATCH_DOWNLOAD_AND_INSTALL;
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<Target> getTargetsWithoutPendingCancellations(final String tenant, final Set<String> controllerIds) {
|
private List<Target> getTargetsWithoutPendingCancellations(final Set<String> controllerIds) {
|
||||||
return partitionedParallelExecution(controllerIds, partition ->
|
return partitionedParallelExecution(controllerIds, partition ->
|
||||||
targetManagement.getByControllerID(partition).stream()
|
targetManagement.getByControllerID(partition).stream()
|
||||||
.filter(target -> {
|
.filter(target -> {
|
||||||
@@ -520,7 +519,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void sendDeleteMessage(final String tenant, final String controllerId, final String targetAddress) {
|
private void sendDeleteMessage(final String tenant, final String controllerId, final String targetAddress) {
|
||||||
if (!hasValidAddress(targetAddress)) {
|
if (hasInvalidAddress(targetAddress)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -528,8 +527,8 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
|||||||
amqpSenderService.sendMessage(message, URI.create(targetAddress));
|
amqpSenderService.sendMessage(message, URI.create(targetAddress));
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean hasValidAddress(final String targetAddress) {
|
private boolean hasInvalidAddress(final String targetAddress) {
|
||||||
return targetAddress != null && IpUtil.isAmqpUri(URI.create(targetAddress));
|
return targetAddress == null || !IpUtil.isAmqpUri(URI.create(targetAddress));
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isFromSelf(final RemoteApplicationEvent event) {
|
private boolean isFromSelf(final RemoteApplicationEvent event) {
|
||||||
@@ -541,7 +540,7 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void sendUpdateAttributesMessageToTarget(final String tenant, final String controllerId, final String targetAddress) {
|
private void sendUpdateAttributesMessageToTarget(final String tenant, final String controllerId, final String targetAddress) {
|
||||||
if (!hasValidAddress(targetAddress)) {
|
if (hasInvalidAddress(targetAddress)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user