Skip to content

Commit

Permalink
Fixed that .Switch() did not propagate errors downstream. (#968)
Browse files Browse the repository at this point in the history
  • Loading branch information
JakenVeina authored Dec 18, 2024
1 parent f30efa2 commit 95b94d3
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 36 deletions.
81 changes: 55 additions & 26 deletions src/DynamicData.Tests/Cache/SwitchFixture.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,54 +10,83 @@

namespace DynamicData.Tests.Cache;

public class SwitchFixture : IDisposable
public class SwitchFixture
{
private readonly ChangeSetAggregator<Person, string> _results;

private readonly ISourceCache<Person, string> _source;

private readonly ISubject<ISourceCache<Person, string>> _switchable;

public SwitchFixture()
{
_source = new SourceCache<Person, string>(p => p.Name);
_switchable = new BehaviorSubject<ISourceCache<Person, string>>(_source);
_results = _switchable.Switch().AsAggregator();
}

[Fact]
public void ClearsForNewSource()
{
using var source = new SourceCache<Person, string>(p => p.Name);
using var switchable = new BehaviorSubject<ISourceCache<Person, string>>(source);
var results = switchable.Switch().AsAggregator();


var inital = Enumerable.Range(1, 100).Select(i => new Person("Person" + i, i)).ToArray();
_source.AddOrUpdate(inital);
source.AddOrUpdate(inital);

_results.Data.Count.Should().Be(100);
results.Data.Count.Should().Be(100);

var newSource = new SourceCache<Person, string>(p => p.Name);
_switchable.OnNext(newSource);
switchable.OnNext(newSource);

_results.Data.Count.Should().Be(0);
results.Data.Count.Should().Be(0);

newSource.AddOrUpdate(inital);
_results.Data.Count.Should().Be(100);
results.Data.Count.Should().Be(100);

var nextUpdates = Enumerable.Range(101, 100).Select(i => new Person("Person" + i, i)).ToArray();
newSource.AddOrUpdate(nextUpdates);
_results.Data.Count.Should().Be(200);
results.Data.Count.Should().Be(200);
}

public void Dispose()
[Fact]
public void PoulatesFirstSource()
{
_source.Dispose();
_results.Dispose();
using var source = new SourceCache<Person, string>(p => p.Name);
using var switchable = new BehaviorSubject<ISourceCache<Person, string>>(source);
var results = switchable.Switch().AsAggregator();


var inital = Enumerable.Range(1, 100).Select(i => new Person("Person" + i, i)).ToArray();
source.AddOrUpdate(inital);

results.Data.Count.Should().Be(100);
}

[Fact]
public void PoulatesFirstSource()
public void PropagatesOuterErrors()
{
using var source = new SourceCache<Person, string>(p => p.Name);
using var switchable = new BehaviorSubject<ISourceCache<Person, string>>(source);
var results = switchable.Switch().AsAggregator();


var inital = Enumerable.Range(1, 100).Select(i => new Person("Person" + i, i)).ToArray();
source.AddOrUpdate(inital);

var error = new Exception("Test");
switchable.OnError(error);

results.Error.Should().Be(error);
}

[Fact]
public void PropagatesInnerErrors()
{
using var source = new SourceCache<Person, string>(p => p.Name);
using var switchable = new BehaviorSubject<IObservable<IChangeSet<Person, string>>>(source.Connect());
var results = switchable.Switch().AsAggregator();


var inital = Enumerable.Range(1, 100).Select(i => new Person("Person" + i, i)).ToArray();
_source.AddOrUpdate(inital);
source.AddOrUpdate(inital);

using var source2 = new BehaviorSubject<IChangeSet<Person, string>>(ChangeSet<Person, string>.Empty);

switchable.OnNext(source2);

var error = new Exception("Test");
source2.OnError(error);

_results.Data.Count.Should().Be(100);
results.Error.Should().Be(error);
}
}
30 changes: 20 additions & 10 deletions src/DynamicData/Cache/Internal/Switch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace DynamicData.Cache.Internal;

Expand All @@ -20,16 +21,25 @@ public IObservable<IChangeSet<TObject, TKey>> Run() => Observable.Create<IChange

var destination = new LockFreeObservableCache<TObject, TKey>();

var errors = new Subject<IChangeSet<TObject, TKey>>();

var populator = Observable.Switch(
_sources.Do(
_ =>
{
lock (locker)
{
destination.Clear();
}
})).Synchronize(locker).PopulateInto(destination);

return new CompositeDisposable(destination, populator, destination.Connect().SubscribeSafe(observer));
_sources
.Synchronize(locker)
.Do(onNext: _ => destination.Clear(),
onError: error => errors.OnError(error)))
.Synchronize(locker)
.Do(onNext: static _ => { },
onError: error => errors.OnError(error))
.PopulateInto(destination);

return new CompositeDisposable(
destination,
errors,
populator,
Observable.Merge(
destination.Connect(),
errors)
.SubscribeSafe(observer));
});
}

0 comments on commit 95b94d3

Please sign in to comment.