From c737614e9b6aed1dc45dd3d9441112f218900c4c Mon Sep 17 00:00:00 2001 From: SirWayne Date: Mon, 15 Feb 2016 08:55:44 +0100 Subject: [PATCH 01/11] Add sender service to customize sending messages Signed-off-by: SirWayne --- .../hawkbit/amqp/AmqpConfiguration.java | 12 ++- .../amqp/AmqpMessageDispatcherService.java | 62 +++++--------- .../amqp/AmqpMessageHandlerService.java | 80 ++++++++----------- .../hawkbit/amqp/AmqpSenderService.java | 24 ++++++ .../eclipse/hawkbit/amqp/BaseAmqpService.java | 60 ++++++++++++++ .../amqp/DefaultAmqpSenderService.java | 29 +++++++ .../AmqpControllerAuthentficationTest.java | 6 +- .../AmqpMessageDispatcherServiceTest.java | 21 ++--- .../amqp/AmqpMessageHandlerServiceTest.java | 10 +-- .../java/org/eclipse/hawkbit/util/IpUtil.java | 6 +- .../org/eclipse/hawkbit/util/IpUtilTest.java | 17 ++-- 11 files changed, 205 insertions(+), 122 deletions(-) create mode 100644 hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java create mode 100644 hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java create mode 100644 hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 988a68ada..7ad557717 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -125,7 +125,17 @@ public class AmqpConfiguration { */ @Bean public AmqpMessageHandlerService amqpMessageHandlerService() { - return new AmqpMessageHandlerService(); + return new AmqpMessageHandlerService(jsonMessageConverter(), rabbitTemplate); + } + + /** + * Create amqp handler service bean. + * + * @return + */ + @Bean + public AmqpSenderService amqpSenderServiceBean() { + return new DefaultAmqpSenderService(rabbitTemplate); } /** diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java index 3708f942b..06809d47e 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java @@ -25,13 +25,12 @@ import org.eclipse.hawkbit.eventbus.EventSubscriber; import org.eclipse.hawkbit.eventbus.event.CancelTargetAssignmentEvent; import org.eclipse.hawkbit.eventbus.event.TargetAssignDistributionSetEvent; import org.eclipse.hawkbit.repository.model.LocalArtifact; -import org.eclipse.hawkbit.tenancy.TenantAware; import org.eclipse.hawkbit.util.ArtifactUrlHandler; import org.eclipse.hawkbit.util.IpUtil; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; +import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import com.google.common.eventbus.Subscribe; @@ -43,17 +42,19 @@ import com.google.common.eventbus.Subscribe; * */ @EventSubscriber -public class AmqpMessageDispatcherService { - - @Autowired - private RabbitTemplate rabbitTemplate; - - @Autowired - private TenantAware tenantAware; +public class AmqpMessageDispatcherService extends BaseAmqpService { @Autowired private ArtifactUrlHandler artifactUrlHandler; + @Autowired + private AmqpSenderService amqpSenderService; + + @Autowired + public AmqpMessageDispatcherService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { + super(messageConverter, defaultTemplate); + } + /** * Method to send a message to a RabbitMQ Exchange after the Distribution * set has been assign to a Target. @@ -79,11 +80,9 @@ public class AmqpMessageDispatcherService { downloadAndUpdateRequest.addSoftwareModule(amqpSoftwareModule); } - final Message message = rabbitTemplate.getMessageConverter().toMessage( - downloadAndUpdateRequest, - createConnectorMessageProperties(targetAssignDistributionSetEvent.getTenant(), controllerId, - EventTopic.DOWNLOAD_AND_INSTALL)); - sendMessage(targetAdress.getHost(), message); + final Message message = messageConverter.toMessage(downloadAndUpdateRequest, createConnectorMessageProperties( + targetAssignDistributionSetEvent.getTenant(), controllerId, EventTopic.DOWNLOAD_AND_INSTALL)); + amqpSenderService.sendMessage(message, targetAdress); } /** @@ -98,29 +97,13 @@ public class AmqpMessageDispatcherService { final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent) { final String controllerId = cancelTargetAssignmentDistributionSetEvent.getControllerId(); final Long actionId = cancelTargetAssignmentDistributionSetEvent.getActionId(); - final Message message = rabbitTemplate.getMessageConverter().toMessage( - actionId, - createConnectorMessageProperties(cancelTargetAssignmentDistributionSetEvent.getTenant(), controllerId, - EventTopic.CANCEL_DOWNLOAD)); + final Message message = messageConverter.toMessage(actionId, createConnectorMessageProperties( + cancelTargetAssignmentDistributionSetEvent.getTenant(), controllerId, EventTopic.CANCEL_DOWNLOAD)); - sendMessage(cancelTargetAssignmentDistributionSetEvent.getTargetAdress().getHost(), message); + amqpSenderService.sendMessage(message, cancelTargetAssignmentDistributionSetEvent.getTargetAdress()); } - /** - * Send message to exchange. - * - * @param exchange - * the exchange - * @param message - * the message - */ - public void sendMessage(final String exchange, final Message message) { - message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME); - rabbitTemplate.setExchange(exchange); - rabbitTemplate.send(message); - } - private MessageProperties createConnectorMessageProperties(final String tenant, final String controllerId, final EventTopic topic) { final MessageProperties messageProperties = createMessageProperties(); @@ -155,9 +138,8 @@ public class AmqpMessageDispatcherService { return Collections.emptyList(); } - final List convertedArtifacts = localArtifacts.stream() - .map(localArtifact -> convertArtifact(targetId, localArtifact)).collect(Collectors.toList()); - return convertedArtifacts; + return localArtifacts.stream().map(localArtifact -> convertArtifact(targetId, localArtifact)) + .collect(Collectors.toList()); } private Artifact convertArtifact(final String targetId, final LocalArtifact localArtifact) { @@ -175,14 +157,6 @@ public class AmqpMessageDispatcherService { return artifact; } - public void setTenantAware(final TenantAware tenantAware) { - this.tenantAware = tenantAware; - } - - public void setRabbitTemplate(final RabbitTemplate rabbitTemplate) { - this.rabbitTemplate = rabbitTemplate; - } - public void setArtifactUrlHandler(final ArtifactUrlHandler artifactUrlHandler) { this.artifactUrlHandler = artifactUrlHandler; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index f8aed4f86..5eed2945c 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -50,7 +50,6 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; @@ -78,13 +77,10 @@ import com.google.common.eventbus.EventBus; * * */ -public class AmqpMessageHandlerService { +public class AmqpMessageHandlerService extends BaseAmqpService { private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageHandlerService.class); - @Autowired - private RabbitTemplate rabbitTemplate; - @Autowired private ControllerManagement controllerManagement; @@ -104,6 +100,14 @@ public class AmqpMessageHandlerService { @Autowired private HostnameResolver hostnameResolver; + /** + * @param messageConverter + */ + @Autowired + public AmqpMessageHandlerService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { + super(messageConverter, defaultTemplate); + } + /** * /** Method to handle all incoming amqp messages. * @@ -153,8 +157,8 @@ public class AmqpMessageHandlerService { final String sha1 = secruityToken.getSha1(); try { SecurityContextHolder.getContext().setAuthentication(authenticationManager.doAuthenticate(secruityToken)); - final LocalArtifact localArtifact = artifactManagement.findFirstLocalArtifactsBySHA1(secruityToken - .getSha1()); + final LocalArtifact localArtifact = artifactManagement + .findFirstLocalArtifactsBySHA1(secruityToken.getSha1()); if (localArtifact == null) { throw new EntityNotFoundException(); } @@ -177,9 +181,9 @@ public class AmqpMessageHandlerService { final String downloadId = UUID.randomUUID().toString(); final DownloadArtifactCache downloadCache = new DownloadArtifactCache(DownloadType.BY_SHA1, sha1); cache.put(downloadId, downloadCache); - authentificationResponse.setDownloadUrl(UriComponentsBuilder - .fromUri(hostnameResolver.resolveHostname().toURI()).path("/api/v1/downloadserver/downloadId/") - .path(downloadId).build().toUriString()); + authentificationResponse + .setDownloadUrl(UriComponentsBuilder.fromUri(hostnameResolver.resolveHostname().toURI()) + .path("/api/v1/downloadserver/downloadId/").path(downloadId).build().toUriString()); authentificationResponse.setResponseCode(HttpStatus.OK.value()); } catch (final BadCredentialsException | AuthenticationServiceException | CredentialsExpiredException e) { LOG.error("Login failed", e); @@ -196,7 +200,7 @@ public class AmqpMessageHandlerService { authentificationResponse.setMessage(errorMessage); } - return rabbitTemplate.getMessageConverter().toMessage(authentificationResponse, messageProperties); + return messageConverter.toMessage(authentificationResponse, messageProperties); } private static Artifact convertDbArtifact(final DbArtifact dbArtifact) { @@ -219,9 +223,9 @@ public class AmqpMessageHandlerService { } private static void setTenantSecurityContext(final String tenantId) { - final AnonymousAuthenticationToken authenticationToken = new AnonymousAuthenticationToken(UUID.randomUUID() - .toString(), "AMQP-Controller", Collections.singletonList(new SimpleGrantedAuthority( - SpringEvalExpressions.CONTROLLER_ROLE_ANONYMOUS))); + final AnonymousAuthenticationToken authenticationToken = new AnonymousAuthenticationToken( + UUID.randomUUID().toString(), "AMQP-Controller", + Collections.singletonList(new SimpleGrantedAuthority(SpringEvalExpressions.CONTROLLER_ROLE_ANONYMOUS))); authenticationToken.setDetails(new TenantAwareAuthenticationDetails(tenantId, true)); setSecurityContext(authenticationToken); } @@ -250,7 +254,8 @@ public class AmqpMessageHandlerService { if (StringUtils.isEmpty(replyTo)) { logAndThrowMessageError(message, "No ReplyTo was set for the createThing Event."); } - final URI amqpUri = IpUtil.createAmqpUri(replyTo); + + final URI amqpUri = IpUtil.createAmqpUri(getVirtualHost(message), replyTo); final Target target = controllerManagement.findOrRegisterTargetIfItDoesNotexist(thingId, amqpUri); LOG.debug("Target {} reported online state.", thingId); @@ -267,8 +272,8 @@ public class AmqpMessageHandlerService { final DistributionSet distributionSet = action.getDistributionSet(); final List softwareModuleList = controllerManagement .findSoftwareModulesByDistributionSet(distributionSet); - eventBus.post(new TargetAssignDistributionSetEvent(target.getOptLockRevision(), target.getTenant(), target - .getControllerId(), action.getId(), softwareModuleList, target.getTargetInfo().getAddress())); + eventBus.post(new TargetAssignDistributionSetEvent(target.getOptLockRevision(), target.getTenant(), + target.getControllerId(), action.getId(), softwareModuleList, target.getTargetInfo().getAddress())); } @@ -281,13 +286,10 @@ public class AmqpMessageHandlerService { * the topic of the event. */ private void handleIncomingEvent(final Message message, final EventTopic topic) { - switch (topic) { - case UPDATE_ACTION_STATUS: + if (EventTopic.UPDATE_ACTION_STATUS.equals(topic)) { updateActionStatus(message); - return; - default: - logAndThrowMessageError(message, "Got event without appropriate topic."); } + logAndThrowMessageError(message, "Got event without appropriate topic."); } /** @@ -356,8 +358,8 @@ public class AmqpMessageHandlerService { */ private Action checkActionExist(final Message message, final ActionUpdateStatus actionUpdateStatus) { final Long actionId = actionUpdateStatus.getActionId(); - LOG.debug("Target notifies intermediate about action {} with status {}.", actionId, actionUpdateStatus - .getActionStatus().name()); + LOG.debug("Target notifies intermediate about action {} with status {}.", actionId, + actionUpdateStatus.getActionStatus().name()); if (actionId == null) { logAndThrowMessageError(message, "Invalid message no action id"); @@ -366,8 +368,8 @@ public class AmqpMessageHandlerService { final Action action = controllerManagement.findActionWithDetails(actionId); if (action == null) { - logAndThrowMessageError(message, "Got intermediate notification about action " + actionId - + " but action does not exist"); + logAndThrowMessageError(message, + "Got intermediate notification about action " + actionId + " but action does not exist"); } return action; } @@ -381,27 +383,11 @@ public class AmqpMessageHandlerService { // back to running action status } else { - logAndThrowMessageError(message, "Cancel Recjected message is not allowed, if action is on state: " - + action.getStatus()); + logAndThrowMessageError(message, + "Cancel Recjected message is not allowed, if action is on state: " + action.getStatus()); } } - /** - * Is needed to convert a incoming message to is originally object type. - * - * @param message - * the message to convert. - * @param clazz - * the class of the originally object. - * @return - */ - @SuppressWarnings("unchecked") - private T convertMessage(final Message message, final Class clazz) { - message.getMessageProperties().getHeaders() - .put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, clazz.getTypeName()); - return (T) rabbitTemplate.getMessageConverter().fromMessage(message); - } - /** * Is needed to verify if an incoming message has the content type json. * @@ -428,12 +414,12 @@ public class AmqpMessageHandlerService { this.hostnameResolver = hostnameResolver; } - void setRabbitTemplate(final RabbitTemplate rabbitTemplate) { - this.rabbitTemplate = rabbitTemplate; + void setMessageConverter(final MessageConverter messageConverter) { + this.messageConverter = messageConverter; } MessageConverter getMessageConverter() { - return rabbitTemplate.getMessageConverter(); + return messageConverter; } void setAuthenticationManager(final AmqpControllerAuthentfication authenticationManager) { diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java new file mode 100644 index 000000000..b7d8ed4e7 --- /dev/null +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java @@ -0,0 +1,24 @@ +package org.eclipse.hawkbit.amqp; + +import java.net.URI; + +import org.springframework.amqp.core.Message; + +/** + * Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved. + */ + +/** + * + */ +@FunctionalInterface +public interface AmqpSenderService { + + /** + * + * @param message + * @param uri + */ + void sendMessage(Message message, URI uri); + +} diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java new file mode 100644 index 000000000..8371a0586 --- /dev/null +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -0,0 +1,60 @@ +/** + * Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved. + */ +package org.eclipse.hawkbit.amqp; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.beans.factory.annotation.Autowired; + +/** + * @author Dennis Melzer + * + */ +public class BaseAmqpService { + + protected static final String VIRTUAL_HOST_MESSAGE_HEADER = "VHOST_HEADER"; + + protected MessageConverter messageConverter; + + protected RabbitTemplate spInternalConnectorTemplate; + + @Autowired + public BaseAmqpService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { + this.messageConverter = messageConverter; + spInternalConnectorTemplate = defaultTemplate; + } + + protected String getVirtualHost(final Message message) { + final Object virtualHost = message.getMessageProperties().getHeaders().get(VIRTUAL_HOST_MESSAGE_HEADER); + + if (virtualHost == null) { + return spInternalConnectorTemplate.getConnectionFactory().getVirtualHost(); + } + return virtualHost.toString(); + } + + protected void cleanMessage(final Message message) { + message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME); + message.getMessageProperties().getHeaders().remove(VIRTUAL_HOST_MESSAGE_HEADER); + } + + /** + * Is needed to convert a incoming message to is originally object type. + * + * @param message + * the message to convert. + * @param clazz + * the class of the originally object. + * @return + */ + @SuppressWarnings("unchecked") + protected T convertMessage(final Message message, final Class clazz) { + message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, + clazz.getTypeName()); + return (T) messageConverter.fromMessage(message); + } + +} diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java new file mode 100644 index 000000000..5a51f9b9f --- /dev/null +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java @@ -0,0 +1,29 @@ +/** + * Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved. + */ +package org.eclipse.hawkbit.amqp; + +import java.net.URI; + +import org.springframework.amqp.core.Message; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +/** + * + */ +public class DefaultAmqpSenderService extends BaseAmqpService implements AmqpSenderService { + + /** + * @param messageConverter + * @param defaultTemplate + */ + public DefaultAmqpSenderService(final RabbitTemplate defaultTemplate) { + super(defaultTemplate.getMessageConverter(), defaultTemplate); + } + + @Override + public void sendMessage(final Message message, final URI uri) { + spInternalConnectorTemplate.send(uri.getPath(), message); + } + +} diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java index 5a77c5fce..f5102c3c3 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java @@ -60,11 +60,8 @@ public class AmqpControllerAuthentficationTest { @Before public void before() throws Exception { - amqpMessageHandlerService = new AmqpMessageHandlerService(); messageConverter = new Jackson2JsonMessageConverter(); - final RabbitTemplate rabbitTemplate = new RabbitTemplate(); - rabbitTemplate.setMessageConverter(messageConverter); - amqpMessageHandlerService.setRabbitTemplate(rabbitTemplate); + amqpMessageHandlerService = new AmqpMessageHandlerService(messageConverter, mock(RabbitTemplate.class)); authenticationManager = new AmqpControllerAuthentfication(); authenticationManager.setControllerManagement(mock(ControllerManagement.class)); @@ -78,7 +75,6 @@ public class AmqpControllerAuthentficationTest { final ControllerManagement controllerManagement = mock(ControllerManagement.class); when(controllerManagement.getSecurityTokenByControllerId(anyString())).thenReturn(CONTROLLLER_ID); authenticationManager.setControllerManagement(controllerManagement); - amqpMessageHandlerService.setArtifactManagement(mock(ArtifactManagement.class)); authenticationManager.setTenantAware(new SecurityContextTenantAware()); diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java index 348e8dea4..dc9cd8e01 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java @@ -15,7 +15,6 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; @@ -59,6 +58,8 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit private AmqpMessageDispatcherService amqpMessageDispatcherService; + private AmqpSenderService senderService; + private MessageConverter messageConverter; private RabbitTemplate rabbitTemplate; @@ -68,7 +69,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit @Override public void before() throws Exception { super.before(); - amqpMessageDispatcherService = new AmqpMessageDispatcherService(); + amqpMessageDispatcherService = new AmqpMessageDispatcherService(messageConverter, rabbitTemplate); amqpMessageDispatcherService = spy(amqpMessageDispatcherService); messageConverter = new Jackson2JsonMessageConverter(); @@ -78,16 +79,17 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit this.rabbitTemplate = Mockito.mock(RabbitTemplate.class); when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter); - amqpMessageDispatcherService.setRabbitTemplate(rabbitTemplate); - amqpMessageDispatcherService.setTenantAware(tenantAware); amqpMessageDispatcherService.setArtifactUrlHandler(artifactUrlHandlerMock); + + senderService = new DefaultAmqpSenderService(rabbitTemplate); } @Test @Description("Verfies that download and install event with no software modul works") public void testSendDownloadRequesWithEmptySoftwareModules() { final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent( - 1L, "default", CONTROLLER_ID, 1l, new ArrayList(), IpUtil.createAmqpUri("mytest")); + 1L, "default", CONTROLLER_ID, 1l, new ArrayList(), + IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent); final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost()); final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage); @@ -100,7 +102,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit final DistributionSet dsA = TestDataUtil.generateDistributionSet("", softwareManagement, distributionSetManagement); final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent( - 1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("mytest")); + 1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent); final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost()); final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage); @@ -134,7 +136,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit Mockito.when(rabbitTemplate.convertSendAndReceive(any())).thenReturn(receivedList); final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent( - 1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("mytest")); + 1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent); final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost()); final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage); @@ -152,7 +154,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit @Description("Verfies that send cancel event works") public void testSendCancelRequest() { final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent = new CancelTargetAssignmentEvent( - 1L, "default", CONTROLLER_ID, 1l, IpUtil.createAmqpUri("mytest")); + 1L, "default", CONTROLLER_ID, 1l, IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService .targetCancelAssignmentToDistributionSet(cancelTargetAssignmentDistributionSetEvent); final Message sendMessage = createArgumentCapture( @@ -194,7 +196,8 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit protected Message createArgumentCapture(final String exchange) { final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Message.class); - Mockito.verify(amqpMessageDispatcherService).sendMessage(eq(exchange), argumentCaptor.capture()); + // Mockito.verify(senderService).sendMessage(argumentCaptor.capture(), + // eq(exchange)); return argumentCaptor.getValue(); } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java index bea75e9d6..189d79487 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java @@ -99,14 +99,14 @@ public class AmqpMessageHandlerServiceTest { @Mock private EventBus eventBus; + @Mock + private RabbitTemplate rabbitTemplate; + @Before public void before() throws Exception { - amqpMessageHandlerService = new AmqpMessageHandlerService(); - amqpMessageHandlerService.setControllerManagement(controllerManagementMock); messageConverter = new Jackson2JsonMessageConverter(); - final RabbitTemplate rabbitTemplate = new RabbitTemplate(); - rabbitTemplate.setMessageConverter(messageConverter); - amqpMessageHandlerService.setRabbitTemplate(rabbitTemplate); + amqpMessageHandlerService = new AmqpMessageHandlerService(messageConverter, rabbitTemplate); + amqpMessageHandlerService.setControllerManagement(controllerManagementMock); amqpMessageHandlerService.setAuthenticationManager(authenticationManagerMock); amqpMessageHandlerService.setArtifactManagement(artifactManagementMock); amqpMessageHandlerService.setCache(cacheMock); diff --git a/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java b/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java index 0068fd0c8..79285dfd6 100644 --- a/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java +++ b/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java @@ -95,7 +95,6 @@ public final class IpUtil { if (isIpV6) { return URI.create(scheme + SCHEME_SEPERATOR + "[" + host + "]"); } - return URI.create(scheme + SCHEME_SEPERATOR + host); } @@ -108,8 +107,9 @@ public final class IpUtil { * @throws IllegalArgumentException * If the given string not parsable */ - public static URI createAmqpUri(final String host) { - return createUri(AMPQP_SCHEME, host); + public static URI createAmqpUri(final String virtualHost, final String exchange) { + // TODO check + return createUri(AMPQP_SCHEME, virtualHost).resolve(exchange); } /** diff --git a/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java b/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java index e1e809ab8..30aa161ad 100644 --- a/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java +++ b/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java @@ -104,23 +104,24 @@ public class IpUtilTest { @Description("Tests create amqp uri ipv4 and ipv6") public void testCreateAmqpUri() { final String ipv4 = "10.99.99.1"; - URI amqpUri = IpUtil.createAmqpUri(ipv4); + URI amqpUri = IpUtil.createAmqpUri(ipv4, "path"); assertAmqpUri(ipv4, amqpUri); final String host = "myhost"; - amqpUri = IpUtil.createAmqpUri(host); + amqpUri = IpUtil.createAmqpUri(host, "path"); assertAmqpUri(host, amqpUri); final String ipv6 = "0:0:0:0:0:0:0:1"; - amqpUri = IpUtil.createAmqpUri(ipv6); + amqpUri = IpUtil.createAmqpUri(ipv6, "path"); assertAmqpUri("[" + ipv6 + "]", amqpUri); } - private void assertAmqpUri(final String host, final URI httpUri) { - assertTrue(IpUtil.isAmqpUri(httpUri)); - assertFalse(IpUtil.isHttpUri(httpUri)); - assertEquals(host, httpUri.getHost()); - assertEquals("amqp", httpUri.getScheme()); + private void assertAmqpUri(final String host, final URI amqpUri) { + assertTrue(IpUtil.isAmqpUri(amqpUri)); + assertFalse(IpUtil.isHttpUri(amqpUri)); + assertEquals(host, amqpUri.getHost()); + assertEquals("amqp", amqpUri.getScheme()); + assertEquals("path", amqpUri.getPath()); } @Test(expected = IllegalArgumentException.class) From 5d7ade1cf2de189f94aec645002935da26e7d100 Mon Sep 17 00:00:00 2001 From: SirWayne Date: Mon, 15 Feb 2016 16:20:45 +0100 Subject: [PATCH 02/11] Add create tenant Signed-off-by: SirWayne --- .../hawkbit/cache/TenantAwareCacheManager.java | 14 ++++++++++++-- .../eclipse/hawkbit/amqp/AmqpConfiguration.java | 2 ++ .../org/eclipse/hawkbit/amqp/BaseAmqpService.java | 2 -- .../hawkbit/MultiTenantJpaTransactionManager.java | 4 +++- 4 files changed, 17 insertions(+), 5 deletions(-) diff --git a/hawkbit-core/src/main/java/org/eclipse/hawkbit/cache/TenantAwareCacheManager.java b/hawkbit-core/src/main/java/org/eclipse/hawkbit/cache/TenantAwareCacheManager.java index 06d6e1719..435f1b2e1 100644 --- a/hawkbit-core/src/main/java/org/eclipse/hawkbit/cache/TenantAwareCacheManager.java +++ b/hawkbit-core/src/main/java/org/eclipse/hawkbit/cache/TenantAwareCacheManager.java @@ -51,7 +51,12 @@ public class TenantAwareCacheManager implements TenancyCacheManager { @Override public Cache getCache(final String name) { - final String currentTenant = tenantAware.getCurrentTenant().toUpperCase(); + String currentTenant = tenantAware.getCurrentTenant(); + if (currentTenant == null) { + return null; + } + + currentTenant = currentTenant.toUpperCase(); if (currentTenant.contains(TENANT_CACHE_DELIMITER)) { return null; } @@ -60,7 +65,12 @@ public class TenantAwareCacheManager implements TenancyCacheManager { @Override public Collection getCacheNames() { - final String currentTenant = tenantAware.getCurrentTenant().toUpperCase(); + String currentTenant = tenantAware.getCurrentTenant(); + if (currentTenant == null) { + return null; + } + + currentTenant = currentTenant.toUpperCase(); if (currentTenant.contains(TENANT_CACHE_DELIMITER)) { return Collections.emptyList(); } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 7ad557717..42b91e89b 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -23,6 +23,7 @@ import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; @@ -134,6 +135,7 @@ public class AmqpConfiguration { * @return */ @Bean + @ConditionalOnMissingBean public AmqpSenderService amqpSenderServiceBean() { return new DefaultAmqpSenderService(rabbitTemplate); } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index 8371a0586..adc5b43f6 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -7,7 +7,6 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.amqp.support.converter.MessageConverter; -import org.springframework.beans.factory.annotation.Autowired; /** * @author Dennis Melzer @@ -21,7 +20,6 @@ public class BaseAmqpService { protected RabbitTemplate spInternalConnectorTemplate; - @Autowired public BaseAmqpService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { this.messageConverter = messageConverter; spInternalConnectorTemplate = defaultTemplate; diff --git a/hawkbit-repository/src/main/java/org/eclipse/hawkbit/MultiTenantJpaTransactionManager.java b/hawkbit-repository/src/main/java/org/eclipse/hawkbit/MultiTenantJpaTransactionManager.java index 2ddbfe870..e19f08b4e 100644 --- a/hawkbit-repository/src/main/java/org/eclipse/hawkbit/MultiTenantJpaTransactionManager.java +++ b/hawkbit-repository/src/main/java/org/eclipse/hawkbit/MultiTenantJpaTransactionManager.java @@ -48,7 +48,9 @@ public class MultiTenantJpaTransactionManager extends JpaTransactionManager { && !definition.getName().startsWith(SystemManagement.class.getCanonicalName() + ".deleteTenant") && !definition.getName() .startsWith(SystemManagement.class.getCanonicalName() + ".currentTenantKeyGenerator") - && !definition.getName().startsWith(RolloutManagement.class.getCanonicalName() + ".rolloutScheduler")) { + && !definition.getName().startsWith(RolloutManagement.class.getCanonicalName() + ".rolloutScheduler") + && !definition.getName() + .startsWith(SystemManagement.class.getCanonicalName() + ".getOrCreateTenantMetadata")) { final String currentTenant = tenantAware.getCurrentTenant(); if (currentTenant == null) { From 91dfbbd3a6128ec3ba6089519e41c9633fa266ba Mon Sep 17 00:00:00 2001 From: SirWayne Date: Mon, 15 Feb 2016 16:48:02 +0100 Subject: [PATCH 03/11] Extract exchange from URI Signed-off-by: SirWayne --- .../main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java | 6 ++++++ .../org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java | 2 +- .../src/main/java/org/eclipse/hawkbit/util/IpUtil.java | 3 +-- .../src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java | 2 +- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index adc5b43f6..faed3eb74 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -3,6 +3,8 @@ */ package org.eclipse.hawkbit.amqp; +import java.net.URI; + import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; @@ -55,4 +57,8 @@ public class BaseAmqpService { return (T) messageConverter.fromMessage(message); } + protected String getExchangeFromAmqpUri(final URI amqpUri) { + return amqpUri.getPath().substring(1); + } + } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java index 5a51f9b9f..1b962da19 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java @@ -23,7 +23,7 @@ public class DefaultAmqpSenderService extends BaseAmqpService implements AmqpSen @Override public void sendMessage(final Message message, final URI uri) { - spInternalConnectorTemplate.send(uri.getPath(), message); + spInternalConnectorTemplate.send(getExchangeFromAmqpUri(uri), message); } } diff --git a/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java b/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java index 79285dfd6..07c22e796 100644 --- a/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java +++ b/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java @@ -108,8 +108,7 @@ public final class IpUtil { * If the given string not parsable */ public static URI createAmqpUri(final String virtualHost, final String exchange) { - // TODO check - return createUri(AMPQP_SCHEME, virtualHost).resolve(exchange); + return createUri(AMPQP_SCHEME, virtualHost).resolve("/" + exchange); } /** diff --git a/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java b/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java index 30aa161ad..47e9bfe9a 100644 --- a/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java +++ b/hawkbit-security-core/src/test/java/org/eclipse/hawkbit/util/IpUtilTest.java @@ -121,7 +121,7 @@ public class IpUtilTest { assertFalse(IpUtil.isHttpUri(amqpUri)); assertEquals(host, amqpUri.getHost()); assertEquals("amqp", amqpUri.getScheme()); - assertEquals("path", amqpUri.getPath()); + assertEquals("/path", amqpUri.getRawPath()); } @Test(expected = IllegalArgumentException.class) From be68ad32f521e1c00b1ec6d919802ed24524faa9 Mon Sep 17 00:00:00 2001 From: SirWayne Date: Tue, 16 Feb 2016 09:22:54 +0100 Subject: [PATCH 04/11] Add JavaDoc and refactor staff for clean code convention Signed-off-by: SirWayne --- .../amqp/AmqpMessageHandlerService.java | 21 ++----------------- .../eclipse/hawkbit/amqp/AmqpProperties.java | 4 ++-- .../eclipse/hawkbit/amqp/BaseAmqpService.java | 19 ++++++++++++++++- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 5eed2945c..539d652b6 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -12,7 +12,6 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.Collections; import java.util.List; -import java.util.Map; import java.util.UUID; import org.apache.commons.lang3.StringUtils; @@ -71,10 +70,8 @@ import com.google.common.eventbus.EventBus; /** * - * {@link AmqpMessageHandlerService} handles all incoming AMQP messages. - * - * - * + * {@link AmqpMessageHandlerService} handles all incoming AMQP messages for the + * queue which is configure for the property hawkbit.dmf.rabbitmq.receiverQueue. * */ public class AmqpMessageHandlerService extends BaseAmqpService { @@ -211,11 +208,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService { return artifact; } - protected void logAndThrowMessageError(final Message message, final String error) { - LOG.error("Error \"{}\" reported by message {}", error, message.getMessageProperties().getMessageId()); - throw new IllegalArgumentException(error); - } - private static void setSecurityContext(final Authentication authentication) { final SecurityContextImpl securityContextImpl = new SecurityContextImpl(); securityContextImpl.setAuthentication(authentication); @@ -230,15 +222,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService { setSecurityContext(authenticationToken); } - private String getStringHeaderKey(final Message message, final String key, final String errorMessageIfNull) { - final Map header = message.getMessageProperties().getHeaders(); - final Object value = header.get(key); - if (value == null) { - logAndThrowMessageError(message, errorMessageIfNull); - } - return value.toString(); - } - /** * Method to create a new target or to find the target if it already exists. * diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java index ecd2dc3d7..2c3477c1f 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java @@ -22,8 +22,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties("hawkbit.dmf.rabbitmq") public class AmqpProperties { - private String deadLetterQueue = "dmf_connector_deadletter"; - private String deadLetterExchange = "dmf.connector.deadletter"; + private String deadLetterQueue = "dmf_receiver_deadletter"; + private String deadLetterExchange = "dmf.receiver.deadletter"; private String receiverQueue = "dmf_receiver"; private boolean missingQueuesFatal = false; diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index faed3eb74..9b0702d41 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -4,18 +4,21 @@ package org.eclipse.hawkbit.amqp; import java.net.URI; +import java.util.Map; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.amqp.support.converter.MessageConverter; /** - * @author Dennis Melzer * */ public class BaseAmqpService { + private static final Logger LOGGER = LoggerFactory.getLogger(BaseAmqpService.class); protected static final String VIRTUAL_HOST_MESSAGE_HEADER = "VHOST_HEADER"; protected MessageConverter messageConverter; @@ -61,4 +64,18 @@ public class BaseAmqpService { return amqpUri.getPath().substring(1); } + protected String getStringHeaderKey(final Message message, final String key, final String errorMessageIfNull) { + final Map header = message.getMessageProperties().getHeaders(); + final Object value = header.get(key); + if (value == null) { + logAndThrowMessageError(message, errorMessageIfNull); + } + return value.toString(); + } + + protected void logAndThrowMessageError(final Message message, final String error) { + LOGGER.error("Error \"{}\" reported by message {}", error, message.getMessageProperties().getMessageId()); + throw new IllegalArgumentException(error); + } + } From b4421e7e4419c6b68f1a41e2f0a8b5e03f297742 Mon Sep 17 00:00:00 2001 From: SirWayne Date: Tue, 16 Feb 2016 12:24:33 +0100 Subject: [PATCH 05/11] Add JavaDoc, refactor staff for clean code convention and modify unit tests Signed-off-by: SirWayne --- .../hawkbit/amqp/AmqpConfiguration.java | 4 +- .../amqp/AmqpControllerAuthentfication.java | 5 +- .../amqp/AmqpMessageDispatcherService.java | 12 +++-- .../amqp/AmqpMessageHandlerService.java | 42 ++++++++++------ .../eclipse/hawkbit/amqp/AmqpProperties.java | 37 -------------- .../hawkbit/amqp/AmqpSenderService.java | 30 ++++++++++-- .../eclipse/hawkbit/amqp/BaseAmqpService.java | 30 ++++-------- .../amqp/DefaultAmqpSenderService.java | 27 +++++++---- .../hawkbit/AmqpTestConfiguration.java | 48 +++++++++++++++++++ .../AmqpControllerAuthentficationTest.java | 6 +-- .../AmqpMessageDispatcherServiceTest.java | 36 +++++++------- .../amqp/AmqpMessageHandlerServiceTest.java | 30 ++++++------ .../PropertyBasedArtifactUrlHandlerTest.java | 6 +++ .../security/SystemSecurityContext.java | 22 ++++----- .../java/org/eclipse/hawkbit/util/IpUtil.java | 9 ++-- 15 files changed, 194 insertions(+), 150 deletions(-) create mode 100644 hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index 42b91e89b..ad106c0bd 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -130,9 +130,9 @@ public class AmqpConfiguration { } /** - * Create amqp handler service bean. + * Create default amqp sender service bean. * - * @return + * @return the default amqp sender service bean */ @Bean @ConditionalOnMissingBean diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentfication.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentfication.java index dd36ef1fd..9b98cadfa 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentfication.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentfication.java @@ -127,10 +127,7 @@ public class AmqpControllerAuthentfication { LOGGER.debug("preAuthenticatedPrincipal = {} trying to authenticate", principal); - final PreAuthenticatedAuthenticationToken authRequest = new PreAuthenticatedAuthenticationToken(principal, - credentials); - - return authRequest; + return new PreAuthenticatedAuthenticationToken(principal, credentials); } public void setControllerManagement(final ControllerManagement controllerManagement) { diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java index 06809d47e..a2ffd06e3 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java @@ -36,9 +36,11 @@ import org.springframework.beans.factory.annotation.Autowired; import com.google.common.eventbus.Subscribe; /** - * {@link AmqpMessageDispatcherService} handles all outgoing AMQP messages. - * - * + * {@link AmqpMessageDispatcherService} create all outgoing AMQP messages and + * delegate the messages to a {@link AmqpSenderService}. + * + * Additionally the dispatcher listener/subscribe for some target events e.g. + * assignment. * */ @EventSubscriber @@ -160,4 +162,8 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { public void setArtifactUrlHandler(final ArtifactUrlHandler artifactUrlHandler) { this.artifactUrlHandler = artifactUrlHandler; } + + public void setAmqpSenderService(final AmqpSenderService amqpSenderService) { + this.amqpSenderService = amqpSenderService; + } } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 539d652b6..33ca6837e 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -98,15 +98,25 @@ public class AmqpMessageHandlerService extends BaseAmqpService { private HostnameResolver hostnameResolver; /** + * Constructor. + * * @param messageConverter + * the message converter. + * @param defaultTemplate + * the configured amqp template. */ - @Autowired public AmqpMessageHandlerService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { super(messageConverter, defaultTemplate); } + @RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory") + private Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type, + @Header(MessageHeaderKey.TENANT) final String tenant) { + return onMessage(message, type, tenant, internalAmqpTemplate.getConnectionFactory().getVirtualHost()); + } + /** - * /** Method to handle all incoming amqp messages. + * Method to handle all incoming amqp messages. * * @param message * incoming message @@ -116,11 +126,11 @@ public class AmqpMessageHandlerService extends BaseAmqpService { * the contentType of the message * @param tenant * the contentType of the message + * @param virtualHost + * the virtual host * @return a message if no message is send back to sender */ - @RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory") - public Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type, - @Header(MessageHeaderKey.TENANT) final String tenant) { + public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) { checkContentTypeJson(message); final SecurityContext oldContext = SecurityContextHolder.getContext(); try { @@ -128,7 +138,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { switch (messageType) { case THING_CREATED: setTenantSecurityContext(tenant); - registerTarget(message); + registerTarget(message, virtualHost); break; case EVENT: setTenantSecurityContext(tenant); @@ -230,7 +240,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { * @param ip * the ip of the target/thing */ - private void registerTarget(final Message message) { + private void registerTarget(final Message message, final String virtualHost) { final String thingId = getStringHeaderKey(message, MessageHeaderKey.THING_ID, "ThingId is null"); final String replyTo = message.getMessageProperties().getReplyTo(); @@ -238,7 +248,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { logAndThrowMessageError(message, "No ReplyTo was set for the createThing Event."); } - final URI amqpUri = IpUtil.createAmqpUri(getVirtualHost(message), replyTo); + final URI amqpUri = IpUtil.createAmqpUri(virtualHost, replyTo); final Target target = controllerManagement.findOrRegisterTargetIfItDoesNotexist(thingId, amqpUri); LOG.debug("Target {} reported online state.", thingId); @@ -271,6 +281,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { private void handleIncomingEvent(final Message message, final EventTopic topic) { if (EventTopic.UPDATE_ACTION_STATUS.equals(topic)) { updateActionStatus(message); + return; } logAndThrowMessageError(message, "Got event without appropriate topic."); } @@ -321,19 +332,20 @@ public class AmqpMessageHandlerService extends BaseAmqpService { logAndThrowMessageError(message, "Status for action does not exisit."); } - Action addUpdateActionStatus; - - if (!actionStatus.getStatus().equals(Status.CANCELED)) { - addUpdateActionStatus = controllerManagement.addUpdateActionStatus(actionStatus, action); - } else { - addUpdateActionStatus = controllerManagement.addCancelActionStatus(actionStatus, action); - } + final Action addUpdateActionStatus = getUpdateActionStatus(action, actionStatus); if (!addUpdateActionStatus.isActive()) { lookIfUpdateAvailable(action.getTarget()); } } + private Action getUpdateActionStatus(final Action action, final ActionStatus actionStatus) { + if (actionStatus.getStatus().equals(Status.CANCELED)) { + return controllerManagement.addCancelActionStatus(actionStatus, action); + } + return controllerManagement.addUpdateActionStatus(actionStatus, action); + } + /** * @param message * @param actionUpdateStatus diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java index 2c3477c1f..c3a807f48 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java @@ -8,15 +8,11 @@ */ package org.eclipse.hawkbit.amqp; -import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.boot.context.properties.ConfigurationProperties; /** * Bean which holds the necessary properties for configuring the AMQP * connection. - * - * - * * */ @ConfigurationProperties("hawkbit.dmf.rabbitmq") @@ -27,59 +23,26 @@ public class AmqpProperties { private String receiverQueue = "dmf_receiver"; private boolean missingQueuesFatal = false; - /** - * Is missingQueuesFatal enabled - * - * @see SimpleMessageListenerContainer#setMissingQueuesFatal - * @return the missingQueuesFatal enabled disabled - */ public boolean isMissingQueuesFatal() { return missingQueuesFatal; } - /** - * @param missingQueuesFatal - * the missingQueuesFatal to set. - * @see SimpleMessageListenerContainer#setMissingQueuesFatal - */ public void setMissingQueuesFatal(final boolean missingQueuesFatal) { this.missingQueuesFatal = missingQueuesFatal; } - /** - * Returns the dead letter exchange. - * - * @return dead letter exchange - */ public String getDeadLetterExchange() { return deadLetterExchange; } - /** - * Sets the dead letter exchange. - * - * @param deadLetterExchange - * the deadLetterExchange to be set - */ public void setDeadLetterExchange(final String deadLetterExchange) { this.deadLetterExchange = deadLetterExchange; } - /** - * Returns the dead letter queue. - * - * @return the dead letter queue - */ public String getDeadLetterQueue() { return deadLetterQueue; } - /** - * Sets the dead letter queue. - * - * @param deadLetterQueue - * the deadLetterQueue ro be set - */ public void setDeadLetterQueue(final String deadLetterQueue) { this.deadLetterQueue = deadLetterQueue; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java index b7d8ed4e7..936495aba 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java @@ -1,3 +1,11 @@ +/** + * Copyright (c) 2015 Bosch Software Innovations GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + */ package org.eclipse.hawkbit.amqp; import java.net.URI; @@ -5,20 +13,32 @@ import java.net.URI; import org.springframework.amqp.core.Message; /** - * Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved. - */ - -/** - * + * Interface to send a amqp message. */ @FunctionalInterface public interface AmqpSenderService { /** + * Send the given message to the given uri. The uri contains the (virtual) + * host and exchange. * * @param message + * the amqp message * @param uri + * the reply to uri */ void sendMessage(Message message, URI uri); + /** + * Extract the exchange from the uri. Default implementation removes the + * first /. + * + * @param amqpUri + * the amqp uri + * @return the exchange. + */ + default String extractExchange(final URI amqpUri) { + return amqpUri.getPath().substring(1); + } + } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index 9b0702d41..f7702e3a6 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -1,9 +1,13 @@ /** - * Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved. + * Copyright (c) 2015 Bosch Software Innovations GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html */ package org.eclipse.hawkbit.amqp; -import java.net.URI; import java.util.Map; import org.slf4j.Logger; @@ -14,34 +18,22 @@ import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.amqp.support.converter.MessageConverter; /** - * + * A base class which provide basis amqp staff. */ public class BaseAmqpService { private static final Logger LOGGER = LoggerFactory.getLogger(BaseAmqpService.class); - protected static final String VIRTUAL_HOST_MESSAGE_HEADER = "VHOST_HEADER"; - protected MessageConverter messageConverter; - protected RabbitTemplate spInternalConnectorTemplate; + protected RabbitTemplate internalAmqpTemplate; public BaseAmqpService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { this.messageConverter = messageConverter; - spInternalConnectorTemplate = defaultTemplate; - } - - protected String getVirtualHost(final Message message) { - final Object virtualHost = message.getMessageProperties().getHeaders().get(VIRTUAL_HOST_MESSAGE_HEADER); - - if (virtualHost == null) { - return spInternalConnectorTemplate.getConnectionFactory().getVirtualHost(); - } - return virtualHost.toString(); + internalAmqpTemplate = defaultTemplate; } protected void cleanMessage(final Message message) { message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME); - message.getMessageProperties().getHeaders().remove(VIRTUAL_HOST_MESSAGE_HEADER); } /** @@ -60,10 +52,6 @@ public class BaseAmqpService { return (T) messageConverter.fromMessage(message); } - protected String getExchangeFromAmqpUri(final URI amqpUri) { - return amqpUri.getPath().substring(1); - } - protected String getStringHeaderKey(final Message message, final String key, final String errorMessageIfNull) { final Map header = message.getMessageProperties().getHeaders(); final Object value = header.get(key); diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java index 1b962da19..3dad77f43 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpSenderService.java @@ -1,5 +1,10 @@ /** - * Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved. + * Copyright (c) 2015 Bosch Software Innovations GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html */ package org.eclipse.hawkbit.amqp; @@ -9,21 +14,27 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.core.RabbitTemplate; /** - * + * A default implementation for the sender service. The service sends all amqp + * message to the configured spring rabbitmq connections. The exchange is + * extracted from the uri. */ -public class DefaultAmqpSenderService extends BaseAmqpService implements AmqpSenderService { +public class DefaultAmqpSenderService implements AmqpSenderService { + + private final RabbitTemplate internalAmqpTemplate; /** - * @param messageConverter - * @param defaultTemplate + * Constructor. + * + * @param internalAmqpTemplate + * the amqp template */ - public DefaultAmqpSenderService(final RabbitTemplate defaultTemplate) { - super(defaultTemplate.getMessageConverter(), defaultTemplate); + public DefaultAmqpSenderService(final RabbitTemplate internalAmqpTemplate) { + this.internalAmqpTemplate = internalAmqpTemplate; } @Override public void sendMessage(final Message message, final URI uri) { - spInternalConnectorTemplate.send(getExchangeFromAmqpUri(uri), message); + internalAmqpTemplate.send(extractExchange(uri), message); } } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java new file mode 100644 index 000000000..a1dd54710 --- /dev/null +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2015 Bosch Software Innovations GmbH and others. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + */ +package org.eclipse.hawkbit; + +import org.eclipse.hawkbit.amqp.AmqpSenderService; +import org.eclipse.hawkbit.amqp.DefaultAmqpSenderService; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * + */ +@Configuration +public class AmqpTestConfiguration { + + /** + * Method to set the Jackson2JsonMessageConverter. + * + * @return the Jackson2JsonMessageConverter + */ + @Bean + public MessageConverter jsonMessageConverter() { + return new Jackson2JsonMessageConverter(); + } + + /** + * Create default amqp sender service bean. + * + * @param rabbitTemplate + * + * @return the default amqp sender service bean + */ + @Bean + @Autowired + public AmqpSenderService amqpSenderServiceBean(final RabbitTemplate rabbitTemplate) { + return new DefaultAmqpSenderService(rabbitTemplate); + } +} diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java index f5102c3c3..9d41df1ad 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java @@ -125,7 +125,7 @@ public class AmqpControllerAuthentficationTest { // test final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT); + TENANT, "vHost"); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -147,7 +147,7 @@ public class AmqpControllerAuthentficationTest { // test final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT); + TENANT, "vHost"); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -169,7 +169,7 @@ public class AmqpControllerAuthentficationTest { // test final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT); + TENANT, "vHost"); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java index dc9cd8e01..4e6e74ea6 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java @@ -15,9 +15,11 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyObject; import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; +import java.net.URI; import java.util.ArrayList; import java.util.List; @@ -44,7 +46,6 @@ import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; -import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.test.context.ActiveProfiles; import ru.yandex.qatools.allure.annotations.Description; @@ -58,30 +59,29 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit private AmqpMessageDispatcherService amqpMessageDispatcherService; - private AmqpSenderService senderService; - - private MessageConverter messageConverter; - private RabbitTemplate rabbitTemplate; + private DefaultAmqpSenderService senderService; + private static final String CONTROLLER_ID = "1"; @Override public void before() throws Exception { super.before(); - amqpMessageDispatcherService = new AmqpMessageDispatcherService(messageConverter, rabbitTemplate); + this.rabbitTemplate = Mockito.mock(RabbitTemplate.class); + when(rabbitTemplate.getMessageConverter()).thenReturn(new Jackson2JsonMessageConverter()); + amqpMessageDispatcherService = new AmqpMessageDispatcherService(new Jackson2JsonMessageConverter(), + rabbitTemplate); amqpMessageDispatcherService = spy(amqpMessageDispatcherService); - messageConverter = new Jackson2JsonMessageConverter(); + + senderService = Mockito.mock(DefaultAmqpSenderService.class); + amqpMessageDispatcherService.setAmqpSenderService(senderService); final ArtifactUrlHandler artifactUrlHandlerMock = Mockito.mock(ArtifactUrlHandler.class); when(artifactUrlHandlerMock.getUrl(anyString(), any(), anyObject())).thenReturn("http://mockurl"); - this.rabbitTemplate = Mockito.mock(RabbitTemplate.class); - when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter); - amqpMessageDispatcherService.setArtifactUrlHandler(artifactUrlHandlerMock); - senderService = new DefaultAmqpSenderService(rabbitTemplate); } @Test @@ -91,7 +91,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit 1L, "default", CONTROLLER_ID, 1l, new ArrayList(), IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent); - final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost()); + final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress()); final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage); assertTrue(downloadAndUpdateRequest.getSoftwareModules().isEmpty()); } @@ -104,7 +104,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent( 1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent); - final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost()); + final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress()); final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage); assertEquals(3, downloadAndUpdateRequest.getSoftwareModules().size()); for (final org.eclipse.hawkbit.dmf.json.model.SoftwareModule softwareModule : downloadAndUpdateRequest @@ -138,7 +138,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent( 1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent); - final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost()); + final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress()); final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage); assertEquals(3, downloadAndUpdateRequest.getSoftwareModules().size()); for (final org.eclipse.hawkbit.dmf.json.model.SoftwareModule softwareModule : downloadAndUpdateRequest @@ -157,8 +157,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit 1L, "default", CONTROLLER_ID, 1l, IpUtil.createAmqpUri("vHost", "mytest")); amqpMessageDispatcherService .targetCancelAssignmentToDistributionSet(cancelTargetAssignmentDistributionSetEvent); - final Message sendMessage = createArgumentCapture( - cancelTargetAssignmentDistributionSetEvent.getTargetAdress().getHost()); + final Message sendMessage = createArgumentCapture(cancelTargetAssignmentDistributionSetEvent.getTargetAdress()); assertCancelMessage(sendMessage); } @@ -194,10 +193,9 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit assertEquals(MessageProperties.CONTENT_TYPE_JSON, sendMessage.getMessageProperties().getContentType()); } - protected Message createArgumentCapture(final String exchange) { + protected Message createArgumentCapture(final URI uri) { final ArgumentCaptor argumentCaptor = ArgumentCaptor.forClass(Message.class); - // Mockito.verify(senderService).sendMessage(argumentCaptor.capture(), - // eq(exchange)); + Mockito.verify(senderService).sendMessage(argumentCaptor.capture(), eq(uri)); return argumentCaptor.getValue(); } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java index 189d79487..860403f0c 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java @@ -121,7 +121,7 @@ public class AmqpMessageHandlerServiceTest { final MessageProperties messageProperties = new MessageProperties(); messageProperties.setContentType("xml"); final Message message = new Message(new byte[0], messageProperties); - amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT); + amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost"); fail(); } @@ -140,11 +140,11 @@ public class AmqpMessageHandlerServiceTest { uriCaptor.capture())).thenReturn(null); // test - amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT); + amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost"); // verify assertThat(targetIdCaptor.getValue()).isEqualTo(knownThingId); - assertThat(uriCaptor.getValue().toString()).isEqualTo("amqp://MyTest"); + assertThat(uriCaptor.getValue().toString()).isEqualTo("amqp://vHost/MyTest"); } @@ -156,7 +156,7 @@ public class AmqpMessageHandlerServiceTest { final Message message = messageConverter.toMessage("", messageProperties); try { - amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT); + amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted since no replyTo header was set"); } catch (final IllegalArgumentException exception) { // test ok - exception was excepted @@ -170,7 +170,7 @@ public class AmqpMessageHandlerServiceTest { final MessageProperties messageProperties = createMessageProperties(MessageType.THING_CREATED); final Message message = messageConverter.toMessage(new byte[0], messageProperties); try { - amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT); + amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted since no thingID was set"); } catch (final IllegalArgumentException exception) { // test ok - exception was excepted @@ -186,7 +186,7 @@ public class AmqpMessageHandlerServiceTest { final Message message = messageConverter.toMessage(new byte[0], messageProperties); try { - amqpMessageHandlerService.onMessage(message, type, TENANT); + amqpMessageHandlerService.onMessage(message, type, TENANT, "vHost"); fail("IllegalArgumentException was excepeted due to unknown message type"); } catch (final IllegalArgumentException exception) { // test ok - exception was excepted @@ -199,21 +199,21 @@ public class AmqpMessageHandlerServiceTest { final MessageProperties messageProperties = createMessageProperties(MessageType.EVENT); final Message message = new Message(new byte[0], messageProperties); try { - amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT); + amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); fail(); } catch (final IllegalArgumentException e) { } try { messageProperties.setHeader(MessageHeaderKey.TOPIC, "wrongTopic"); - amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT); + amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); fail(); } catch (final IllegalArgumentException e) { } messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.CANCEL_DOWNLOAD.name()); try { - amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT); + amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted because there was no event topic"); } catch (final IllegalArgumentException exception) { // test ok - exception was excepted @@ -232,7 +232,7 @@ public class AmqpMessageHandlerServiceTest { messageProperties); try { - amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT); + amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted since no action id was set"); } catch (final IllegalArgumentException exception) { // test ok - exception was excepted @@ -249,7 +249,7 @@ public class AmqpMessageHandlerServiceTest { messageProperties); try { - amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT); + amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); fail("IllegalArgumentException was excepeted since no action id was set"); } catch (final IllegalArgumentException exception) { // test ok - exception was excepted @@ -267,7 +267,7 @@ public class AmqpMessageHandlerServiceTest { // test final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT); + TENANT, "vHost"); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -290,7 +290,7 @@ public class AmqpMessageHandlerServiceTest { // test final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT); + TENANT, "vHost"); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -321,7 +321,7 @@ public class AmqpMessageHandlerServiceTest { // test final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT); + TENANT, "vHost"); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -355,7 +355,7 @@ public class AmqpMessageHandlerServiceTest { messageProperties); // test - amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT); + amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost"); // verify final ArgumentCaptor captorTargetAssignDistributionSetEvent = ArgumentCaptor diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java index fcafb23e4..772b9f261 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/util/PropertyBasedArtifactUrlHandlerTest.java @@ -11,6 +11,9 @@ package org.eclipse.hawkbit.util; import static org.junit.Assert.assertEquals; import org.eclipse.hawkbit.AbstractIntegrationTestWithMongoDB; +import org.eclipse.hawkbit.AmqpTestConfiguration; +import org.eclipse.hawkbit.RepositoryApplicationConfiguration; +import org.eclipse.hawkbit.TestConfiguration; import org.eclipse.hawkbit.TestDataUtil; import org.eclipse.hawkbit.dmf.json.model.Artifact; import org.eclipse.hawkbit.repository.model.DistributionSet; @@ -20,6 +23,7 @@ import org.eclipse.hawkbit.tenancy.TenantAware; import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.SpringApplicationConfiguration; import ru.yandex.qatools.allure.annotations.Description; import ru.yandex.qatools.allure.annotations.Features; @@ -31,6 +35,8 @@ import ru.yandex.qatools.allure.annotations.Stories; */ @Features("Component Tests - Artifact URL Handler") @Stories("Test to generate the artifact download URL") +@SpringApplicationConfiguration(classes = { RepositoryApplicationConfiguration.class, TestConfiguration.class, + AmqpTestConfiguration.class }) public class PropertyBasedArtifactUrlHandlerTest extends AbstractIntegrationTestWithMongoDB { @Autowired diff --git a/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/security/SystemSecurityContext.java b/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/security/SystemSecurityContext.java index 7e3dc8de7..c1125667e 100644 --- a/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/security/SystemSecurityContext.java +++ b/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/security/SystemSecurityContext.java @@ -15,7 +15,6 @@ import java.util.concurrent.Callable; import org.eclipse.hawkbit.im.authentication.SpPermission.SpringEvalExpressions; import org.eclipse.hawkbit.tenancy.TenantAware; -import org.eclipse.hawkbit.tenancy.TenantAware.TenantRunner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -30,8 +29,7 @@ import org.springframework.stereotype.Service; import com.google.common.base.Throwables; /** - * @author Michael Hirsch - * + * */ @Service public class SystemSecurityContext { @@ -45,15 +43,12 @@ public class SystemSecurityContext { final SecurityContext oldContext = SecurityContextHolder.getContext(); try { logger.debug("entering system code execution"); - return tenantAware.runAsTenant(tenantAware.getCurrentTenant(), new TenantRunner() { - @Override - public T run() { - try { - setSystemContext(); - return callable.call(); - } catch (final Exception e) { - throw Throwables.propagate(e); - } + return tenantAware.runAsTenant(tenantAware.getCurrentTenant(), () -> { + try { + setSystemContext(); + return callable.call(); + } catch (final Exception e) { + throw Throwables.propagate(e); } }); @@ -106,7 +101,8 @@ public class SystemSecurityContext { } @Override - public void setAuthenticated(final boolean isAuthenticated) throws IllegalArgumentException { + public void setAuthenticated(final boolean isAuthenticated) { + // not needed } } } diff --git a/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java b/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java index 07c22e796..4e08d8bfe 100644 --- a/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java +++ b/hawkbit-security-core/src/main/java/org/eclipse/hawkbit/util/IpUtil.java @@ -20,9 +20,6 @@ import com.google.common.net.HttpHeaders; /** * A utility which determines the correct IP of a connected {@link Target}. E.g * from a {@link HttpServletRequest}. - * - * - * * */ public final class IpUtil { @@ -103,12 +100,14 @@ public final class IpUtil { * * @param host * the host + * @param exchange + * the exchange will store in the path * @return the {@link URI} * @throws IllegalArgumentException * If the given string not parsable */ - public static URI createAmqpUri(final String virtualHost, final String exchange) { - return createUri(AMPQP_SCHEME, virtualHost).resolve("/" + exchange); + public static URI createAmqpUri(final String host, final String exchange) { + return createUri(AMPQP_SCHEME, host).resolve("/" + exchange); } /** From f42fbe32b37c0bfe08313a43ebcbe8308e9a80d9 Mon Sep 17 00:00:00 2001 From: SirWayne Date: Wed, 17 Feb 2016 15:16:24 +0100 Subject: [PATCH 06/11] Add null check in convert methode Signed-off-by: SirWayne --- .../main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index f7702e3a6..c3811626e 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -47,6 +47,9 @@ public class BaseAmqpService { */ @SuppressWarnings("unchecked") protected T convertMessage(final Message message, final Class clazz) { + if (message == null) { + return null; + } message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, clazz.getTypeName()); return (T) messageConverter.fromMessage(message); From cfaf02779fb4e7c2b70e58d093c1387920398679 Mon Sep 17 00:00:00 2001 From: SirWayne Date: Wed, 17 Feb 2016 23:43:52 +0100 Subject: [PATCH 07/11] Convert Json List Signed-off-by: SirWayne --- .../eclipse/hawkbit/amqp/BaseAmqpService.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index c3811626e..d9af2b533 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -8,6 +8,9 @@ */ package org.eclipse.hawkbit.amqp; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Map; import org.slf4j.Logger; @@ -55,6 +58,27 @@ public class BaseAmqpService { return (T) messageConverter.fromMessage(message); } + /** + * Is needed to convert a incoming message to is originally object type. + * + * @param message + * the message to convert. + * @param clazz + * the class of the originally object. + * @return + */ + @SuppressWarnings("unchecked") + protected List convertMessageList(final Message message, final Class clazz) { + if (message == null) { + return Collections.emptyList(); + } + message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, + ArrayList.class); + message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, + clazz.getTypeName()); + return (List) messageConverter.fromMessage(message); + } + protected String getStringHeaderKey(final Message message, final String key, final String errorMessageIfNull) { final Map header = message.getMessageProperties().getHeaders(); final Object value = header.get(key); From 031be3e8cb6ae6651e70c7c84c847bd0b4606523 Mon Sep 17 00:00:00 2001 From: SirWayne Date: Thu, 18 Feb 2016 00:26:45 +0100 Subject: [PATCH 08/11] Convert Json List Signed-off-by: SirWayne --- .../main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index d9af2b533..ca0a04485 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -54,7 +54,7 @@ public class BaseAmqpService { return null; } message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, - clazz.getTypeName()); + clazz.getName()); return (T) messageConverter.fromMessage(message); } @@ -73,9 +73,9 @@ public class BaseAmqpService { return Collections.emptyList(); } message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, - ArrayList.class); + ArrayList.class.getName()); message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, - clazz.getTypeName()); + clazz.getName()); return (List) messageConverter.fromMessage(message); } From 661d3e2d417727867e78d17b8fa5cc002c41e352 Mon Sep 17 00:00:00 2001 From: SirWayne Date: Thu, 18 Feb 2016 10:27:58 +0100 Subject: [PATCH 09/11] Add JavaDoc Signed-off-by: SirWayne --- .../main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index ca0a04485..6cfa89358 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -59,12 +59,13 @@ public class BaseAmqpService { } /** - * Is needed to convert a incoming message to is originally object type. + * Is needed to convert a incoming message to is originally list object + * type. * * @param message * the message to convert. * @param clazz - * the class of the originally object. + * the class of the list content. * @return */ @SuppressWarnings("unchecked") From 5973413cbeba331a97f39490f7510b2a4adae793 Mon Sep 17 00:00:00 2001 From: SirWayne Date: Tue, 1 Mar 2016 08:37:50 +0100 Subject: [PATCH 10/11] Add Javadoc and remove message converter from base class Signed-off-by: SirWayne --- .../hawkbit/amqp/AmqpConfiguration.java | 4 +-- .../amqp/AmqpMessageDispatcherService.java | 11 ++++-- .../amqp/AmqpMessageHandlerService.java | 35 ++++--------------- .../hawkbit/amqp/AmqpSenderService.java | 2 +- .../eclipse/hawkbit/amqp/BaseAmqpService.java | 33 +++++++++++------ .../AmqpControllerAuthentficationTest.java | 4 ++- .../AmqpMessageDispatcherServiceTest.java | 3 +- .../amqp/AmqpMessageHandlerServiceTest.java | 3 +- 8 files changed, 46 insertions(+), 49 deletions(-) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index ad106c0bd..d2cd1eab8 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -122,11 +122,11 @@ public class AmqpConfiguration { /** * Create amqp handler service bean. * - * @return + * @return handler service bean */ @Bean public AmqpMessageHandlerService amqpMessageHandlerService() { - return new AmqpMessageHandlerService(jsonMessageConverter(), rabbitTemplate); + return new AmqpMessageHandlerService(rabbitTemplate); } /** diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java index a2ffd06e3..8681d3a7a 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java @@ -29,7 +29,6 @@ import org.eclipse.hawkbit.util.ArtifactUrlHandler; import org.eclipse.hawkbit.util.IpUtil; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; -import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; @@ -52,9 +51,15 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { @Autowired private AmqpSenderService amqpSenderService; + /** + * Constructor. + * + * @param messageConverter + * message converter + */ @Autowired - public AmqpMessageDispatcherService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { - super(messageConverter, defaultTemplate); + public AmqpMessageDispatcherService(final MessageConverter messageConverter) { + super(messageConverter); } /** diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 33ca6837e..66de7ada0 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -49,7 +49,6 @@ import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.cache.Cache; @@ -97,16 +96,17 @@ public class AmqpMessageHandlerService extends BaseAmqpService { @Autowired private HostnameResolver hostnameResolver; + private final RabbitTemplate internalAmqpTemplate; + /** * Constructor. * - * @param messageConverter - * the message converter. * @param defaultTemplate * the configured amqp template. */ - public AmqpMessageHandlerService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { - super(messageConverter, defaultTemplate); + public AmqpMessageHandlerService(final RabbitTemplate defaultTemplate) { + super(defaultTemplate.getMessageConverter()); + this.internalAmqpTemplate = defaultTemplate; } @RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory") @@ -346,11 +346,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService { return controllerManagement.addUpdateActionStatus(actionStatus, action); } - /** - * @param message - * @param actionUpdateStatus - * @return - */ private Action checkActionExist(final Message message, final ActionUpdateStatus actionUpdateStatus) { final Long actionId = actionUpdateStatus.getActionId(); LOG.debug("Target notifies intermediate about action {} with status {}.", actionId, @@ -383,17 +378,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { } } - /** - * Is needed to verify if an incoming message has the content type json. - * - * @param message - * the to verify - * @param contentType - * the content type - * @return true if the content type has json, false it not. - */ - - private static void checkContentTypeJson(final Message message) { + private void checkContentTypeJson(final Message message) { final MessageProperties messageProperties = message.getMessageProperties(); if (messageProperties.getContentType() != null && messageProperties.getContentType().contains("json")) { return; @@ -409,14 +394,6 @@ public class AmqpMessageHandlerService extends BaseAmqpService { this.hostnameResolver = hostnameResolver; } - void setMessageConverter(final MessageConverter messageConverter) { - this.messageConverter = messageConverter; - } - - MessageConverter getMessageConverter() { - return messageConverter; - } - void setAuthenticationManager(final AmqpControllerAuthentfication authenticationManager) { this.authenticationManager = authenticationManager; } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java index 936495aba..6cb3dd9be 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpSenderService.java @@ -20,7 +20,7 @@ public interface AmqpSenderService { /** * Send the given message to the given uri. The uri contains the (virtual) - * host and exchange. + * host and exchange e.g amqp://host/exchange. * * @param message * the amqp message diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java index 6cfa89358..5ad27c041 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/BaseAmqpService.java @@ -16,7 +16,6 @@ import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; -import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.amqp.support.converter.MessageConverter; @@ -28,14 +27,23 @@ public class BaseAmqpService { private static final Logger LOGGER = LoggerFactory.getLogger(BaseAmqpService.class); protected MessageConverter messageConverter; - protected RabbitTemplate internalAmqpTemplate; - - public BaseAmqpService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) { + /** + * Constructor. + * + * @param messageConverter + * the message messageConverter. + */ + public BaseAmqpService(final MessageConverter messageConverter) { this.messageConverter = messageConverter; - internalAmqpTemplate = defaultTemplate; } - protected void cleanMessage(final Message message) { + /** + * Clean message properties before sending a message. + * + * @param message + * the message to cleaned up + */ + protected void cleanMessageHeaderProperties(final Message message) { message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME); } @@ -46,7 +54,7 @@ public class BaseAmqpService { * the message to convert. * @param clazz * the class of the originally object. - * @return + * @return the converted object */ @SuppressWarnings("unchecked") protected T convertMessage(final Message message, final Class clazz) { @@ -66,7 +74,7 @@ public class BaseAmqpService { * the message to convert. * @param clazz * the class of the list content. - * @return + * @return the list of converted objects */ @SuppressWarnings("unchecked") protected List convertMessageList(final Message message, final Class clazz) { @@ -80,7 +88,12 @@ public class BaseAmqpService { return (List) messageConverter.fromMessage(message); } - protected String getStringHeaderKey(final Message message, final String key, final String errorMessageIfNull) { + public MessageConverter getMessageConverter() { + return messageConverter; + } + + protected final String getStringHeaderKey(final Message message, final String key, + final String errorMessageIfNull) { final Map header = message.getMessageProperties().getHeaders(); final Object value = header.get(key); if (value == null) { @@ -89,7 +102,7 @@ public class BaseAmqpService { return value.toString(); } - protected void logAndThrowMessageError(final Message message, final String error) { + protected final void logAndThrowMessageError(final Message message, final String error) { LOGGER.error("Error \"{}\" reported by message {}", error, message.getMessageProperties().getMessageId()); throw new IllegalArgumentException(error); } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java index e8b8a089b..ce2db47d5 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthentficationTest.java @@ -61,7 +61,9 @@ public class AmqpControllerAuthentficationTest { @Before public void before() throws Exception { messageConverter = new Jackson2JsonMessageConverter(); - amqpMessageHandlerService = new AmqpMessageHandlerService(messageConverter, mock(RabbitTemplate.class)); + final RabbitTemplate rabbitTemplate = mock(RabbitTemplate.class); + when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter); + amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate); authenticationManager = new AmqpControllerAuthentfication(); authenticationManager.setControllerManagement(mock(ControllerManagement.class)); diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java index 4c8300bfd..5967976ca 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherServiceTest.java @@ -70,8 +70,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit super.before(); this.rabbitTemplate = Mockito.mock(RabbitTemplate.class); when(rabbitTemplate.getMessageConverter()).thenReturn(new Jackson2JsonMessageConverter()); - amqpMessageDispatcherService = new AmqpMessageDispatcherService(new Jackson2JsonMessageConverter(), - rabbitTemplate); + amqpMessageDispatcherService = new AmqpMessageDispatcherService(new Jackson2JsonMessageConverter()); amqpMessageDispatcherService = spy(amqpMessageDispatcherService); senderService = Mockito.mock(DefaultAmqpSenderService.class); diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java index 9bc2f2da7..582a1857b 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java @@ -105,7 +105,8 @@ public class AmqpMessageHandlerServiceTest { @Before public void before() throws Exception { messageConverter = new Jackson2JsonMessageConverter(); - amqpMessageHandlerService = new AmqpMessageHandlerService(messageConverter, rabbitTemplate); + when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter); + amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate); amqpMessageHandlerService.setControllerManagement(controllerManagementMock); amqpMessageHandlerService.setAuthenticationManager(authenticationManagerMock); amqpMessageHandlerService.setArtifactManagement(artifactManagementMock); From d014e81b738644cbfd8a8b1377fc45de29e3b6ac Mon Sep 17 00:00:00 2001 From: Dennis Melzer Date: Wed, 2 Mar 2016 12:50:00 +0100 Subject: [PATCH 11/11] Update AmqpMessageHandlerService.java --- .../org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 66de7ada0..ae1926331 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -374,7 +374,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { } else { logAndThrowMessageError(message, - "Cancel Recjected message is not allowed, if action is on state: " + action.getStatus()); + "Cancel recjected message is not allowed, if action is on state: " + action.getStatus()); } }