diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index bcac6a6e5..1cfb0f462 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -243,12 +243,16 @@ public class AmqpConfiguration { /** * Create amqp handler service bean. + * + * @param amqpMessageDispatcherService + * to sending events to DMF client * * @return handler service bean */ @Bean - public AmqpMessageHandlerService amqpMessageHandlerService() { - return new AmqpMessageHandlerService(rabbitTemplate()); + public AmqpMessageHandlerService amqpMessageHandlerService( + final AmqpMessageDispatcherService amqpMessageDispatcherService) { + return new AmqpMessageHandlerService(rabbitTemplate(), amqpMessageDispatcherService); } /** diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 8906bc96c..791757922 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -73,8 +73,6 @@ import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.security.core.context.SecurityContextImpl; import org.springframework.web.util.UriComponentsBuilder; -import com.google.common.eventbus.EventBus; - /** * * {@link AmqpMessageHandlerService} handles all incoming AMQP messages for the @@ -85,6 +83,8 @@ public class AmqpMessageHandlerService extends BaseAmqpService { private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageHandlerService.class); + private final AmqpMessageDispatcherService amqpMessageDispatcherService; + @Autowired private ControllerManagement controllerManagement; @@ -94,9 +94,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService { @Autowired private ArtifactManagement artifactManagement; - @Autowired - private EventBus eventBus; - @Autowired @Qualifier(CacheConstants.DOWNLOAD_ID_CACHE) private Cache cache; @@ -115,9 +112,13 @@ public class AmqpMessageHandlerService extends BaseAmqpService { * * @param defaultTemplate * the configured amqp template. + * @param amqpMessageDispatcherService + * to sending events to DMF client */ - public AmqpMessageHandlerService(final RabbitTemplate defaultTemplate) { + public AmqpMessageHandlerService(final RabbitTemplate defaultTemplate, + final AmqpMessageDispatcherService amqpMessageDispatcherService) { super(defaultTemplate); + this.amqpMessageDispatcherService = amqpMessageDispatcherService; } /** @@ -352,9 +353,9 @@ public class AmqpMessageHandlerService extends BaseAmqpService { final List softwareModuleList = controllerManagement .findSoftwareModulesByDistributionSet(distributionSet); final String targetSecurityToken = systemSecurityContext.runAsSystem(() -> target.getSecurityToken()); - eventBus.post(new TargetAssignDistributionSetEvent(target.getOptLockRevision(), target.getTenant(), - target.getControllerId(), action.getId(), softwareModuleList, target.getTargetInfo().getAddress(), - targetSecurityToken)); + amqpMessageDispatcherService.targetAssignDistributionSet(new TargetAssignDistributionSetEvent( + target.getOptLockRevision(), target.getTenant(), target.getControllerId(), action.getId(), + softwareModuleList, target.getTargetInfo().getAddress(), targetSecurityToken)); } @@ -385,6 +386,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { final Action action = checkActionExist(message, actionUpdateStatus); final ActionStatus actionStatus = createActionStatus(message, actionUpdateStatus, action); + updateLastPollTime(action); switch (actionUpdateStatus.getActionStatus()) { case DOWNLOAD: @@ -422,6 +424,11 @@ public class AmqpMessageHandlerService extends BaseAmqpService { } } + private void updateLastPollTime(final Action action) { + controllerManagement.updateTargetStatus(action.getTarget().getTargetInfo(), null, System.currentTimeMillis(), + null); + } + private ActionStatus createActionStatus(final Message message, final ActionUpdateStatus actionUpdateStatus, final Action action) { final ActionStatus actionStatus = entityFactory.generateActionStatus(); @@ -508,10 +515,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService { this.cache = cache; } - void setEventBus(final EventBus eventBus) { - this.eventBus = eventBus; - } - void setEntityFactory(final EntityFactory entityFactory) { this.entityFactory = entityFactory; } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthenticationTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthenticationTest.java index 211ac26a2..aa2a7c150 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthenticationTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthenticationTest.java @@ -74,7 +74,8 @@ public class AmqpControllerAuthenticationTest { messageConverter = new Jackson2JsonMessageConverter(); final RabbitTemplate rabbitTemplate = mock(RabbitTemplate.class); when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter); - amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate); + amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate, + mock(AmqpMessageDispatcherService.class)); authenticationManager = new AmqpControllerAuthentfication(); authenticationManager.setControllerManagement(mock(ControllerManagement.class)); diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java index 7a3915e28..9ed314ebc 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java @@ -51,6 +51,7 @@ import org.eclipse.hawkbit.repository.model.Action.Status; import org.eclipse.hawkbit.repository.model.LocalArtifact; import org.eclipse.hawkbit.repository.model.SoftwareModule; import org.eclipse.hawkbit.repository.model.TargetInfo; +import org.eclipse.hawkbit.repository.model.TargetUpdateStatus; import org.eclipse.hawkbit.security.SecurityTokenGenerator; import org.eclipse.hawkbit.security.SystemSecurityContext; import org.junit.Before; @@ -69,8 +70,6 @@ import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.cache.Cache; import org.springframework.http.HttpStatus; -import com.google.common.eventbus.EventBus; - import ru.yandex.qatools.allure.annotations.Description; import ru.yandex.qatools.allure.annotations.Features; import ru.yandex.qatools.allure.annotations.Stories; @@ -86,6 +85,9 @@ public class AmqpMessageHandlerServiceTest { private MessageConverter messageConverter; + @Mock + private AmqpMessageDispatcherService amqpMessageDispatcherServiceMock; + @Mock private ControllerManagement controllerManagementMock; @@ -107,9 +109,6 @@ public class AmqpMessageHandlerServiceTest { @Mock private HostnameResolver hostnameResolverMock; - @Mock - private EventBus eventBus; - @Mock private RabbitTemplate rabbitTemplate; @@ -120,13 +119,12 @@ public class AmqpMessageHandlerServiceTest { public void before() throws Exception { messageConverter = new Jackson2JsonMessageConverter(); when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter); - amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate); + amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate, amqpMessageDispatcherServiceMock); amqpMessageHandlerService.setControllerManagement(controllerManagementMock); amqpMessageHandlerService.setAuthenticationManager(authenticationManagerMock); amqpMessageHandlerService.setArtifactManagement(artifactManagementMock); amqpMessageHandlerService.setCache(cacheMock); amqpMessageHandlerService.setHostnameResolver(hostnameResolverMock); - amqpMessageHandlerService.setEventBus(eventBus); amqpMessageHandlerService.setEntityFactory(entityFactoryMock); amqpMessageHandlerService.setSystemSecurityContext(systemSecurityContextMock); @@ -134,7 +132,7 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests not allowed content-type in message") - public void testWrongContentType() { + public void wrongContentType() { final MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("xml"); final Message message = new Message(new byte[0], messageProperties); @@ -147,7 +145,7 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests the creation of a target/thing by calling the same method that incoming RabbitMQ messages would access.") - public void testCreateThing() { + public void createThing() { final String knownThingId = "1"; final MessageProperties messageProperties = createMessageProperties(MessageType.THING_CREATED); messageProperties.setHeader(MessageHeaderKey.THING_ID, "1"); @@ -168,7 +166,7 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests the creation of a thing without a 'reply to' header in message.") - public void testCreateThingWitoutReplyTo() { + public void createThingWitoutReplyTo() { final MessageProperties messageProperties = createMessageProperties(MessageType.THING_CREATED, null); messageProperties.setHeader(MessageHeaderKey.THING_ID, "1"); final Message message = messageConverter.toMessage("", messageProperties); @@ -184,7 +182,7 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests the creation of a target/thing without a thingID by calling the same method that incoming RabbitMQ messages would access.") - public void testCreateThingWithoutID() { + public void createThingWithoutID() { final MessageProperties messageProperties = createMessageProperties(MessageType.THING_CREATED); final Message message = messageConverter.toMessage(new byte[0], messageProperties); try { @@ -197,7 +195,7 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests the call of the same method that incoming RabbitMQ messages would access with an unknown message type.") - public void testUnknownMessageType() { + public void unknownMessageType() { final String type = "bumlux"; final MessageProperties messageProperties = createMessageProperties(MessageType.THING_CREATED); messageProperties.setHeader(MessageHeaderKey.THING_ID, ""); @@ -213,7 +211,7 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests a invalid message without event topic") - public void testInvalidEventTopic() { + public void invalidEventTopic() { final MessageProperties messageProperties = createMessageProperties(MessageType.EVENT); final Message message = new Message(new byte[0], messageProperties); try { @@ -241,7 +239,7 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests the update of an action of a target without a exist action id") - public void testUpdateActionStatusWithoutActionId() { + public void updateActionStatusWithoutActionId() { final MessageProperties messageProperties = createMessageProperties(MessageType.EVENT); messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ACTION_STATUS.name()); final ActionUpdateStatus actionUpdateStatus = new ActionUpdateStatus(); @@ -259,7 +257,7 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests the update of an action of a target without a exist action id") - public void testUpdateActionStatusWithoutExistActionId() { + public void updateActionStatusWithoutExistActionId() { final MessageProperties messageProperties = createMessageProperties(MessageType.EVENT); messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ACTION_STATUS.name()); final ActionUpdateStatus actionUpdateStatus = createActionUpdateStatus(ActionStatus.DOWNLOAD); @@ -384,9 +382,13 @@ public class AmqpMessageHandlerServiceTest { amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); // verify + verify(controllerManagementMock).updateTargetStatus(Matchers.any(TargetInfo.class), + Matchers.isNull(TargetUpdateStatus.class), Matchers.isNotNull(Long.class), Matchers.isNull(URI.class)); + final ArgumentCaptor captorTargetAssignDistributionSetEvent = ArgumentCaptor .forClass(TargetAssignDistributionSetEvent.class); - verify(eventBus, times(1)).post(captorTargetAssignDistributionSetEvent.capture()); + verify(amqpMessageDispatcherServiceMock, times(1)) + .targetAssignDistributionSet(captorTargetAssignDistributionSetEvent.capture()); final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = captorTargetAssignDistributionSetEvent .getValue(); diff --git a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaControllerManagement.java b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaControllerManagement.java index 06555bf1c..34160aa37 100644 --- a/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaControllerManagement.java +++ b/hawkbit-repository/hawkbit-repository-jpa/src/main/java/org/eclipse/hawkbit/repository/jpa/JpaControllerManagement.java @@ -341,7 +341,7 @@ public class JpaControllerManagement implements ControllerManagement { targetInfo.setInstalledDistributionSet(ds); targetInfo.setInstallationDate(System.currentTimeMillis()); - // check if the assigned set is equal no to the installed set (not + // check if the assigned set is equal to the installed set (not // necessarily the case as another update might be pending already). if (target.getAssignedDistributionSet() != null && target.getAssignedDistributionSet().getId() .equals(targetInfo.getInstalledDistributionSet().getId())) {