Skip to content

Commit

Permalink
Add comment about Subject.
Browse files Browse the repository at this point in the history
  • Loading branch information
lempiji committed Jun 13, 2016
1 parent 99dbf66 commit 06c30b3
Showing 1 changed file with 21 additions and 9 deletions.
30 changes: 21 additions & 9 deletions source/rx/subject.d
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
module rx.subject;
/+++++++++++++++++++++++++++++
+ This module defines the Subject and some implements.
+/
module rx.subject;

import rx.disposable;
import rx.observer;
Expand All @@ -7,8 +10,10 @@ import rx.observable;
import core.atomic : atomicLoad, cas;
import std.range : put;

///Represents an object that is both an observable sequence as well as an observer.
interface Subject(E) : Observer!E, Observable!E { }

///Represents an object that is both an observable sequence as well as an observer. Each notification is broadcasted to all subscribed observers.
class SubjectObject(E) : Subject!E
{
alias ElementType = E;
Expand All @@ -19,11 +24,13 @@ public:
}

public:
///
void put(E obj)
{
auto temp = atomicLoad(_observer);
temp.put(obj);
}
///
void completed()
{
shared(Observer!E) oldObserver = void;
Expand All @@ -37,7 +44,7 @@ public:
} while (!cas(&_observer, oldObserver, newObserver));
temp.completed();
}

///
void failure(Exception error)
{
shared(Observer!E) oldObserver = void;
Expand All @@ -52,10 +59,12 @@ public:
temp.failure(error);
}

///
Disposable subscribe(T)(T observer)
{
return subscribe(observerObject!E(observer));
}
///
Disposable subscribe(Observer!E observer)
{
shared(Observer!E) oldObserver = void;
Expand Down Expand Up @@ -94,6 +103,7 @@ public:
return subscription(this, observer);
}

///
void unsubscribe(Observer!E observer)
{
shared(Observer!E) oldObserver = void;
Expand All @@ -119,14 +129,8 @@ public:
private:
shared(Observer!E) _observer;
}
unittest
{
static assert(isObserver!(SubjectObject!int, int));
static assert(isObservable!(SubjectObject!int, int));
static assert(!isObservable!(SubjectObject!int, string));
static assert(!isObservable!(SubjectObject!int, string));
}

///
unittest
{
import std.array : appender;
Expand All @@ -144,6 +148,14 @@ unittest
subject.put(2);
assert(equal(data.data, [0, 1]));
}

unittest
{
static assert(isObserver!(SubjectObject!int, int));
static assert(isObservable!(SubjectObject!int, int));
static assert(!isObservable!(SubjectObject!int, string));
static assert(!isObservable!(SubjectObject!int, string));
}
unittest
{
int putCount = 0;
Expand Down

0 comments on commit 06c30b3

Please sign in to comment.