From 23cb62b9d9b7e537fa81aaed75ef2f780ee92827 Mon Sep 17 00:00:00 2001 From: kaizimmerm Date: Fri, 24 Jun 2016 13:59:19 +0200 Subject: [PATCH] Fix scheduled executor, auth exchange and simulator poll. Signed-off-by: kaizimmerm --- .../simulator/DeviceSimulatorUpdater.java | 3 +- .../simulator/SimulatedDeviceFactory.java | 2 +- .../simulator/SimulationController.java | 4 - .../hawkbit/simulator/SimulatorStartup.java | 5 - .../simulator/amqp/AmqpConfiguration.java | 10 +- .../AsyncConfigurerThreadpoolProperties.java | 13 +++ .../scheduling/ExecutorAutoConfiguration.java | 50 +++++----- .../main/resources/hawkbitdefaults.properties | 3 +- .../hawkbit/amqp/AmqpConfiguration.java | 93 +++++++++++++------ .../amqp/AmqpMessageHandlerService.java | 25 ++++- .../eclipse/hawkbit/amqp/AmqpProperties.java | 24 ++++- .../hawkbit/AmqpTestConfiguration.java | 9 +- .../AmqpControllerAuthenticationTest.java | 19 ++-- .../amqp/AmqpMessageHandlerServiceTest.java | 19 ++-- .../hawkbit/dmf/amqp/api/AmqpSettings.java | 2 + .../hawkbit/dmf/amqp/api/MessageType.java | 5 - .../test/util/TestConfiguration.java | 4 +- 17 files changed, 185 insertions(+), 105 deletions(-) diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulatorUpdater.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulatorUpdater.java index 0931f6f3e..81237beb7 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulatorUpdater.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/DeviceSimulatorUpdater.java @@ -99,7 +99,8 @@ public class DeviceSimulatorUpdater { // plug and play - non existing device will be auto created if (device == null) { - device = repository.add(deviceFactory.createSimulatedDevice(id, tenant, Protocol.DMF_AMQP, -1, null, null)); + device = repository + .add(deviceFactory.createSimulatedDevice(id, tenant, Protocol.DMF_AMQP, 1800, null, null)); } device.setProgress(0.0); diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulatedDeviceFactory.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulatedDeviceFactory.java index 35829c673..8d5c5e0b1 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulatedDeviceFactory.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulatedDeviceFactory.java @@ -45,7 +45,7 @@ public class SimulatedDeviceFactory { */ public AbstractSimulatedDevice createSimulatedDevice(final String id, final String tenant, final Protocol protocol) { - return createSimulatedDevice(id, tenant, protocol, 30, null, null); + return createSimulatedDevice(id, tenant, protocol, 1800, null, null); } /** diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulationController.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulationController.java index 3bd6b41f2..43f954c00 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulationController.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulationController.java @@ -86,10 +86,6 @@ public class SimulationController { final String deviceId = name + i; repository.add(deviceFactory.createSimulatedDevice(deviceId, tenant, protocol, pollDelay, new URL(endpoint), gatewayToken)); - - if (protocol == Protocol.DMF_AMQP) { - spSenderService.createOrUpdateThing(tenant, deviceId); - } } return ResponseEntity.ok("Updated " + amount + " DMF connected targets!"); diff --git a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulatorStartup.java b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulatorStartup.java index 19367a9d4..486eb4b5e 100644 --- a/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulatorStartup.java +++ b/examples/hawkbit-device-simulator/src/main/java/org/eclipse/hawkbit/simulator/SimulatorStartup.java @@ -11,7 +11,6 @@ package org.eclipse.hawkbit.simulator; import java.net.MalformedURLException; import java.net.URL; -import org.eclipse.hawkbit.simulator.AbstractSimulatedDevice.Protocol; import org.eclipse.hawkbit.simulator.amqp.SpSenderService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,10 +53,6 @@ public class SimulatorStartup implements ApplicationListener arguments = getDeadLetterExchangeArgs(); arguments.putAll(getTTLMaxArgs()); - return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).withArguments(arguments) - .build(); + return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete() + .withArguments(arguments).build(); } /** @@ -133,12 +133,12 @@ public class AmqpConfiguration { */ @Bean public FanoutExchange exchangeQueueToConnector() { - return new FanoutExchange(amqpProperties.getSenderForSpExchange()); + return new FanoutExchange(amqpProperties.getSenderForSpExchange(), false, true); } /** * Create the Binding - * {@link AmqpConfiguration#receiverConnectorQueueFromSp()} to + * {@link AmqpConfiguration#receiverConnectorQueueFromHawkBit()} to * {@link AmqpConfiguration#exchangeQueueToConnector()}. * * @return the binding and create the queue and exchange @@ -165,7 +165,7 @@ public class AmqpConfiguration { */ @Bean public FanoutExchange exchangeDeadLetter() { - return new FanoutExchange(amqpProperties.getDeadLetterExchange()); + return new FanoutExchange(amqpProperties.getDeadLetterExchange(), false, true); } /** diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java index 4425c7278..d6f3ca430 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/AsyncConfigurerThreadpoolProperties.java @@ -32,6 +32,11 @@ public class AsyncConfigurerThreadpoolProperties { */ private Integer maxthreads = 20; + /** + * Core processing threads for scheduled event executor. + */ + private Integer schedulerThreads = 3; + /** * When the number of threads is greater than the core, this is the maximum * time that excess idle threads will wait for new tasks before terminating. @@ -70,4 +75,12 @@ public class AsyncConfigurerThreadpoolProperties { this.idletimeout = idletimeout; } + public Integer getSchedulerThreads() { + return schedulerThreads; + } + + public void setSchedulerThreads(final Integer schedulerThreads) { + this.schedulerThreads = schedulerThreads; + } + } diff --git a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java index 0aeaf75bb..917e4e4ef 100644 --- a/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java +++ b/hawkbit-autoconfigure/src/main/java/org/eclipse/hawkbit/autoconfigure/scheduling/ExecutorAutoConfiguration.java @@ -11,6 +11,8 @@ package org.eclipse.hawkbit.autoconfigure.scheduling; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; @@ -24,9 +26,12 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; +import org.springframework.scheduling.TaskScheduler; import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; -import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; +import org.springframework.scheduling.concurrent.ConcurrentTaskScheduler; import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; +import org.springframework.security.concurrent.DelegatingSecurityContextExecutorService; +import org.springframework.security.concurrent.DelegatingSecurityContextScheduledExecutorService; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -45,20 +50,28 @@ public class ExecutorAutoConfiguration { /** * @return ExecutorService with security context availability in thread - * execution.. + * execution. + */ + @Bean(destroyMethod = "shutdown") + @ConditionalOnMissingBean + public ExecutorService asyncExecutor() { + return new DelegatingSecurityContextExecutorService(threadPoolExecutor()); + } + + /** + * @return {@link TaskExecutor} for task execution */ @Bean @ConditionalOnMissingBean - public Executor asyncExecutor() { - return new DelegatingSecurityContextExecutor(threadPoolExecutor()); + public TaskExecutor taskExecutor() { + return new ConcurrentTaskExecutor(asyncExecutor()); } /** * @return central ThreadPoolExecutor for general purpose multi threaded * operations. Tries an orderly shutdown when destroyed. */ - @Bean(destroyMethod = "shutdown") - public ThreadPoolExecutor threadPoolExecutor() { + private ThreadPoolExecutor threadPoolExecutor() { final BlockingQueue blockingQueue = new ArrayBlockingQueue<>( asyncConfigurerProperties.getQueuesize()); return new ThreadPoolExecutor(asyncConfigurerProperties.getCorethreads(), @@ -92,31 +105,24 @@ public class ExecutorAutoConfiguration { } /** - * @return {@link TaskExecutor} for task execution + * @return {@link ScheduledExecutorService} with security context + * availability in thread execution. */ - @Bean - @ConditionalOnMissingBean - public TaskExecutor taskExecutor() { - return new ConcurrentTaskExecutor(asyncExecutor()); - } - - /** - * @return {@link ScheduledExecutorService} based on - * {@link #threadPoolTaskScheduler()}. - */ - @Bean + @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean public ScheduledExecutorService scheduledExecutorService() { - return threadPoolTaskScheduler().getScheduledExecutor(); + return new DelegatingSecurityContextScheduledExecutorService( + Executors.newScheduledThreadPool(asyncConfigurerProperties.getSchedulerThreads(), + new ThreadFactoryBuilder().setNameFormat("central-scheduled-executor-pool-%d").build())); } /** - * @return {@link ThreadPoolTaskScheduler} for scheduled operations. + * @return {@link TaskScheduler} for task execution */ @Bean @ConditionalOnMissingBean - public ThreadPoolTaskScheduler threadPoolTaskScheduler() { - return new ThreadPoolTaskScheduler(); + public TaskScheduler taskScheduler() { + return new ConcurrentTaskScheduler(scheduledExecutorService()); } } diff --git a/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties b/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties index 9c197fbb3..cb7793168 100644 --- a/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties +++ b/hawkbit-autoconfigure/src/main/resources/hawkbitdefaults.properties @@ -41,4 +41,5 @@ hawkbit.controller.minPollingTime=00:00:30 # Configuration for RabbitMQ integration hawkbit.dmf.rabbitmq.deadLetterQueue=dmf_connector_deadletter_ttl hawkbit.dmf.rabbitmq.deadLetterExchange=dmf.connector.deadletter -hawkbit.dmf.rabbitmq.receiverQueue=dmf_receiver \ No newline at end of file +hawkbit.dmf.rabbitmq.receiverQueue=dmf_receiver +hawkbit.dmf.rabbitmq.authenticationReceiverQueue=authentication_receiver diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java index c26b5547e..ae25d0086 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpConfiguration.java @@ -8,8 +8,11 @@ */ package org.eclipse.hawkbit.amqp; +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadPoolExecutor; import org.eclipse.hawkbit.dmf.amqp.api.AmqpSettings; import org.slf4j.Logger; @@ -18,6 +21,7 @@ import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.QueueBuilder; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; @@ -51,10 +55,6 @@ public class AmqpConfiguration { @Autowired private AmqpDeadletterProperties amqpDeadletterProperties; - @Autowired - @Qualifier("threadPoolExecutor") - private ThreadPoolExecutor threadPoolExecutor; - @Autowired private ConnectionFactory rabbitConnectionFactory; @@ -66,8 +66,8 @@ public class AmqpConfiguration { private AmqpProperties amqpProperties; @Autowired - @Qualifier("threadPoolExecutor") - private ThreadPoolExecutor threadPoolExecutor; + @Qualifier("asyncExecutor") + private Executor threadPoolExecutor; @Autowired private ScheduledExecutorService scheduledExecutorService; @@ -145,26 +145,71 @@ public class AmqpConfiguration { } /** - * Create the sp receiver queue. + * Create the DMF API receiver queue for * * @return the receiver queue */ @Bean - public Queue receiverQueue() { + public Queue dmfReceiverQueue() { return new Queue(amqpProperties.getReceiverQueue(), true, false, false, amqpDeadletterProperties.getDeadLetterExchangeArgs(amqpProperties.getDeadLetterExchange())); } /** - * Create the dead letter fanout exchange. + * Create the DMF API receiver queue for authentication requests called by + * 3rd party artifact storages for download authorization by devices. + * + * @return the receiver queue + */ + @Bean + public Queue authenticationReceiverQueue() { + return QueueBuilder.nonDurable(amqpProperties.getAuthenticationReceiverQueue()).autoDelete() + .withArguments(getTTLMaxArgsAuthenticationQueue()).build(); + } + + /** + * Create DMF exchange. * * @return the fanout exchange */ @Bean - public FanoutExchange senderExchange() { + public FanoutExchange dmfSenderExchange() { return new FanoutExchange(AmqpSettings.DMF_EXCHANGE); } + /** + * Create the Binding {@link AmqpConfiguration#dmfReceiverQueue()} to + * {@link AmqpConfiguration#dmfSenderExchange()}. + * + * @return the binding and create the queue and exchange + */ + @Bean + public Binding bindDmfSenderExchangeToDmfQueue() { + return BindingBuilder.bind(dmfReceiverQueue()).to(dmfSenderExchange()); + } + + /** + * Create authentication exchange. + * + * @return the fanout exchange + */ + @Bean + public FanoutExchange authenticationExchange() { + return new FanoutExchange(AmqpSettings.AUTHENTICATION_EXCHANGE, false, true); + } + + /** + * Create the Binding + * {@link AmqpConfiguration#authenticationReceiverQueue()} to + * {@link AmqpConfiguration#authenticationExchange()}. + * + * @return the binding and create the queue and exchange + */ + @Bean + public Binding bindAuthenticationSenderExchangeToAuthenticationQueue() { + return BindingBuilder.bind(authenticationReceiverQueue()).to(authenticationExchange()); + } + /** * Create dead letter queue. * @@ -181,29 +226,18 @@ public class AmqpConfiguration { * @return the fanout exchange */ @Bean - public FanoutExchange exchangeDeadLetter() { + public FanoutExchange deadLetterExchange() { return new FanoutExchange(amqpProperties.getDeadLetterExchange()); } /** - * Create the Binding deadLetterQueue to exchangeDeadLetter. + * Create the Binding deadLetterQueue to deadLetterExchange. * * @return the binding */ @Bean - public Binding bindDeadLetterQueueToLwm2mExchange() { - return BindingBuilder.bind(deadLetterQueue()).to(exchangeDeadLetter()); - } - - /** - * Create the Binding {@link AmqpConfiguration#receiverQueue()} to - * {@link AmqpConfiguration#senderExchange()}. - * - * @return the binding and create the queue and exchange - */ - @Bean - public Binding bindSenderExchangeToSpQueue() { - return BindingBuilder.bind(receiverQueue()).to(senderExchange()); + public Binding bindDeadLetterQueueToDeadLetterExchange() { + return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()); } /** @@ -245,4 +279,11 @@ public class AmqpConfiguration { return containerFactory; } + private static Map getTTLMaxArgsAuthenticationQueue() { + final Map args = new HashMap<>(); + args.put("x-message-ttl", Duration.ofSeconds(30).toMillis()); + args.put("x-max-length", 1_000); + return args; + } + } diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 27cbc0f69..7d4e08144 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -116,7 +116,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { } /** - * Method to handle all incoming amqp messages. + * Method to handle all incoming DMF amqp messages. * * @param message * incoming message @@ -133,6 +133,12 @@ public class AmqpMessageHandlerService extends BaseAmqpService { return onMessage(message, type, tenant, getRabbitTemplate().getConnectionFactory().getVirtualHost()); } + @RabbitListener(queues = "${hawkbit.dmf.rabbitmq.authenticationReceiverQueue}", containerFactory = "listenerContainerFactory") + public Message onAuthenticationRequest(final Message message, + @Header(MessageHeaderKey.TENANT) final String tenant) { + return onAuthenticationRequest(message); + } + public Message onMessage(final Message message, final String type, final String tenant, final String virtualHost) { checkContentTypeJson(message); final SecurityContext oldContext = SecurityContextHolder.getContext(); @@ -149,8 +155,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { final EventTopic eventTopic = EventTopic.valueOf(topicValue); handleIncomingEvent(message, eventTopic); break; - case AUTHENTIFICATION: - return handleAuthentifiactionMessage(message); + default: logAndThrowMessageError(message, "No handle method was found for the given message type."); } @@ -164,6 +169,20 @@ public class AmqpMessageHandlerService extends BaseAmqpService { return null; } + public Message onAuthenticationRequest(final Message message) { + checkContentTypeJson(message); + final SecurityContext oldContext = SecurityContextHolder.getContext(); + try { + return handleAuthentifiactionMessage(message); + } catch (final IllegalArgumentException ex) { + throw new AmqpRejectAndDontRequeueException("Invalid message!", ex); + } catch (final TenantNotExistException teex) { + throw new AmqpRejectAndDontRequeueException(teex); + } finally { + SecurityContextHolder.setContext(oldContext); + } + } + private Message handleAuthentifiactionMessage(final Message message) { final DownloadResponse authentificationResponse = new DownloadResponse(); final MessageProperties messageProperties = message.getMessageProperties(); diff --git a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java index ace1fefa2..888b204e7 100644 --- a/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java +++ b/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpProperties.java @@ -31,10 +31,16 @@ public class AmqpProperties { private String deadLetterExchange = "dmf.connector.deadletter"; /** - * DMF API receiving queue. + * DMF API receiving queue for EVENT or THING_CREATED message. */ private String receiverQueue = "dmf_receiver"; + /** + * Authentication request called by 3rd party artifact storages for download + * authorizations. + */ + private String authenticationReceiverQueue = "authentication_receiver"; + /** * Missing queue fatal. */ @@ -62,6 +68,14 @@ public class AmqpProperties { */ private int initialConcurrentConsumers = 3; + public String getAuthenticationReceiverQueue() { + return authenticationReceiverQueue; + } + + public void setAuthenticationReceiverQueue(final String authenticationReceiverQueue) { + this.authenticationReceiverQueue = authenticationReceiverQueue; + } + public int getPrefetchCount() { return prefetchCount; } @@ -147,10 +161,6 @@ public class AmqpProperties { return receiverQueue; } - public void setReceiverQueue(final String receiverQueue) { - this.receiverQueue = receiverQueue; - } - public int getRequestedHeartBeat() { return requestedHeartBeat; } @@ -159,4 +169,8 @@ public class AmqpProperties { this.requestedHeartBeat = requestedHeartBeat; } + public void setReceiverQueue(final String receiverQueue) { + this.receiverQueue = receiverQueue; + } + } diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java index 075313a19..467c650be 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/AmqpTestConfiguration.java @@ -31,7 +31,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.core.task.TaskExecutor; import org.springframework.scheduling.concurrent.ConcurrentTaskExecutor; import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler; -import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; +import org.springframework.security.concurrent.DelegatingSecurityContextExecutorService; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -78,18 +78,17 @@ public class AmqpTestConfiguration { * @return ExecutorService with security context availability in thread * execution.. */ - @Bean + @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean public Executor asyncExecutor() { - return new DelegatingSecurityContextExecutor(threadPoolExecutor()); + return new DelegatingSecurityContextExecutorService(threadPoolExecutor()); } /** * @return central ThreadPoolExecutor for general purpose multi threaded * operations. Tries an orderly shutdown when destroyed. */ - @Bean(destroyMethod = "shutdown") - public ThreadPoolExecutor threadPoolExecutor() { + private ThreadPoolExecutor threadPoolExecutor() { final BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10); final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 10, 1000, TimeUnit.MILLISECONDS, blockingQueue, new ThreadFactoryBuilder().setNameFormat("central-executor-pool-%d").build()); diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthenticationTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthenticationTest.java index 881260579..4e08e1852 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthenticationTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpControllerAuthenticationTest.java @@ -158,7 +158,7 @@ public class AmqpControllerAuthenticationTest { @Test @Description("Tests authentication message without principal") public void testAuthenticationMessageBadCredantialsWithoutPricipal() { - final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION); + final MessageProperties messageProperties = createMessageProperties(null); final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, CONTROLLLER_ID, FileResource.sha1("12345")); @@ -166,8 +166,7 @@ public class AmqpControllerAuthenticationTest { messageProperties); // test - final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT, "vHost"); + final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -178,7 +177,7 @@ public class AmqpControllerAuthenticationTest { @Test @Description("Tests authentication message without wrong credential") public void testAuthenticationMessageBadCredantialsWithWrongCredential() { - final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION); + final MessageProperties messageProperties = createMessageProperties(null); final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, CONTROLLLER_ID, FileResource.sha1("12345")); when(tenantConfigurationManagement.getConfigurationValue( @@ -189,8 +188,7 @@ public class AmqpControllerAuthenticationTest { messageProperties); // test - final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT, "vHost"); + final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -201,7 +199,7 @@ public class AmqpControllerAuthenticationTest { @Test @Description("Tests authentication message successfull") public void testSuccessfullMessageAuthentication() { - final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION); + final MessageProperties messageProperties = createMessageProperties(null); final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, CONTROLLLER_ID, FileResource.sha1("12345")); when(tenantConfigurationManagement.getConfigurationValue( @@ -212,8 +210,7 @@ public class AmqpControllerAuthenticationTest { messageProperties); // test - final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT, "vHost"); + final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -232,7 +229,9 @@ public class AmqpControllerAuthenticationTest { private MessageProperties createMessageProperties(final MessageType type, final String replyTo) { final MessageProperties messageProperties = new MessageProperties(); - messageProperties.setHeader(MessageHeaderKey.TYPE, type.name()); + if (type != null) { + messageProperties.setHeader(MessageHeaderKey.TYPE, type.name()); + } messageProperties.setHeader(MessageHeaderKey.TENANT, TENANT); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setReplyTo(replyTo); diff --git a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java index 338feea62..43d7980df 100644 --- a/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java +++ b/hawkbit-dmf-amqp/src/test/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerServiceTest.java @@ -273,14 +273,13 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests that an download request is denied for an artifact which does not exists") public void authenticationRequestDeniedForArtifactWhichDoesNotExists() { - final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION); + final MessageProperties messageProperties = createMessageProperties(null); final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, "123", FileResource.sha1("12345")); final Message message = amqpMessageHandlerService.getMessageConverter().toMessage(securityToken, messageProperties); // test - final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT, "vHost"); + final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -292,7 +291,7 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests that an download request is denied for an artifact which is not assigned to the requested target") public void authenticationRequestDeniedForArtifactWhichIsNotAssignedToTarget() { - final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION); + final MessageProperties messageProperties = createMessageProperties(null); final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, "123", FileResource.sha1("12345")); final Message message = amqpMessageHandlerService.getMessageConverter().toMessage(securityToken, messageProperties); @@ -303,8 +302,7 @@ public class AmqpMessageHandlerServiceTest { .thenThrow(EntityNotFoundException.class); // test - final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT, "vHost"); + final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -316,7 +314,7 @@ public class AmqpMessageHandlerServiceTest { @Test @Description("Tests that an download request is allowed for an artifact which exists and assigned to the requested target") public void authenticationRequestAllowedForArtifactWhichExistsAndAssignedToTarget() throws MalformedURLException { - final MessageProperties messageProperties = createMessageProperties(MessageType.AUTHENTIFICATION); + final MessageProperties messageProperties = createMessageProperties(null); final TenantSecurityToken securityToken = new TenantSecurityToken(TENANT, "123", FileResource.sha1("12345")); final Message message = amqpMessageHandlerService.getMessageConverter().toMessage(securityToken, messageProperties); @@ -334,8 +332,7 @@ public class AmqpMessageHandlerServiceTest { when(hostnameResolverMock.resolveHostname()).thenReturn(new URL("http://localhost")); // test - final Message onMessage = amqpMessageHandlerService.onMessage(message, MessageType.AUTHENTIFICATION.name(), - TENANT, "vHost"); + final Message onMessage = amqpMessageHandlerService.onAuthenticationRequest(message); // verify final DownloadResponse downloadResponse = (DownloadResponse) messageConverter.fromMessage(onMessage); @@ -411,7 +408,9 @@ public class AmqpMessageHandlerServiceTest { private MessageProperties createMessageProperties(final MessageType type, final String replyTo) { final MessageProperties messageProperties = new MessageProperties(); - messageProperties.setHeader(MessageHeaderKey.TYPE, type.name()); + if (type != null) { + messageProperties.setHeader(MessageHeaderKey.TYPE, type.name()); + } messageProperties.setHeader(MessageHeaderKey.TENANT, TENANT); messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); messageProperties.setReplyTo(replyTo); diff --git a/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/AmqpSettings.java b/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/AmqpSettings.java index 40ba04419..364f36aa5 100644 --- a/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/AmqpSettings.java +++ b/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/AmqpSettings.java @@ -18,6 +18,8 @@ public final class AmqpSettings { public static final String DMF_EXCHANGE = "dmf.exchange"; + public static final String AUTHENTICATION_EXCHANGE = "authentication.exchange"; + private AmqpSettings() { } diff --git a/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/MessageType.java b/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/MessageType.java index 8cca32b06..e66a0c8c6 100644 --- a/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/MessageType.java +++ b/hawkbit-dmf-api/src/main/java/org/eclipse/hawkbit/dmf/amqp/api/MessageType.java @@ -26,9 +26,4 @@ public enum MessageType { */ THING_CREATED, - /** - * The authentication type. - */ - AUTHENTIFICATION, - } diff --git a/hawkbit-repository/hawkbit-repository-test/src/main/java/org/eclipse/hawkbit/repository/test/util/TestConfiguration.java b/hawkbit-repository/hawkbit-repository-test/src/main/java/org/eclipse/hawkbit/repository/test/util/TestConfiguration.java index e6a8e7686..3a7a2f73d 100644 --- a/hawkbit-repository/hawkbit-repository-test/src/main/java/org/eclipse/hawkbit/repository/test/util/TestConfiguration.java +++ b/hawkbit-repository/hawkbit-repository-test/src/main/java/org/eclipse/hawkbit/repository/test/util/TestConfiguration.java @@ -31,7 +31,7 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Profile; import org.springframework.data.domain.AuditorAware; import org.springframework.scheduling.annotation.AsyncConfigurer; -import org.springframework.security.concurrent.DelegatingSecurityContextExecutor; +import org.springframework.security.concurrent.DelegatingSecurityContextExecutorService; import org.springframework.security.config.annotation.method.configuration.EnableGlobalMethodSecurity; import com.google.common.eventbus.AsyncEventBus; @@ -99,7 +99,7 @@ public class TestConfiguration implements AsyncConfigurer { @Bean public Executor asyncExecutor() { - return new DelegatingSecurityContextExecutor(Executors.newSingleThreadExecutor()); + return new DelegatingSecurityContextExecutorService(Executors.newSingleThreadExecutor()); } @Bean