Simplify AMQP sender interface. (#578)
Signed-off-by: kaizimmerm <kai.zimmermann@bosch-si.com>
This commit is contained in:
@@ -144,14 +144,15 @@ public class AmqpMessageDispatcherService extends BaseAmqpService {
|
|||||||
amqpSenderService.sendMessage(message, targetAdress);
|
amqpSenderService.sendMessage(message, targetAdress);
|
||||||
}
|
}
|
||||||
|
|
||||||
void sendPingReponseToDmfReceiver(final Message ping, final String tenant) {
|
void sendPingReponseToDmfReceiver(final Message ping, final String tenant, final String virtualHost) {
|
||||||
final Message message = MessageBuilder.withBody(String.valueOf(System.currentTimeMillis()).getBytes())
|
final Message message = MessageBuilder.withBody(String.valueOf(System.currentTimeMillis()).getBytes())
|
||||||
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
|
.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
|
||||||
.setCorrelationId(ping.getMessageProperties().getCorrelationId())
|
.setCorrelationId(ping.getMessageProperties().getCorrelationId())
|
||||||
.setHeader(MessageHeaderKey.TYPE, MessageType.PING_RESPONSE).setHeader(MessageHeaderKey.TENANT, tenant)
|
.setHeader(MessageHeaderKey.TYPE, MessageType.PING_RESPONSE).setHeader(MessageHeaderKey.TENANT, tenant)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
amqpSenderService.sendMessage(message, ping.getMessageProperties().getReplyTo());
|
amqpSenderService.sendMessage(message,
|
||||||
|
IpUtil.createAmqpUri(virtualHost, ping.getMessageProperties().getReplyTo()));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -132,7 +132,7 @@ public class AmqpMessageHandlerService extends BaseAmqpService {
|
|||||||
break;
|
break;
|
||||||
case PING:
|
case PING:
|
||||||
if (isCorrelationIdNotEmpty(message)) {
|
if (isCorrelationIdNotEmpty(message)) {
|
||||||
amqpMessageDispatcherService.sendPingReponseToDmfReceiver(message, tenant);
|
amqpMessageDispatcherService.sendPingReponseToDmfReceiver(message, tenant, virtualHost);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
|||||||
@@ -12,7 +12,6 @@ import java.net.URI;
|
|||||||
|
|
||||||
import javax.validation.constraints.NotNull;
|
import javax.validation.constraints.NotNull;
|
||||||
|
|
||||||
import org.eclipse.hawkbit.util.IpUtil;
|
|
||||||
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.core.Message;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -30,24 +29,6 @@ public interface AmqpMessageSenderService {
|
|||||||
* @param replyTo
|
* @param replyTo
|
||||||
* the reply to uri
|
* the reply to uri
|
||||||
*/
|
*/
|
||||||
default void sendMessage(@NotNull final Message message, @NotNull final URI replyTo) {
|
void sendMessage(@NotNull final Message message, @NotNull final URI replyTo);
|
||||||
if (!IpUtil.isAmqpUri(replyTo)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
sendMessage(message, replyTo.getPath().substring(1));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Send the given message to the given host and exchange.
|
|
||||||
*
|
|
||||||
* @param message
|
|
||||||
* the amqp message
|
|
||||||
* @param exchange
|
|
||||||
* to send to
|
|
||||||
* @param virtualHost
|
|
||||||
* to send to
|
|
||||||
*/
|
|
||||||
void sendMessage(@NotNull final Message message, @NotNull final String exchange);
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -8,9 +8,11 @@
|
|||||||
*/
|
*/
|
||||||
package org.eclipse.hawkbit.amqp;
|
package org.eclipse.hawkbit.amqp;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.eclipse.hawkbit.util.IpUtil;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.springframework.amqp.core.Message;
|
import org.springframework.amqp.core.Message;
|
||||||
@@ -36,8 +38,12 @@ public class DefaultAmqpMessageSenderService extends BaseAmqpService implements
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendMessage(final Message message, final String exchange) {
|
public void sendMessage(final Message message, final URI sendTo) {
|
||||||
|
if (!IpUtil.isAmqpUri(sendTo)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final String exchange = sendTo.getPath().substring(1);
|
||||||
final String correlationId = UUID.randomUUID().toString();
|
final String correlationId = UUID.randomUUID().toString();
|
||||||
|
|
||||||
if (isCorrelationIdEmpty(message)) {
|
if (isCorrelationIdEmpty(message)) {
|
||||||
|
|||||||
Reference in New Issue
Block a user