Skip to content

Commit

Permalink
ISubject -> IObservable
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisTTian667 committed Apr 6, 2023
1 parent 086c682 commit cab4fbc
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 54 deletions.
3 changes: 2 additions & 1 deletion Knx.sln.DotSettings
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=IP/@EntryIndexedValue">IP</s:String></wpf:ResourceDictionary>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=IP/@EntryIndexedValue">IP</s:String>
<s:Boolean x:Key="/Default/UserDictionary/Words/=Hpai/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
6 changes: 3 additions & 3 deletions Knx/ExtendedMessageInterface/KnxMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,13 @@ public bool IsPositiveConfirmation
/// Gets or sets the source (device-) address.
/// </summary>
/// <value>The source address.</value>
public KnxDeviceAddress? SourceAddress { get; set; } = "0/0/0";
public KnxDeviceAddress SourceAddress { get; set; } = "0/0/0";

/// <summary>
/// Gets or sets the destination address (device or individual/logical address).
/// </summary>
/// <value>The destination address.</value>
public KnxAddress? DestinationAddress { get; set; }
public KnxAddress DestinationAddress { get; set; } = (KnxLogicalAddress)"0/0/0";

/// <summary>
/// Gets the hop count, a message did, till it was received.
Expand Down Expand Up @@ -255,4 +255,4 @@ public static KnxMessage Deserialize(byte[] bytes)
MessageType = msgType
};
}
}
}
5 changes: 2 additions & 3 deletions Knx/KnxNetIp/IKnxNetIpClient.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
using System;
using System.Reactive.Subjects;
using System.Threading;
using System.Threading.Tasks;

namespace Knx.KnxNetIp;

public interface IKnxNetIpClient :
IAsyncDisposable,
ISubject<KnxNetIpMessage>,
ISubject<IKnxMessage>
IObservable<KnxNetIpMessage>,
IObservable<IKnxMessage>
{
Task ConnectAsync();

Expand Down
44 changes: 16 additions & 28 deletions Knx/KnxNetIp/KnxNetIpRoutingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ namespace Knx.KnxNetIp;
/// <summary>
/// Used to connect to the Knx Bus via KnxNetIpRouting protocol.
/// </summary>
public sealed class KnxNetIpRoutingClient : IKnxNetIpClient
public sealed class KnxNetIpRoutingClient : IKnxNetIpClient, IObservable<KnxHpai>
{
private readonly KnxDeviceAddress _deviceAddress;
private readonly IPEndPoint _localEndpoint;
private readonly ILogger<KnxNetIpRoutingClient> _logger;
private readonly CancellationTokenSource _receivingMessagesCancellationTokenSource;
private readonly IPEndPoint _remoteEndPoint;
private UdpClient? _udpClient;

private readonly Subject<KnxNetIpMessage> _knxNetIpMessageSubject;
private readonly Subject<IKnxMessage> _knxMessageSubject;
private readonly Subject<KnxHpai> _knxHpaiSubject;

/// <summary>
/// Creates a new instance of the <see cref="KnxNetIpRoutingClient" /> class.
Expand All @@ -48,6 +50,7 @@ public KnxNetIpRoutingClient(

_knxNetIpMessageSubject = new Subject<KnxNetIpMessage>();
_knxMessageSubject = new Subject<IKnxMessage>();
_knxHpaiSubject = new Subject<KnxHpai>();
}

public async ValueTask DisposeAsync()
Expand Down Expand Up @@ -97,15 +100,14 @@ public async Task ConnectAsync()
await Task.CompletedTask;
}

IDisposable IObservable<KnxNetIpMessage>.Subscribe(IObserver<KnxNetIpMessage> observer)
{
throw new NotImplementedException();
}
IDisposable IObservable<KnxNetIpMessage>.Subscribe(IObserver<KnxNetIpMessage> observer) =>
_knxNetIpMessageSubject.Subscribe(observer);

IDisposable IObservable<IKnxMessage>.Subscribe(IObserver<IKnxMessage> observer)
{
throw new NotImplementedException();
}
IDisposable IObservable<IKnxMessage>.Subscribe(IObserver<IKnxMessage> observer) =>
_knxMessageSubject.Subscribe(observer);

IDisposable IObservable<KnxHpai>.Subscribe(IObserver<KnxHpai> observer) =>
_knxHpaiSubject.Subscribe(observer);

~KnxNetIpRoutingClient()
{
Expand All @@ -122,6 +124,7 @@ private ValueTask Dispose(bool disposing)

_knxNetIpMessageSubject.Dispose();
_knxMessageSubject.Dispose();
_knxHpaiSubject.Dispose();
}

return ValueTask.CompletedTask;
Expand Down Expand Up @@ -230,24 +233,9 @@ private void InvokeKnxMessageReceived(IKnxMessage knxMessage)
_knxMessageSubject.OnNext(knxMessage);
}

private void InvokeKnxDeviceDiscovered(KnxHpai hostProtocolAddressInformation) =>
private void InvokeKnxDeviceDiscovered(KnxHpai hostProtocolAddressInformation)
{
KnxDeviceDiscovered?.Invoke(this, hostProtocolAddressInformation);

void IObserver<KnxNetIpMessage>.OnCompleted() =>
_knxNetIpMessageSubject.OnCompleted();

void IObserver<IKnxMessage>.OnError(Exception error) =>
_knxMessageSubject.OnError(error);

void IObserver<IKnxMessage>.OnNext(IKnxMessage value) =>
_knxMessageSubject.OnNext(value);

void IObserver<IKnxMessage>.OnCompleted() =>
_knxMessageSubject.OnCompleted();

void IObserver<KnxNetIpMessage>.OnError(Exception error) =>
_knxNetIpMessageSubject.OnError(error);

void IObserver<KnxNetIpMessage>.OnNext(KnxNetIpMessage value) =>
_knxNetIpMessageSubject.OnNext(value);
_knxHpaiSubject.OnNext(hostProtocolAddressInformation);
}
}
21 changes: 2 additions & 19 deletions Knx/KnxNetIp/KnxNetIpTunnelingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ private void OnKnxNetIpMessageReceived(KnxNetIpMessage message)
}

KnxNetIpMessageReceived?.Invoke(this, message);
((ISubject<KnxNetIpMessage>)this).OnNext(message);

_knxNetIpMessageSubject.OnNext(message);
}

private async Task DisposeAsync(bool disposing)
Expand Down Expand Up @@ -512,22 +513,4 @@ private void SetSequenceCount(TunnelingMessageBody knxTunnelingMessageBody)
_sequenceCounter++;
}
}

void IObserver<KnxNetIpMessage>.OnCompleted() =>
_knxNetIpMessageSubject.OnCompleted();

void IObserver<IKnxMessage>.OnError(Exception error) =>
_knxMessageSubject.OnError(error);

void IObserver<IKnxMessage>.OnNext(IKnxMessage value) =>
_knxMessageSubject.OnNext(value);

void IObserver<IKnxMessage>.OnCompleted() =>
_knxMessageSubject.OnCompleted();

void IObserver<KnxNetIpMessage>.OnError(Exception error) =>
_knxNetIpMessageSubject.OnError(error);

void IObserver<KnxNetIpMessage>.OnNext(KnxNetIpMessage value) =>
_knxNetIpMessageSubject.OnNext(value);
}

0 comments on commit cab4fbc

Please sign in to comment.