Skip to content

Commit

Permalink
优化Mqtt推送
Browse files Browse the repository at this point in the history
  • Loading branch information
iioter committed Aug 23, 2022
1 parent 90eac18 commit 81b3d81
Show file tree
Hide file tree
Showing 10 changed files with 366 additions and 380 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public override void DoEdit(bool updateAllFields = false)
{
base.DoEdit(updateAllFields);
var myMqttClient = Wtm.ServiceProvider.GetService(typeof(MyMqttClient)) as MyMqttClient;
myMqttClient.ConnectAsync();
myMqttClient.StartManagedClientAsync().Wait();
}

public override void DoDelete()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
using IoTGateway.Model;
using Microsoft.Extensions.Primitives;
using MQTTnet.Server;
using MQTTnet.Server.Status;
using MQTTnet.Formatter;

namespace IoTGateway.ViewModel.MqttClient.MqttServerVMs
Expand Down Expand Up @@ -45,12 +44,12 @@ public override void AfterDoSearcher()
}
public override void DoSearch()
{
var mqttServer = Wtm.ServiceProvider.GetService(typeof(IMqttServer)) as IMqttServer;
foreach (var client in mqttServer.GetClientStatusAsync().Result)
var mqttServer = Wtm.ServiceProvider.GetService(typeof(MqttServer)) as MqttServer;
foreach (var client in mqttServer.GetClientsAsync().Result)
{
MqttClient_View mqttClient_ = new MqttClient_View
{
ClientId = client.ClientId,
ClientId = client.Id,
BytesReceived = client.BytesReceived,
BytesSent = client.BytesSent,
MqttProtocolVersion = client.ProtocolVersion,
Expand Down
11 changes: 5 additions & 6 deletions IoTGateway/IoTGateway.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@

<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.14.0" />
<PackageReference Include="MQTTnet" Version="3.1.2" />
<PackageReference Include="MQTTnet.AspNetCore" Version="3.1.2" />
<PackageReference Include="MQTTnet.Extensions.ManagedClient" Version="3.1.2" />
<PackageReference Include="MQTTnet.Extensions.Rpc" Version="3.1.2" />
<PackageReference Include="NLog" Version="5.0.1" />
<PackageReference Include="NLog.Web.AspNetCore" Version="5.1.0" />
<PackageReference Include="MQTTnet" Version="4.1.0.247" />
<PackageReference Include="MQTTnet.AspNetCore" Version="4.1.0.247" />
<PackageReference Include="MQTTnet.Extensions.Rpc" Version="4.1.0.247" />
<PackageReference Include="NLog" Version="5.0.2" />
<PackageReference Include="NLog.Web.AspNetCore" Version="5.1.1" />
<PackageReference Include="System.IO.Ports" Version="6.0.0" />
</ItemGroup>

Expand Down
2 changes: 1 addition & 1 deletion IoTGateway/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet.AspNetCore.Extensions;
using MQTTnet.AspNetCore;
using WalkingTec.Mvvm.Core;
using NLog;
using NLog.Web;
Expand Down
21 changes: 11 additions & 10 deletions IoTGateway/Startup.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
using Microsoft.Extensions.FileProviders;
using Microsoft.Extensions.Options;
using MQTTnet.AspNetCore;
using MQTTnet.AspNetCore.Extensions;
using Plugin;
using WalkingTec.Mvvm.Core;
using WalkingTec.Mvvm.Core.Extensions;
Expand Down Expand Up @@ -66,19 +65,16 @@ public void ConfigureServices(IServiceCollection services)
options.ReloadUserFunc = ReloadUser;
});

//MQTTServer
services.AddHostedMqttServer(mqttServer =>
{
mqttServer.WithoutDefaultEndpoint();
})
//MqttServer
services.AddHostedMqttServer(mqttServer => mqttServer.WithoutDefaultEndpoint())
.AddMqttConnectionHandler()
.AddConnections();


services.AddHostedService<IoTBackgroundService>();
services.AddSingleton<DeviceService>();
services.AddSingleton<DriverService>();
services.AddSingleton<UAService>();
//services.AddSingleton<UAService>();
services.AddSingleton<MyMqttClient>();
services.AddSingleton<ModbusSlaveService>();

Expand Down Expand Up @@ -125,10 +121,15 @@ public void Configure(IApplicationBuilder app, IOptionsMonitor<Configs> configs,

app.UseEndpoints(endpoints =>
{
//MqttServerWebSocket
endpoints.MapConnectionHandler<MqttConnectionHandler>("/mqtt", options =>
//MqttServer
app.UseEndpoints(endpoints =>
{
endpoints.MapMqtt("/mqtt");
});

app.UseMqttServer(server =>
{
options.WebSockets.SubProtocolSelector = MqttSubProtocolSelector.SelectSubProtocol;
// Todo: Do something with the server
});

endpoints.MapControllerRoute(
Expand Down
Binary file modified IoTGateway/iotgateway.db
Binary file not shown.
8 changes: 4 additions & 4 deletions Plugins/Plugin/DeviceService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@ public class DeviceService : IDisposable
public List<DeviceThread> DeviceThreads = new List<DeviceThread>();
private readonly MyMqttClient _myMqttClient;
private readonly UAService _uAService;
private readonly IMqttServer _mqttServer;
private readonly MqttServer _mqttServer;
private readonly string _connnectSetting = IoTBackgroundService.connnectSetting;
private readonly DBTypeEnum _dbType = IoTBackgroundService.DbType;

//UAService? uAService,
public DeviceService(IConfiguration configRoot, DriverService drvierManager, MyMqttClient myMqttClient,
UAService uAService, IMqttServer mqttServer, ILogger<DeviceService> logger)
MqttServer mqttServer, ILogger<DeviceService> logger)
{
if (mqttServer == null) throw new ArgumentNullException(nameof(mqttServer));
_logger = logger;
DrvierManager = drvierManager;
_myMqttClient = myMqttClient;
_uAService = uAService;
//_uAService = uAService;
_mqttServer = mqttServer ?? throw new ArgumentNullException(nameof(mqttServer));
try
{
Expand Down
29 changes: 21 additions & 8 deletions Plugins/Plugin/DeviceThread.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
using PluginInterface;
using System.Reflection;
using System.Text;
using IoTGateway.DataAccess;
using IoTGateway.Model;
using DynamicExpresso;
using MQTTnet.Server;
using Newtonsoft.Json;
using Microsoft.Extensions.Logging;
using MQTTnet;

namespace Plugin
{
Expand All @@ -26,7 +28,7 @@ public class DeviceThread : IDisposable
private bool _lastConnected;

public DeviceThread(Device device, IDriver driver, string projectId, MyMqttClient myMqttClient,
IMqttServer mqttServer, ILogger logger)
MqttServer mqttServer, ILogger logger)
{
_myMqttClient = myMqttClient;
_myMqttClient.OnExcRpc += MyMqttClient_OnExcRpc;
Expand Down Expand Up @@ -119,13 +121,24 @@ public DeviceThread(Device device, IDriver driver, string projectId, MyMqttClien
ret.CookedValue?.ToString())
{
//这是设备变量列表要用的
mqttServer.PublishAsync(
$"internal/v1/gateway/telemetry/{Device.DeviceName}/{item.Name}",
JsonConvert.SerializeObject(ret));
var msg = new InjectedMqttApplicationMessage(
new MqttApplicationMessage()
{
Topic =
$"internal/v1/gateway/telemetry/{Device.DeviceName}/{item.Name}",
Payload = Encoding.UTF8.GetBytes(
JsonConvert.SerializeObject(ret))
});
mqttServer.InjectApplicationMessage(msg);
//这是在线组态要用的
mqttServer.PublishAsync(
$"v1/gateway/telemetry/{Device.DeviceName}/{item.Name}",
JsonConvert.SerializeObject(ret.CookedValue));
msg = new InjectedMqttApplicationMessage(
new MqttApplicationMessage()
{
Topic =
$"v1/gateway/telemetry/{Device.DeviceName}/{item.Name}",
Payload = Encoding.UTF8.GetBytes(
JsonConvert.SerializeObject(ret.CookedValue))
});
}

DeviceValues[item.ID] = ret;
Expand Down Expand Up @@ -224,7 +237,7 @@ public void MyMqttClient_OnExcRpc(object? sender, RpcRequest e)
if (!writeResponse.IsSuccess)
{
rpcResponse.Description = writeResponse.Description;
break;
continue;
}
}
else
Expand Down
Loading

0 comments on commit 81b3d81

Please sign in to comment.