Fix scheduled executor, auth exchange and simulator poll.

Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>
This commit is contained in:
kaizimmerm
2016-06-24 13:59:19 +02:00
parent 7857107b46
commit 23cb62b9d9
17 changed files with 185 additions and 105 deletions

View File

@@ -99,7 +99,8 @@ public class DeviceSimulatorUpdater {
// plug and play - non existing device will be auto created
if (device == null) {
device = repository.add(deviceFactory.createSimulatedDevice(id, tenant, Protocol.DMF_AMQP, -1, null, null));
device = repository
.add(deviceFactory.createSimulatedDevice(id, tenant, Protocol.DMF_AMQP, 1800, null, null));
}
device.setProgress(0.0);

View File

@@ -45,7 +45,7 @@ public class SimulatedDeviceFactory {
*/
public AbstractSimulatedDevice createSimulatedDevice(final String id, final String tenant,
final Protocol protocol) {
return createSimulatedDevice(id, tenant, protocol, 30, null, null);
return createSimulatedDevice(id, tenant, protocol, 1800, null, null);
}
/**

View File

@@ -86,10 +86,6 @@ public class SimulationController {
final String deviceId = name + i;
repository.add(deviceFactory.createSimulatedDevice(deviceId, tenant, protocol, pollDelay, new URL(endpoint),
gatewayToken));
if (protocol == Protocol.DMF_AMQP) {
spSenderService.createOrUpdateThing(tenant, deviceId);
}
}
return ResponseEntity.ok("Updated " + amount + " DMF connected targets!");

View File

@@ -11,7 +11,6 @@ package org.eclipse.hawkbit.simulator;
import java.net.MalformedURLException;
import java.net.URL;
import org.eclipse.hawkbit.simulator.AbstractSimulatedDevice.Protocol;
import org.eclipse.hawkbit.simulator.amqp.SpSenderService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,10 +53,6 @@ public class SimulatorStartup implements ApplicationListener<ContextRefreshedEve
} catch (final MalformedURLException e) {
LOGGER.error("Creation of simulated device at startup failed.", e);
}
if (autostart.getApi() == Protocol.DMF_AMQP) {
spSenderService.createOrUpdateThing(autostart.getTenant(), deviceId);
}
}
});
}

View File

@@ -122,8 +122,8 @@ public class AmqpConfiguration {
final Map<String, Object> arguments = getDeadLetterExchangeArgs();
arguments.putAll(getTTLMaxArgs());
return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).withArguments(arguments)
.build();
return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete()
.withArguments(arguments).build();
}
/**
@@ -133,12 +133,12 @@ public class AmqpConfiguration {
*/
@Bean
public FanoutExchange exchangeQueueToConnector() {
return new FanoutExchange(amqpProperties.getSenderForSpExchange());
return new FanoutExchange(amqpProperties.getSenderForSpExchange(), false, true);
}
/**
* Create the Binding
* {@link AmqpConfiguration#receiverConnectorQueueFromSp()} to
* {@link AmqpConfiguration#receiverConnectorQueueFromHawkBit()} to
* {@link AmqpConfiguration#exchangeQueueToConnector()}.
*
* @return the binding and create the queue and exchange
@@ -165,7 +165,7 @@ public class AmqpConfiguration {
*/
@Bean
public FanoutExchange exchangeDeadLetter() {
return new FanoutExchange(amqpProperties.getDeadLetterExchange());
return new FanoutExchange(amqpProperties.getDeadLetterExchange(), false, true);
}
/**

View File

@@ -32,6 +32,11 @@ public class AsyncConfigurerThreadpoolProperties {
*/
private Integer maxthreads = 20;
/**
* Core processing threads for scheduled event executor.
*/
private Integer schedulerThreads = 3;
/**
* When the number of threads is greater than the core, this is the maximum
* time that excess idle threads will wait for new tasks before terminating.
@@ -70,4 +75,12 @@ public class AsyncConfigurerThreadpoolProperties {
this.idletimeout = idletimeout;
}
public Integer getSchedulerThreads() {
return schedulerThreads;
}
public void setSchedulerThreads(final Integer schedulerThreads) {
this.schedulerThreads = schedulerThreads;
}
}

View File

@@ -11,6 +11,8 @@ package org.eclipse.hawkbit.autoconfigure.scheduling;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
@@ -24,9 +26,12 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutor;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutorService;
import org.springframework.security.concurrent.DelegatingSecurityContextScheduledExecutorService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -45,20 +50,28 @@ public class ExecutorAutoConfiguration {
/**
* @return ExecutorService with security context availability in thread
* execution..
* execution.
*/
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean
public ExecutorService asyncExecutor() {
return new DelegatingSecurityContextExecutorService(threadPoolExecutor());
}
/**
* @return {@link TaskExecutor} for task execution
*/
@Bean
@ConditionalOnMissingBean
public Executor asyncExecutor() {
return new DelegatingSecurityContextExecutor(threadPoolExecutor());
public TaskExecutor taskExecutor() {
return new ConcurrentTaskExecutor(asyncExecutor());
}
/**
* @return central ThreadPoolExecutor for general purpose multi threaded
* operations. Tries an orderly shutdown when destroyed.
*/
@Bean(destroyMethod = "shutdown")
public ThreadPoolExecutor threadPoolExecutor() {
private ThreadPoolExecutor threadPoolExecutor() {
final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(
asyncConfigurerProperties.getQueuesize());
return new ThreadPoolExecutor(asyncConfigurerProperties.getCorethreads(),
@@ -92,31 +105,24 @@ public class ExecutorAutoConfiguration {
}
/**
* @return {@link TaskExecutor} for task execution
* @return {@link ScheduledExecutorService} with security context
* availability in thread execution.
*/
@Bean
@ConditionalOnMissingBean
public TaskExecutor taskExecutor() {
return new ConcurrentTaskExecutor(asyncExecutor());
}
/**
* @return {@link ScheduledExecutorService} based on
* {@link #threadPoolTaskScheduler()}.
*/
@Bean
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean
public ScheduledExecutorService scheduledExecutorService() {
return threadPoolTaskScheduler().getScheduledExecutor();
return new DelegatingSecurityContextScheduledExecutorService(
Executors.newScheduledThreadPool(asyncConfigurerProperties.getSchedulerThreads(),
new ThreadFactoryBuilder().setNameFormat("central-scheduled-executor-pool-%d").build()));
}
/**
* @return {@link ThreadPoolTaskScheduler} for scheduled operations.
* @return {@link TaskScheduler} for task execution
*/
@Bean
@ConditionalOnMissingBean
public ThreadPoolTaskScheduler threadPoolTaskScheduler() {
return new ThreadPoolTaskScheduler();
public TaskScheduler taskScheduler() {
return new ConcurrentTaskScheduler(scheduledExecutorService());
}
}

View File

@@ -41,4 +41,5 @@ hawkbit.controller.minPollingTime=00:00:30
# Configuration for RabbitMQ integration
hawkbit.dmf.rabbitmq.deadLetterQueue=dmf_connector_deadletter_ttl
hawkbit.dmf.rabbitmq.deadLetterExchange=dmf.connector.deadletter
hawkbit.dmf.rabbitmq.receiverQueue=dmf_receiver
hawkbit.dmf.rabbitmq.receiverQueue=dmf_receiver
hawkbit.dmf.rabbitmq.authenticationReceiverQueue=authentication_receiver

View File

@@ -8,8 +8,11 @@
*/
package org.eclipse.hawkbit.amqp;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings;
import org.slf4j.Logger;
@@ -18,6 +21,7 @@ import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
@@ -51,10 +55,6 @@ public class AmqpConfiguration {
@Autowired
private AmqpDeadletterProperties amqpDeadletterProperties;
@Autowired
@Qualifier("threadPoolExecutor")
private ThreadPoolExecutor threadPoolExecutor;
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@@ -66,8 +66,8 @@ public class AmqpConfiguration {
private AmqpProperties amqpProperties;
@Autowired
@Qualifier("threadPoolExecutor")
private ThreadPoolExecutor threadPoolExecutor;
@Qualifier("asyncExecutor")
private Executor threadPoolExecutor;
@Autowired
private ScheduledExecutorService scheduledExecutorService;
@@ -145,26 +145,71 @@ public class AmqpConfiguration {
}
/**
* Create the sp receiver queue.
* Create the DMF API receiver queue for
*
* @return the receiver queue
*/
@Bean
public Queue receiverQueue() {
public Queue dmfReceiverQueue() {
return new Queue(amqpProperties.getReceiverQueue(), true, false, false,
amqpDeadletterProperties.getDeadLetterExchangeArgs(amqpProperties.getDeadLetterExchange()));
}
/**
* Create the dead letter fanout exchange.
* Create the DMF API receiver queue for authentication requests called by
* 3rd party artifact storages for download authorization by devices.
*
* @return the receiver queue
*/
@Bean
public Queue authenticationReceiverQueue() {
return QueueBuilder.nonDurable(amqpProperties.getAuthenticationReceiverQueue()).autoDelete()
.withArguments(getTTLMaxArgsAuthenticationQueue()).build();
}
/**
* Create DMF exchange.
*
* @return the fanout exchange
*/
@Bean
public FanoutExchange senderExchange() {
public FanoutExchange dmfSenderExchange() {
return new FanoutExchange(AmqpSettings.DMF_EXCHANGE);
}
/**
* Create the Binding {@link AmqpConfiguration#dmfReceiverQueue()} to
* {@link AmqpConfiguration#dmfSenderExchange()}.
*
* @return the binding and create the queue and exchange
*/
@Bean
public Binding bindDmfSenderExchangeToDmfQueue() {
return BindingBuilder.bind(dmfReceiverQueue()).to(dmfSenderExchange());
}
/**
* Create authentication exchange.
*
* @return the fanout exchange
*/
@Bean
public FanoutExchange authenticationExchange() {
return new FanoutExchange(AmqpSettings.AUTHENTICATION_EXCHANGE, false, true);
}
/**
* Create the Binding
* {@link AmqpConfiguration#authenticationReceiverQueue()} to
* {@link AmqpConfiguration#authenticationExchange()}.
*
* @return the binding and create the queue and exchange
*/
@Bean
public Binding bindAuthenticationSenderExchangeToAuthenticationQueue() {
return BindingBuilder.bind(authenticationReceiverQueue()).to(authenticationExchange());
}
/**
* Create dead letter queue.
*
@@ -181,29 +226,18 @@ public class AmqpConfiguration {
* @return the fanout exchange
*/
@Bean
public FanoutExchange exchangeDeadLetter() {
public FanoutExchange deadLetterExchange() {
return new FanoutExchange(amqpProperties.getDeadLetterExchange());
}
/**
* Create the Binding deadLetterQueue to exchangeDeadLetter.
* Create the Binding deadLetterQueue to deadLetterExchange.
*
* @return the binding
*/
@Bean
public Binding bindDeadLetterQueueToLwm2mExchange() {
return BindingBuilder.bind(deadLetterQueue()).to(exchangeDeadLetter());
}
/**
* Create the Binding {@link AmqpConfiguration#receiverQueue()} to
* {@link AmqpConfiguration#senderExchange()}.
*
* @return the binding and create the queue and exchange
*/
@Bean
public Binding bindSenderExchangeToSpQueue() {
return BindingBuilder.bind(receiverQueue()).to(senderExchange());
public Binding bindDeadLetterQueueToDeadLetterExchange() {
return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange());
}
/**
@@ -245,4 +279,11 @@ public class AmqpConfiguration {
return containerFactory;
}
private static Map<String, Object> getTTLMaxArgsAuthenticationQueue() {
final Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", Duration.ofSeconds(30).toMillis());
args.put("x-max-length", 1_000);
return args;
}
}

View File

@@ -116,7 +116,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
}
/**
* Method to handle all incoming amqp messages.
* Method to handle all incoming DMF amqp messages.
*
* @param message
* incoming message
@@ -133,6 +133,12 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
return onMessage(message, type, tenant, getRabbitTemplate().getConnectionFactory().getVirtualHost());
}
@RabbitListener(queues = "${hawkbit.dmf.rabbitmq.authenticationReceiverQueue}", containerFactory = "listenerContainerFactory")
public Message onAuthenticationRequest(final Message message,
@Header(MessageHeaderKey.TENANT) final String tenant) {
return onAuthenticationRequest(message);
}
public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) {
checkContentTypeJson(message);
final SecurityContext oldContext = SecurityContextHolder.getContext();
@@ -149,8 +155,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
final EventTopic eventTopic = EventTopic.valueOf(topicValue);
handleIncomingEvent(message, eventTopic);
break;
case AUTHENTIFICATION:
return handleAuthentifiactionMessage(message);
default:
logAndThrowMessageError(message, "No handle method was found for the given message type.");
}
@@ -164,6 +169,20 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
return null;
}
public Message onAuthenticationRequest(final Message message) {
checkContentTypeJson(message);
final SecurityContext oldContext = SecurityContextHolder.getContext();
try {
return handleAuthentifiactionMessage(message);
} catch (final IllegalArgumentException ex) {
throw new AmqpRejectAndDontRequeueException("Invalid message!", ex);
} catch (final TenantNotExistException teex) {
throw new AmqpRejectAndDontRequeueException(teex);
} finally {
SecurityContextHolder.setContext(oldContext);
}
}
private Message handleAuthentifiactionMessage(final Message message) {
final DownloadResponse authentificationResponse = new DownloadResponse();
final MessageProperties messageProperties = message.getMessageProperties();

View File

@@ -31,10 +31,16 @@ public class AmqpProperties {
private String deadLetterExchange = "dmf.connector.deadletter";
/**
* DMF API receiving queue.
* DMF API receiving queue for EVENT or THING_CREATED message.
*/
private String receiverQueue = "dmf_receiver";
/**
* Authentication request called by 3rd party artifact storages for download
* authorizations.
*/
private String authenticationReceiverQueue = "authentication_receiver";
/**
* Missing queue fatal.
*/
@@ -62,6 +68,14 @@ public class AmqpProperties {
*/
private int initialConcurrentConsumers = 3;
public String getAuthenticationReceiverQueue() {
return authenticationReceiverQueue;
}
public void setAuthenticationReceiverQueue(final String authenticationReceiverQueue) {
this.authenticationReceiverQueue = authenticationReceiverQueue;
}
public int getPrefetchCount() {
return prefetchCount;
}
@@ -147,10 +161,6 @@ public class AmqpProperties {
return receiverQueue;
}
public void setReceiverQueue(final String receiverQueue) {
this.receiverQueue = receiverQueue;
}
public int getRequestedHeartBeat() {
return requestedHeartBeat;
}
@@ -159,4 +169,8 @@ public class AmqpProperties {
this.requestedHeartBeat = requestedHeartBeat;
}
public void setReceiverQueue(final String receiverQueue) {
this.receiverQueue = receiverQueue;
}
}

View File

@@ -31,7 +31,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutor;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutorService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -78,18 +78,17 @@ public class AmqpTestConfiguration {
* @return ExecutorService with security context availability in thread
* execution..
*/
@Bean
@Bean(destroyMethod = "shutdown")
@ConditionalOnMissingBean
public Executor asyncExecutor() {
return new DelegatingSecurityContextExecutor(threadPoolExecutor());
return new DelegatingSecurityContextExecutorService(threadPoolExecutor());
}
/**
* @return central ThreadPoolExecutor for general purpose multi threaded
* operations. Tries an orderly shutdown when destroyed.
*/
@Bean(destroyMethod = "shutdown")
public ThreadPoolExecutor threadPoolExecutor() {
private ThreadPoolExecutor threadPoolExecutor() {
final BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<>(10);
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10, 1000, TimeUnit.MILLISECONDS,
blockingQueue, new ThreadFactoryBuilder().setNameFormat("central-executor-pool-%d").build());

View File

@@ -158,7 +158,7 @@ public class AmqpControllerAuthenticationTest {
@Test
@Description("Tests authentication message without principal")
public void testAuthenticationMessageBadCredantialsWithoutPricipal() {
final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION);
final MessageProperties messageProperties = createMessageProperties(null);
final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, CONTROLLLER_ID,
FileResource.sha1("12345"));
@@ -166,8 +166,7 @@ public class AmqpControllerAuthenticationTest {
messageProperties);
// test
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
TENANT, "vHost");
final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message);
// verify
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
@@ -178,7 +177,7 @@ public class AmqpControllerAuthenticationTest {
@Test
@Description("Tests authentication message without wrong credential")
public void testAuthenticationMessageBadCredantialsWithWrongCredential() {
final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION);
final MessageProperties messageProperties = createMessageProperties(null);
final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, CONTROLLLER_ID,
FileResource.sha1("12345"));
when(tenantConfigurationManagement.getConfigurationValue(
@@ -189,8 +188,7 @@ public class AmqpControllerAuthenticationTest {
messageProperties);
// test
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
TENANT, "vHost");
final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message);
// verify
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
@@ -201,7 +199,7 @@ public class AmqpControllerAuthenticationTest {
@Test
@Description("Tests authentication message successfull")
public void testSuccessfullMessageAuthentication() {
final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION);
final MessageProperties messageProperties = createMessageProperties(null);
final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, CONTROLLLER_ID,
FileResource.sha1("12345"));
when(tenantConfigurationManagement.getConfigurationValue(
@@ -212,8 +210,7 @@ public class AmqpControllerAuthenticationTest {
messageProperties);
// test
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
TENANT, "vHost");
final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message);
// verify
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
@@ -232,7 +229,9 @@ public class AmqpControllerAuthenticationTest {
private MessageProperties createMessageProperties(final MessageType type, final String replyTo) {
final MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader(MessageHeaderKey.TYPE, type.name());
if (type != null) {
messageProperties.setHeader(MessageHeaderKey.TYPE, type.name());
}
messageProperties.setHeader(MessageHeaderKey.TENANT, TENANT);
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setReplyTo(replyTo);

View File

@@ -273,14 +273,13 @@ public class AmqpMessageHandlerServiceTest {
@Test
@Description("Tests that an download request is denied for an artifact which does not exists")
public void authenticationRequestDeniedForArtifactWhichDoesNotExists() {
final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION);
final MessageProperties messageProperties = createMessageProperties(null);
final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, "123", FileResource.sha1("12345"));
final Message message = amqpMessageHandlerService.getMessageConverter().toMessage(securityToken,
messageProperties);
// test
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
TENANT, "vHost");
final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message);
// verify
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
@@ -292,7 +291,7 @@ public class AmqpMessageHandlerServiceTest {
@Test
@Description("Tests that an download request is denied for an artifact which is not assigned to the requested target")
public void authenticationRequestDeniedForArtifactWhichIsNotAssignedToTarget() {
final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION);
final MessageProperties messageProperties = createMessageProperties(null);
final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, "123", FileResource.sha1("12345"));
final Message message = amqpMessageHandlerService.getMessageConverter().toMessage(securityToken,
messageProperties);
@@ -303,8 +302,7 @@ public class AmqpMessageHandlerServiceTest {
.thenThrow(EntityNotFoundException.class);
// test
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
TENANT, "vHost");
final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message);
// verify
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
@@ -316,7 +314,7 @@ public class AmqpMessageHandlerServiceTest {
@Test
@Description("Tests that an download request is allowed for an artifact which exists and assigned to the requested target")
public void authenticationRequestAllowedForArtifactWhichExistsAndAssignedToTarget() throws MalformedURLException {
final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION);
final MessageProperties messageProperties = createMessageProperties(null);
final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, "123", FileResource.sha1("12345"));
final Message message = amqpMessageHandlerService.getMessageConverter().toMessage(securityToken,
messageProperties);
@@ -334,8 +332,7 @@ public class AmqpMessageHandlerServiceTest {
when(hostnameResolverMock.resolveHostname()).thenReturn(new URL("http://localhost"));
// test
final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(),
TENANT, "vHost");
final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message);
// verify
final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage);
@@ -411,7 +408,9 @@ public class AmqpMessageHandlerServiceTest {
private MessageProperties createMessageProperties(final MessageType type, final String replyTo) {
final MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader(MessageHeaderKey.TYPE, type.name());
if (type != null) {
messageProperties.setHeader(MessageHeaderKey.TYPE, type.name());
}
messageProperties.setHeader(MessageHeaderKey.TENANT, TENANT);
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setReplyTo(replyTo);

View File

@@ -18,6 +18,8 @@ public final class AmqpSettings {
public static final String DMF_EXCHANGE = "dmf.exchange";
public static final String AUTHENTICATION_EXCHANGE = "authentication.exchange";
private AmqpSettings() {
}

View File

@@ -26,9 +26,4 @@ public enum MessageType {
*/
THING_CREATED,
/**
* The authentication type.
*/
AUTHENTIFICATION,
}

View File

@@ -31,7 +31,7 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.data.domain.AuditorAware;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutor;
import org.springframework.security.concurrent.DelegatingSecurityContextExecutorService;
import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity;
import com.google.common.eventbus.AsyncEventBus;
@@ -99,7 +99,7 @@ public class TestConfiguration implements AsyncConfigurer {
@Bean
public Executor asyncExecutor() {
return new DelegatingSecurityContextExecutor(Executors.newSingleThreadExecutor());
return new DelegatingSecurityContextExecutorService(Executors.newSingleThreadExecutor());
}
@Bean