Add sender service to customize sending messages
Signed-off-by: SirWayne <dennis.melzer@bosch-si.com>
This commit is contained in:
@@ -125,7 +125,17 @@ public class AmqpConfiguration {
|
||||
*/
|
||||
@Bean
|
||||
public AmqpMessageHandlerService amqpMessageHandlerService() {
|
||||
return new AmqpMessageHandlerService();
|
||||
return new AmqpMessageHandlerService(jsonMessageConverter(), rabbitTemplate);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create amqp handler service bean.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Bean
|
||||
public AmqpSenderService amqpSenderServiceBean() {
|
||||
return new DefaultAmqpSenderService(rabbitTemplate);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -25,13 +25,12 @@ import org.eclipse.hawkbit.eventbus.EventSubscriber;
|
||||
import org.eclipse.hawkbit.eventbus.event.CancelTargetAssignmentEvent;
|
||||
import org.eclipse.hawkbit.eventbus.event.TargetAssignDistributionSetEvent;
|
||||
import org.eclipse.hawkbit.repository.model.LocalArtifact;
|
||||
import org.eclipse.hawkbit.tenancy.TenantAware;
|
||||
import org.eclipse.hawkbit.util.ArtifactUrlHandler;
|
||||
import org.eclipse.hawkbit.util.IpUtil;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import com.google.common.eventbus.Subscribe;
|
||||
@@ -43,17 +42,19 @@ import com.google.common.eventbus.Subscribe;
|
||||
*
|
||||
*/
|
||||
@EventSubscriber
|
||||
public class AmqpMessageDispatcherService {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Autowired
|
||||
private TenantAware tenantAware;
|
||||
public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
|
||||
@Autowired
|
||||
private ArtifactUrlHandler artifactUrlHandler;
|
||||
|
||||
@Autowired
|
||||
private AmqpSenderService amqpSenderService;
|
||||
|
||||
@Autowired
|
||||
public AmqpMessageDispatcherService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) {
|
||||
super(messageConverter, defaultTemplate);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to send a message to a RabbitMQ Exchange after the Distribution
|
||||
* set has been assign to a Target.
|
||||
@@ -79,11 +80,9 @@ public class AmqpMessageDispatcherService {
|
||||
downloadAndUpdateRequest.addSoftwareModule(amqpSoftwareModule);
|
||||
}
|
||||
|
||||
final Message message = rabbitTemplate.getMessageConverter().toMessage(
|
||||
downloadAndUpdateRequest,
|
||||
createConnectorMessageProperties(targetAssignDistributionSetEvent.getTenant(), controllerId,
|
||||
EventTopic.DOWNLOAD_AND_INSTALL));
|
||||
sendMessage(targetAdress.getHost(), message);
|
||||
final Message message = messageConverter.toMessage(downloadAndUpdateRequest, createConnectorMessageProperties(
|
||||
targetAssignDistributionSetEvent.getTenant(), controllerId, EventTopic.DOWNLOAD_AND_INSTALL));
|
||||
amqpSenderService.sendMessage(message, targetAdress);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -98,29 +97,13 @@ public class AmqpMessageDispatcherService {
|
||||
final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent) {
|
||||
final String controllerId = cancelTargetAssignmentDistributionSetEvent.getControllerId();
|
||||
final Long actionId = cancelTargetAssignmentDistributionSetEvent.getActionId();
|
||||
final Message message = rabbitTemplate.getMessageConverter().toMessage(
|
||||
actionId,
|
||||
createConnectorMessageProperties(cancelTargetAssignmentDistributionSetEvent.getTenant(), controllerId,
|
||||
EventTopic.CANCEL_DOWNLOAD));
|
||||
final Message message = messageConverter.toMessage(actionId, createConnectorMessageProperties(
|
||||
cancelTargetAssignmentDistributionSetEvent.getTenant(), controllerId, EventTopic.CANCEL_DOWNLOAD));
|
||||
|
||||
sendMessage(cancelTargetAssignmentDistributionSetEvent.getTargetAdress().getHost(), message);
|
||||
amqpSenderService.sendMessage(message, cancelTargetAssignmentDistributionSetEvent.getTargetAdress());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Send message to exchange.
|
||||
*
|
||||
* @param exchange
|
||||
* the exchange
|
||||
* @param message
|
||||
* the message
|
||||
*/
|
||||
public void sendMessage(final String exchange, final Message message) {
|
||||
message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME);
|
||||
rabbitTemplate.setExchange(exchange);
|
||||
rabbitTemplate.send(message);
|
||||
}
|
||||
|
||||
private MessageProperties createConnectorMessageProperties(final String tenant, final String controllerId,
|
||||
final EventTopic topic) {
|
||||
final MessageProperties messageProperties = createMessageProperties();
|
||||
@@ -155,9 +138,8 @@ public class AmqpMessageDispatcherService {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
final List<Artifact> convertedArtifacts = localArtifacts.stream()
|
||||
.map(localArtifact -> convertArtifact(targetId, localArtifact)).collect(Collectors.toList());
|
||||
return convertedArtifacts;
|
||||
return localArtifacts.stream().map(localArtifact -> convertArtifact(targetId, localArtifact))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private Artifact convertArtifact(final String targetId, final LocalArtifact localArtifact) {
|
||||
@@ -175,14 +157,6 @@ public class AmqpMessageDispatcherService {
|
||||
return artifact;
|
||||
}
|
||||
|
||||
public void setTenantAware(final TenantAware tenantAware) {
|
||||
this.tenantAware = tenantAware;
|
||||
}
|
||||
|
||||
public void setRabbitTemplate(final RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
}
|
||||
|
||||
public void setArtifactUrlHandler(final ArtifactUrlHandler artifactUrlHandler) {
|
||||
this.artifactUrlHandler = artifactUrlHandler;
|
||||
}
|
||||
|
||||
@@ -50,7 +50,6 @@ import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
@@ -78,13 +77,10 @@ import com.google.common.eventbus.EventBus;
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class AmqpMessageHandlerService {
|
||||
public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageHandlerService.class);
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Autowired
|
||||
private ControllerManagement controllerManagement;
|
||||
|
||||
@@ -104,6 +100,14 @@ public class AmqpMessageHandlerService {
|
||||
@Autowired
|
||||
private HostnameResolver hostnameResolver;
|
||||
|
||||
/**
|
||||
* @param messageConverter
|
||||
*/
|
||||
@Autowired
|
||||
public AmqpMessageHandlerService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) {
|
||||
super(messageConverter, defaultTemplate);
|
||||
}
|
||||
|
||||
/**
|
||||
* /** Method to handle all incoming amqp messages.
|
||||
*
|
||||
@@ -153,8 +157,8 @@ public class AmqpMessageHandlerService {
|
||||
final String sha1 = secruityToken.getSha1();
|
||||
try {
|
||||
SecurityContextHolder.getContext().setAuthentication(authenticationManager.doAuthenticate(secruityToken));
|
||||
final LocalArtifact localArtifact = artifactManagement.findFirstLocalArtifactsBySHA1(secruityToken
|
||||
.getSha1());
|
||||
final LocalArtifact localArtifact = artifactManagement
|
||||
.findFirstLocalArtifactsBySHA1(secruityToken.getSha1());
|
||||
if (localArtifact == null) {
|
||||
throw new EntityNotFoundException();
|
||||
}
|
||||
@@ -177,9 +181,9 @@ public class AmqpMessageHandlerService {
|
||||
final String downloadId = UUID.randomUUID().toString();
|
||||
final DownloadArtifactCache downloadCache = new DownloadArtifactCache(DownloadType.BY_SHA1, sha1);
|
||||
cache.put(downloadId, downloadCache);
|
||||
authentificationResponse.setDownloadUrl(UriComponentsBuilder
|
||||
.fromUri(hostnameResolver.resolveHostname().toURI()).path("/api/v1/downloadserver/downloadId/")
|
||||
.path(downloadId).build().toUriString());
|
||||
authentificationResponse
|
||||
.setDownloadUrl(UriComponentsBuilder.fromUri(hostnameResolver.resolveHostname().toURI())
|
||||
.path("/api/v1/downloadserver/downloadId/").path(downloadId).build().toUriString());
|
||||
authentificationResponse.setResponseCode(HttpStatus.OK.value());
|
||||
} catch (final BadCredentialsException | AuthenticationServiceException | CredentialsExpiredException e) {
|
||||
LOG.error("Login failed", e);
|
||||
@@ -196,7 +200,7 @@ public class AmqpMessageHandlerService {
|
||||
authentificationResponse.setMessage(errorMessage);
|
||||
}
|
||||
|
||||
return rabbitTemplate.getMessageConverter().toMessage(authentificationResponse, messageProperties);
|
||||
return messageConverter.toMessage(authentificationResponse, messageProperties);
|
||||
}
|
||||
|
||||
private static Artifact convertDbArtifact(final DbArtifact dbArtifact) {
|
||||
@@ -219,9 +223,9 @@ public class AmqpMessageHandlerService {
|
||||
}
|
||||
|
||||
private static void setTenantSecurityContext(final String tenantId) {
|
||||
final AnonymousAuthenticationToken authenticationToken = new AnonymousAuthenticationToken(UUID.randomUUID()
|
||||
.toString(), "AMQP-Controller", Collections.singletonList(new SimpleGrantedAuthority(
|
||||
SpringEvalExpressions.CONTROLLER_ROLE_ANONYMOUS)));
|
||||
final AnonymousAuthenticationToken authenticationToken = new AnonymousAuthenticationToken(
|
||||
UUID.randomUUID().toString(), "AMQP-Controller",
|
||||
Collections.singletonList(new SimpleGrantedAuthority(SpringEvalExpressions.CONTROLLER_ROLE_ANONYMOUS)));
|
||||
authenticationToken.setDetails(new TenantAwareAuthenticationDetails(tenantId, true));
|
||||
setSecurityContext(authenticationToken);
|
||||
}
|
||||
@@ -250,7 +254,8 @@ public class AmqpMessageHandlerService {
|
||||
if (StringUtils.isEmpty(replyTo)) {
|
||||
logAndThrowMessageError(message, "No ReplyTo was set for the createThing Event.");
|
||||
}
|
||||
final URI amqpUri = IpUtil.createAmqpUri(replyTo);
|
||||
|
||||
final URI amqpUri = IpUtil.createAmqpUri(getVirtualHost(message), replyTo);
|
||||
final Target target = controllerManagement.findOrRegisterTargetIfItDoesNotexist(thingId, amqpUri);
|
||||
LOG.debug("Target {} reported online state.", thingId);
|
||||
|
||||
@@ -267,8 +272,8 @@ public class AmqpMessageHandlerService {
|
||||
final DistributionSet distributionSet = action.getDistributionSet();
|
||||
final List<SoftwareModule> softwareModuleList = controllerManagement
|
||||
.findSoftwareModulesByDistributionSet(distributionSet);
|
||||
eventBus.post(new TargetAssignDistributionSetEvent(target.getOptLockRevision(), target.getTenant(), target
|
||||
.getControllerId(), action.getId(), softwareModuleList, target.getTargetInfo().getAddress()));
|
||||
eventBus.post(new TargetAssignDistributionSetEvent(target.getOptLockRevision(), target.getTenant(),
|
||||
target.getControllerId(), action.getId(), softwareModuleList, target.getTargetInfo().getAddress()));
|
||||
|
||||
}
|
||||
|
||||
@@ -281,13 +286,10 @@ public class AmqpMessageHandlerService {
|
||||
* the topic of the event.
|
||||
*/
|
||||
private void handleIncomingEvent(final Message message, final EventTopic topic) {
|
||||
switch (topic) {
|
||||
case UPDATE_ACTION_STATUS:
|
||||
if (EventTopic.UPDATE_ACTION_STATUS.equals(topic)) {
|
||||
updateActionStatus(message);
|
||||
return;
|
||||
default:
|
||||
logAndThrowMessageError(message, "Got event without appropriate topic.");
|
||||
}
|
||||
logAndThrowMessageError(message, "Got event without appropriate topic.");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -356,8 +358,8 @@ public class AmqpMessageHandlerService {
|
||||
*/
|
||||
private Action checkActionExist(final Message message, final ActionUpdateStatus actionUpdateStatus) {
|
||||
final Long actionId = actionUpdateStatus.getActionId();
|
||||
LOG.debug("Target notifies intermediate about action {} with status {}.", actionId, actionUpdateStatus
|
||||
.getActionStatus().name());
|
||||
LOG.debug("Target notifies intermediate about action {} with status {}.", actionId,
|
||||
actionUpdateStatus.getActionStatus().name());
|
||||
|
||||
if (actionId == null) {
|
||||
logAndThrowMessageError(message, "Invalid message no action id");
|
||||
@@ -366,8 +368,8 @@ public class AmqpMessageHandlerService {
|
||||
final Action action = controllerManagement.findActionWithDetails(actionId);
|
||||
|
||||
if (action == null) {
|
||||
logAndThrowMessageError(message, "Got intermediate notification about action " + actionId
|
||||
+ " but action does not exist");
|
||||
logAndThrowMessageError(message,
|
||||
"Got intermediate notification about action " + actionId + " but action does not exist");
|
||||
}
|
||||
return action;
|
||||
}
|
||||
@@ -381,27 +383,11 @@ public class AmqpMessageHandlerService {
|
||||
// back to running action status
|
||||
|
||||
} else {
|
||||
logAndThrowMessageError(message, "Cancel Recjected message is not allowed, if action is on state: "
|
||||
+ action.getStatus());
|
||||
logAndThrowMessageError(message,
|
||||
"Cancel Recjected message is not allowed, if action is on state: " + action.getStatus());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Is needed to convert a incoming message to is originally object type.
|
||||
*
|
||||
* @param message
|
||||
* the message to convert.
|
||||
* @param clazz
|
||||
* the class of the originally object.
|
||||
* @return
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> T convertMessage(final Message message, final Class<T> clazz) {
|
||||
message.getMessageProperties().getHeaders()
|
||||
.put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, clazz.getTypeName());
|
||||
return (T) rabbitTemplate.getMessageConverter().fromMessage(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is needed to verify if an incoming message has the content type json.
|
||||
*
|
||||
@@ -428,12 +414,12 @@ public class AmqpMessageHandlerService {
|
||||
this.hostnameResolver = hostnameResolver;
|
||||
}
|
||||
|
||||
void setRabbitTemplate(final RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
void setMessageConverter(final MessageConverter messageConverter) {
|
||||
this.messageConverter = messageConverter;
|
||||
}
|
||||
|
||||
MessageConverter getMessageConverter() {
|
||||
return rabbitTemplate.getMessageConverter();
|
||||
return messageConverter;
|
||||
}
|
||||
|
||||
void setAuthenticationManager(final AmqpControllerAuthentfication authenticationManager) {
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
|
||||
/**
|
||||
* Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved.
|
||||
*/
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface AmqpSenderService {
|
||||
|
||||
/**
|
||||
*
|
||||
* @param message
|
||||
* @param uri
|
||||
*/
|
||||
void sendMessage(Message message, URI uri);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,60 @@
|
||||
/**
|
||||
* Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved.
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
/**
|
||||
* @author Dennis Melzer
|
||||
*
|
||||
*/
|
||||
public class BaseAmqpService {
|
||||
|
||||
protected static final String VIRTUAL_HOST_MESSAGE_HEADER = "VHOST_HEADER";
|
||||
|
||||
protected MessageConverter messageConverter;
|
||||
|
||||
protected RabbitTemplate spInternalConnectorTemplate;
|
||||
|
||||
@Autowired
|
||||
public BaseAmqpService(final MessageConverter messageConverter, final RabbitTemplate defaultTemplate) {
|
||||
this.messageConverter = messageConverter;
|
||||
spInternalConnectorTemplate = defaultTemplate;
|
||||
}
|
||||
|
||||
protected String getVirtualHost(final Message message) {
|
||||
final Object virtualHost = message.getMessageProperties().getHeaders().get(VIRTUAL_HOST_MESSAGE_HEADER);
|
||||
|
||||
if (virtualHost == null) {
|
||||
return spInternalConnectorTemplate.getConnectionFactory().getVirtualHost();
|
||||
}
|
||||
return virtualHost.toString();
|
||||
}
|
||||
|
||||
protected void cleanMessage(final Message message) {
|
||||
message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME);
|
||||
message.getMessageProperties().getHeaders().remove(VIRTUAL_HOST_MESSAGE_HEADER);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is needed to convert a incoming message to is originally object type.
|
||||
*
|
||||
* @param message
|
||||
* the message to convert.
|
||||
* @param clazz
|
||||
* the class of the originally object.
|
||||
* @return
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T convertMessage(final Message message, final Class<T> clazz) {
|
||||
message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME,
|
||||
clazz.getTypeName());
|
||||
return (T) messageConverter.fromMessage(message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
/**
|
||||
* Copyright (c) 2011-2016 Bosch Software Innovations GmbH, Germany. All rights reserved.
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class DefaultAmqpSenderService extends BaseAmqpService implements AmqpSenderService {
|
||||
|
||||
/**
|
||||
* @param messageConverter
|
||||
* @param defaultTemplate
|
||||
*/
|
||||
public DefaultAmqpSenderService(final RabbitTemplate defaultTemplate) {
|
||||
super(defaultTemplate.getMessageConverter(), defaultTemplate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendMessage(final Message message, final URI uri) {
|
||||
spInternalConnectorTemplate.send(uri.getPath(), message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -60,11 +60,8 @@ public class AmqpControllerAuthentficationTest {
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService();
|
||||
messageConverter = new Jackson2JsonMessageConverter();
|
||||
final RabbitTemplate rabbitTemplate = new RabbitTemplate();
|
||||
rabbitTemplate.setMessageConverter(messageConverter);
|
||||
amqpMessageHandlerService.setRabbitTemplate(rabbitTemplate);
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService(messageConverter, mock(RabbitTemplate.class));
|
||||
|
||||
authenticationManager = new AmqpControllerAuthentfication();
|
||||
authenticationManager.setControllerManagement(mock(ControllerManagement.class));
|
||||
@@ -78,7 +75,6 @@ public class AmqpControllerAuthentficationTest {
|
||||
final ControllerManagement controllerManagement = mock(ControllerManagement.class);
|
||||
when(controllerManagement.getSecurityTokenByControllerId(anyString())).thenReturn(CONTROLLLER_ID);
|
||||
authenticationManager.setControllerManagement(controllerManagement);
|
||||
|
||||
amqpMessageHandlerService.setArtifactManagement(mock(ArtifactManagement.class));
|
||||
|
||||
authenticationManager.setTenantAware(new SecurityContextTenantAware());
|
||||
|
||||
@@ -15,7 +15,6 @@ import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Matchers.anyObject;
|
||||
import static org.mockito.Matchers.anyString;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@@ -59,6 +58,8 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
|
||||
private AmqpMessageDispatcherService amqpMessageDispatcherService;
|
||||
|
||||
private AmqpSenderService senderService;
|
||||
|
||||
private MessageConverter messageConverter;
|
||||
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
@@ -68,7 +69,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
@Override
|
||||
public void before() throws Exception {
|
||||
super.before();
|
||||
amqpMessageDispatcherService = new AmqpMessageDispatcherService();
|
||||
amqpMessageDispatcherService = new AmqpMessageDispatcherService(messageConverter, rabbitTemplate);
|
||||
amqpMessageDispatcherService = spy(amqpMessageDispatcherService);
|
||||
messageConverter = new Jackson2JsonMessageConverter();
|
||||
|
||||
@@ -78,16 +79,17 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
this.rabbitTemplate = Mockito.mock(RabbitTemplate.class);
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter);
|
||||
|
||||
amqpMessageDispatcherService.setRabbitTemplate(rabbitTemplate);
|
||||
amqpMessageDispatcherService.setTenantAware(tenantAware);
|
||||
amqpMessageDispatcherService.setArtifactUrlHandler(artifactUrlHandlerMock);
|
||||
|
||||
senderService = new DefaultAmqpSenderService(rabbitTemplate);
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("Verfies that download and install event with no software modul works")
|
||||
public void testSendDownloadRequesWithEmptySoftwareModules() {
|
||||
final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent(
|
||||
1L, "default", CONTROLLER_ID, 1l, new ArrayList<SoftwareModule>(), IpUtil.createAmqpUri("mytest"));
|
||||
1L, "default", CONTROLLER_ID, 1l, new ArrayList<SoftwareModule>(),
|
||||
IpUtil.createAmqpUri("vHost", "mytest"));
|
||||
amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent);
|
||||
final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost());
|
||||
final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage);
|
||||
@@ -100,7 +102,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
final DistributionSet dsA = TestDataUtil.generateDistributionSet("", softwareManagement,
|
||||
distributionSetManagement);
|
||||
final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent(
|
||||
1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("mytest"));
|
||||
1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("vHost", "mytest"));
|
||||
amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent);
|
||||
final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost());
|
||||
final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage);
|
||||
@@ -134,7 +136,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
Mockito.when(rabbitTemplate.convertSendAndReceive(any())).thenReturn(receivedList);
|
||||
|
||||
final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent(
|
||||
1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("mytest"));
|
||||
1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("vHost", "mytest"));
|
||||
amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent);
|
||||
final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost());
|
||||
final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage);
|
||||
@@ -152,7 +154,7 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
@Description("Verfies that send cancel event works")
|
||||
public void testSendCancelRequest() {
|
||||
final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent = new CancelTargetAssignmentEvent(
|
||||
1L, "default", CONTROLLER_ID, 1l, IpUtil.createAmqpUri("mytest"));
|
||||
1L, "default", CONTROLLER_ID, 1l, IpUtil.createAmqpUri("vHost", "mytest"));
|
||||
amqpMessageDispatcherService
|
||||
.targetCancelAssignmentToDistributionSet(cancelTargetAssignmentDistributionSetEvent);
|
||||
final Message sendMessage = createArgumentCapture(
|
||||
@@ -194,7 +196,8 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
|
||||
protected Message createArgumentCapture(final String exchange) {
|
||||
final ArgumentCaptor<Message> argumentCaptor = ArgumentCaptor.forClass(Message.class);
|
||||
Mockito.verify(amqpMessageDispatcherService).sendMessage(eq(exchange), argumentCaptor.capture());
|
||||
// Mockito.verify(senderService).sendMessage(argumentCaptor.capture(),
|
||||
// eq(exchange));
|
||||
return argumentCaptor.getValue();
|
||||
}
|
||||
|
||||
|
||||
@@ -99,14 +99,14 @@ public class AmqpMessageHandlerServiceTest {
|
||||
@Mock
|
||||
private EventBus eventBus;
|
||||
|
||||
@Mock
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService();
|
||||
amqpMessageHandlerService.setControllerManagement(controllerManagementMock);
|
||||
messageConverter = new Jackson2JsonMessageConverter();
|
||||
final RabbitTemplate rabbitTemplate = new RabbitTemplate();
|
||||
rabbitTemplate.setMessageConverter(messageConverter);
|
||||
amqpMessageHandlerService.setRabbitTemplate(rabbitTemplate);
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService(messageConverter, rabbitTemplate);
|
||||
amqpMessageHandlerService.setControllerManagement(controllerManagementMock);
|
||||
amqpMessageHandlerService.setAuthenticationManager(authenticationManagerMock);
|
||||
amqpMessageHandlerService.setArtifactManagement(artifactManagementMock);
|
||||
amqpMessageHandlerService.setCache(cacheMock);
|
||||
|
||||
@@ -95,7 +95,6 @@ public final class IpUtil {
|
||||
if (isIpV6) {
|
||||
return URI.create(scheme + SCHEME_SEPERATOR + "[" + host + "]");
|
||||
}
|
||||
|
||||
return URI.create(scheme + SCHEME_SEPERATOR + host);
|
||||
}
|
||||
|
||||
@@ -108,8 +107,9 @@ public final class IpUtil {
|
||||
* @throws IllegalArgumentException
|
||||
* If the given string not parsable
|
||||
*/
|
||||
public static URI createAmqpUri(final String host) {
|
||||
return createUri(AMPQP_SCHEME, host);
|
||||
public static URI createAmqpUri(final String virtualHost, final String exchange) {
|
||||
// TODO check
|
||||
return createUri(AMPQP_SCHEME, virtualHost).resolve(exchange);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -104,23 +104,24 @@ public class IpUtilTest {
|
||||
@Description("Tests create amqp uri ipv4 and ipv6")
|
||||
public void testCreateAmqpUri() {
|
||||
final String ipv4 = "10.99.99.1";
|
||||
URI amqpUri = IpUtil.createAmqpUri(ipv4);
|
||||
URI amqpUri = IpUtil.createAmqpUri(ipv4, "path");
|
||||
assertAmqpUri(ipv4, amqpUri);
|
||||
|
||||
final String host = "myhost";
|
||||
amqpUri = IpUtil.createAmqpUri(host);
|
||||
amqpUri = IpUtil.createAmqpUri(host, "path");
|
||||
assertAmqpUri(host, amqpUri);
|
||||
|
||||
final String ipv6 = "0:0:0:0:0:0:0:1";
|
||||
amqpUri = IpUtil.createAmqpUri(ipv6);
|
||||
amqpUri = IpUtil.createAmqpUri(ipv6, "path");
|
||||
assertAmqpUri("[" + ipv6 + "]", amqpUri);
|
||||
}
|
||||
|
||||
private void assertAmqpUri(final String host, final URI httpUri) {
|
||||
assertTrue(IpUtil.isAmqpUri(httpUri));
|
||||
assertFalse(IpUtil.isHttpUri(httpUri));
|
||||
assertEquals(host, httpUri.getHost());
|
||||
assertEquals("amqp", httpUri.getScheme());
|
||||
private void assertAmqpUri(final String host, final URI amqpUri) {
|
||||
assertTrue(IpUtil.isAmqpUri(amqpUri));
|
||||
assertFalse(IpUtil.isHttpUri(amqpUri));
|
||||
assertEquals(host, amqpUri.getHost());
|
||||
assertEquals("amqp", amqpUri.getScheme());
|
||||
assertEquals("path", amqpUri.getPath());
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
|
||||
Reference in New Issue
Block a user