SDK DMF Support - Configuration for AMQP moved in own class (#1708)
Signed-off-by: Marinov Avgustin <Avgustin.Marinov@bosch.com>
This commit is contained in:
@@ -26,7 +26,7 @@ public class DmfProperties {
|
||||
/**
|
||||
* The prefix for this configuration.
|
||||
*/
|
||||
public static final String CONFIGURATION_PREFIX = "hawkbit.sdk.dmf.amqp";
|
||||
public static final String CONFIGURATION_PREFIX = "hawkbit.sdk.dmf";
|
||||
|
||||
/**
|
||||
* The property string of ~.amqp.enabled
|
||||
@@ -42,21 +42,4 @@ public class DmfProperties {
|
||||
* Set to true for the simulator run DMF health check.
|
||||
*/
|
||||
private boolean healthCheckEnabled;
|
||||
|
||||
/**
|
||||
* Queue for receiving DMF messages from update server.
|
||||
*/
|
||||
private String receiverConnectorQueueFromSp = "sdk_receiver";
|
||||
|
||||
/**
|
||||
* Exchange for sending DMF messages to update server.
|
||||
*/
|
||||
private String senderForSpExchange = "sdk.replyTo";
|
||||
|
||||
/**
|
||||
* Message time to live (ttl) for the deadletter queue. Default ttl is 1 hour.
|
||||
*/
|
||||
private int deadLetterTtl = 60_000;
|
||||
|
||||
private String customVhost;
|
||||
}
|
||||
@@ -12,6 +12,7 @@ package org.eclipse.hawkbit.sdk.dmf;
|
||||
import java.time.Duration;
|
||||
import java.util.Map;
|
||||
|
||||
import org.eclipse.hawkbit.sdk.dmf.amqp.AmqpProperties;
|
||||
import org.eclipse.hawkbit.sdk.dmf.amqp.DmfReceiverService;
|
||||
import org.eclipse.hawkbit.sdk.dmf.amqp.DmfSenderService;
|
||||
import org.springframework.amqp.core.Binding;
|
||||
@@ -32,7 +33,7 @@ import org.springframework.context.annotation.Configuration;
|
||||
* The spring AMQP configuration to use a AMQP for communication with SP update server.
|
||||
*/
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(DmfProperties.class)
|
||||
@EnableConfigurationProperties({DmfProperties.class, AmqpProperties.class})
|
||||
@ConditionalOnProperty(prefix = DmfProperties.CONFIGURATION_PREFIX, name = "enabled", matchIfMissing = true)
|
||||
public class DmfSDKConfiguration {
|
||||
|
||||
@@ -44,8 +45,8 @@ public class DmfSDKConfiguration {
|
||||
@Bean
|
||||
DmfSenderService dmfSenderService(
|
||||
final RabbitTemplate rabbitTemplate,
|
||||
final DmfProperties dmfProperties) {
|
||||
return new DmfSenderService(rabbitTemplate, dmfProperties);
|
||||
final AmqpProperties amqpProperties) {
|
||||
return new DmfSenderService(rabbitTemplate, amqpProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@@ -53,8 +54,8 @@ public class DmfSDKConfiguration {
|
||||
final RabbitTemplate rabbitTemplate,
|
||||
final DmfSenderService dmfSenderService,
|
||||
final DeviceManagement deviceManagement,
|
||||
final DmfProperties dmfProperties) {
|
||||
return new DmfReceiverService(rabbitTemplate, dmfSenderService, deviceManagement, dmfProperties);
|
||||
final AmqpProperties amqpProperties) {
|
||||
return new DmfReceiverService(rabbitTemplate, dmfSenderService, deviceManagement, amqpProperties);
|
||||
}
|
||||
|
||||
@Bean
|
||||
@@ -76,8 +77,8 @@ public class DmfSDKConfiguration {
|
||||
* Creates the receiver queue from update server for receiving message from update server.
|
||||
*/
|
||||
@Bean
|
||||
Queue receiverConnectorQueueFromHawkBit(final DmfProperties dmfProperties) {
|
||||
return QueueBuilder.nonDurable(dmfProperties.getReceiverConnectorQueueFromSp()).autoDelete()
|
||||
Queue receiverConnectorQueueFromHawkBit(final AmqpProperties amqpProperties) {
|
||||
return QueueBuilder.nonDurable(amqpProperties.getReceiverConnectorQueueFromSp()).autoDelete()
|
||||
.withArguments(Map.of(
|
||||
"x-message-ttl", Duration.ofDays(1).toMillis(),
|
||||
"x-max-length", 100_000))
|
||||
@@ -88,8 +89,8 @@ public class DmfSDKConfiguration {
|
||||
* Creates the receiver exchange for sending messages to update server.
|
||||
*/
|
||||
@Bean
|
||||
FanoutExchange exchangeQueueToConnector(final DmfProperties dmfProperties) {
|
||||
return new FanoutExchange(dmfProperties.getSenderForSpExchange(), false, true);
|
||||
FanoutExchange exchangeQueueToConnector(final AmqpProperties amqpProperties) {
|
||||
return new FanoutExchange(amqpProperties.getSenderForSpExchange(), false, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -98,9 +99,9 @@ public class DmfSDKConfiguration {
|
||||
* @return the binding and create the queue and exchange
|
||||
*/
|
||||
@Bean
|
||||
Binding bindReceiverQueueToSpExchange(final DmfProperties dmfProperties) {
|
||||
return BindingBuilder.bind(receiverConnectorQueueFromHawkBit(dmfProperties))
|
||||
.to(exchangeQueueToConnector(dmfProperties));
|
||||
Binding bindReceiverQueueToSpExchange(final AmqpProperties amqpProperties) {
|
||||
return BindingBuilder.bind(receiverConnectorQueueFromHawkBit(amqpProperties))
|
||||
.to(exchangeQueueToConnector(amqpProperties));
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@@ -108,8 +109,8 @@ public class DmfSDKConfiguration {
|
||||
protected static class CachingConnectionFactoryInitializer {
|
||||
|
||||
CachingConnectionFactoryInitializer(
|
||||
final CachingConnectionFactory connectionFactory, final DmfProperties dmfProperties) {
|
||||
connectionFactory.setVirtualHost(dmfProperties.getCustomVhost());
|
||||
final CachingConnectionFactory connectionFactory, final AmqpProperties amqpProperties) {
|
||||
connectionFactory.setVirtualHost(amqpProperties.getCustomVhost());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,47 @@
|
||||
/**
|
||||
* Copyright (c) 2024 Contributors to the Eclipse Foundation
|
||||
*
|
||||
* This program and the accompanying materials are made
|
||||
* available under the terms of the Eclipse Public License 2.0
|
||||
* which is available at https://www.eclipse.org/legal/epl-2.0/
|
||||
*
|
||||
* SPDX-License-Identifier: EPL-2.0
|
||||
*/
|
||||
package org.eclipse.hawkbit.sdk.dmf.amqp;
|
||||
|
||||
import lombok.Data;
|
||||
import lombok.ToString;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* Bean which holds the necessary properties for configuring the AMQP connection.
|
||||
*/
|
||||
@Data
|
||||
@ToString
|
||||
@Component
|
||||
@ConfigurationProperties(AmqpProperties.CONFIGURATION_PREFIX)
|
||||
public class AmqpProperties {
|
||||
|
||||
/**
|
||||
* The prefix for this configuration.
|
||||
*/
|
||||
public static final String CONFIGURATION_PREFIX = "hawkbit.sdk.dmf.amqp";
|
||||
|
||||
/**
|
||||
* Queue for receiving DMF messages from update server.
|
||||
*/
|
||||
private String receiverConnectorQueueFromSp = "sdk_receiver";
|
||||
|
||||
/**
|
||||
* Exchange for sending DMF messages to update server.
|
||||
*/
|
||||
private String senderForSpExchange = "sdk.replyTo";
|
||||
|
||||
/**
|
||||
* Message time to live (ttl) for the deadletter queue. Default ttl is 1 hour.
|
||||
*/
|
||||
private int deadLetterTtl = 60_000;
|
||||
|
||||
private String customVhost;
|
||||
}
|
||||
@@ -50,8 +50,8 @@ public class DmfReceiverService extends MessageService {
|
||||
private final Set<Long> openActions = Collections.synchronizedSet(new HashSet<>());
|
||||
|
||||
public DmfReceiverService(final RabbitTemplate rabbitTemplate, final DmfSenderService dmfSenderService,
|
||||
final DeviceManagement deviceManagement, final DmfProperties dmfProperties) {
|
||||
super(rabbitTemplate, dmfProperties);
|
||||
final DeviceManagement deviceManagement, final AmqpProperties amqpProperties) {
|
||||
super(rabbitTemplate, amqpProperties);
|
||||
this.dmfSenderService = dmfSenderService;
|
||||
this.deviceManagement = deviceManagement;
|
||||
}
|
||||
|
||||
@@ -25,7 +25,6 @@ import org.eclipse.hawkbit.dmf.json.model.DmfActionStatus;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfActionUpdateStatus;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfAttributeUpdate;
|
||||
import org.eclipse.hawkbit.dmf.json.model.DmfUpdateMode;
|
||||
import org.eclipse.hawkbit.sdk.dmf.DmfProperties;
|
||||
import org.eclipse.hawkbit.sdk.dmf.UpdateInfo;
|
||||
import org.eclipse.hawkbit.sdk.dmf.UpdateStatus;
|
||||
import org.springframework.amqp.core.Message;
|
||||
@@ -46,8 +45,8 @@ public class DmfSenderService extends MessageService {
|
||||
private final String spExchange;
|
||||
private final ConcurrentHashMap<String, BiConsumer<String, Message>> pingListeners = new ConcurrentHashMap<>();
|
||||
|
||||
public DmfSenderService(final RabbitTemplate rabbitTemplate, final DmfProperties dmfProperties) {
|
||||
super(rabbitTemplate, dmfProperties);
|
||||
public DmfSenderService(final RabbitTemplate rabbitTemplate, final AmqpProperties amqpProperties) {
|
||||
super(rabbitTemplate, amqpProperties);
|
||||
spExchange = AmqpSettings.DMF_EXCHANGE;
|
||||
}
|
||||
|
||||
@@ -58,7 +57,7 @@ public class DmfSenderService extends MessageService {
|
||||
messagePropertiesForSP.setHeader(MessageHeaderKey.THING_ID, controllerId);
|
||||
messagePropertiesForSP.setHeader(MessageHeaderKey.SENDER, "hawkBit-sdk");
|
||||
messagePropertiesForSP.setContentType(MessageProperties.CONTENT_TYPE_JSON);
|
||||
messagePropertiesForSP.setReplyTo(dmfProperties.getSenderForSpExchange());
|
||||
messagePropertiesForSP.setReplyTo(amqpProperties.getSenderForSpExchange());
|
||||
|
||||
sendMessage(spExchange, new Message(EMPTY_BODY, messagePropertiesForSP));
|
||||
}
|
||||
@@ -129,7 +128,7 @@ public class DmfSenderService extends MessageService {
|
||||
messagePropertiesForSP.setHeader(MessageHeaderKey.TENANT, tenant);
|
||||
messagePropertiesForSP.setHeader(MessageHeaderKey.THING_ID, controllerId);
|
||||
messagePropertiesForSP.setContentType(MessageProperties.CONTENT_TYPE_JSON);
|
||||
messagePropertiesForSP.setReplyTo(dmfProperties.getSenderForSpExchange());
|
||||
messagePropertiesForSP.setReplyTo(amqpProperties.getSenderForSpExchange());
|
||||
|
||||
final DmfAttributeUpdate attributeUpdate = new DmfAttributeUpdate();
|
||||
attributeUpdate.setMode(mode);
|
||||
@@ -143,7 +142,7 @@ public class DmfSenderService extends MessageService {
|
||||
messageProperties.getHeaders().put(MessageHeaderKey.TENANT, tenant);
|
||||
messageProperties.getHeaders().put(MessageHeaderKey.TYPE, MessageType.PING.toString());
|
||||
messageProperties.setCorrelationId(correlationId);
|
||||
messageProperties.setReplyTo(dmfProperties.getSenderForSpExchange());
|
||||
messageProperties.setReplyTo(amqpProperties.getSenderForSpExchange());
|
||||
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
|
||||
|
||||
if (listener != null) {
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
package org.eclipse.hawkbit.sdk.dmf.amqp;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.eclipse.hawkbit.sdk.dmf.DmfProperties;
|
||||
import org.springframework.amqp.core.Message;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
|
||||
@@ -22,11 +21,11 @@ import org.springframework.amqp.support.converter.AbstractJavaTypeMapper;
|
||||
class MessageService {
|
||||
|
||||
protected final RabbitTemplate rabbitTemplate;
|
||||
protected final DmfProperties dmfProperties;
|
||||
protected final AmqpProperties amqpProperties;
|
||||
|
||||
MessageService(final RabbitTemplate rabbitTemplate, final DmfProperties dmfProperties) {
|
||||
MessageService(final RabbitTemplate rabbitTemplate, final AmqpProperties amqpProperties) {
|
||||
this.rabbitTemplate = rabbitTemplate;
|
||||
this.dmfProperties = dmfProperties;
|
||||
this.amqpProperties = amqpProperties;
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -17,7 +17,7 @@ spring.rabbitmq.port=5672
|
||||
spring.rabbitmq.dynamic=true
|
||||
|
||||
## Configuration for sdk dmf
|
||||
hawkbit.sdk.dmf.amqp.enabled=true
|
||||
hawkbit.sdk.dmf.enabled=true
|
||||
hawkbit.sdk.dmf.amqp.receiverConnectorQueueFromSp=sdk_receiver
|
||||
hawkbit.sdk.dmf.amqp.senderForSpExchange=sdk.replyTo
|
||||
|
||||
|
||||
Reference in New Issue
Block a user