|
|
|
|
@ -0,0 +1,173 @@ |
|
|
|
|
using Common.Shared.Application.MQTT; |
|
|
|
|
using Microsoft.Extensions.Logging; |
|
|
|
|
using MQTTnet; |
|
|
|
|
using MQTTnet.Client; |
|
|
|
|
using MQTTnet.Protocol; |
|
|
|
|
using System.Collections.Concurrent; |
|
|
|
|
using System.Text; |
|
|
|
|
using WeiCloud.Utils.Common; |
|
|
|
|
|
|
|
|
|
namespace Common.Shared.DomainService.MqttClient |
|
|
|
|
{ |
|
|
|
|
public class MqttClientListService |
|
|
|
|
{ |
|
|
|
|
private readonly ILogger<MqttClientListService> _logger; |
|
|
|
|
|
|
|
|
|
/// <summary> |
|
|
|
|
/// mqttclient集合 |
|
|
|
|
/// </summary> |
|
|
|
|
public static ConcurrentDictionary<MQTTParamDto, IMqttClient> _mqttClientList = new ConcurrentDictionary<MQTTParamDto, IMqttClient>(); |
|
|
|
|
|
|
|
|
|
private MqttFactory _factory = new MqttFactory(); |
|
|
|
|
|
|
|
|
|
/// <summary> |
|
|
|
|
/// DI |
|
|
|
|
/// </summary> |
|
|
|
|
/// <param name="logger"></param> |
|
|
|
|
/// <param name="weiCloudDBRobotContext"></param> |
|
|
|
|
/// <param name="redisHashService"></param> |
|
|
|
|
/// <param name="configuration"></param> |
|
|
|
|
/// <exception cref="ArgumentNullException"></exception> |
|
|
|
|
public MqttClientListService(ILogger<MqttClientListService> logger) |
|
|
|
|
{ |
|
|
|
|
_logger = logger; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// <summary> |
|
|
|
|
/// 创建mqtt客户端 |
|
|
|
|
/// </summary> |
|
|
|
|
/// <param name="mQTT"></param> |
|
|
|
|
/// <returns></returns> |
|
|
|
|
public async Task<IMqttClient> CreateMqttClient(MQTTParamDto mQTT) |
|
|
|
|
{ |
|
|
|
|
try |
|
|
|
|
{ |
|
|
|
|
var key = _mqttClientList.Keys.Where(x => x.MqttClientMark == mQTT.MqttClientMark).FirstOrDefault(); |
|
|
|
|
if (key != null) |
|
|
|
|
{ |
|
|
|
|
return _mqttClientList[key]; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (string.IsNullOrWhiteSpace(mQTT.ClientId)) |
|
|
|
|
{ |
|
|
|
|
mQTT.ClientId = "robot_mqtt_" + UidGenerator.Uid().ToString(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var mqttClient = _factory.CreateMqttClient(); |
|
|
|
|
var options = new MqttClientOptionsBuilder() |
|
|
|
|
.WithTcpServer(mQTT.HostIp, mQTT.HostPort) |
|
|
|
|
.WithCredentials(mQTT.UserName, mQTT.Password) |
|
|
|
|
.WithCleanSession(false) |
|
|
|
|
.WithClientId(mQTT.ClientId) |
|
|
|
|
.Build(); |
|
|
|
|
|
|
|
|
|
mqttClient.ApplicationMessageReceivedAsync += delegate (MqttApplicationMessageReceivedEventArgs args) |
|
|
|
|
{ |
|
|
|
|
//var key = _mqttClientList.Keys.Where(x => x.ClientId == ).FirstOrDefault(); |
|
|
|
|
Console.WriteLine($"Topic {args.ApplicationMessage.Topic}"); |
|
|
|
|
Console.WriteLine($"Payload {Encoding.UTF8.GetString(args.ApplicationMessage.Payload)}"); |
|
|
|
|
Console.WriteLine($"Qos {args.ApplicationMessage.QualityOfServiceLevel}"); |
|
|
|
|
//进行业务处理 |
|
|
|
|
return Task.CompletedTask; |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
|
mqttClient.DisconnectedAsync += async e => |
|
|
|
|
{ |
|
|
|
|
await Task.Delay(TimeSpan.FromSeconds(5)); |
|
|
|
|
try |
|
|
|
|
{ |
|
|
|
|
await mqttClient.ConnectAsync(options); |
|
|
|
|
//订阅 |
|
|
|
|
await Subscribe(mqttClient, mQTT.SubscribeTopics, mQTT.Level); |
|
|
|
|
} |
|
|
|
|
catch (Exception ex) |
|
|
|
|
{ |
|
|
|
|
Console.WriteLine($"重新连接服务器异常 {ex.Message}"); |
|
|
|
|
_logger.LogError($"Init 重新连接服务器异常{ex.Message}"); |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
await mqttClient.ConnectAsync(options); |
|
|
|
|
await Subscribe(mqttClient, mQTT.SubscribeTopics, mQTT.Level); |
|
|
|
|
_mqttClientList.TryAdd(mQTT, mqttClient); |
|
|
|
|
return mqttClient; |
|
|
|
|
} |
|
|
|
|
catch (Exception ex) |
|
|
|
|
{ |
|
|
|
|
_logger.LogError($"create mqttclient error {ex.Message}"); |
|
|
|
|
return null; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// <summary> |
|
|
|
|
/// 订阅主题 |
|
|
|
|
/// </summary> |
|
|
|
|
/// <param name="mqttClient"></param> |
|
|
|
|
/// <param name="topics"></param> |
|
|
|
|
/// <param name="level"></param> |
|
|
|
|
/// <returns></returns> |
|
|
|
|
private async Task Subscribe(IMqttClient mqttClient, List<string>? topics, short level) |
|
|
|
|
{ |
|
|
|
|
var mqttSubscribeOptions = _factory.CreateSubscribeOptionsBuilder(); |
|
|
|
|
if (topics != null) |
|
|
|
|
{ |
|
|
|
|
foreach (var topic in topics) |
|
|
|
|
{ |
|
|
|
|
mqttSubscribeOptions.WithTopicFilter( |
|
|
|
|
f => |
|
|
|
|
{ |
|
|
|
|
f.WithTopic(topic); |
|
|
|
|
if (level == 1) |
|
|
|
|
{ |
|
|
|
|
f.WithAtLeastOnceQoS(); |
|
|
|
|
} |
|
|
|
|
else if (level == 0) |
|
|
|
|
{ |
|
|
|
|
f.WithAtMostOnceQoS(); |
|
|
|
|
} |
|
|
|
|
else if (level == 2) |
|
|
|
|
{ |
|
|
|
|
f.WithExactlyOnceQoS(); |
|
|
|
|
} |
|
|
|
|
}); |
|
|
|
|
} |
|
|
|
|
var response = await mqttClient.SubscribeAsync(mqttSubscribeOptions.Build(), CancellationToken.None); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/// <summary> |
|
|
|
|
/// 发布主题数据 |
|
|
|
|
/// </summary> |
|
|
|
|
/// <param name="mqttClientMark"></param> |
|
|
|
|
/// <param name="msgs"></param> |
|
|
|
|
/// <returns></returns> |
|
|
|
|
public async Task Publish(string mqttClientMark, List<string> msgs) |
|
|
|
|
{ |
|
|
|
|
IMqttClient mqttClient = null; |
|
|
|
|
var key = _mqttClientList.Keys.Where(x => x.MqttClientMark == mqttClientMark).FirstOrDefault(); |
|
|
|
|
if (key != null) |
|
|
|
|
{ |
|
|
|
|
if (key.PublishTopics == null) |
|
|
|
|
{ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
mqttClient = _mqttClientList[key]; |
|
|
|
|
} |
|
|
|
|
if (mqttClient == null) |
|
|
|
|
{ |
|
|
|
|
return; |
|
|
|
|
} |
|
|
|
|
foreach (var msg in msgs) |
|
|
|
|
{ |
|
|
|
|
foreach (var topic in key.PublishTopics) |
|
|
|
|
{ |
|
|
|
|
var applicationMessage = new MqttApplicationMessageBuilder() |
|
|
|
|
.WithTopic(topic) |
|
|
|
|
.WithPayload(msg) |
|
|
|
|
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtMostOnce) |
|
|
|
|
.Build(); |
|
|
|
|
await mqttClient.PublishAsync(applicationMessage, CancellationToken.None); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |