Skip to content

Commit

Permalink
Merge pull request #1916 from glopesdev/issue-1914
Browse files Browse the repository at this point in the history
Ensure multicast subject preserves source type
  • Loading branch information
glopesdev authored Jul 16, 2024
2 parents 1df14b3 + 458143d commit 9089b6e
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 16 deletions.
88 changes: 74 additions & 14 deletions Bonsai.Core.Tests/SubjectTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System;
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Reactive.Linq;
using System.Threading.Tasks;
using Bonsai.Expressions;
using Bonsai.Reactive;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand All @@ -18,30 +21,87 @@ public override IObservable<T> Process(IObservable<T> source)
}
}

class ConstantExpressionBuilder : ZeroArgumentExpressionBuilder
{
public Expression Expression { get; set; }

public override Expression Build(IEnumerable<Expression> arguments)
{
return Expression;
}
}

[TestMethod]
[ExpectedException(typeof(InvalidOperationException))]
public void Build_MulticastSubjectMissingBuildContext_ThrowsBuildException()
{
var source = new UnitBuilder().Build();
var builder = new MulticastSubject { Name = nameof(BehaviorSubject) };
builder.Build(source);
Assert.Fail();
}

[TestMethod]
public void Build_MulticastSubjectMissingName_ReturnsSameSequence()
{
var source = Expression.Constant(Observable.Return(0));
var builder = new TestWorkflow()
.Append(new ConstantExpressionBuilder { Expression = source })
.Append(new MulticastSubject())
.AppendOutput();
var expression = builder.Workflow.Build();
Assert.AreSame(source, expression);
}

[TestMethod]
[ExpectedException(typeof(WorkflowBuildException))]
public void Build_MulticastInterfaceToSubjectOfDifferentInterface_ThrowsBuildException()
{
var builder = new WorkflowBuilder();
builder.Workflow.Add(new BehaviorSubject<IDisposable> { Name = nameof(BehaviorSubject) });
var source = builder.Workflow.Add(new CombinatorBuilder { Combinator = new DoubleProperty { Value = 5.5 } });
var convert1 = builder.Workflow.Add(new CombinatorBuilder { Combinator = new TypeCombinatorMock<IComparable>() });
var convert2 = builder.Workflow.Add(new MulticastSubject { Name = nameof(BehaviorSubject) });
builder.Workflow.AddEdge(source, convert1, new ExpressionBuilderArgument());
builder.Workflow.AddEdge(convert1, convert2, new ExpressionBuilderArgument());
var builder = new TestWorkflow()
.Append(new BehaviorSubject<IDisposable> { Name = nameof(BehaviorSubject) })
.ResetCursor()
.AppendCombinator(new DoubleProperty { Value = 5.5 })
.AppendCombinator(new TypeCombinatorMock<IComparable>())
.Append(new MulticastSubject { Name = nameof(BehaviorSubject) });
var expression = builder.Workflow.Build();
Assert.IsNotNull(expression);
}

[TestMethod]
public async Task Build_MulticastSourceToSubject_ReturnsSameValue()
{
var value = 32;
var workflow = new TestWorkflow()
.Append(new BehaviorSubject<int> { Name = nameof(BehaviorSubject) })
.ResetCursor()
.AppendCombinator(new IntProperty { Value = value })
.Append(new MulticastSubject { Name = nameof(BehaviorSubject) })
.AppendOutput();
var observable = workflow.BuildObservable<int>();
Assert.AreEqual(value, await observable.Take(1));
}

[TestMethod]
public async Task Build_MulticastSourceToObjectSubject_PreservesTypeOfSourceSequence()
{
// related to https://github.com/bonsai-rx/bonsai/issues/1914
var workflow = new TestWorkflow()
.Append(new BehaviorSubject<object> { Name = nameof(BehaviorSubject) })
.ResetCursor()
.AppendCombinator(new IntProperty())
.Append(new MulticastSubject { Name = nameof(BehaviorSubject) })
.AppendOutput();
var observable = workflow.BuildObservable<int>();
Assert.AreEqual(0, await observable.Take(1));
}

[TestMethod]
public void ResourceSubject_SourceTerminatesExceptionally_ShouldNotTryToDispose()
{
var workflowBuilder = new WorkflowBuilder();
var source = workflowBuilder.Workflow.Add(new CombinatorBuilder { Combinator = new ThrowSource() });
var subject = workflowBuilder.Workflow.Add(new ResourceSubject { Name = nameof(ResourceSubject) });
var sink = workflowBuilder.Workflow.Add(new CombinatorBuilder { Combinator = new CatchSink() });
workflowBuilder.Workflow.AddEdge(source, subject, new ExpressionBuilderArgument());
workflowBuilder.Workflow.AddEdge(subject, sink, new ExpressionBuilderArgument());
var workflowBuilder = new TestWorkflow()
.AppendCombinator(new ThrowSource())
.Append(new ResourceSubject { Name = nameof(ResourceSubject) })
.AppendCombinator(new CatchSink());
var observable = workflowBuilder.Workflow.BuildObservable();
observable.FirstOrDefaultAsync().Wait();
}
Expand All @@ -58,7 +118,7 @@ class CatchSink : Sink
{
public override IObservable<TSource> Process<TSource>(IObservable<TSource> source)
{
return source.Catch<TSource>(Observable.Empty<TSource>());
return source.Catch(Observable.Empty<TSource>());
}
}

Expand Down
8 changes: 8 additions & 0 deletions Bonsai.Core.Tests/TestWorkflow.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Linq.Expressions;
using Bonsai.Dag;
using Bonsai.Expressions;

Expand Down Expand Up @@ -97,5 +98,12 @@ public ExpressionBuilderGraph ToInspectableGraph()
{
return Workflow.ToInspectableGraph();
}

public IObservable<T> BuildObservable<T>()
{
var expression = Workflow.Build();
var observableFactory = Expression.Lambda<Func<IObservable<T>>>(expression).Compile();
return observableFactory();
}
}
}
23 changes: 21 additions & 2 deletions Bonsai.Core/Expressions/MulticastSubject.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,16 @@ public override Expression Build(IEnumerable<Expression> arguments)
);
}

source = CoerceMethodArgument(typeof(IObservable<>).MakeGenericType(subjectType), source);
observableType = subjectType;
var conversionParameter = Expression.Parameter(observableType);
var conversionBody = Expression.Convert(conversionParameter, subjectType);
var conversion = Expression.Lambda(conversionBody, conversionParameter);
return Expression.Call(
typeof(MulticastSubject),
nameof(Process),
new[] { observableType, subjectType },
source,
subjectExpression,
conversion);
}

return Expression.Call(typeof(MulticastSubject), nameof(Process), new[] { observableType }, source, subjectExpression);
Expand All @@ -79,6 +87,17 @@ static IObservable<TSource> Process<TSource>(IObservable<TSource> source, IObser
return source.Do(subject);
}

static IObservable<TSource> Process<TSource, TSubject>(
IObservable<TSource> source,
IObserver<TSubject> subject,
Func<TSource, TSubject> conversion)
{
return source.Do(
value => subject.OnNext(conversion(value)),
subject.OnError,
subject.OnCompleted);
}

class MulticastSubjectTypeDescriptionProvider : TypeDescriptionProvider
{
readonly MulticastSubjectTypeDescriptor typeDescriptor = new MulticastSubjectTypeDescriptor(null);
Expand Down

0 comments on commit 9089b6e

Please sign in to comment.