Fixed missing controller poll update in case of EVENT. Fixed eventBus
duplicate event post. Signed-off-by: Kai Zimmermann <kai.zimmermann@bosch-si.com>
This commit is contained in:
@@ -243,12 +243,16 @@ public class AmqpConfiguration {
|
||||
|
||||
/**
|
||||
* Create amqp handler service bean.
|
||||
*
|
||||
* @param amqpMessageDispatcherService
|
||||
* to sending events to DMF client
|
||||
*
|
||||
* @return handler service bean
|
||||
*/
|
||||
@Bean
|
||||
public AmqpMessageHandlerService amqpMessageHandlerService() {
|
||||
return new AmqpMessageHandlerService(rabbitTemplate());
|
||||
public AmqpMessageHandlerService amqpMessageHandlerService(
|
||||
final AmqpMessageDispatcherService amqpMessageDispatcherService) {
|
||||
return new AmqpMessageHandlerService(rabbitTemplate(), amqpMessageDispatcherService);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -73,8 +73,6 @@ import org.springframework.security.core.context.SecurityContextHolder;
|
||||
import org.springframework.security.core.context.SecurityContextImpl;
|
||||
import org.springframework.web.util.UriComponentsBuilder;
|
||||
|
||||
import com.google.common.eventbus.EventBus;
|
||||
|
||||
/**
|
||||
*
|
||||
* {@link AmqpMessageHandlerService} handles all incoming AMQP messages for the
|
||||
@@ -85,6 +83,8 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageHandlerService.class);
|
||||
|
||||
private final AmqpMessageDispatcherService amqpMessageDispatcherService;
|
||||
|
||||
@Autowired
|
||||
private ControllerManagement controllerManagement;
|
||||
|
||||
@@ -94,9 +94,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
@Autowired
|
||||
private ArtifactManagement artifactManagement;
|
||||
|
||||
@Autowired
|
||||
private EventBus eventBus;
|
||||
|
||||
@Autowired
|
||||
@Qualifier(CacheConstants.DOWNLOAD_ID_CACHE)
|
||||
private Cache cache;
|
||||
@@ -115,9 +112,13 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
*
|
||||
* @param defaultTemplate
|
||||
* the configured amqp template.
|
||||
* @param amqpMessageDispatcherService
|
||||
* to sending events to DMF client
|
||||
*/
|
||||
public AmqpMessageHandlerService(final RabbitTemplate defaultTemplate) {
|
||||
public AmqpMessageHandlerService(final RabbitTemplate defaultTemplate,
|
||||
final AmqpMessageDispatcherService amqpMessageDispatcherService) {
|
||||
super(defaultTemplate);
|
||||
this.amqpMessageDispatcherService = amqpMessageDispatcherService;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -352,9 +353,9 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
final List<SoftwareModule> softwareModuleList = controllerManagement
|
||||
.findSoftwareModulesByDistributionSet(distributionSet);
|
||||
final String targetSecurityToken = systemSecurityContext.runAsSystem(() -> target.getSecurityToken());
|
||||
eventBus.post(new TargetAssignDistributionSetEvent(target.getOptLockRevision(), target.getTenant(),
|
||||
target.getControllerId(), action.getId(), softwareModuleList, target.getTargetInfo().getAddress(),
|
||||
targetSecurityToken));
|
||||
amqpMessageDispatcherService.targetAssignDistributionSet(new TargetAssignDistributionSetEvent(
|
||||
target.getOptLockRevision(), target.getTenant(), target.getControllerId(), action.getId(),
|
||||
softwareModuleList, target.getTargetInfo().getAddress(), targetSecurityToken));
|
||||
|
||||
}
|
||||
|
||||
@@ -385,6 +386,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
final Action action = checkActionExist(message, actionUpdateStatus);
|
||||
|
||||
final ActionStatus actionStatus = createActionStatus(message, actionUpdateStatus, action);
|
||||
updateLastPollTime(action);
|
||||
|
||||
switch (actionUpdateStatus.getActionStatus()) {
|
||||
case DOWNLOAD:
|
||||
@@ -422,6 +424,11 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
}
|
||||
}
|
||||
|
||||
private void updateLastPollTime(final Action action) {
|
||||
controllerManagement.updateTargetStatus(action.getTarget().getTargetInfo(), null, System.currentTimeMillis(),
|
||||
null);
|
||||
}
|
||||
|
||||
private ActionStatus createActionStatus(final Message message, final ActionUpdateStatus actionUpdateStatus,
|
||||
final Action action) {
|
||||
final ActionStatus actionStatus = entityFactory.generateActionStatus();
|
||||
@@ -508,10 +515,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
this.cache = cache;
|
||||
}
|
||||
|
||||
void setEventBus(final EventBus eventBus) {
|
||||
this.eventBus = eventBus;
|
||||
}
|
||||
|
||||
void setEntityFactory(final EntityFactory entityFactory) {
|
||||
this.entityFactory = entityFactory;
|
||||
}
|
||||
|
||||
@@ -74,7 +74,8 @@ public class AmqpControllerAuthenticationTest {
|
||||
messageConverter = new Jackson2JsonMessageConverter();
|
||||
final RabbitTemplate rabbitTemplate = mock(RabbitTemplate.class);
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter);
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate);
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate,
|
||||
mock(AmqpMessageDispatcherService.class));
|
||||
|
||||
authenticationManager = new AmqpControllerAuthentfication();
|
||||
authenticationManager.setControllerManagement(mock(ControllerManagement.class));
|
||||
|
||||
@@ -51,6 +51,7 @@ import org.eclipse.hawkbit.repository.model.Action.Status;
|
||||
import org.eclipse.hawkbit.repository.model.LocalArtifact;
|
||||
import org.eclipse.hawkbit.repository.model.SoftwareModule;
|
||||
import org.eclipse.hawkbit.repository.model.TargetInfo;
|
||||
import org.eclipse.hawkbit.repository.model.TargetUpdateStatus;
|
||||
import org.eclipse.hawkbit.security.SecurityTokenGenerator;
|
||||
import org.eclipse.hawkbit.security.SystemSecurityContext;
|
||||
import org.junit.Before;
|
||||
@@ -69,8 +70,6 @@ import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.cache.Cache;
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
||||
import com.google.common.eventbus.EventBus;
|
||||
|
||||
import ru.yandex.qatools.allure.annotations.Description;
|
||||
import ru.yandex.qatools.allure.annotations.Features;
|
||||
import ru.yandex.qatools.allure.annotations.Stories;
|
||||
@@ -86,6 +85,9 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
private MessageConverter messageConverter;
|
||||
|
||||
@Mock
|
||||
private AmqpMessageDispatcherService amqpMessageDispatcherServiceMock;
|
||||
|
||||
@Mock
|
||||
private ControllerManagement controllerManagementMock;
|
||||
|
||||
@@ -107,9 +109,6 @@ public class AmqpMessageHandlerServiceTest {
|
||||
@Mock
|
||||
private HostnameResolver hostnameResolverMock;
|
||||
|
||||
@Mock
|
||||
private EventBus eventBus;
|
||||
|
||||
@Mock
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@@ -120,13 +119,12 @@ public class AmqpMessageHandlerServiceTest {
|
||||
public void before() throws Exception {
|
||||
messageConverter = new Jackson2JsonMessageConverter();
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter);
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate);
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate, amqpMessageDispatcherServiceMock);
|
||||
amqpMessageHandlerService.setControllerManagement(controllerManagementMock);
|
||||
amqpMessageHandlerService.setAuthenticationManager(authenticationManagerMock);
|
||||
amqpMessageHandlerService.setArtifactManagement(artifactManagementMock);
|
||||
amqpMessageHandlerService.setCache(cacheMock);
|
||||
amqpMessageHandlerService.setHostnameResolver(hostnameResolverMock);
|
||||
amqpMessageHandlerService.setEventBus(eventBus);
|
||||
amqpMessageHandlerService.setEntityFactory(entityFactoryMock);
|
||||
amqpMessageHandlerService.setSystemSecurityContext(systemSecurityContextMock);
|
||||
|
||||
@@ -134,7 +132,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
@Test
|
||||
@Description("Tests not allowed content-type in message")
|
||||
public void testWrongContentType() {
|
||||
public void wrongContentType() {
|
||||
final MessageProperties messageProperties = new MessageProperties();
|
||||
messageProperties.setContentType("xml");
|
||||
final Message message = new Message(new byte[0], messageProperties);
|
||||
@@ -147,7 +145,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
@Test
|
||||
@Description("Tests the creation of a target/thing by calling the same method that incoming RabbitMQ messages would access.")
|
||||
public void testCreateThing() {
|
||||
public void createThing() {
|
||||
final String knownThingId = "1";
|
||||
final MessageProperties messageProperties = createMessageProperties(MessageType.THING_CREATED);
|
||||
messageProperties.setHeader(MessageHeaderKey.THING_ID, "1");
|
||||
@@ -168,7 +166,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
@Test
|
||||
@Description("Tests the creation of a thing without a 'reply to' header in message.")
|
||||
public void testCreateThingWitoutReplyTo() {
|
||||
public void createThingWitoutReplyTo() {
|
||||
final MessageProperties messageProperties = createMessageProperties(MessageType.THING_CREATED, null);
|
||||
messageProperties.setHeader(MessageHeaderKey.THING_ID, "1");
|
||||
final Message message = messageConverter.toMessage("", messageProperties);
|
||||
@@ -184,7 +182,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
@Test
|
||||
@Description("Tests the creation of a target/thing without a thingID by calling the same method that incoming RabbitMQ messages would access.")
|
||||
public void testCreateThingWithoutID() {
|
||||
public void createThingWithoutID() {
|
||||
final MessageProperties messageProperties = createMessageProperties(MessageType.THING_CREATED);
|
||||
final Message message = messageConverter.toMessage(new byte[0], messageProperties);
|
||||
try {
|
||||
@@ -197,7 +195,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
@Test
|
||||
@Description("Tests the call of the same method that incoming RabbitMQ messages would access with an unknown message type.")
|
||||
public void testUnknownMessageType() {
|
||||
public void unknownMessageType() {
|
||||
final String type = "bumlux";
|
||||
final MessageProperties messageProperties = createMessageProperties(MessageType.THING_CREATED);
|
||||
messageProperties.setHeader(MessageHeaderKey.THING_ID, "");
|
||||
@@ -213,7 +211,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
@Test
|
||||
@Description("Tests a invalid message without event topic")
|
||||
public void testInvalidEventTopic() {
|
||||
public void invalidEventTopic() {
|
||||
final MessageProperties messageProperties = createMessageProperties(MessageType.EVENT);
|
||||
final Message message = new Message(new byte[0], messageProperties);
|
||||
try {
|
||||
@@ -241,7 +239,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
@Test
|
||||
@Description("Tests the update of an action of a target without a exist action id")
|
||||
public void testUpdateActionStatusWithoutActionId() {
|
||||
public void updateActionStatusWithoutActionId() {
|
||||
final MessageProperties messageProperties = createMessageProperties(MessageType.EVENT);
|
||||
messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ACTION_STATUS.name());
|
||||
final ActionUpdateStatus actionUpdateStatus = new ActionUpdateStatus();
|
||||
@@ -259,7 +257,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
@Test
|
||||
@Description("Tests the update of an action of a target without a exist action id")
|
||||
public void testUpdateActionStatusWithoutExistActionId() {
|
||||
public void updateActionStatusWithoutExistActionId() {
|
||||
final MessageProperties messageProperties = createMessageProperties(MessageType.EVENT);
|
||||
messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.UPDATE_ACTION_STATUS.name());
|
||||
final ActionUpdateStatus actionUpdateStatus = createActionUpdateStatus(ActionStatus.DOWNLOAD);
|
||||
@@ -384,9 +382,13 @@ public class AmqpMessageHandlerServiceTest {
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
|
||||
|
||||
// verify
|
||||
verify(controllerManagementMock).updateTargetStatus(Matchers.any(TargetInfo.class),
|
||||
Matchers.isNull(TargetUpdateStatus.class), Matchers.isNotNull(Long.class), Matchers.isNull(URI.class));
|
||||
|
||||
final ArgumentCaptor<TargetAssignDistributionSetEvent> captorTargetAssignDistributionSetEvent = ArgumentCaptor
|
||||
.forClass(TargetAssignDistributionSetEvent.class);
|
||||
verify(eventBus, times(1)).post(captorTargetAssignDistributionSetEvent.capture());
|
||||
verify(amqpMessageDispatcherServiceMock, times(1))
|
||||
.targetAssignDistributionSet(captorTargetAssignDistributionSetEvent.capture());
|
||||
final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = captorTargetAssignDistributionSetEvent
|
||||
.getValue();
|
||||
|
||||
|
||||
@@ -341,7 +341,7 @@ public class JpaControllerManagement implements ControllerManagement {
|
||||
targetInfo.setInstalledDistributionSet(ds);
|
||||
targetInfo.setInstallationDate(System.currentTimeMillis());
|
||||
|
||||
// check if the assigned set is equal no to the installed set (not
|
||||
// check if the assigned set is equal to the installed set (not
|
||||
// necessarily the case as another update might be pending already).
|
||||
if (target.getAssignedDistributionSet() != null && target.getAssignedDistributionSet().getId()
|
||||
.equals(targetInfo.getInstalledDistributionSet().getId())) {
|
||||
|
||||
Reference in New Issue
Block a user