Skip to content
This repository has been archived by the owner on Feb 16, 2024. It is now read-only.

Allow AsyncMessageBroker to receive data from subscribers. #457

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ public interface IAsyncMessagePublisher
/// Send Message to all receiver and await complete.
/// </summary>
IObservable<Unit> PublishAsync<T>(T message);

/// <summary>
/// Send Message to all recievers and await data from recievers
/// </summary>
IObservable<T[]> PublishAsync<T, T1>(T1 message);
}

public interface IAsyncMessageReceiver
Expand All @@ -38,6 +43,11 @@ public interface IAsyncMessageReceiver
/// Subscribe typed message.
/// </summary>
IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver);

/// <summary>
/// Subscribe typed message with payload.
/// </summary>
IDisposable Subscribe<T, T1>(Func<T, IObservable<T1>> asyncMessageReceiver);
}

public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver
Expand Down Expand Up @@ -143,6 +153,34 @@ public IObservable<Unit> PublishAsync<T>(T message)
return Observable.WhenAll(awaiter);
}

public IObservable<T[]> PublishAsync<T, T1>(T1 message)
{
UniRx.InternalUtil.ImmutableList<Func<T1, IObservable<T>>> notifier;
lock (notifiers)
{
if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker");

object _notifier;
if (notifiers.TryGetValue(typeof(T1), out _notifier))
{
notifier = (UniRx.InternalUtil.ImmutableList<Func<T1, IObservable<T>>>)_notifier;
}
else
{
return null;
}
}

var data = notifier.Data;
var awaiter = new IObservable<T>[data.Length];
for (int i = 0; i < data.Length; i++)
{
awaiter[i] = data[i].Invoke(message);
}
return Observable.WhenAll(awaiter);
}


public IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver)
{
lock (notifiers)
Expand All @@ -167,6 +205,30 @@ public IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver)
return new Subscription<T>(this, asyncMessageReceiver);
}

public IDisposable Subscribe<T, T1>(Func<T, IObservable<T1>> asyncMessageReceiver)
{
lock (notifiers)
{
if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker");

object _notifier;
if (!notifiers.TryGetValue(typeof(T), out _notifier))
{
var notifier = UniRx.InternalUtil.ImmutableList<Func<T, IObservable<T1>>>.Empty;
notifier = notifier.Add(asyncMessageReceiver);
notifiers.Add(typeof(T), notifier);
}
else
{
var notifier = (ImmutableList<Func<T, IObservable<T1>>>)_notifier;
notifier = notifier.Add(asyncMessageReceiver);
notifiers[typeof(T)] = notifier;
}
}

return new Subscription<T, T1>(this, asyncMessageReceiver);
}

public void Dispose()
{
lock (notifiers)
Expand Down Expand Up @@ -205,5 +267,32 @@ public void Dispose()
}
}
}

class Subscription<T, T1> : IDisposable
{
readonly AsyncMessageBroker parent;
readonly Func<T, IObservable<T1>> asyncMessageReceiver;

public Subscription(AsyncMessageBroker parent, Func<T, IObservable<T1>> asyncMessageReceiver)
{
this.parent = parent;
this.asyncMessageReceiver = asyncMessageReceiver;
}

public void Dispose()
{
lock (parent.notifiers)
{
object _notifier;
if (parent.notifiers.TryGetValue(typeof(T), out _notifier))
{
var notifier = (ImmutableList<Func<T, IObservable<T1>>>)_notifier;
notifier = notifier.Remove(asyncMessageReceiver);

parent.notifiers[typeof(T)] = notifier;
}
}
}
}
}
}