Skip to content

Commit

Permalink
Introduce options for java live message API
Browse files Browse the repository at this point in the history
This allows setting the condition header to live messages.

Co-authored-by: Stanchev Aleksandar <aleksandar.stanchev@bosch.io>
Signed-off-by: David Schwilk <david.schwilk@bosch.io>
  • Loading branch information
2 people authored and thjaeckle committed Jan 5, 2023
1 parent b4bb350 commit 19c2ec5
Show file tree
Hide file tree
Showing 10 changed files with 148 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.eclipse.ditto.messages.model.Message;
import org.eclipse.ditto.messages.model.MessageBuilder;
import org.eclipse.ditto.messages.model.MessageHeaders;
import org.eclipse.ditto.messages.model.MessageHeadersBuilder;
import org.eclipse.ditto.messages.model.MessagesModelFactory;
import org.eclipse.ditto.policies.model.Policy;
import org.eclipse.ditto.policies.model.PolicyId;
Expand Down Expand Up @@ -544,11 +545,16 @@ public DeleteFeatureProperties deleteFeatureProperties(final ThingId thingId,
* @param <T> the type of the payload.
* @return a sendMessage message.
*/
public <T> Message<T> sendMessage(final MessageSerializerRegistry registry, final Message<T> message) {
public <T> Message<T> sendMessage(final MessageSerializerRegistry registry, final Message<T> message,
final Option<?>... options) {

final MessageHeaders messageHeaders = message.getHeaders().toBuilder()
.correlationId(message.getHeaders().getCorrelationId().orElseGet(() -> UUID.randomUUID().toString()))
.build();
final DittoHeaders dittoHeaders = buildDittoHeaders(EnumSet.of(CONDITION), options);
final MessageHeadersBuilder messageHeadersBuilder = message.getHeaders().toBuilder()
.correlationId(message.getHeaders().getCorrelationId().orElseGet(() -> UUID.randomUUID().toString()));
if (dittoHeaders.getCondition().isPresent()) {
messageHeadersBuilder.condition(dittoHeaders.getCondition().get());
}
MessageHeaders messageHeaders = messageHeadersBuilder.build();

final MessageBuilder<T> messageBuilder = message.getPayload()
.map(payload -> {
Expand Down
21 changes: 21 additions & 0 deletions java/src/main/java/org/eclipse/ditto/client/live/Live.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,27 @@ public interface Live extends CommonManagement<LiveThingHandle, LiveFeatureHandl
*/
<T> PendingMessage<T> message();

/**
* Provides the functionality to create and send a new {@link org.eclipse.ditto.messages.model.Message}
* <em>FROM</em> or <em>TO</em> a "Live" {@link Thing} or a "Live" Thing's {@link
* org.eclipse.ditto.things.model.Feature Feature}. <p> Example: </p>
* <pre>
* client.live().message()
* .from("org.eclipse.ditto:fireDetectionDevice")
* .featureId("smokeDetector")
* .subject("fireAlert")
* .payload(JsonFactory.newObject("{\"action\" : \"call fire department\"}"))
* .send();
* </pre>
*
* @param <T> the type of the Message's payload.
* @param options options sent to the outbound message.
* @param options options sent to the outbound message.
* @return a new message builder that offers the functionality to create and send the message.
* @since 3.1.0
*/
<T> PendingMessage<T> message(Option<?>... options);

/**
* Start consuming changes, messages and commands on this {@code live()} channel.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.ditto.client.live.messages.MessageRegistration;
import org.eclipse.ditto.client.live.messages.PendingMessageWithFeatureId;
import org.eclipse.ditto.client.management.FeatureHandle;
import org.eclipse.ditto.client.options.Option;

/**
* A {@code LiveFeatureHandle} provides management and registration functionality for specific {@code Live Thing}
Expand Down Expand Up @@ -48,4 +49,26 @@ public interface LiveFeatureHandle extends FeatureHandle, MessageRegistration, F
*/
<T> PendingMessageWithFeatureId<T> message();

/**
* Provides the functionality to create and send a new {@link org.eclipse.ditto.messages.model.Message} <em>TO</em>
* or <em>FROM</em> the {@code Feature} handled by this {@code LiveFeatureHandle}. <p> Example: </p>
* <pre>
* client.live()
* .forId("org.eclipse.ditto:fireDetectionDevice")
* .forFeature("smokeDetector")
* .message()
* .from()
* .subject("fireAlert")
* .payload("{\"action\" : \"call fire department\"}")
* .contentType("application/json")
* .send();
* </pre>
*
* @param <T> the type of the Message's payload.
* @param options options sent to the outbound message.
* @return a new message builder that offers the functionality to create and send the message.
* @since 3.1.0
*/
<T> PendingMessageWithFeatureId<T> message(Option<?>... options);

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.ditto.client.live.messages.MessageRegistration;
import org.eclipse.ditto.client.live.messages.PendingMessageWithThingId;
import org.eclipse.ditto.client.management.ThingHandle;
import org.eclipse.ditto.client.options.Option;

/**
* A {@code LiveThingHandle} provides management and registration functionality for specific <em>Live Things</em>.
Expand Down Expand Up @@ -50,4 +51,23 @@ public interface LiveThingHandle
*/
<T> PendingMessageWithThingId<T> message();

/**
* Provides the functionality to create and send a new {@link org.eclipse.ditto.messages.model.Message}
* <em>FROM</em> or <em>TO</em> the {@code Thing} handled by this {@code LiveThingHandle}. <p> Example: </p>
* <pre>
* client.live().forId("org.eclipse.ditto:fireDetectionDevice").message()
* .from()
* .subject("fireAlert")
* .payload("{\"action\" : \"call fire department\"}")
* .contentType("application/json")
* .send();
* </pre>
*
* @param <T> the type of the Message's payload.
* @param options options sent to the outbound message.
* @return a new message builder that offers the functionality to create and send the message.
* @since 3.1.0
*/
<T> PendingMessageWithThingId<T> message(Option<?>... options);

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.eclipse.ditto.client.live.messages.RepliableMessage;
import org.eclipse.ditto.client.management.internal.FeatureHandleImpl;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.client.options.Option;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.ThingId;
import org.slf4j.Logger;
Expand Down Expand Up @@ -89,6 +90,12 @@ public <T> PendingMessageWithFeatureId<T> message() {
messagingProvider).withThingAndFeatureIds(getEntityId(), getFeatureId());
}

@Override
public <T> PendingMessageWithFeatureId<T> message(final Option<?>... options) {
return PendingMessageImpl.<T>of(LOGGER, outgoingMessageFactory, messageSerializerRegistry, PROTOCOL_ADAPTER,
messagingProvider).withThingAndFeatureIds(getEntityId(), getFeatureId());
}

@Override
public <T, U> void registerForMessage(final String registrationId,
final String subject,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.eclipse.ditto.client.live.messages.RepliableMessage;
import org.eclipse.ditto.client.management.ClientReconnectingException;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.client.options.Option;
import org.eclipse.ditto.json.JsonKey;
import org.eclipse.ditto.messages.model.KnownMessageSubjects;
import org.eclipse.ditto.messages.model.Message;
Expand Down Expand Up @@ -224,6 +225,12 @@ public <T> PendingMessage<T> message() {
messagingProvider);
}

@Override
public <T> PendingMessage<T> message(final Option<?>... options) {
return PendingMessageImpl.of(LOGGER, outgoingMessageFactory, messageSerializerRegistry, PROTOCOL_ADAPTER,
messagingProvider);
}

@Override
public <T, U> void registerForMessage(final String registrationId,
final String subject,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.eclipse.ditto.client.live.messages.RepliableMessage;
import org.eclipse.ditto.client.management.internal.ThingHandleImpl;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.client.options.Option;
import org.eclipse.ditto.messages.model.KnownMessageSubjects;
import org.eclipse.ditto.protocol.TopicPath;
import org.eclipse.ditto.things.model.ThingId;
Expand Down Expand Up @@ -98,6 +99,12 @@ public <T> PendingMessageWithThingId<T> message() {
messagingProvider).withThingId(getEntityId());
}

@Override
public <T> PendingMessageWithThingId<T> message(final Option<?>... options) {
return PendingMessageImpl.<T>of(LOGGER, outgoingMessageFactory, messageSerializerRegistry, PROTOCOL_ADAPTER,
messagingProvider).withThingId(getEntityId());
}

@Override
public <T, U> void registerForMessage(final String registrationId,
final String subject,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.eclipse.ditto.client.live.messages.PendingMessageWithThingId;
import org.eclipse.ditto.client.live.messages.internal.ImmutableMessageSender;
import org.eclipse.ditto.client.messaging.MessagingProvider;
import org.eclipse.ditto.client.options.Option;
import org.eclipse.ditto.client.options.Options;
import org.eclipse.ditto.messages.model.Message;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.protocol.adapter.ProtocolAdapter;
Expand All @@ -41,17 +43,21 @@ final class PendingMessageImpl<T> implements PendingMessage<T> {
private final MessageSerializerRegistry messageSerializerRegistry;
private final ProtocolAdapter protocolAdapter;
private final MessagingProvider messagingProvider;
private final Option<?>[] options;

private PendingMessageImpl(final Logger logger,
final OutgoingMessageFactory outgoingMessageFactory,
final MessageSerializerRegistry messageSerializerRegistry,
final ProtocolAdapter protocolAdapter,
final MessagingProvider messagingProvider) {
final MessagingProvider messagingProvider,
final Option<?>... options) {

this.logger = logger;
this.outgoingMessageFactory = outgoingMessageFactory;
this.messageSerializerRegistry = messageSerializerRegistry;
this.protocolAdapter = protocolAdapter;
this.messagingProvider = messagingProvider;
this.options = options;
}

static <T> PendingMessageImpl<T> of(final Logger logger,
Expand Down Expand Up @@ -120,7 +126,7 @@ public MessageSender.SetFeatureIdOrSubject<T> to(final ThingId thingId) {

private void sendMessage(final Message<T> message, @Nullable final ResponseConsumer<?> responseConsumer) {
final Message<?> toBeSentMessage =
outgoingMessageFactory.sendMessage(messageSerializerRegistry, message);
outgoingMessageFactory.sendMessage(messageSerializerRegistry, message, options);
logger.trace("Message about to send: {}", toBeSentMessage);
if (responseConsumer != null) {
toBeSentMessage.getCorrelationId().ifPresent(correlationId ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.eclipse.ditto.client.options.Option;
import org.eclipse.ditto.client.options.OptionName;


/**
* This abstract implementation of {@link OptionVisitor} implements the parts which are common for all option visitors
* like comparing the name of the option with the expected name, getting the value from the option and handling
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,18 @@
import org.assertj.core.api.JUnitSoftAssertions;
import org.eclipse.ditto.base.model.headers.DittoHeaderDefinition;
import org.eclipse.ditto.base.model.json.JsonSchemaVersion;
import org.eclipse.ditto.client.live.messages.MessageSerializerRegistry;
import org.eclipse.ditto.client.live.messages.MessageSerializers;
import org.eclipse.ditto.client.live.messages.internal.DefaultMessageSerializerRegistry;
import org.eclipse.ditto.client.options.OptionName;
import org.eclipse.ditto.client.options.Options;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.messages.model.Message;
import org.eclipse.ditto.messages.model.MessageDirection;
import org.eclipse.ditto.messages.model.MessageHeaders;
import org.eclipse.ditto.messages.model.MessagesModelFactory;
import org.eclipse.ditto.messages.model.signals.commands.SendThingMessage;
import org.eclipse.ditto.things.model.ThingId;
import org.eclipse.ditto.things.model.signals.commands.query.RetrieveFeature;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -81,4 +91,39 @@ public void deleteThingWithLiveChannelConditionExpressionThrowsException() {
.withNoCause();
}

@Test
public void LiveMessageWithOnlyAllowedOptionsReturnsExpected() {
final Message<?> liveMessage = underTest.sendMessage(new DefaultMessageSerializerRegistry(), getMessage(),
Options.condition(CONDITION_EXPRESSION));

softly.assertThat((CharSequence) liveMessage.getEntityId())
.as("entity ID")
.isEqualTo(THING_ID);
softly.assertThat(liveMessage.getHeaders())
.as("Ditto headers")
.satisfies(dittoHeaders -> {
softly.assertThat(dittoHeaders)
.as("condition expression")
.containsEntry(DittoHeaderDefinition.CONDITION.getKey(), CONDITION_EXPRESSION);
});
}

@Test
public void liverMessageWithLiveChannelConditionExpressionThrowsException() {
Assertions.assertThatIllegalArgumentException()
.isThrownBy(() -> underTest.sendMessage(new DefaultMessageSerializerRegistry(), getMessage(),
Options.liveChannelCondition(LIVE_CHANNEL_CONDITION_EXPRESSION)))
.withMessage("Option '%s' is not allowed. This operation only allows [%s, %s, %s].",
OptionName.Global.LIVE_CHANNEL_CONDITION,
OptionName.Global.CONDITION,
OptionName.Global.DITTO_HEADERS,
OptionName.Modify.RESPONSE_REQUIRED)
.withNoCause();
}

private static Message<?> getMessage() {
return MessagesModelFactory.newMessageBuilder(MessageHeaders.newBuilder(MessageDirection.TO,
THING_ID, "subject").build()).build();
}

}

0 comments on commit 19c2ec5

Please sign in to comment.