Skip to content

Commit

Permalink
Pub/Sub implementation
Browse files Browse the repository at this point in the history
There are more things to do, but covers most #7
  • Loading branch information
darkl committed Apr 18, 2014
1 parent 497bb62 commit 9c5c00d
Show file tree
Hide file tree
Showing 15 changed files with 706 additions and 17 deletions.
32 changes: 27 additions & 5 deletions src/JWampSharp.TestConsole/src/Program.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import Client.Rpc.WampRawRpcOperationCallback;
import Client.Rpc.WampRpcOperationCatalogProxy;
import Client.PubSub.WampRawTopicSubscriber;
import Client.Realm.WampRealmProxy;
import Client.WampChannel;
import Core.Serialization.WampFormatter;
import Rpc.WampRpcOperation;
import Rpc.WampRpcOperationCallback;
import com.fasterxml.jackson.databind.JsonNode;

import java.net.URI;
Expand All @@ -28,8 +26,30 @@ public static void main(String[] args) {
Object o =
open.toCompletableFuture().get();

WampRealmProxy realmProxy = channel.getRealmProxy();

realmProxy.getTopicContainer().getTopic("com.myapp.topic2")
.subscribe(new WampRawTopicSubscriber() {
@Override
public <TMessage> void event(WampFormatter<TMessage> formatter, long publicationId, TMessage details) {

}

@Override
public <TMessage> void event(WampFormatter<TMessage> formatter, long publicationId, TMessage details, TMessage[] arguments) {

}

@Override
public <TMessage> void event(WampFormatter<TMessage> formatter, long publicationId, TMessage details, TMessage[] arguments, TMessage argumentsKeywords) {

}
},
new HashMap<String,String>());

/*
WampRpcOperationCatalogProxy rpcCatalog =
channel.getRealmProxy().getRpcCatalog();
realmProxy.getRpcCatalog();
CompletionStage register = rpcCatalog.register(new WampRpcOperation() {
@Override
Expand Down Expand Up @@ -93,6 +113,8 @@ public <TMessage> void error(WampFormatter<TMessage> formatter, TMessage details
"com.arguments.add2",
new Object[]{2, 4}
);
*/


} catch (Exception ex) {
System.out.println(ex);
Expand Down
1 change: 1 addition & 0 deletions src/JWampSharp/JWampSharp.iml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
</content>
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
<orderEntry type="library" name="com.netflix.rxjava:rxjava-core:0.14.3" level="project" />
</component>
</module>

6 changes: 2 additions & 4 deletions src/JWampSharp/src/Client/DefaultWampClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ public class DefaultWampClient<TMessage> implements WampClient<TMessage>, WampSe
private final WampRealmProxy realmProxy;
private WampSessionClientExtended<TMessage> sessionClient;
private WampCaller<TMessage> caller;
private WampPublisher<TMessage> publisher;
private WampSubscriber<TMessage> subscriber;
private WampError<TMessage> error;

public DefaultWampClient(WampRealmProxyFactory<TMessage> realmProxyFactory) {
Expand All @@ -42,11 +40,11 @@ private WampCaller<TMessage> getCaller() {
}

private WampPublisher<TMessage> getPublisher() {
return publisher;
return (WampPublisher<TMessage>)realmProxy.getTopicContainer();
}

private WampSubscriber<TMessage> getSubscriber() {
return subscriber;
return (WampSubscriber<TMessage>)realmProxy.getTopicContainer();
}

private WampError<TMessage> getError() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package Client.PubSub;

import Core.Contracts.PubSub.WampPublisher;
import Core.Contracts.PubSub.WampSubscriber;
import Core.Contracts.WampServerProxy;
import Core.Serialization.WampFormatter;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

public class DefaultWampTopicContainerProxy<TMessage> implements WampTopicContainerProxy,
WampSubscriber<TMessage>, WampPublisher<TMessage> {

private final ConcurrentMap<String, WampTopicProxy> topicUriToTopicProxy =
new ConcurrentHashMap<String, WampTopicProxy>();

private final WampSubscriberClient<TMessage> subscriber;
private final WampPublisherClient<TMessage> publisher;

public DefaultWampTopicContainerProxy(WampServerProxy proxy, WampFormatter<TMessage> formatter) {
this.subscriber = new WampSubscriberClient<TMessage>(proxy, formatter);
this.publisher = new WampPublisherClient<TMessage>();
}

@Override
public WampTopicProxy getTopic(String topicUri) {
return topicUriToTopicProxy.computeIfAbsent(topicUri,
new Function<String, WampTopicProxy>() {
@Override
public WampTopicProxy apply(String uri) {
return new DefaultWampTopicProxy(uri,
subscriber,
publisher,
new RemoveFromContainerCloseable(uri));
}
}
);
}

@Override
public void subscribed(long requestId, long subscriptionId) {
subscriber.subscribed(requestId, subscriptionId);
}

@Override
public void unsubscribed(long requestId, long subscriptionId) {
subscriber.unsubscribed(requestId, subscriptionId);
}

@Override
public void event(long subscriptionId, long publicationId, TMessage details) {
subscriber.event(subscriptionId, publicationId, details);
}

@Override
public void event(long subscriptionId, long publicationId, TMessage details, TMessage[] arguments) {
subscriber.event(subscriptionId, publicationId, details, arguments);
}

@Override
public void event(long subscriptionId, long publicationId, TMessage details, TMessage[] arguments, TMessage argumentsKeywords) {
subscriber.event(subscriptionId, publicationId, details, arguments, argumentsKeywords);
}

@Override
public void published(long requestId, long publicationId) {
publisher.published(requestId, publicationId);
}

private class RemoveFromContainerCloseable implements AutoCloseable {
private String topicUri;

private RemoveFromContainerCloseable(String topicUri) {
this.topicUri = topicUri;
}

@Override
public void close() throws Exception {
topicUriToTopicProxy.remove(topicUri);
}
}
}
114 changes: 114 additions & 0 deletions src/JWampSharp/src/Client/PubSub/DefaultWampTopicProxy.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package Client.PubSub;

import Core.Serialization.WampFormatter;
import PubSub.WampTopicSubscriber;

import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Function;

/**
* Created by Elad on 18/04/2014.
*/
public class DefaultWampTopicProxy implements WampTopicProxy {
private final String topicUri;
private final WampTopicSubscriptionProxy subscriber;
private final WampTopicPublicationProxy publisher;
private final AutoCloseable conatinerDisposable;

private final Object lock = new Object();

private ConcurrentMap<Object, WampTopicProxySection> optionsToSection =
new ConcurrentHashMap<Object, WampTopicProxySection>();

public DefaultWampTopicProxy(String topicUri,
WampTopicSubscriptionProxy subscriber,
WampTopicPublicationProxy publisher,
AutoCloseable conatinerDisposable) {
this.topicUri = topicUri;
this.subscriber = subscriber;
this.publisher = publisher;
this.conatinerDisposable = conatinerDisposable;
}

@Override
public String getTopicUri() {
return topicUri;
}

@Override
public CompletionStage<Long> publish(Object options) {
return this.publisher.publish(this.getTopicUri(), options);
}

@Override
public CompletionStage<Long> publish(Object options, Object[] arguments) {
return this.publisher.publish(this.getTopicUri(), options, arguments);
}

@Override
public CompletionStage<Long> publish(Object options, Object[] arguments, Object argumentKeywords) {
return this.publisher.publish(this.getTopicUri(), options, arguments, argumentKeywords);
}

@Override
public CompletionStage<AutoCloseable> subscribe(WampTopicSubscriber subscriber, Object options) {
RawSubscriberAdapter adapter = new RawSubscriberAdapter(subscriber);
return subscribe(adapter, options);
}

@Override
public CompletionStage<AutoCloseable> subscribe(WampRawTopicSubscriber subscriber, Object options) {
synchronized (lock) {
Function<Object, WampTopicProxySection> topicProxySectionGenerator =
new Function<Object, WampTopicProxySection>() {
@Override
public WampTopicProxySection apply(Object givenOptions) {
return createTopicProxySection(givenOptions);
}
};

WampTopicProxySection section =
this.optionsToSection.computeIfAbsent(options,
topicProxySectionGenerator);

return section.subscribe(subscriber);
}
}

private WampTopicProxySection createTopicProxySection(Object options) {
return new WampTopicProxySection(topicUri, subscriber, options);
// TODO: subscribe to "SectionEmpty" event and remove this from hashmap when empty.
}

@Override
public void close() throws Exception {
synchronized (lock) {
// TODO: clean hashmap.
}
}

private class RawSubscriberAdapter implements WampRawTopicSubscriber {
private final WampTopicSubscriber subscriber;

public RawSubscriberAdapter(WampTopicSubscriber subscriber) {
this.subscriber = subscriber;
}

@Override
public <TMessage> void event(WampFormatter<TMessage> formatter, long publicationId, TMessage details) {
this.subscriber.event(publicationId, details);
}

@Override
public <TMessage> void event(WampFormatter<TMessage> formatter, long publicationId, TMessage details, TMessage[] arguments) {
this.subscriber.event(publicationId, details, arguments);
}

@Override
public <TMessage> void event(WampFormatter<TMessage> formatter, long publicationId, TMessage details, TMessage[] arguments, TMessage argumentsKeywords) {
this.subscriber.event(publicationId, details, arguments, argumentsKeywords);
}
}
}
30 changes: 30 additions & 0 deletions src/JWampSharp/src/Client/PubSub/WampPublisherClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package Client.PubSub;

import Core.Contracts.PubSub.WampPublisher;

import java.util.concurrent.CompletionStage;

/**
* Created by Elad on 18/04/2014.
*/
public class WampPublisherClient<TMessage> implements WampPublisher<TMessage>, WampTopicPublicationProxy {
@Override
public void published(long requestId, long publicationId) {

}

@Override
public CompletionStage<Long> publish(String topicUri, Object options) {
return null;
}

@Override
public CompletionStage<Long> publish(String topicUri, Object options, Object[] arguments) {
return null;
}

@Override
public CompletionStage<Long> publish(String topicUri, Object options, Object[] arguments, Object argumentKeywords) {
return null;
}
}
12 changes: 12 additions & 0 deletions src/JWampSharp/src/Client/PubSub/WampRawTopicSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package Client.PubSub;

import Core.Serialization.WampFormatter;

/**
* Created by Elad on 18/04/2014.
*/
public interface WampRawTopicSubscriber {
<TMessage> void event(WampFormatter<TMessage> formatter, long publicationId, TMessage details);
<TMessage> void event(WampFormatter<TMessage> formatter, long publicationId, TMessage details, TMessage[] arguments);
<TMessage> void event(WampFormatter<TMessage> formatter, long publicationId, TMessage details, TMessage[] arguments, TMessage argumentsKeywords);
}
Loading

0 comments on commit 9c5c00d

Please sign in to comment.