Merge pull request #57 from bsinno/Multiple_VHost_Connection
ok merging.
This commit is contained in:
@@ -51,7 +51,12 @@ public class TenantAwareCacheManager implements TenancyCacheManager {
|
||||
|
||||
@Override
|
||||
public Cache getCache(final String name) {
|
||||
final String currentTenant = tenantAware.getCurrentTenant().toUpperCase();
|
||||
String currentTenant = tenantAware.getCurrentTenant();
|
||||
if (currentTenant == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
currentTenant = currentTenant.toUpperCase();
|
||||
if (currentTenant.contains(TENANT_CACHE_DELIMITER)) {
|
||||
return null;
|
||||
}
|
||||
@@ -60,7 +65,12 @@ public class TenantAwareCacheManager implements TenancyCacheManager {
|
||||
|
||||
@Override
|
||||
public Collection<String> getCacheNames() {
|
||||
final String currentTenant = tenantAware.getCurrentTenant().toUpperCase();
|
||||
String currentTenant = tenantAware.getCurrentTenant();
|
||||
if (currentTenant == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
currentTenant = currentTenant.toUpperCase();
|
||||
if (currentTenant.contains(TENANT_CACHE_DELIMITER)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@@ -23,6 +23,7 @@ import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
|
||||
@@ -121,11 +122,22 @@ public class AmqpConfiguration {
|
||||
/**
|
||||
* Create amqp handler service bean.
|
||||
*
|
||||
* @return
|
||||
* @return handler service bean
|
||||
*/
|
||||
@Bean
|
||||
public AmqpMessageHandlerService amqpMessageHandlerService() {
|
||||
return new AmqpMessageHandlerService();
|
||||
return new AmqpMessageHandlerService(rabbitTemplate);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create default amqp sender service bean.
|
||||
*
|
||||
* @return the default amqp sender service bean
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnMissingBean
|
||||
public AmqpSenderService amqpSenderServiceBean() {
|
||||
return new DefaultAmqpSenderService(rabbitTemplate);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -127,10 +127,7 @@ public class AmqpControllerAuthentfication {
|
||||
|
||||
LOGGER.debug("preAuthenticatedPrincipal = {} trying to authenticate", principal);
|
||||
|
||||
final PreAuthenticatedAuthenticationToken authRequest = new PreAuthenticatedAuthenticationToken(principal,
|
||||
credentials);
|
||||
|
||||
return authRequest;
|
||||
return new PreAuthenticatedAuthenticationToken(principal, credentials);
|
||||
}
|
||||
|
||||
public void setControllerManagement(final ControllerManagement controllerManagement) {
|
||||
|
||||
@@ -25,35 +25,43 @@ import org.eclipse.hawkbit.eventbus.EventSubscriber;
|
||||
import org.eclipse.hawkbit.eventbus.event.CancelTargetAssignmentEvent;
|
||||
import org.eclipse.hawkbit.eventbus.event.TargetAssignDistributionSetEvent;
|
||||
import org.eclipse.hawkbit.repository.model.LocalArtifact;
|
||||
import org.eclipse.hawkbit.tenancy.TenantAware;
|
||||
import org.eclipse.hawkbit.util.ArtifactUrlHandler;
|
||||
import org.eclipse.hawkbit.util.IpUtil;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import com.google.common.eventbus.Subscribe;
|
||||
|
||||
/**
|
||||
* {@link AmqpMessageDispatcherService} handles all outgoing AMQP messages.
|
||||
*
|
||||
*
|
||||
* {@link AmqpMessageDispatcherService} create all outgoing AMQP messages and
|
||||
* delegate the messages to a {@link AmqpSenderService}.
|
||||
*
|
||||
* Additionally the dispatcher listener/subscribe for some target events e.g.
|
||||
* assignment.
|
||||
*
|
||||
*/
|
||||
@EventSubscriber
|
||||
public class AmqpMessageDispatcherService {
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Autowired
|
||||
private TenantAware tenantAware;
|
||||
public class AmqpMessageDispatcherService extends BaseAmqpService {
|
||||
|
||||
@Autowired
|
||||
private ArtifactUrlHandler artifactUrlHandler;
|
||||
|
||||
@Autowired
|
||||
private AmqpSenderService amqpSenderService;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param messageConverter
|
||||
* message converter
|
||||
*/
|
||||
@Autowired
|
||||
public AmqpMessageDispatcherService(final MessageConverter messageConverter) {
|
||||
super(messageConverter);
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to send a message to a RabbitMQ Exchange after the Distribution
|
||||
* set has been assign to a Target.
|
||||
@@ -79,11 +87,9 @@ public class AmqpMessageDispatcherService {
|
||||
downloadAndUpdateRequest.addSoftwareModule(amqpSoftwareModule);
|
||||
}
|
||||
|
||||
final Message message = rabbitTemplate.getMessageConverter().toMessage(
|
||||
downloadAndUpdateRequest,
|
||||
createConnectorMessageProperties(targetAssignDistributionSetEvent.getTenant(), controllerId,
|
||||
EventTopic.DOWNLOAD_AND_INSTALL));
|
||||
sendMessage(targetAdress.getHost(), message);
|
||||
final Message message = messageConverter.toMessage(downloadAndUpdateRequest, createConnectorMessageProperties(
|
||||
targetAssignDistributionSetEvent.getTenant(), controllerId, EventTopic.DOWNLOAD_AND_INSTALL));
|
||||
amqpSenderService.sendMessage(message, targetAdress);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -98,29 +104,13 @@ public class AmqpMessageDispatcherService {
|
||||
final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent) {
|
||||
final String controllerId = cancelTargetAssignmentDistributionSetEvent.getControllerId();
|
||||
final Long actionId = cancelTargetAssignmentDistributionSetEvent.getActionId();
|
||||
final Message message = rabbitTemplate.getMessageConverter().toMessage(
|
||||
actionId,
|
||||
createConnectorMessageProperties(cancelTargetAssignmentDistributionSetEvent.getTenant(), controllerId,
|
||||
EventTopic.CANCEL_DOWNLOAD));
|
||||
final Message message = messageConverter.toMessage(actionId, createConnectorMessageProperties(
|
||||
cancelTargetAssignmentDistributionSetEvent.getTenant(), controllerId, EventTopic.CANCEL_DOWNLOAD));
|
||||
|
||||
sendMessage(cancelTargetAssignmentDistributionSetEvent.getTargetAdress().getHost(), message);
|
||||
amqpSenderService.sendMessage(message, cancelTargetAssignmentDistributionSetEvent.getTargetAdress());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Send message to exchange.
|
||||
*
|
||||
* @param exchange
|
||||
* the exchange
|
||||
* @param message
|
||||
* the message
|
||||
*/
|
||||
public void sendMessage(final String exchange, final Message message) {
|
||||
message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME);
|
||||
rabbitTemplate.setExchange(exchange);
|
||||
rabbitTemplate.send(message);
|
||||
}
|
||||
|
||||
private MessageProperties createConnectorMessageProperties(final String tenant, final String controllerId,
|
||||
final EventTopic topic) {
|
||||
final MessageProperties messageProperties = createMessageProperties();
|
||||
@@ -155,9 +145,8 @@ public class AmqpMessageDispatcherService {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
final List<Artifact> convertedArtifacts = localArtifacts.stream()
|
||||
.map(localArtifact -> convertArtifact(targetId, localArtifact)).collect(Collectors.toList());
|
||||
return convertedArtifacts;
|
||||
return localArtifacts.stream().map(localArtifact -> convertArtifact(targetId, localArtifact))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
private Artifact convertArtifact(final String targetId, final LocalArtifact localArtifact) {
|
||||
@@ -175,15 +164,11 @@ public class AmqpMessageDispatcherService {
|
||||
return artifact;
|
||||
}
|
||||
|
||||
public void setTenantAware(final TenantAware tenantAware) {
|
||||
this.tenantAware = tenantAware;
|
||||
}
|
||||
|
||||
public void setRabbitTemplate(final RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
}
|
||||
|
||||
public void setArtifactUrlHandler(final ArtifactUrlHandler artifactUrlHandler) {
|
||||
this.artifactUrlHandler = artifactUrlHandler;
|
||||
}
|
||||
|
||||
public void setAmqpSenderService(final AmqpSenderService amqpSenderService) {
|
||||
this.amqpSenderService = amqpSenderService;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -12,7 +12,6 @@ import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@@ -50,8 +49,6 @@ import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.cache.Cache;
|
||||
@@ -72,19 +69,14 @@ import com.google.common.eventbus.EventBus;
|
||||
|
||||
/**
|
||||
*
|
||||
* {@link AmqpMessageHandlerService} handles all incoming AMQP messages.
|
||||
*
|
||||
*
|
||||
*
|
||||
* {@link AmqpMessageHandlerService} handles all incoming AMQP messages for the
|
||||
* queue which is configure for the property hawkbit.dmf.rabbitmq.receiverQueue.
|
||||
*
|
||||
*/
|
||||
public class AmqpMessageHandlerService {
|
||||
public class AmqpMessageHandlerService extends BaseAmqpService {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(AmqpMessageHandlerService.class);
|
||||
|
||||
@Autowired
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Autowired
|
||||
private ControllerManagement controllerManagement;
|
||||
|
||||
@@ -104,8 +96,27 @@ public class AmqpMessageHandlerService {
|
||||
@Autowired
|
||||
private HostnameResolver hostnameResolver;
|
||||
|
||||
private final RabbitTemplate internalAmqpTemplate;
|
||||
|
||||
/**
|
||||
* /** Method to handle all incoming amqp messages.
|
||||
* Constructor.
|
||||
*
|
||||
* @param defaultTemplate
|
||||
* the configured amqp template.
|
||||
*/
|
||||
public AmqpMessageHandlerService(final RabbitTemplate defaultTemplate) {
|
||||
super(defaultTemplate.getMessageConverter());
|
||||
this.internalAmqpTemplate = defaultTemplate;
|
||||
}
|
||||
|
||||
@RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory")
|
||||
private Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type,
|
||||
@Header(MessageHeaderKey.TENANT) final String tenant) {
|
||||
return onMessage(message, type, tenant, internalAmqpTemplate.getConnectionFactory().getVirtualHost());
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to handle all incoming amqp messages.
|
||||
*
|
||||
* @param message
|
||||
* incoming message
|
||||
@@ -115,11 +126,11 @@ public class AmqpMessageHandlerService {
|
||||
* the contentType of the message
|
||||
* @param tenant
|
||||
* the contentType of the message
|
||||
* @param virtualHost
|
||||
* the virtual host
|
||||
* @return a message if <null> no message is send back to sender
|
||||
*/
|
||||
@RabbitListener(queues = "${hawkbit.dmf.rabbitmq.receiverQueue}", containerFactory = "listenerContainerFactory")
|
||||
public Message onMessage(final Message message, @Header(MessageHeaderKey.TYPE) final String type,
|
||||
@Header(MessageHeaderKey.TENANT) final String tenant) {
|
||||
public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) {
|
||||
checkContentTypeJson(message);
|
||||
final SecurityContext oldContext = SecurityContextHolder.getContext();
|
||||
try {
|
||||
@@ -127,7 +138,7 @@ public class AmqpMessageHandlerService {
|
||||
switch (messageType) {
|
||||
case THING_CREATED:
|
||||
setTenantSecurityContext(tenant);
|
||||
registerTarget(message);
|
||||
registerTarget(message, virtualHost);
|
||||
break;
|
||||
case EVENT:
|
||||
setTenantSecurityContext(tenant);
|
||||
@@ -153,8 +164,8 @@ public class AmqpMessageHandlerService {
|
||||
final String sha1 = secruityToken.getSha1();
|
||||
try {
|
||||
SecurityContextHolder.getContext().setAuthentication(authenticationManager.doAuthenticate(secruityToken));
|
||||
final LocalArtifact localArtifact = artifactManagement.findFirstLocalArtifactsBySHA1(secruityToken
|
||||
.getSha1());
|
||||
final LocalArtifact localArtifact = artifactManagement
|
||||
.findFirstLocalArtifactsBySHA1(secruityToken.getSha1());
|
||||
if (localArtifact == null) {
|
||||
throw new EntityNotFoundException();
|
||||
}
|
||||
@@ -177,9 +188,9 @@ public class AmqpMessageHandlerService {
|
||||
final String downloadId = UUID.randomUUID().toString();
|
||||
final DownloadArtifactCache downloadCache = new DownloadArtifactCache(DownloadType.BY_SHA1, sha1);
|
||||
cache.put(downloadId, downloadCache);
|
||||
authentificationResponse.setDownloadUrl(UriComponentsBuilder
|
||||
.fromUri(hostnameResolver.resolveHostname().toURI()).path("/api/v1/downloadserver/downloadId/")
|
||||
.path(downloadId).build().toUriString());
|
||||
authentificationResponse
|
||||
.setDownloadUrl(UriComponentsBuilder.fromUri(hostnameResolver.resolveHostname().toURI())
|
||||
.path("/api/v1/downloadserver/downloadId/").path(downloadId).build().toUriString());
|
||||
authentificationResponse.setResponseCode(HttpStatus.OK.value());
|
||||
} catch (final BadCredentialsException | AuthenticationServiceException | CredentialsExpiredException e) {
|
||||
LOG.error("Login failed", e);
|
||||
@@ -196,7 +207,7 @@ public class AmqpMessageHandlerService {
|
||||
authentificationResponse.setMessage(errorMessage);
|
||||
}
|
||||
|
||||
return rabbitTemplate.getMessageConverter().toMessage(authentificationResponse, messageProperties);
|
||||
return messageConverter.toMessage(authentificationResponse, messageProperties);
|
||||
}
|
||||
|
||||
private static Artifact convertDbArtifact(final DbArtifact dbArtifact) {
|
||||
@@ -207,11 +218,6 @@ public class AmqpMessageHandlerService {
|
||||
return artifact;
|
||||
}
|
||||
|
||||
protected void logAndThrowMessageError(final Message message, final String error) {
|
||||
LOG.error("Error \"{}\" reported by message {}", error, message.getMessageProperties().getMessageId());
|
||||
throw new IllegalArgumentException(error);
|
||||
}
|
||||
|
||||
private static void setSecurityContext(final Authentication authentication) {
|
||||
final SecurityContextImpl securityContextImpl = new SecurityContextImpl();
|
||||
securityContextImpl.setAuthentication(authentication);
|
||||
@@ -219,22 +225,13 @@ public class AmqpMessageHandlerService {
|
||||
}
|
||||
|
||||
private static void setTenantSecurityContext(final String tenantId) {
|
||||
final AnonymousAuthenticationToken authenticationToken = new AnonymousAuthenticationToken(UUID.randomUUID()
|
||||
.toString(), "AMQP-Controller", Collections.singletonList(new SimpleGrantedAuthority(
|
||||
SpringEvalExpressions.CONTROLLER_ROLE_ANONYMOUS)));
|
||||
final AnonymousAuthenticationToken authenticationToken = new AnonymousAuthenticationToken(
|
||||
UUID.randomUUID().toString(), "AMQP-Controller",
|
||||
Collections.singletonList(new SimpleGrantedAuthority(SpringEvalExpressions.CONTROLLER_ROLE_ANONYMOUS)));
|
||||
authenticationToken.setDetails(new TenantAwareAuthenticationDetails(tenantId, true));
|
||||
setSecurityContext(authenticationToken);
|
||||
}
|
||||
|
||||
private String getStringHeaderKey(final Message message, final String key, final String errorMessageIfNull) {
|
||||
final Map<String, Object> header = message.getMessageProperties().getHeaders();
|
||||
final Object value = header.get(key);
|
||||
if (value == null) {
|
||||
logAndThrowMessageError(message, errorMessageIfNull);
|
||||
}
|
||||
return value.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Method to create a new target or to find the target if it already exists.
|
||||
*
|
||||
@@ -243,14 +240,15 @@ public class AmqpMessageHandlerService {
|
||||
* @param ip
|
||||
* the ip of the target/thing
|
||||
*/
|
||||
private void registerTarget(final Message message) {
|
||||
private void registerTarget(final Message message, final String virtualHost) {
|
||||
final String thingId = getStringHeaderKey(message, MessageHeaderKey.THING_ID, "ThingId is null");
|
||||
final String replyTo = message.getMessageProperties().getReplyTo();
|
||||
|
||||
if (StringUtils.isEmpty(replyTo)) {
|
||||
logAndThrowMessageError(message, "No ReplyTo was set for the createThing Event.");
|
||||
}
|
||||
final URI amqpUri = IpUtil.createAmqpUri(replyTo);
|
||||
|
||||
final URI amqpUri = IpUtil.createAmqpUri(virtualHost, replyTo);
|
||||
final Target target = controllerManagement.findOrRegisterTargetIfItDoesNotexist(thingId, amqpUri);
|
||||
LOG.debug("Target {} reported online state.", thingId);
|
||||
|
||||
@@ -267,8 +265,8 @@ public class AmqpMessageHandlerService {
|
||||
final DistributionSet distributionSet = action.getDistributionSet();
|
||||
final List<SoftwareModule> softwareModuleList = controllerManagement
|
||||
.findSoftwareModulesByDistributionSet(distributionSet);
|
||||
eventBus.post(new TargetAssignDistributionSetEvent(target.getOptLockRevision(), target.getTenant(), target
|
||||
.getControllerId(), action.getId(), softwareModuleList, target.getTargetInfo().getAddress()));
|
||||
eventBus.post(new TargetAssignDistributionSetEvent(target.getOptLockRevision(), target.getTenant(),
|
||||
target.getControllerId(), action.getId(), softwareModuleList, target.getTargetInfo().getAddress()));
|
||||
|
||||
}
|
||||
|
||||
@@ -281,13 +279,11 @@ public class AmqpMessageHandlerService {
|
||||
* the topic of the event.
|
||||
*/
|
||||
private void handleIncomingEvent(final Message message, final EventTopic topic) {
|
||||
switch (topic) {
|
||||
case UPDATE_ACTION_STATUS:
|
||||
if (EventTopic.UPDATE_ACTION_STATUS.equals(topic)) {
|
||||
updateActionStatus(message);
|
||||
return;
|
||||
default:
|
||||
logAndThrowMessageError(message, "Got event without appropriate topic.");
|
||||
}
|
||||
logAndThrowMessageError(message, "Got event without appropriate topic.");
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -336,28 +332,24 @@ public class AmqpMessageHandlerService {
|
||||
logAndThrowMessageError(message, "Status for action does not exisit.");
|
||||
}
|
||||
|
||||
Action addUpdateActionStatus;
|
||||
|
||||
if (!actionStatus.getStatus().equals(Status.CANCELED)) {
|
||||
addUpdateActionStatus = controllerManagement.addUpdateActionStatus(actionStatus, action);
|
||||
} else {
|
||||
addUpdateActionStatus = controllerManagement.addCancelActionStatus(actionStatus, action);
|
||||
}
|
||||
final Action addUpdateActionStatus = getUpdateActionStatus(action, actionStatus);
|
||||
|
||||
if (!addUpdateActionStatus.isActive()) {
|
||||
lookIfUpdateAvailable(action.getTarget());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param message
|
||||
* @param actionUpdateStatus
|
||||
* @return
|
||||
*/
|
||||
private Action getUpdateActionStatus(final Action action, final ActionStatus actionStatus) {
|
||||
if (actionStatus.getStatus().equals(Status.CANCELED)) {
|
||||
return controllerManagement.addCancelActionStatus(actionStatus, action);
|
||||
}
|
||||
return controllerManagement.addUpdateActionStatus(actionStatus, action);
|
||||
}
|
||||
|
||||
private Action checkActionExist(final Message message, final ActionUpdateStatus actionUpdateStatus) {
|
||||
final Long actionId = actionUpdateStatus.getActionId();
|
||||
LOG.debug("Target notifies intermediate about action {} with status {}.", actionId, actionUpdateStatus
|
||||
.getActionStatus().name());
|
||||
LOG.debug("Target notifies intermediate about action {} with status {}.", actionId,
|
||||
actionUpdateStatus.getActionStatus().name());
|
||||
|
||||
if (actionId == null) {
|
||||
logAndThrowMessageError(message, "Invalid message no action id");
|
||||
@@ -366,8 +358,8 @@ public class AmqpMessageHandlerService {
|
||||
final Action action = controllerManagement.findActionWithDetails(actionId);
|
||||
|
||||
if (action == null) {
|
||||
logAndThrowMessageError(message, "Got intermediate notification about action " + actionId
|
||||
+ " but action does not exist");
|
||||
logAndThrowMessageError(message,
|
||||
"Got intermediate notification about action " + actionId + " but action does not exist");
|
||||
}
|
||||
return action;
|
||||
}
|
||||
@@ -381,38 +373,12 @@ public class AmqpMessageHandlerService {
|
||||
// back to running action status
|
||||
|
||||
} else {
|
||||
logAndThrowMessageError(message, "Cancel Recjected message is not allowed, if action is on state: "
|
||||
+ action.getStatus());
|
||||
logAndThrowMessageError(message,
|
||||
"Cancel recjected message is not allowed, if action is on state: " + action.getStatus());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Is needed to convert a incoming message to is originally object type.
|
||||
*
|
||||
* @param message
|
||||
* the message to convert.
|
||||
* @param clazz
|
||||
* the class of the originally object.
|
||||
* @return
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private <T> T convertMessage(final Message message, final Class<T> clazz) {
|
||||
message.getMessageProperties().getHeaders()
|
||||
.put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME, clazz.getTypeName());
|
||||
return (T) rabbitTemplate.getMessageConverter().fromMessage(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is needed to verify if an incoming message has the content type json.
|
||||
*
|
||||
* @param message
|
||||
* the to verify
|
||||
* @param contentType
|
||||
* the content type
|
||||
* @return true if the content type has json, false it not.
|
||||
*/
|
||||
|
||||
private static void checkContentTypeJson(final Message message) {
|
||||
private void checkContentTypeJson(final Message message) {
|
||||
final MessageProperties messageProperties = message.getMessageProperties();
|
||||
if (messageProperties.getContentType() != null && messageProperties.getContentType().contains("json")) {
|
||||
return;
|
||||
@@ -428,14 +394,6 @@ public class AmqpMessageHandlerService {
|
||||
this.hostnameResolver = hostnameResolver;
|
||||
}
|
||||
|
||||
void setRabbitTemplate(final RabbitTemplate rabbitTemplate) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
}
|
||||
|
||||
MessageConverter getMessageConverter() {
|
||||
return rabbitTemplate.getMessageConverter();
|
||||
}
|
||||
|
||||
void setAuthenticationManager(final AmqpControllerAuthentfication authenticationManager) {
|
||||
this.authenticationManager = authenticationManager;
|
||||
}
|
||||
|
||||
@@ -8,78 +8,41 @@
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* Bean which holds the necessary properties for configuring the AMQP
|
||||
* connection.
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
*/
|
||||
@ConfigurationProperties("hawkbit.dmf.rabbitmq")
|
||||
public class AmqpProperties {
|
||||
|
||||
private String deadLetterQueue = "dmf_connector_deadletter";
|
||||
private String deadLetterExchange = "dmf.connector.deadletter";
|
||||
private String deadLetterQueue = "dmf_receiver_deadletter";
|
||||
private String deadLetterExchange = "dmf.receiver.deadletter";
|
||||
private String receiverQueue = "dmf_receiver";
|
||||
private boolean missingQueuesFatal = false;
|
||||
|
||||
/**
|
||||
* Is missingQueuesFatal enabled
|
||||
*
|
||||
* @see SimpleMessageListenerContainer#setMissingQueuesFatal
|
||||
* @return the missingQueuesFatal <true> enabled <false> disabled
|
||||
*/
|
||||
public boolean isMissingQueuesFatal() {
|
||||
return missingQueuesFatal;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param missingQueuesFatal
|
||||
* the missingQueuesFatal to set.
|
||||
* @see SimpleMessageListenerContainer#setMissingQueuesFatal
|
||||
*/
|
||||
public void setMissingQueuesFatal(final boolean missingQueuesFatal) {
|
||||
this.missingQueuesFatal = missingQueuesFatal;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the dead letter exchange.
|
||||
*
|
||||
* @return dead letter exchange
|
||||
*/
|
||||
public String getDeadLetterExchange() {
|
||||
return deadLetterExchange;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the dead letter exchange.
|
||||
*
|
||||
* @param deadLetterExchange
|
||||
* the deadLetterExchange to be set
|
||||
*/
|
||||
public void setDeadLetterExchange(final String deadLetterExchange) {
|
||||
this.deadLetterExchange = deadLetterExchange;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the dead letter queue.
|
||||
*
|
||||
* @return the dead letter queue
|
||||
*/
|
||||
public String getDeadLetterQueue() {
|
||||
return deadLetterQueue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the dead letter queue.
|
||||
*
|
||||
* @param deadLetterQueue
|
||||
* the deadLetterQueue ro be set
|
||||
*/
|
||||
public void setDeadLetterQueue(final String deadLetterQueue) {
|
||||
this.deadLetterQueue = deadLetterQueue;
|
||||
}
|
||||
|
||||
@@ -0,0 +1,44 @@
|
||||
/**
|
||||
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
|
||||
/**
|
||||
* Interface to send a amqp message.
|
||||
*/
|
||||
@FunctionalInterface
|
||||
public interface AmqpSenderService {
|
||||
|
||||
/**
|
||||
* Send the given message to the given uri. The uri contains the (virtual)
|
||||
* host and exchange e.g amqp://host/exchange.
|
||||
*
|
||||
* @param message
|
||||
* the amqp message
|
||||
* @param uri
|
||||
* the reply to uri
|
||||
*/
|
||||
void sendMessage(Message message, URI uri);
|
||||
|
||||
/**
|
||||
* Extract the exchange from the uri. Default implementation removes the
|
||||
* first /.
|
||||
*
|
||||
* @param amqpUri
|
||||
* the amqp uri
|
||||
* @return the exchange.
|
||||
*/
|
||||
default String extractExchange(final URI amqpUri) {
|
||||
return amqpUri.getPath().substring(1);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,110 @@
|
||||
/**
|
||||
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
|
||||
/**
|
||||
* A base class which provide basis amqp staff.
|
||||
*/
|
||||
public class BaseAmqpService {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(BaseAmqpService.class);
|
||||
protected MessageConverter messageConverter;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param messageConverter
|
||||
* the message messageConverter.
|
||||
*/
|
||||
public BaseAmqpService(final MessageConverter messageConverter) {
|
||||
this.messageConverter = messageConverter;
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean message properties before sending a message.
|
||||
*
|
||||
* @param message
|
||||
* the message to cleaned up
|
||||
*/
|
||||
protected void cleanMessageHeaderProperties(final Message message) {
|
||||
message.getMessageProperties().getHeaders().remove(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is needed to convert a incoming message to is originally object type.
|
||||
*
|
||||
* @param message
|
||||
* the message to convert.
|
||||
* @param clazz
|
||||
* the class of the originally object.
|
||||
* @return the converted object
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> T convertMessage(final Message message, final Class<T> clazz) {
|
||||
if (message == null) {
|
||||
return null;
|
||||
}
|
||||
message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME,
|
||||
clazz.getName());
|
||||
return (T) messageConverter.fromMessage(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Is needed to convert a incoming message to is originally list object
|
||||
* type.
|
||||
*
|
||||
* @param message
|
||||
* the message to convert.
|
||||
* @param clazz
|
||||
* the class of the list content.
|
||||
* @return the list of converted objects
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
protected <T> List<T> convertMessageList(final Message message, final Class<T> clazz) {
|
||||
if (message == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME,
|
||||
ArrayList.class.getName());
|
||||
message.getMessageProperties().getHeaders().put(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME,
|
||||
clazz.getName());
|
||||
return (List<T>) messageConverter.fromMessage(message);
|
||||
}
|
||||
|
||||
public MessageConverter getMessageConverter() {
|
||||
return messageConverter;
|
||||
}
|
||||
|
||||
protected final String getStringHeaderKey(final Message message, final String key,
|
||||
final String errorMessageIfNull) {
|
||||
final Map<String, Object> header = message.getMessageProperties().getHeaders();
|
||||
final Object value = header.get(key);
|
||||
if (value == null) {
|
||||
logAndThrowMessageError(message, errorMessageIfNull);
|
||||
}
|
||||
return value.toString();
|
||||
}
|
||||
|
||||
protected final void logAndThrowMessageError(final Message message, final String error) {
|
||||
LOGGER.error("Error \"{}\" reported by message {}", error, message.getMessageProperties().getMessageId());
|
||||
throw new IllegalArgumentException(error);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,40 @@
|
||||
/**
|
||||
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit.amqp;
|
||||
|
||||
import java.net.URI;
|
||||
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
|
||||
/**
|
||||
* A default implementation for the sender service. The service sends all amqp
|
||||
* message to the configured spring rabbitmq connections. The exchange is
|
||||
* extracted from the uri.
|
||||
*/
|
||||
public class DefaultAmqpSenderService implements AmqpSenderService {
|
||||
|
||||
private final RabbitTemplate internalAmqpTemplate;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param internalAmqpTemplate
|
||||
* the amqp template
|
||||
*/
|
||||
public DefaultAmqpSenderService(final RabbitTemplate internalAmqpTemplate) {
|
||||
this.internalAmqpTemplate = internalAmqpTemplate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sendMessage(final Message message, final URI uri) {
|
||||
internalAmqpTemplate.send(extractExchange(uri), message);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,48 @@
|
||||
/**
|
||||
* Copyright (c) 2015 Bosch Software Innovations GmbH and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v1.0
|
||||
* which accompanies this distribution, and is available at
|
||||
* http://www.eclipse.org/legal/epl-v10.html
|
||||
*/
|
||||
package org.eclipse.hawkbit;
|
||||
|
||||
import org.eclipse.hawkbit.amqp.AmqpSenderService;
|
||||
import org.eclipse.hawkbit.amqp.DefaultAmqpSenderService;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
@Configuration
|
||||
public class AmqpTestConfiguration {
|
||||
|
||||
/**
|
||||
* Method to set the Jackson2JsonMessageConverter.
|
||||
*
|
||||
* @return the Jackson2JsonMessageConverter
|
||||
*/
|
||||
@Bean
|
||||
public MessageConverter jsonMessageConverter() {
|
||||
return new Jackson2JsonMessageConverter();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create default amqp sender service bean.
|
||||
*
|
||||
* @param rabbitTemplate
|
||||
*
|
||||
* @return the default amqp sender service bean
|
||||
*/
|
||||
@Bean
|
||||
@Autowired
|
||||
public AmqpSenderService amqpSenderServiceBean(final RabbitTemplate rabbitTemplate) {
|
||||
return new DefaultAmqpSenderService(rabbitTemplate);
|
||||
}
|
||||
}
|
||||
@@ -60,11 +60,10 @@ public class AmqpControllerAuthentficationTest {
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService();
|
||||
messageConverter = new Jackson2JsonMessageConverter();
|
||||
final RabbitTemplate rabbitTemplate = new RabbitTemplate();
|
||||
rabbitTemplate.setMessageConverter(messageConverter);
|
||||
amqpMessageHandlerService.setRabbitTemplate(rabbitTemplate);
|
||||
final RabbitTemplate rabbitTemplate = mock(RabbitTemplate.class);
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter);
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate);
|
||||
|
||||
authenticationManager = new AmqpControllerAuthentfication();
|
||||
authenticationManager.setControllerManagement(mock(ControllerManagement.class));
|
||||
@@ -78,7 +77,6 @@ public class AmqpControllerAuthentficationTest {
|
||||
final ControllerManagement controllerManagement = mock(ControllerManagement.class);
|
||||
when(controllerManagement.getSecurityTokenByControllerId(anyString())).thenReturn(CONTROLLLER_ID);
|
||||
authenticationManager.setControllerManagement(controllerManagement);
|
||||
|
||||
amqpMessageHandlerService.setArtifactManagement(mock(ArtifactManagement.class));
|
||||
|
||||
authenticationManager.setTenantAware(new SecurityContextTenantAware());
|
||||
@@ -139,7 +137,7 @@ public class AmqpControllerAuthentficationTest {
|
||||
|
||||
// test
|
||||
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
|
||||
TENANT);
|
||||
TENANT, "vHost");
|
||||
|
||||
// verify
|
||||
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
|
||||
@@ -161,7 +159,7 @@ public class AmqpControllerAuthentficationTest {
|
||||
|
||||
// test
|
||||
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
|
||||
TENANT);
|
||||
TENANT, "vHost");
|
||||
|
||||
// verify
|
||||
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
|
||||
@@ -183,7 +181,7 @@ public class AmqpControllerAuthentficationTest {
|
||||
|
||||
// test
|
||||
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
|
||||
TENANT);
|
||||
TENANT, "vHost");
|
||||
|
||||
// verify
|
||||
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
|
||||
|
||||
@@ -19,6 +19,7 @@ import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Mockito.spy;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@@ -45,7 +46,6 @@ import org.springframework.amqp.core.MessageProperties;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
|
||||
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
|
||||
import org.springframework.amqp.support.converter.MessageConverter;
|
||||
import org.springframework.test.context.ActiveProfiles;
|
||||
|
||||
import ru.yandex.qatools.allure.annotations.Description;
|
||||
@@ -59,37 +59,38 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
|
||||
private AmqpMessageDispatcherService amqpMessageDispatcherService;
|
||||
|
||||
private MessageConverter messageConverter;
|
||||
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
private DefaultAmqpSenderService senderService;
|
||||
|
||||
private static final String CONTROLLER_ID = "1";
|
||||
|
||||
@Override
|
||||
public void before() throws Exception {
|
||||
super.before();
|
||||
amqpMessageDispatcherService = new AmqpMessageDispatcherService();
|
||||
this.rabbitTemplate = Mockito.mock(RabbitTemplate.class);
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(new Jackson2JsonMessageConverter());
|
||||
amqpMessageDispatcherService = new AmqpMessageDispatcherService(new Jackson2JsonMessageConverter());
|
||||
amqpMessageDispatcherService = spy(amqpMessageDispatcherService);
|
||||
messageConverter = new Jackson2JsonMessageConverter();
|
||||
|
||||
senderService = Mockito.mock(DefaultAmqpSenderService.class);
|
||||
amqpMessageDispatcherService.setAmqpSenderService(senderService);
|
||||
|
||||
final ArtifactUrlHandler artifactUrlHandlerMock = Mockito.mock(ArtifactUrlHandler.class);
|
||||
when(artifactUrlHandlerMock.getUrl(anyString(), any(), anyObject())).thenReturn("http://mockurl");
|
||||
|
||||
this.rabbitTemplate = Mockito.mock(RabbitTemplate.class);
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter);
|
||||
|
||||
amqpMessageDispatcherService.setRabbitTemplate(rabbitTemplate);
|
||||
amqpMessageDispatcherService.setTenantAware(tenantAware);
|
||||
amqpMessageDispatcherService.setArtifactUrlHandler(artifactUrlHandlerMock);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@Description("Verfies that download and install event with no software modul works")
|
||||
public void testSendDownloadRequesWithEmptySoftwareModules() {
|
||||
final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent(
|
||||
1L, "default", CONTROLLER_ID, 1l, new ArrayList<SoftwareModule>(), IpUtil.createAmqpUri("mytest"));
|
||||
1L, "default", CONTROLLER_ID, 1l, new ArrayList<SoftwareModule>(),
|
||||
IpUtil.createAmqpUri("vHost", "mytest"));
|
||||
amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent);
|
||||
final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost());
|
||||
final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress());
|
||||
final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage);
|
||||
assertTrue("No softwaremmodule should be contained in the request",
|
||||
downloadAndUpdateRequest.getSoftwareModules().isEmpty());
|
||||
@@ -101,9 +102,9 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
final DistributionSet dsA = TestDataUtil.generateDistributionSet("", softwareManagement,
|
||||
distributionSetManagement);
|
||||
final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent(
|
||||
1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("mytest"));
|
||||
1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("vHost", "mytest"));
|
||||
amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent);
|
||||
final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost());
|
||||
final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress());
|
||||
final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage);
|
||||
assertEquals("Expecting a size of 3 software modules in the reuqest", 3,
|
||||
downloadAndUpdateRequest.getSoftwareModules().size());
|
||||
@@ -140,9 +141,9 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
Mockito.when(rabbitTemplate.convertSendAndReceive(any())).thenReturn(receivedList);
|
||||
|
||||
final TargetAssignDistributionSetEvent targetAssignDistributionSetEvent = new TargetAssignDistributionSetEvent(
|
||||
1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("mytest"));
|
||||
1L, "default", CONTROLLER_ID, 1l, dsA.getModules(), IpUtil.createAmqpUri("vHost", "mytest"));
|
||||
amqpMessageDispatcherService.targetAssignDistributionSet(targetAssignDistributionSetEvent);
|
||||
final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress().getHost());
|
||||
final Message sendMessage = createArgumentCapture(targetAssignDistributionSetEvent.getTargetAdress());
|
||||
final DownloadAndUpdateRequest downloadAndUpdateRequest = assertDownloadAndInstallMessage(sendMessage);
|
||||
assertEquals("DownloadAndUpdateRequest event should contains 3 software modules", 3,
|
||||
downloadAndUpdateRequest.getSoftwareModules().size());
|
||||
@@ -159,11 +160,10 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
@Description("Verfies that send cancel event works")
|
||||
public void testSendCancelRequest() {
|
||||
final CancelTargetAssignmentEvent cancelTargetAssignmentDistributionSetEvent = new CancelTargetAssignmentEvent(
|
||||
1L, "default", CONTROLLER_ID, 1l, IpUtil.createAmqpUri("mytest"));
|
||||
1L, "default", CONTROLLER_ID, 1l, IpUtil.createAmqpUri("vHost", "mytest"));
|
||||
amqpMessageDispatcherService
|
||||
.targetCancelAssignmentToDistributionSet(cancelTargetAssignmentDistributionSetEvent);
|
||||
final Message sendMessage = createArgumentCapture(
|
||||
cancelTargetAssignmentDistributionSetEvent.getTargetAdress().getHost());
|
||||
final Message sendMessage = createArgumentCapture(cancelTargetAssignmentDistributionSetEvent.getTargetAdress());
|
||||
assertCancelMessage(sendMessage);
|
||||
|
||||
}
|
||||
@@ -203,9 +203,9 @@ public class AmqpMessageDispatcherServiceTest extends AbstractIntegrationTestWit
|
||||
MessageProperties.CONTENT_TYPE_JSON, sendMessage.getMessageProperties().getContentType());
|
||||
}
|
||||
|
||||
protected Message createArgumentCapture(final String exchange) {
|
||||
protected Message createArgumentCapture(final URI uri) {
|
||||
final ArgumentCaptor<Message> argumentCaptor = ArgumentCaptor.forClass(Message.class);
|
||||
Mockito.verify(amqpMessageDispatcherService).sendMessage(eq(exchange), argumentCaptor.capture());
|
||||
Mockito.verify(senderService).sendMessage(argumentCaptor.capture(), eq(uri));
|
||||
return argumentCaptor.getValue();
|
||||
}
|
||||
|
||||
|
||||
@@ -99,14 +99,15 @@ public class AmqpMessageHandlerServiceTest {
|
||||
@Mock
|
||||
private EventBus eventBus;
|
||||
|
||||
@Mock
|
||||
private RabbitTemplate rabbitTemplate;
|
||||
|
||||
@Before
|
||||
public void before() throws Exception {
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService();
|
||||
amqpMessageHandlerService.setControllerManagement(controllerManagementMock);
|
||||
messageConverter = new Jackson2JsonMessageConverter();
|
||||
final RabbitTemplate rabbitTemplate = new RabbitTemplate();
|
||||
rabbitTemplate.setMessageConverter(messageConverter);
|
||||
amqpMessageHandlerService.setRabbitTemplate(rabbitTemplate);
|
||||
when(rabbitTemplate.getMessageConverter()).thenReturn(messageConverter);
|
||||
amqpMessageHandlerService = new AmqpMessageHandlerService(rabbitTemplate);
|
||||
amqpMessageHandlerService.setControllerManagement(controllerManagementMock);
|
||||
amqpMessageHandlerService.setAuthenticationManager(authenticationManagerMock);
|
||||
amqpMessageHandlerService.setArtifactManagement(artifactManagementMock);
|
||||
amqpMessageHandlerService.setCache(cacheMock);
|
||||
@@ -121,7 +122,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
messageProperties.setContentType("xml");
|
||||
final Message message = new Message(new byte[0], messageProperties);
|
||||
try {
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
|
||||
fail("IllegalArgumentException was excepeted due to worng content type");
|
||||
} catch (final IllegalArgumentException e) {
|
||||
}
|
||||
@@ -140,10 +141,11 @@ public class AmqpMessageHandlerServiceTest {
|
||||
when(controllerManagementMock.findOrRegisterTargetIfItDoesNotexist(targetIdCaptor.capture(),
|
||||
uriCaptor.capture())).thenReturn(null);
|
||||
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
|
||||
|
||||
assertThat(targetIdCaptor.getValue()).as("Extraxted Thing should be the same").isEqualTo(knownThingId);
|
||||
assertThat(uriCaptor.getValue().toString()).as("Extraxted Uri should be the same").isEqualTo("amqp://MyTest");
|
||||
// verify
|
||||
assertThat(targetIdCaptor.getValue()).as("Thing id is wrong").isEqualTo(knownThingId);
|
||||
assertThat(uriCaptor.getValue().toString()).as("Uri is not right").isEqualTo("amqp://vHost/MyTest");
|
||||
|
||||
}
|
||||
|
||||
@@ -155,7 +157,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
final Message message = messageConverter.toMessage("", messageProperties);
|
||||
|
||||
try {
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
|
||||
fail("IllegalArgumentException was excepeted since no replyTo header was set");
|
||||
} catch (final IllegalArgumentException exception) {
|
||||
// test ok - exception was excepted
|
||||
@@ -169,7 +171,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
final MessageProperties messageProperties = createMessageProperties(MessageType.THING_CREATED);
|
||||
final Message message = messageConverter.toMessage(new byte[0], messageProperties);
|
||||
try {
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.THING_CREATED.name(), TENANT, "vHost");
|
||||
fail("IllegalArgumentException was excepeted since no thingID was set");
|
||||
} catch (final IllegalArgumentException exception) {
|
||||
// test ok - exception was excepted
|
||||
@@ -185,7 +187,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
final Message message = messageConverter.toMessage(new byte[0], messageProperties);
|
||||
|
||||
try {
|
||||
amqpMessageHandlerService.onMessage(message, type, TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, type, TENANT, "vHost");
|
||||
fail("IllegalArgumentException was excepeted due to unknown message type");
|
||||
} catch (final IllegalArgumentException exception) {
|
||||
// test ok - exception was excepted
|
||||
@@ -198,21 +200,21 @@ public class AmqpMessageHandlerServiceTest {
|
||||
final MessageProperties messageProperties = createMessageProperties(MessageType.EVENT);
|
||||
final Message message = new Message(new byte[0], messageProperties);
|
||||
try {
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
|
||||
fail("IllegalArgumentException was excepeted due to unknown message type");
|
||||
} catch (final IllegalArgumentException e) {
|
||||
}
|
||||
|
||||
try {
|
||||
messageProperties.setHeader(MessageHeaderKey.TOPIC, "wrongTopic");
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
|
||||
fail("IllegalArgumentException was excepeted due to unknown topic");
|
||||
} catch (final IllegalArgumentException e) {
|
||||
}
|
||||
|
||||
messageProperties.setHeader(MessageHeaderKey.TOPIC, EventTopic.CANCEL_DOWNLOAD.name());
|
||||
try {
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
|
||||
fail("IllegalArgumentException was excepeted because there was no event topic");
|
||||
} catch (final IllegalArgumentException exception) {
|
||||
// test ok - exception was excepted
|
||||
@@ -231,7 +233,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
messageProperties);
|
||||
|
||||
try {
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
|
||||
fail("IllegalArgumentException was excepeted since no action id was set");
|
||||
} catch (final IllegalArgumentException exception) {
|
||||
// test ok - exception was excepted
|
||||
@@ -248,7 +250,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
messageProperties);
|
||||
|
||||
try {
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
|
||||
fail("IllegalArgumentException was excepeted since no action id was set");
|
||||
} catch (final IllegalArgumentException exception) {
|
||||
// test ok - exception was excepted
|
||||
@@ -266,7 +268,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
// test
|
||||
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
|
||||
TENANT);
|
||||
TENANT, "vHost");
|
||||
|
||||
// verify
|
||||
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
|
||||
@@ -290,7 +292,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
// test
|
||||
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
|
||||
TENANT);
|
||||
TENANT, "vHost");
|
||||
|
||||
// verify
|
||||
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
|
||||
@@ -322,7 +324,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
|
||||
// test
|
||||
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
|
||||
TENANT);
|
||||
TENANT, "vHost");
|
||||
|
||||
// verify
|
||||
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
|
||||
@@ -358,7 +360,7 @@ public class AmqpMessageHandlerServiceTest {
|
||||
messageProperties);
|
||||
|
||||
// test
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT);
|
||||
amqpMessageHandlerService.onMessage(message, MessageType.EVENT.name(), TENANT, "vHost");
|
||||
|
||||
// verify
|
||||
final ArgumentCaptor<TargetAssignDistributionSetEvent> captorTargetAssignDistributionSetEvent = ArgumentCaptor
|
||||
|
||||
@@ -11,6 +11,9 @@ package org.eclipse.hawkbit.util;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import org.eclipse.hawkbit.AbstractIntegrationTestWithMongoDB;
|
||||
import org.eclipse.hawkbit.AmqpTestConfiguration;
|
||||
import org.eclipse.hawkbit.RepositoryApplicationConfiguration;
|
||||
import org.eclipse.hawkbit.TestConfiguration;
|
||||
import org.eclipse.hawkbit.TestDataUtil;
|
||||
import org.eclipse.hawkbit.dmf.json.model.Artifact;
|
||||
import org.eclipse.hawkbit.repository.model.DistributionSet;
|
||||
@@ -20,6 +23,7 @@ import org.eclipse.hawkbit.tenancy.TenantAware;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.test.SpringApplicationConfiguration;
|
||||
|
||||
import ru.yandex.qatools.allure.annotations.Description;
|
||||
import ru.yandex.qatools.allure.annotations.Features;
|
||||
@@ -31,6 +35,8 @@ import ru.yandex.qatools.allure.annotations.Stories;
|
||||
*/
|
||||
@Features("Component Tests - Artifact URL Handler")
|
||||
@Stories("Test to generate the artifact download URL")
|
||||
@SpringApplicationConfiguration(classes = { RepositoryApplicationConfiguration.class, TestConfiguration.class,
|
||||
AmqpTestConfiguration.class })
|
||||
public class PropertyBasedArtifactUrlHandlerTest extends AbstractIntegrationTestWithMongoDB {
|
||||
|
||||
@Autowired
|
||||
|
||||
@@ -48,7 +48,9 @@ public class MultiTenantJpaTransactionManager extends JpaTransactionManager {
|
||||
&& !definition.getName().startsWith(SystemManagement.class.getCanonicalName() + ".deleteTenant")
|
||||
&& !definition.getName()
|
||||
.startsWith(SystemManagement.class.getCanonicalName() + ".currentTenantKeyGenerator")
|
||||
&& !definition.getName().startsWith(RolloutManagement.class.getCanonicalName() + ".rolloutScheduler")) {
|
||||
&& !definition.getName().startsWith(RolloutManagement.class.getCanonicalName() + ".rolloutScheduler")
|
||||
&& !definition.getName()
|
||||
.startsWith(SystemManagement.class.getCanonicalName() + ".getOrCreateTenantMetadata")) {
|
||||
|
||||
final String currentTenant = tenantAware.getCurrentTenant();
|
||||
if (currentTenant == null) {
|
||||
|
||||
@@ -15,7 +15,6 @@ import java.util.concurrent.Callable;
|
||||
|
||||
import org.eclipse.hawkbit.im.authentication.SpPermission.SpringEvalExpressions;
|
||||
import org.eclipse.hawkbit.tenancy.TenantAware;
|
||||
import org.eclipse.hawkbit.tenancy.TenantAware.TenantRunner;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
@@ -30,8 +29,7 @@ import org.springframework.stereotype.Service;
|
||||
import com.google.common.base.Throwables;
|
||||
|
||||
/**
|
||||
* @author Michael Hirsch
|
||||
*
|
||||
*
|
||||
*/
|
||||
@Service
|
||||
public class SystemSecurityContext {
|
||||
@@ -45,15 +43,12 @@ public class SystemSecurityContext {
|
||||
final SecurityContext oldContext = SecurityContextHolder.getContext();
|
||||
try {
|
||||
logger.debug("entering system code execution");
|
||||
return tenantAware.runAsTenant(tenantAware.getCurrentTenant(), new TenantRunner<T>() {
|
||||
@Override
|
||||
public T run() {
|
||||
try {
|
||||
setSystemContext();
|
||||
return callable.call();
|
||||
} catch (final Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
return tenantAware.runAsTenant(tenantAware.getCurrentTenant(), () -> {
|
||||
try {
|
||||
setSystemContext();
|
||||
return callable.call();
|
||||
} catch (final Exception e) {
|
||||
throw Throwables.propagate(e);
|
||||
}
|
||||
});
|
||||
|
||||
@@ -106,7 +101,8 @@ public class SystemSecurityContext {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAuthenticated(final boolean isAuthenticated) throws IllegalArgumentException {
|
||||
public void setAuthenticated(final boolean isAuthenticated) {
|
||||
// not needed
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,9 +20,6 @@ import com.google.common.net.HttpHeaders;
|
||||
/**
|
||||
* A utility which determines the correct IP of a connected {@link Target}. E.g
|
||||
* from a {@link HttpServletRequest}.
|
||||
*
|
||||
*
|
||||
*
|
||||
*
|
||||
*/
|
||||
public final class IpUtil {
|
||||
@@ -95,7 +92,6 @@ public final class IpUtil {
|
||||
if (isIpV6) {
|
||||
return URI.create(scheme + SCHEME_SEPERATOR + "[" + host + "]");
|
||||
}
|
||||
|
||||
return URI.create(scheme + SCHEME_SEPERATOR + host);
|
||||
}
|
||||
|
||||
@@ -104,12 +100,14 @@ public final class IpUtil {
|
||||
*
|
||||
* @param host
|
||||
* the host
|
||||
* @param exchange
|
||||
* the exchange will store in the path
|
||||
* @return the {@link URI}
|
||||
* @throws IllegalArgumentException
|
||||
* If the given string not parsable
|
||||
*/
|
||||
public static URI createAmqpUri(final String host) {
|
||||
return createUri(AMPQP_SCHEME, host);
|
||||
public static URI createAmqpUri(final String host, final String exchange) {
|
||||
return createUri(AMPQP_SCHEME, host).resolve("/" + exchange);
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -106,23 +106,24 @@ public class IpUtilTest {
|
||||
@Description("Tests create amqp uri ipv4 and ipv6")
|
||||
public void testCreateAmqpUri() {
|
||||
final String ipv4 = "10.99.99.1";
|
||||
URI amqpUri = IpUtil.createAmqpUri(ipv4);
|
||||
URI amqpUri = IpUtil.createAmqpUri(ipv4, "path");
|
||||
assertAmqpUri(ipv4, amqpUri);
|
||||
|
||||
final String host = "myhost";
|
||||
amqpUri = IpUtil.createAmqpUri(host);
|
||||
amqpUri = IpUtil.createAmqpUri(host, "path");
|
||||
assertAmqpUri(host, amqpUri);
|
||||
|
||||
final String ipv6 = "0:0:0:0:0:0:0:1";
|
||||
amqpUri = IpUtil.createAmqpUri(ipv6);
|
||||
amqpUri = IpUtil.createAmqpUri(ipv6, "path");
|
||||
assertAmqpUri("[" + ipv6 + "]", amqpUri);
|
||||
}
|
||||
|
||||
private void assertAmqpUri(final String host, final URI httpUri) {
|
||||
assertTrue("The given URI is an AMQP scheme", IpUtil.isAmqpUri(httpUri));
|
||||
assertFalse("The given URI is not an HTTP scheme", IpUtil.isHttpUri(httpUri));
|
||||
assertEquals("The given host matches the URI host", host, httpUri.getHost());
|
||||
assertEquals("The given URI has an AMQP scheme", "amqp", httpUri.getScheme());
|
||||
private void assertAmqpUri(final String host, final URI amqpUri) {
|
||||
assertTrue("The given URI is an AMQP scheme", IpUtil.isAmqpUri(amqpUri));
|
||||
assertFalse("The given URI is not an HTTP scheme", IpUtil.isHttpUri(amqpUri));
|
||||
assertEquals("The given host matches the URI host", host, amqpUri.getHost());
|
||||
assertEquals("The given URI has an AMQP scheme", "amqp", amqpUri.getScheme());
|
||||
assertEquals("The given URI has an AMQP path", "/path", amqpUri.getRawPath());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
Reference in New Issue
Block a user