From 63d9ffae2ce4bd7d9e2863c18cbaee736d68dcd7 Mon Sep 17 00:00:00 2001 From: suoerkerokero Date: Sun, 31 May 2020 15:16:31 +0900 Subject: [PATCH 1/2] Allow AsyncMessageBroker to recieve data from subscribers. --- .../UniRx/Scripts/Notifiers/MessageBroker.cs | 245 ++++++++++-------- 1 file changed, 139 insertions(+), 106 deletions(-) diff --git a/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs b/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs index a951831d..f0497a7c 100644 --- a/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs +++ b/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs @@ -2,102 +2,93 @@ using System.Collections.Generic; using UniRx.InternalUtil; -namespace UniRx -{ - public interface IMessagePublisher - { +namespace UniRx { + public interface IMessagePublisher { /// /// Send Message to all receiver. /// - void Publish(T message); + void Publish (T message); } - public interface IMessageReceiver - { + public interface IMessageReceiver { /// /// Subscribe typed message. /// - IObservable Receive(); + IObservable Receive (); } - public interface IMessageBroker : IMessagePublisher, IMessageReceiver - { - } + public interface IMessageBroker : IMessagePublisher, IMessageReceiver { } - public interface IAsyncMessagePublisher - { + public interface IAsyncMessagePublisher { /// /// Send Message to all receiver and await complete. /// - IObservable PublishAsync(T message); + IObservable PublishAsync (T message); + + /// + /// Send Message to all recievers and await data from recievers + /// + IObservable PublishAsync (T1 message); } - public interface IAsyncMessageReceiver - { + public interface IAsyncMessageReceiver { /// /// Subscribe typed message. /// - IDisposable Subscribe(Func> asyncMessageReceiver); - } + IDisposable Subscribe (Func> asyncMessageReceiver); - public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver - { + /// + /// Subscribe typed message with payload. + /// + IDisposable Subscribe (Func> asyncMessageReceiver); } + public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver { } + /// /// In-Memory PubSub filtered by Type. /// - public class MessageBroker : IMessageBroker, IDisposable - { + public class MessageBroker : IMessageBroker, IDisposable { /// /// MessageBroker in Global scope. /// - public static readonly IMessageBroker Default = new MessageBroker(); + public static readonly IMessageBroker Default = new MessageBroker (); bool isDisposed = false; - readonly Dictionary notifiers = new Dictionary(); + readonly Dictionary notifiers = new Dictionary (); - public void Publish(T message) - { + public void Publish (T message) { object notifier; - lock (notifiers) - { - if (isDisposed) return; + lock (notifiers) { + if (isDisposed) return; - if (!notifiers.TryGetValue(typeof(T), out notifier)) - { - return; + if (!notifiers.TryGetValue (typeof (T), out notifier)) { + return; + } } - } - ((ISubject)notifier).OnNext(message); + ((ISubject) notifier).OnNext (message); } - public IObservable Receive() - { + public IObservable Receive () { object notifier; - lock (notifiers) - { - if (isDisposed) throw new ObjectDisposedException("MessageBroker"); + lock (notifiers) { + if (isDisposed) throw new ObjectDisposedException ("MessageBroker"); - if (!notifiers.TryGetValue(typeof(T), out notifier)) - { - ISubject n = new Subject().Synchronize(); + if (!notifiers.TryGetValue (typeof (T), out notifier)) { + ISubject n = new Subject ().Synchronize (); notifier = n; - notifiers.Add(typeof(T), notifier); + notifiers.Add (typeof (T), notifier); } } - return ((IObservable)notifier).AsObservable(); + return ((IObservable) notifier).AsObservable (); } - public void Dispose() - { - lock (notifiers) - { - if (!isDisposed) - { + public void Dispose () { + lock (notifiers) { + if (!isDisposed) { isDisposed = true; - notifiers.Clear(); + notifiers.Clear (); } } } @@ -106,101 +97,143 @@ public void Dispose() /// /// In-Memory PubSub filtered by Type. /// - public class AsyncMessageBroker : IAsyncMessageBroker, IDisposable - { + public class AsyncMessageBroker : IAsyncMessageBroker, IDisposable { /// /// AsyncMessageBroker in Global scope. /// - public static readonly IAsyncMessageBroker Default = new AsyncMessageBroker(); + public static readonly IAsyncMessageBroker Default = new AsyncMessageBroker (); bool isDisposed = false; - readonly Dictionary notifiers = new Dictionary(); + readonly Dictionary notifiers = new Dictionary (); - public IObservable PublishAsync(T message) - { + public IObservable PublishAsync (T message) { UniRx.InternalUtil.ImmutableList>> notifier; - lock (notifiers) - { - if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); + lock (notifiers) { + if (isDisposed) throw new ObjectDisposedException ("AsyncMessageBroker"); object _notifier; - if (notifiers.TryGetValue(typeof(T), out _notifier)) - { - notifier = (UniRx.InternalUtil.ImmutableList>>)_notifier; - } - else - { - return Observable.ReturnUnit(); + if (notifiers.TryGetValue (typeof (T), out _notifier)) { + notifier = (UniRx.InternalUtil.ImmutableList>>) _notifier; + } else { + return Observable.ReturnUnit (); } } var data = notifier.Data; var awaiter = new IObservable[data.Length]; - for (int i = 0; i < data.Length; i++) - { - awaiter[i] = data[i].Invoke(message); + for (int i = 0; i < data.Length; i++) { + awaiter[i] = data[i].Invoke (message); + } + return Observable.WhenAll (awaiter); + } + + public IObservable PublishAsync (T1 message) { + UniRx.InternalUtil.ImmutableList>> notifier; + lock (notifiers) { + if (isDisposed) throw new ObjectDisposedException ("AsyncMessageBroker"); + + object _notifier; + if (notifiers.TryGetValue (typeof (T1), out _notifier)) { + notifier = (UniRx.InternalUtil.ImmutableList>>) _notifier; + } else { + return null; + } + } + + var data = notifier.Data; + var awaiter = new IObservable[data.Length]; + for (int i = 0; i < data.Length; i++) { + awaiter[i] = data[i].Invoke (message); } - return Observable.WhenAll(awaiter); + return Observable.WhenAll (awaiter); } - public IDisposable Subscribe(Func> asyncMessageReceiver) - { - lock (notifiers) - { - if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); + public IDisposable Subscribe (Func> asyncMessageReceiver) { + lock (notifiers) { + if (isDisposed) throw new ObjectDisposedException ("AsyncMessageBroker"); object _notifier; - if (!notifiers.TryGetValue(typeof(T), out _notifier)) - { + if (!notifiers.TryGetValue (typeof (T), out _notifier)) { var notifier = UniRx.InternalUtil.ImmutableList>>.Empty; - notifier = notifier.Add(asyncMessageReceiver); - notifiers.Add(typeof(T), notifier); + notifier = notifier.Add (asyncMessageReceiver); + notifiers.Add (typeof (T), notifier); + } else { + var notifier = (ImmutableList>>) _notifier; + notifier = notifier.Add (asyncMessageReceiver); + notifiers[typeof (T)] = notifier; } - else - { - var notifier = (ImmutableList>>)_notifier; - notifier = notifier.Add(asyncMessageReceiver); - notifiers[typeof(T)] = notifier; + } + + return new Subscription (this, asyncMessageReceiver); + } + + public IDisposable Subscribe (Func> asyncMessageReceiver) { + lock (notifiers) { + if (isDisposed) throw new ObjectDisposedException ("AsyncMessageBroker"); + + object _notifier; + if (!notifiers.TryGetValue (typeof (T), out _notifier)) { + var notifier = UniRx.InternalUtil.ImmutableList>>.Empty; + notifier = notifier.Add (asyncMessageReceiver); + notifiers.Add (typeof (T), notifier); + } else { + var notifier = (ImmutableList>>) _notifier; + notifier = notifier.Add (asyncMessageReceiver); + notifiers[typeof (T)] = notifier; } } - return new Subscription(this, asyncMessageReceiver); + return new Subscription (this, asyncMessageReceiver); } - public void Dispose() - { - lock (notifiers) - { - if (!isDisposed) - { + public void Dispose () { + lock (notifiers) { + if (!isDisposed) { isDisposed = true; - notifiers.Clear(); + notifiers.Clear (); } } } - class Subscription : IDisposable - { + class Subscription : IDisposable { readonly AsyncMessageBroker parent; readonly Func> asyncMessageReceiver; - public Subscription(AsyncMessageBroker parent, Func> asyncMessageReceiver) - { + public Subscription (AsyncMessageBroker parent, Func> asyncMessageReceiver) { + this.parent = parent; + this.asyncMessageReceiver = asyncMessageReceiver; + } + + public void Dispose () { + lock (parent.notifiers) { + object _notifier; + if (parent.notifiers.TryGetValue (typeof (T), out _notifier)) { + var notifier = (ImmutableList>>) _notifier; + notifier = notifier.Remove (asyncMessageReceiver); + + parent.notifiers[typeof (T)] = notifier; + } + } + } + } + + class Subscription : IDisposable { + readonly AsyncMessageBroker parent; + readonly Func> asyncMessageReceiver; + + public Subscription (AsyncMessageBroker parent, Func> asyncMessageReceiver) { this.parent = parent; this.asyncMessageReceiver = asyncMessageReceiver; } - public void Dispose() - { - lock (parent.notifiers) - { + public void Dispose () { + lock (parent.notifiers) { object _notifier; - if (parent.notifiers.TryGetValue(typeof(T), out _notifier)) - { - var notifier = (ImmutableList>>)_notifier; - notifier = notifier.Remove(asyncMessageReceiver); + if (parent.notifiers.TryGetValue (typeof (T), out _notifier)) { + var notifier = (ImmutableList>>) _notifier; + notifier = notifier.Remove (asyncMessageReceiver); - parent.notifiers[typeof(T)] = notifier; + parent.notifiers[typeof (T)] = notifier; } } } From 06d40ca753f97f018e8d02703e173649ef97bfe2 Mon Sep 17 00:00:00 2001 From: suoerkerokero Date: Sun, 31 May 2020 15:49:58 +0900 Subject: [PATCH 2/2] Fix format --- .../UniRx/Scripts/Notifiers/MessageBroker.cs | 252 +++++++++++------- 1 file changed, 154 insertions(+), 98 deletions(-) diff --git a/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs b/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs index f0497a7c..91341d77 100644 --- a/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs +++ b/Assets/Plugins/UniRx/Scripts/Notifiers/MessageBroker.cs @@ -2,93 +2,112 @@ using System.Collections.Generic; using UniRx.InternalUtil; -namespace UniRx { - public interface IMessagePublisher { +namespace UniRx +{ + public interface IMessagePublisher + { /// /// Send Message to all receiver. /// - void Publish (T message); + void Publish(T message); } - public interface IMessageReceiver { + public interface IMessageReceiver + { /// /// Subscribe typed message. /// - IObservable Receive (); + IObservable Receive(); } - public interface IMessageBroker : IMessagePublisher, IMessageReceiver { } + public interface IMessageBroker : IMessagePublisher, IMessageReceiver + { + } - public interface IAsyncMessagePublisher { + public interface IAsyncMessagePublisher + { /// /// Send Message to all receiver and await complete. /// - IObservable PublishAsync (T message); + IObservable PublishAsync(T message); /// /// Send Message to all recievers and await data from recievers /// - IObservable PublishAsync (T1 message); + IObservable PublishAsync(T1 message); } - public interface IAsyncMessageReceiver { + public interface IAsyncMessageReceiver + { /// /// Subscribe typed message. /// - IDisposable Subscribe (Func> asyncMessageReceiver); + IDisposable Subscribe(Func> asyncMessageReceiver); /// /// Subscribe typed message with payload. /// - IDisposable Subscribe (Func> asyncMessageReceiver); + IDisposable Subscribe(Func> asyncMessageReceiver); } - public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver { } + public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver + { + } /// /// In-Memory PubSub filtered by Type. /// - public class MessageBroker : IMessageBroker, IDisposable { + public class MessageBroker : IMessageBroker, IDisposable + { /// /// MessageBroker in Global scope. /// - public static readonly IMessageBroker Default = new MessageBroker (); + public static readonly IMessageBroker Default = new MessageBroker(); bool isDisposed = false; - readonly Dictionary notifiers = new Dictionary (); + readonly Dictionary notifiers = new Dictionary(); - public void Publish (T message) { + public void Publish(T message) + { object notifier; - lock (notifiers) { - if (isDisposed) return; + lock (notifiers) + { + if (isDisposed) return; - if (!notifiers.TryGetValue (typeof (T), out notifier)) { - return; - } + if (!notifiers.TryGetValue(typeof(T), out notifier)) + { + return; } - ((ISubject) notifier).OnNext (message); + } + ((ISubject)notifier).OnNext(message); } - public IObservable Receive () { + public IObservable Receive() + { object notifier; - lock (notifiers) { - if (isDisposed) throw new ObjectDisposedException ("MessageBroker"); + lock (notifiers) + { + if (isDisposed) throw new ObjectDisposedException("MessageBroker"); - if (!notifiers.TryGetValue (typeof (T), out notifier)) { - ISubject n = new Subject ().Synchronize (); + if (!notifiers.TryGetValue(typeof(T), out notifier)) + { + ISubject n = new Subject().Synchronize(); notifier = n; - notifiers.Add (typeof (T), notifier); + notifiers.Add(typeof(T), notifier); } } - return ((IObservable) notifier).AsObservable (); + return ((IObservable)notifier).AsObservable(); } - public void Dispose () { - lock (notifiers) { - if (!isDisposed) { + public void Dispose() + { + lock (notifiers) + { + if (!isDisposed) + { isDisposed = true; - notifiers.Clear (); + notifiers.Clear(); } } } @@ -97,143 +116,180 @@ public void Dispose () { /// /// In-Memory PubSub filtered by Type. /// - public class AsyncMessageBroker : IAsyncMessageBroker, IDisposable { + public class AsyncMessageBroker : IAsyncMessageBroker, IDisposable + { /// /// AsyncMessageBroker in Global scope. /// - public static readonly IAsyncMessageBroker Default = new AsyncMessageBroker (); + public static readonly IAsyncMessageBroker Default = new AsyncMessageBroker(); bool isDisposed = false; - readonly Dictionary notifiers = new Dictionary (); + readonly Dictionary notifiers = new Dictionary(); - public IObservable PublishAsync (T message) { + public IObservable PublishAsync(T message) + { UniRx.InternalUtil.ImmutableList>> notifier; - lock (notifiers) { - if (isDisposed) throw new ObjectDisposedException ("AsyncMessageBroker"); + lock (notifiers) + { + if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); object _notifier; - if (notifiers.TryGetValue (typeof (T), out _notifier)) { - notifier = (UniRx.InternalUtil.ImmutableList>>) _notifier; - } else { - return Observable.ReturnUnit (); + if (notifiers.TryGetValue(typeof(T), out _notifier)) + { + notifier = (UniRx.InternalUtil.ImmutableList>>)_notifier; + } + else + { + return Observable.ReturnUnit(); } } var data = notifier.Data; var awaiter = new IObservable[data.Length]; - for (int i = 0; i < data.Length; i++) { - awaiter[i] = data[i].Invoke (message); + for (int i = 0; i < data.Length; i++) + { + awaiter[i] = data[i].Invoke(message); } - return Observable.WhenAll (awaiter); + return Observable.WhenAll(awaiter); } - public IObservable PublishAsync (T1 message) { + public IObservable PublishAsync(T1 message) + { UniRx.InternalUtil.ImmutableList>> notifier; - lock (notifiers) { - if (isDisposed) throw new ObjectDisposedException ("AsyncMessageBroker"); + lock (notifiers) + { + if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); object _notifier; - if (notifiers.TryGetValue (typeof (T1), out _notifier)) { - notifier = (UniRx.InternalUtil.ImmutableList>>) _notifier; - } else { + if (notifiers.TryGetValue(typeof(T1), out _notifier)) + { + notifier = (UniRx.InternalUtil.ImmutableList>>)_notifier; + } + else + { return null; } } var data = notifier.Data; var awaiter = new IObservable[data.Length]; - for (int i = 0; i < data.Length; i++) { - awaiter[i] = data[i].Invoke (message); + for (int i = 0; i < data.Length; i++) + { + awaiter[i] = data[i].Invoke(message); } - return Observable.WhenAll (awaiter); + return Observable.WhenAll(awaiter); } - public IDisposable Subscribe (Func> asyncMessageReceiver) { - lock (notifiers) { - if (isDisposed) throw new ObjectDisposedException ("AsyncMessageBroker"); + + public IDisposable Subscribe(Func> asyncMessageReceiver) + { + lock (notifiers) + { + if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); object _notifier; - if (!notifiers.TryGetValue (typeof (T), out _notifier)) { + if (!notifiers.TryGetValue(typeof(T), out _notifier)) + { var notifier = UniRx.InternalUtil.ImmutableList>>.Empty; - notifier = notifier.Add (asyncMessageReceiver); - notifiers.Add (typeof (T), notifier); - } else { - var notifier = (ImmutableList>>) _notifier; - notifier = notifier.Add (asyncMessageReceiver); - notifiers[typeof (T)] = notifier; + notifier = notifier.Add(asyncMessageReceiver); + notifiers.Add(typeof(T), notifier); + } + else + { + var notifier = (ImmutableList>>)_notifier; + notifier = notifier.Add(asyncMessageReceiver); + notifiers[typeof(T)] = notifier; } } - return new Subscription (this, asyncMessageReceiver); + return new Subscription(this, asyncMessageReceiver); } - public IDisposable Subscribe (Func> asyncMessageReceiver) { - lock (notifiers) { - if (isDisposed) throw new ObjectDisposedException ("AsyncMessageBroker"); + public IDisposable Subscribe(Func> asyncMessageReceiver) + { + lock (notifiers) + { + if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); object _notifier; - if (!notifiers.TryGetValue (typeof (T), out _notifier)) { + if (!notifiers.TryGetValue(typeof(T), out _notifier)) + { var notifier = UniRx.InternalUtil.ImmutableList>>.Empty; - notifier = notifier.Add (asyncMessageReceiver); - notifiers.Add (typeof (T), notifier); - } else { - var notifier = (ImmutableList>>) _notifier; - notifier = notifier.Add (asyncMessageReceiver); - notifiers[typeof (T)] = notifier; + notifier = notifier.Add(asyncMessageReceiver); + notifiers.Add(typeof(T), notifier); + } + else + { + var notifier = (ImmutableList>>)_notifier; + notifier = notifier.Add(asyncMessageReceiver); + notifiers[typeof(T)] = notifier; } } - return new Subscription (this, asyncMessageReceiver); + return new Subscription(this, asyncMessageReceiver); } - public void Dispose () { - lock (notifiers) { - if (!isDisposed) { + public void Dispose() + { + lock (notifiers) + { + if (!isDisposed) + { isDisposed = true; - notifiers.Clear (); + notifiers.Clear(); } } } - class Subscription : IDisposable { + class Subscription : IDisposable + { readonly AsyncMessageBroker parent; readonly Func> asyncMessageReceiver; - public Subscription (AsyncMessageBroker parent, Func> asyncMessageReceiver) { + public Subscription(AsyncMessageBroker parent, Func> asyncMessageReceiver) + { this.parent = parent; this.asyncMessageReceiver = asyncMessageReceiver; } - public void Dispose () { - lock (parent.notifiers) { + public void Dispose() + { + lock (parent.notifiers) + { object _notifier; - if (parent.notifiers.TryGetValue (typeof (T), out _notifier)) { - var notifier = (ImmutableList>>) _notifier; - notifier = notifier.Remove (asyncMessageReceiver); + if (parent.notifiers.TryGetValue(typeof(T), out _notifier)) + { + var notifier = (ImmutableList>>)_notifier; + notifier = notifier.Remove(asyncMessageReceiver); - parent.notifiers[typeof (T)] = notifier; + parent.notifiers[typeof(T)] = notifier; } } } } - class Subscription : IDisposable { + class Subscription : IDisposable + { readonly AsyncMessageBroker parent; readonly Func> asyncMessageReceiver; - public Subscription (AsyncMessageBroker parent, Func> asyncMessageReceiver) { + public Subscription(AsyncMessageBroker parent, Func> asyncMessageReceiver) + { this.parent = parent; this.asyncMessageReceiver = asyncMessageReceiver; } - public void Dispose () { - lock (parent.notifiers) { + public void Dispose() + { + lock (parent.notifiers) + { object _notifier; - if (parent.notifiers.TryGetValue (typeof (T), out _notifier)) { - var notifier = (ImmutableList>>) _notifier; - notifier = notifier.Remove (asyncMessageReceiver); + if (parent.notifiers.TryGetValue(typeof(T), out _notifier)) + { + var notifier = (ImmutableList>>)_notifier; + notifier = notifier.Remove(asyncMessageReceiver); - parent.notifiers[typeof (T)] = notifier; + parent.notifiers[typeof(T)] = notifier; } } }