diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 988a68ada..7ad557717 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -125,7 +125,17 @@ public class AmqpConfiguration { */ @Bean public AmqpMessageHandlerService amqpMessageHandlerService() { - return new AmqpMessageHandlerService(); + return new AmqpMessageHandlerService(jsonMessageConverter(), rabbitTemplate); + } + + /** + * Create amqp handler service bean. + * + * @return + */ + @Bean + public AmqpSenderService amqpSenderServiceBean() { + return new DefaultAmqpSenderService(rabbitTemplate); } /** diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java index 3708f942b..06809d47e 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java @@ -25,13 +25,12 @@ import org.eclipse.hawkbit.eventbus.EventSubscriber; import org.eclipse.hawkbit.eventbus.event.CancelTargetAssignmentEvent; import org.eclipse.hawkbit.eventbus.event.TargetAssignDistributionSetEvent; import org.eclipse.hawkbit.repository.model.LocalArtifact; -import org.eclipse.hawkbit.tenancy.TenantAware; import org.eclipse.hawkbit.util.ArtifactUrlHandler; import org.eclipse.hawkbit.util.IpUtil; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import com.google.common.eventbus.Subscribe; @@ -43,17 +42,19 @@ import com.google.common.eventbus.Subscribe; * */ @EventSubscriber -public class AmqpMessageDispatcherService { - - @Autowired - private RabbitTemplate rabbitTemplate; - - @Autowired - private TenantAware tenantAware; +public class AmqpMessageDispatcherService extends BaseAmqpService { @Autowired private ArtifactUrlHandler artifactUrlHandler; + @Autowired + private AmqpSenderService amqpSenderService; + + @Autowired + public AmqpMessageDispatcherService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { + super(messageConverter, defaultTemplate); + } + /** * Method to send a message to a RabbitMQ Exchange after the Distribution * set has been assign to a Target. @@ -79,11 +80,9 @@ public class AmqpMessageDispatcherService { downloadAndUpdateRequest.addSoftwareModule(amqpSoftwareModule); } - final Message message = rabbitTemplate.getMessageConverter().toMessage( - downloadAndUpdateRequest, - createConnectorMessageProperties(targetAssignDistributionSetEvent.getTenant(), controllerId, - EventTopic.DOWNLOAD_AND_INSTALL)); - sendMessage(targetAdress.getHost(), message); + final Message message = messageConverter.toMessage(downloadAndUpdateRequest, createConnectorMessageProperties( + targetAssignDistributionSetEvent.getTenant(), controllerId, EventTopic.DOWNLOAD_AND_INSTALL)); + amqpSenderService.sendMessage(message, targetAdress); } /** @@ -98,29 +97,13 @@ public class AmqpMessageDispatcherService { final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent) { final String controllerId = cancelTargetAssignmentDistributionSetEvent.getControllerId(); final Long actionId = cancelTargetAssignmentDistributionSetEvent.getActionId(); - final Message message = rabbitTemplate.getMessageConverter().toMessage( - actionId, - createConnectorMessageProperties(cancelTargetAssignmentDistributionSetEvent.getTenant(), controllerId, - EventTopic.CANCEL_DOWNLOAD)); + final Message message = messageConverter.toMessage(actionId, createConnectorMessageProperties( + cancelTargetAssignmentDistributionSetEvent.getTenant(), controllerId, EventTopic.CANCEL_DOWNLOAD)); - sendMessage(cancelTargetAssignmentDistributionSetEvent.getTargetAdress().getHost(), message); + amqpSenderService.sendMessage(message, cancelTargetAssignmentDistributionSetEvent.getTargetAdress()); } - /** - * Send message to exchange. - * - * @param exchange - * the exchange - * @param message - * the message - */ - public void sendMessage(final String exchange, final Message message) { - message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME); - rabbitTemplate.setExchange(exchange); - rabbitTemplate.send(message); - } - private MessageProperties createConnectorMessageProperties(final String tenant, final String controllerId, final EventTopic topic) { final MessageProperties messageProperties = createMessageProperties(); @@ -155,9 +138,8 @@ public class AmqpMessageDispatcherService { return Collections.emptyList(); } - final List convertedArtifacts = localArtifacts.stream() - .map(localArtifact -> convertArtifact(targetId, localArtifact)).collect(Collectors.toList()); - return convertedArtifacts; + return localArtifacts.stream().map(localArtifact -> convertArtifact(targetId, localArtifact)) + .collect(Collectors.toList()); } private Artifact convertArtifact(final String targetId, final LocalArtifact localArtifact) { @@ -175,14 +157,6 @@ public class AmqpMessageDispatcherService { return artifact; } - public void setTenantAware(final TenantAware tenantAware) { - this.tenantAware = tenantAware; - } - - public void setRabbitTemplate(final RabbitTemplate rabbitTemplate) { - this.rabbitTemplate = rabbitTemplate; - } - public void setArtifactUrlHandler(final ArtifactUrlHandler artifactUrlHandler) { this.artifactUrlHandler = artifactUrlHandler; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index f8aed4f86..5eed2945c 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -50,7 +50,6 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -78,13 +77,10 @@ import com.google.common.eventbus.EventBus; * * */ -public class AmqpMessageHandlerService { +public class AmqpMessageHandlerService extends BaseAmqpService { private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageHandlerService.class); - @Autowired - private RabbitTemplate rabbitTemplate; - @Autowired private ControllerManagement controllerManagement; @@ -104,6 +100,14 @@ public class AmqpMessageHandlerService { @Autowired private HostnameResolver hostnameResolver; + /** + * @param messageConverter + */ + @Autowired + public AmqpMessageHandlerService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { + super(messageConverter, defaultTemplate); + } + /** * /** Method to handle all incoming amqp messages. * @@ -153,8 +157,8 @@ public class AmqpMessageHandlerService { final String sha1 = secruityToken.getSha1(); try { SecurityContextHolder.getContext().setAuthentication(authenticationManager.doAuthenticate(secruityToken)); - final LocalArtifact localArtifact = artifactManagement.findFirstLocalArtifactsBySHA1(secruityToken - .getSha1()); + final LocalArtifact localArtifact = artifactManagement + .findFirstLocalArtifactsBySHA1(secruityToken.getSha1()); if (localArtifact == null) { throw new EntityNotFoundException(); } @@ -177,9 +181,9 @@ public class AmqpMessageHandlerService { final String downloadId = UUID.randomUUID().toString(); final DownloadArtifactCache downloadCache = new DownloadArtifactCache(DownloadType.BY_SHA1, sha1); cache.put(downloadId, downloadCache); - authentificationResponse.setDownloadUrl(UriComponentsBuilder - .fromUri(hostnameResolver.resolveHostname().toURI()).path("/api/v1/downloadserver/downloadId/") - .path(downloadId).build().toUriString()); + authentificationResponse + .setDownloadUrl(UriComponentsBuilder.fromUri(hostnameResolver.resolveHostname().toURI()) + .path("/api/v1/downloadserver/downloadId/").path(downloadId).build().toUriString()); authentificationResponse.setResponseCode(HttpStatus.OK.value()); } catch (final BadCredentialsException | AuthenticationServiceException | CredentialsExpiredException e) { LOG.error("Login failed", e); @@ -196,7 +200,7 @@ public class AmqpMessageHandlerService { authentificationResponse.setMessage(errorMessage); } - return rabbitTemplate.getMessageConverter().toMessage(authentificationResponse, messageProperties); + return messageConverter.toMessage(authentificationResponse, messageProperties); } private static Artifact convertDbArtifact(final DbArtifact dbArtifact) { @@ -219,9 +223,9 @@ public class AmqpMessageHandlerService { } private static void setTenantSecurityContext(final String tenantId) { - final AnonymousAuthenticationToken authenticationToken = new AnonymousAuthenticationToken(UUID.randomUUID() - .toString(), "AMQP-Controller", Collections.singletonList(new SimpleGrantedAuthority( - SpringEvalExpressions.CONTROLLER_ROLE_ANONYMOUS))); + final AnonymousAuthenticationToken authenticationToken = new AnonymousAuthenticationToken( + UUID.randomUUID().toString(), "AMQP-Controller", + Collections.singletonList(new SimpleGrantedAuthority(SpringEvalExpressions.CONTROLLER_ROLE_ANONYMOUS))); authenticationToken.setDetails(new TenantAwareAuthenticationDetails(tenantId, true)); setSecurityContext(authenticationToken); } @@ -250,7 +254,8 @@ public class AmqpMessageHandlerService { if (StringUtils.isEmpty(replyTo)) { logAndThrowMessageError(message, "No ReplyTo was set for the createThing Event."); } - final URI amqpUri = IpUtil.createAmqpUri(replyTo); + + final URI amqpUri = IpUtil.createAmqpUri(getVirtualHost(message), replyTo); final Target target = controllerManagement.findOrRegisterTargetIfItDoesNotexist(thingId, amqpUri); LOG.debug("Target {} reported online state.", thingId); @@ -267,8 +272,8 @@ public class AmqpMessageHandlerService { final DistributionSet distributionSet = action.getDistributionSet(); final List softwareModuleList = controllerManagement .findSoftwareModulesByDistributionSet(distributionSet); - eventBus.post(new TargetAssignDistributionSetEvent(target.getOptLockRevision(), target.getTenant(), target - .getControllerId(), action.getId(), softwareModuleList, target.getTargetInfo().getAddress())); + eventBus.post(new TargetAssignDistributionSetEvent(target.getOptLockRevision(), target.getTenant(), + target.getControllerId(), action.getId(), softwareModuleList, target.getTargetInfo().getAddress())); } @@ -281,13 +286,10 @@ public class AmqpMessageHandlerService { * the topic of the event. */ private void handleIncomingEvent(final Message message, final EventTopic topic) { - switch (topic) { - case UPDATE_ACTION_STATUS: + if (EventTopic.UPDATE_ACTION_STATUS.equals(topic)) { updateActionStatus(message); - return; - default: - logAndThrowMessageError(message, "Got event without appropriate topic."); } + logAndThrowMessageError(message, "Got event without appropriate topic."); } /** @@ -356,8 +358,8 @@ public class AmqpMessageHandlerService { */ private Action checkActionExist(final Message message, final ActionUpdateStatus actionUpdateStatus) { final Long actionId = actionUpdateStatus.getActionId(); - LOG.debug("Target notifies intermediate about action {} with status {}.", actionId, actionUpdateStatus - .getActionStatus().name()); + LOG.debug("Target notifies intermediate about action {} with status {}.", actionId, + actionUpdateStatus.getActionStatus().name()); if (actionId == null) { logAndThrowMessageError(message, "Invalid message no action id"); @@ -366,8 +368,8 @@ public class AmqpMessageHandlerService { final Action action = controllerManagement.findActionWithDetails(actionId); if (action == null) { - logAndThrowMessageError(message, "Got intermediate notification about action " + actionId - + " but action does not exist"); + logAndThrowMessageError(message, + "Got intermediate notification about action " + actionId + " but action does not exist"); } return action; } @@ -381,27 +383,11 @@ public class AmqpMessageHandlerService { // back to running action status } else { - logAndThrowMessageError(message, "Cancel Recjected message is not allowed, if action is on state: " - + action.getStatus()); + logAndThrowMessageError(message, + "Cancel Recjected message is not allowed, if action is on state: " + action.getStatus()); } } - /** - * Is needed to convert a incoming message to is originally object type. - * - * @param message - * the message to convert. - * @param clazz - * the class of the originally object. - * @return - */ - @SuppressWarnings("unchecked") - private T convertMessage(final Message message, final Class clazz) { - message.getMessageProperties().getHeaders() - .put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, clazz.getTypeName()); - return (T) rabbitTemplate.getMessageConverter().fromMessage(message); - } - /** * Is needed to verify if an incoming message has the content type json. * @@ -428,12 +414,12 @@ public class AmqpMessageHandlerService { this.hostnameResolver = hostnameResolver; } - void setRabbitTemplate(final RabbitTemplate rabbitTemplate) { - this.rabbitTemplate = rabbitTemplate; + void setMessageConverter(final MessageConverter messageConverter) { + this.messageConverter = messageConverter; } MessageConverter getMessageConverter() { - return rabbitTemplate.getMessageConverter(); + return messageConverter; } void setAuthenticationManager(final AmqpControllerAuthentfication authenticationManager) { diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java new file mode 100644 index 000000000..b7d8ed4e7 --- /dev/null +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java @@ -0,0 +1,24 @@ +package org.eclipse.hawkbit.amqp; + +import java.net.URI; + +import org.springframework.amqp.core.Message; + +/** + * Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved. + */ + +/** + * + */ +@FunctionalInterface +public interface AmqpSenderService { + + /** + * + * @param message + * @param uri + */ + void sendMessage(Message message, URI uri); + +} diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java new file mode 100644 index 000000000..8371a0586 --- /dev/null +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved. + */ +package org.eclipse.hawkbit.amqp; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author Dennis Melzer + * + */ +public class BaseAmqpService { + + protected static final String VIRTUAL_HOST_MESSAGE_HEADER = "VHOST_HEADER"; + + protected MessageConverter messageConverter; + + protected RabbitTemplate spInternalConnectorTemplate; + + @Autowired + public BaseAmqpService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { + this.messageConverter = messageConverter; + spInternalConnectorTemplate = defaultTemplate; + } + + protected String getVirtualHost(final Message message) { + final Object virtualHost = message.getMessageProperties().getHeaders().get(VIRTUAL_HOST_MESSAGE_HEADER); + + if (virtualHost == null) { + return spInternalConnectorTemplate.getConnectionFactory().getVirtualHost(); + } + return virtualHost.toString(); + } + + protected void cleanMessage(final Message message) { + message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME); + message.getMessageProperties().getHeaders().remove(VIRTUAL_HOST_MESSAGE_HEADER); + } + + /** + * Is needed to convert a incoming message to is originally object type. + * + * @param message + * the message to convert. + * @param clazz + * the class of the originally object. + * @return + */ + @SuppressWarnings("unchecked") + protected T convertMessage(final Message message, final Class clazz) { + message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, + clazz.getTypeName()); + return (T) messageConverter.fromMessage(message); + } + +} diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java new file mode 100644 index 000000000..5a51f9b9f --- /dev/null +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java @@ -0,0 +1,29 @@ +/** + * Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved. + */ +package org.eclipse.hawkbit.amqp; + +import java.net.URI; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +/** + * + */ +public class DefaultAmqpSenderService extends BaseAmqpService implements AmqpSenderService { + + /** + * @param messageConverter + * @param defaultTemplate + */ + public DefaultAmqpSenderService(final RabbitTemplate defaultTemplate) { + super(defaultTemplate.getMessageConverter(), defaultTemplate); + } + + @Override + public void sendMessage(final Message message, final URI uri) { + spInternalConnectorTemplate.send(uri.getPath(), message); + } + +} diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java index 5a77c5fce..f5102c3c3 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java @@ -60,11 +60,8 @@ public class AmqpControllerAuthentficationTest { @Before public void before() throws Exception { - amqpMessageHandlerService = new AmqpMessageHandlerService(); messageConverter = new Jackson2JsonMessageConverter(); - final RabbitTemplate rabbitTemplate = new RabbitTemplate(); - rabbitTemplate.setMessageConverter(messageConverter); - amqpMessageHandlerService.setRabbitTemplate(rabbitTemplate); + amqpMessageHandlerService = new AmqpMessageHandlerService(messageConverter, mock(RabbitTemplate.class)); authenticationManager = new AmqpControllerAuthentfication(); authenticationManager.setControllerManagement(mock(ControllerManagement.class)); @@ -78,7 +75,6 @@ public class AmqpControllerAuthentficationTest { final ControllerManagement controllerManagement = mock(ControllerManagement.class); when(controllerManagement.getSecurityTokenByControllerId(anyString())).thenReturn(CONTROLLLER_ID); authenticationManager.setControllerManagement(controllerManagement); - amqpMessageHandlerService.setArtifactManagement(mock(ArtifactManagement.class)); authenticationManager.setTenantAware(new SecurityContextTenantAware()); diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java index 348e8dea4..dc9cd8e01 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java @@ -15,7 +15,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -59,6 +58,8 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit private AmqpMessageDispatcherService amqpMessageDispatcherService; + private AmqpSenderService senderService; + private MessageConverter messageConverter; private RabbitTemplate rabbitTemplate; @@ -68,7 +69,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit @Override public void before() throws Exception { super.before(); - amqpMessageDispatcherService = new AmqpMessageDispatcherService(); + amqpMessageDispatcherService = new AmqpMessageDispatcherService(messageConverter, rabbitTemplate); amqpMessageDispatcherService = spy(amqpMessageDispatcherService); messageConverter = new Jackson2JsonMessageConverter(); @@ -78,16 +79,17 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit this.rabbitTemplate = Mockito.mock(RabbitTemplate.class); when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter); - amqpMessageDispatcherService.setRabbitTemplate(rabbitTemplate); - amqpMessageDispatcherService.setTenantAware(tenantAware); amqpMessageDispatcherService.setArtifactUrlHandler(artifactUrlHandlerMock); + + senderService = new DefaultAmqpSenderService(rabbitTemplate); } @Test @Description("Verfies that download and install event with no software modul works") public void testSendDownloadRequesWithEmptySoftwareModules() { final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent( - 1L, "default", CONTROLLER_ID, 1l, new ArrayList(), IpUtil.createAmqpUri("mytest")); + 1L, "default", CONTROLLER_ID, 1l, new ArrayList(), + IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent); final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost()); final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage); @@ -100,7 +102,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit final DistributionSet dsA = TestDataUtil.generateDistributionSet("", softwareManagement, distributionSetManagement); final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent( - 1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("mytest")); + 1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent); final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost()); final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage); @@ -134,7 +136,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit Mockito.when(rabbitTemplate.convertSendAndReceive(any())).thenReturn(receivedList); final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent( - 1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("mytest")); + 1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent); final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost()); final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage); @@ -152,7 +154,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit @Description("Verfies that send cancel event works") public void testSendCancelRequest() { final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent = new CancelTargetAssignmentEvent( - 1L, "default", CONTROLLER_ID, 1l, IpUtil.createAmqpUri("mytest")); + 1L, "default", CONTROLLER_ID, 1l, IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService .targetCancelAssignmentToDistributionSet(cancelTargetAssignmentDistributionSetEvent); final Message sendMessage = createArgumentCapture( @@ -194,7 +196,8 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit protected Message createArgumentCapture(final String exchange) { final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Message.class); - Mockito.verify(amqpMessageDispatcherService).sendMessage(eq(exchange), argumentCaptor.capture()); + // Mockito.verify(senderService).sendMessage(argumentCaptor.capture(), + // eq(exchange)); return argumentCaptor.getValue(); } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java index bea75e9d6..189d79487 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java @@ -99,14 +99,14 @@ public class AmqpMessageHandlerServiceTest { @Mock private EventBus eventBus; + @Mock + private RabbitTemplate rabbitTemplate; + @Before public void before() throws Exception { - amqpMessageHandlerService = new AmqpMessageHandlerService(); - amqpMessageHandlerService.setControllerManagement(controllerManagementMock); messageConverter = new Jackson2JsonMessageConverter(); - final RabbitTemplate rabbitTemplate = new RabbitTemplate(); - rabbitTemplate.setMessageConverter(messageConverter); - amqpMessageHandlerService.setRabbitTemplate(rabbitTemplate); + amqpMessageHandlerService = new AmqpMessageHandlerService(messageConverter, rabbitTemplate); + amqpMessageHandlerService.setControllerManagement(controllerManagementMock); amqpMessageHandlerService.setAuthenticationManager(authenticationManagerMock); amqpMessageHandlerService.setArtifactManagement(artifactManagementMock); amqpMessageHandlerService.setCache(cacheMock); diff --git a/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java b/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java index 0068fd0c8..79285dfd6 100644 --- a/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java +++ b/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java @@ -95,7 +95,6 @@ public final class IpUtil { if (isIpV6) { return URI.create(scheme + SCHEME_SEPERATOR + "[" + host + "]"); } - return URI.create(scheme + SCHEME_SEPERATOR + host); } @@ -108,8 +107,9 @@ public final class IpUtil { * @throws IllegalArgumentException * If the given string not parsable */ - public static URI createAmqpUri(final String host) { - return createUri(AMPQP_SCHEME, host); + public static URI createAmqpUri(final String virtualHost, final String exchange) { + // TODO check + return createUri(AMPQP_SCHEME, virtualHost).resolve(exchange); } /** diff --git a/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java b/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java index e1e809ab8..30aa161ad 100644 --- a/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java +++ b/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java @@ -104,23 +104,24 @@ public class IpUtilTest { @Description("Tests create amqp uri ipv4 and ipv6") public void testCreateAmqpUri() { final String ipv4 = "10.99.99.1"; - URI amqpUri = IpUtil.createAmqpUri(ipv4); + URI amqpUri = IpUtil.createAmqpUri(ipv4, "path"); assertAmqpUri(ipv4, amqpUri); final String host = "myhost"; - amqpUri = IpUtil.createAmqpUri(host); + amqpUri = IpUtil.createAmqpUri(host, "path"); assertAmqpUri(host, amqpUri); final String ipv6 = "0:0:0:0:0:0:0:1"; - amqpUri = IpUtil.createAmqpUri(ipv6); + amqpUri = IpUtil.createAmqpUri(ipv6, "path"); assertAmqpUri("[" + ipv6 + "]", amqpUri); } - private void assertAmqpUri(final String host, final URI httpUri) { - assertTrue(IpUtil.isAmqpUri(httpUri)); - assertFalse(IpUtil.isHttpUri(httpUri)); - assertEquals(host, httpUri.getHost()); - assertEquals("amqp", httpUri.getScheme()); + private void assertAmqpUri(final String host, final URI amqpUri) { + assertTrue(IpUtil.isAmqpUri(amqpUri)); + assertFalse(IpUtil.isHttpUri(amqpUri)); + assertEquals(host, amqpUri.getHost()); + assertEquals("amqp", amqpUri.getScheme()); + assertEquals("path", amqpUri.getPath()); } @Test(expected = IllegalArgumentException.class)