diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java index 609fa9698..abe32bdf2 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageDispatcherService.java @@ -144,14 +144,15 @@ public class AmqpMessageDispatcherService extends BaseAmqpService { amqpSenderService.sendMessage(message, targetAdress); } - void sendPingReponseToDmfReceiver(final Message ping, final String tenant) { + void sendPingReponseToDmfReceiver(final Message ping, final String tenant, final String virtualHost) { final Message message = MessageBuilder.withBody(String.valueOf(System.currentTimeMillis()).getBytes()) .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) .setCorrelationId(ping.getMessageProperties().getCorrelationId()) .setHeader(MessageHeaderKey.TYPE, MessageType.PING_RESPONSE).setHeader(MessageHeaderKey.TENANT, tenant) .build(); - amqpSenderService.sendMessage(message, ping.getMessageProperties().getReplyTo()); + amqpSenderService.sendMessage(message, + IpUtil.createAmqpUri(virtualHost, ping.getMessageProperties().getReplyTo())); } /** diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java index 84f76214f..1ad903c4e 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageHandlerService.java @@ -132,7 +132,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService { break; case PING: if (isCorrelationIdNotEmpty(message)) { - amqpMessageDispatcherService.sendPingReponseToDmfReceiver(message, tenant); + amqpMessageDispatcherService.sendPingReponseToDmfReceiver(message, tenant, virtualHost); } break; default: diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageSenderService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageSenderService.java index 2a1d01794..74df554b8 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageSenderService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/AmqpMessageSenderService.java @@ -12,7 +12,6 @@ import java.net.URI; import javax.validation.constraints.NotNull; -import org.eclipse.hawkbit.util.IpUtil; import org.springframework.amqp.core.Message; /** @@ -30,24 +29,6 @@ public interface AmqpMessageSenderService { * @param replyTo * the reply to uri */ - default void sendMessage(@NotNull final Message message, @NotNull final URI replyTo) { - if (!IpUtil.isAmqpUri(replyTo)) { - return; - } - - sendMessage(message, replyTo.getPath().substring(1)); - } - - /** - * Send the given message to the given host and exchange. - * - * @param message - * the amqp message - * @param exchange - * to send to - * @param virtualHost - * to send to - */ - void sendMessage(@NotNull final Message message, @NotNull final String exchange); + void sendMessage(@NotNull final Message message, @NotNull final URI replyTo); } diff --git a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpMessageSenderService.java b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpMessageSenderService.java index 7824b6706..3b60bc8d5 100644 --- a/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpMessageSenderService.java +++ b/hawkbit-dmf/hawkbit-dmf-amqp/src/main/java/org/eclipse/hawkbit/amqp/DefaultAmqpMessageSenderService.java @@ -8,9 +8,11 @@ */ package org.eclipse.hawkbit.amqp; +import java.net.URI; import java.nio.charset.StandardCharsets; import java.util.UUID; +import org.eclipse.hawkbit.util.IpUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; @@ -36,8 +38,12 @@ public class DefaultAmqpMessageSenderService extends BaseAmqpService implements } @Override - public void sendMessage(final Message message, final String exchange) { + public void sendMessage(final Message message, final URI sendTo) { + if (!IpUtil.isAmqpUri(sendTo)) { + return; + } + final String exchange = sendTo.getPath().substring(1); final String correlationId = UUID.randomUUID().toString(); if (isCorrelationIdEmpty(message)) {