diff --git a/Txgy.FilesWatcher/ViewModels/MainViewModel.cs b/Txgy.FilesWatcher/ViewModels/MainViewModel.cs index aeed41d..611ef28 100644 --- a/Txgy.FilesWatcher/ViewModels/MainViewModel.cs +++ b/Txgy.FilesWatcher/ViewModels/MainViewModel.cs @@ -25,6 +25,10 @@ using Prism.Events; using ImTools; using static System.Windows.Forms.Design.AxImporter; using HandyControl.Controls; +using MQTTnet.Extensions.ManagedClient; +using MQTTnet; +using MQTTnet.Client.Options; +using System.Windows.Markup; namespace Txgy.FilesWatcher.ViewModels { @@ -118,15 +122,39 @@ namespace Txgy.FilesWatcher.ViewModels get { return isUploadMQTT; } set { SetProperty(ref isUploadMQTT, value); } } - private ObservableCollection dataList = new ObservableCollection(); + private ObservableCollection dataList = new ObservableCollection(); - public ObservableCollection DataList + public ObservableCollection DataList { get { return dataList; } set { SetProperty(ref dataList, value); } } + private ObservableCollection realTimeDataList = new ObservableCollection(); + public ObservableCollection RealTimeDataList + + { + get { return realTimeDataList; } + set { SetProperty(ref realTimeDataList, value); } + } + + private ObservableCollection postDataList = new ObservableCollection(); + + public ObservableCollection PostDataList + + { + get { return postDataList; } + set { SetProperty(ref postDataList, value); } + } + private ObservableCollection mqttDataList = new ObservableCollection(); + + public ObservableCollection MQTTDataList + + { + get { return mqttDataList; } + set { SetProperty(ref mqttDataList, value); } + } private DateTime startTime; public DateTime StartTime { get => startTime; set => SetProperty(ref startTime, value); } private string runTime; @@ -230,37 +258,42 @@ namespace Txgy.FilesWatcher.ViewModels } else if (para == "MseedPath") { - - var name = Path.GetDirectoryName(filePath); + //替换主目录; + var name = filePath.Replace(MainPath+"\\", ""); string path = Path.Combine(MainPath, name); if (!Directory.Exists(path)) { MessageBox.Show($"{path} 不存在!"); return; } + MseedPath = name; _systemConfig.mseedpath = name; } else if (para == "RealtimePath") { - string path = Path.Combine(MainPath, RealtimePath); + var name = filePath.Replace(MainPath + "\\", ""); + string path = Path.Combine(MainPath, name); if (!Directory.Exists(path)) { MessageBox.Show($"{path} 不存在!"); return; } - watcherArray[0].Path = Path.Combine(MainPath, RealtimePath); + RealtimePath=name; _systemConfig.realtimepath = RealtimePath; + watcherArray[0].Path = path; } else if (para == "PostPath") { - string path = Path.Combine(MainPath, PostPath); + var name = filePath.Replace(MainPath + "\\", ""); + string path = Path.Combine(MainPath, name); if (!Directory.Exists(path)) { MessageBox.Show($"{path} 不存在!"); return; } - watcherArray[1].Path = Path.Combine(MainPath, PostPath); + PostPath=name; _systemConfig.postpath = PostPath; + watcherArray[1].Path = path; } UpdateJsonConfig(_systemConfig); } @@ -275,7 +308,7 @@ namespace Txgy.FilesWatcher.ViewModels IntervalTimesSource.Add(30); IntervalTimesSource.Add(60); SelectedIndex = 1; - + foreach (var watcher in watcherArray) { //初始化监听 @@ -306,9 +339,37 @@ namespace Txgy.FilesWatcher.ViewModels watcher.EnableRaisingEvents = false; watcher.EndInit(); } - } - - + + //初始化mqtt + mqttClient = new MqttFactory().CreateManagedMqttClient(); + mqttClient.UseDisconnectedHandler(ee => + { + Debug.WriteLine($">>> 服务器断开连接,{ee}"); + }); + mqttClient.UseApplicationMessageReceivedHandler(ee => + { + try + { + // ResolveDatas(ee.ApplicationMessage.Topic, ee.ApplicationMessage.Payload); + } + catch (Exception ex) + { + MessageBox.Show($">>>数据解析出错,{ex}"); + } + }); + + mqttClient.UseConnectedHandler(ee => + { + Debug.WriteLine(">>> MQTT 连接到服务!"); + // mqttClient.SubscribeAsync(CmdData.STREMACTRL + "#", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce); + // mqttClient.SubscribeAsync(CmdData.STREMARES + "#", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce); + // Debug.WriteLine($">>>订阅:{CmdData.STREMARES} #"); + // mqttClient.SubscribeAsync(CmdData.STREMAUP + "vpn45", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce); + // WriteLog($">>>成功订阅:{CmdData.STREMAUP} #"); + // mqttClient.SubscribeAsync(CmdData.STREMAROUTERRES + "#", MQTTnet.Protocol.MqttQualityOfServiceLevel.AtMostOnce); + }); + } + private void UpdateJsonConfig(SystemConfig systemConfig) { @@ -335,7 +396,17 @@ namespace Txgy.FilesWatcher.ViewModels Debug.WriteLine($"**********当前时间:{currentT},监控路径:{path}"); if (Directory.Exists(path) && IsUploadDB&& IsUploadMseedPath) { - UploadMseedFile.UploadMSeedOnce(path, WorkAreaId); + UploadMseedFile.UploadMSeedOnce((a=> + { + timer1.Dispatcher.Invoke(() => + { + DataList.Add(new FileModel + { + CreateTime = DateTime.Now.ToString("yyyy-MM-dd T HH:mm:ss"), + Data = $"{DateTime.Now.ToString("yyyy-MM-dd T HH:mm:ss")}{a}" + }); + }); + }), path, WorkAreaId); } } /// @@ -364,7 +435,6 @@ namespace Txgy.FilesWatcher.ViewModels return result; } - private string DateDiff(DateTime DateTime1, DateTime DateTime2) { string dateDiff = null; @@ -402,32 +472,64 @@ namespace Txgy.FilesWatcher.ViewModels Debug.WriteLine($"最后修改时间:{lastWriteTime},文件路径:{watch.Path}"); if (watch != null && watch.Path == watcherArray[0].Path) { + watcherArray[0].EnableRaisingEvents = false; - if (IsUploadDB&& isUploadRealtimePath) - UploadRealtimeFile.UploadRealtimeFileOnce(watcherArray[0].Path, lastLine, IsUploadMQTT, WorkAreaId); + if (IsUploadDB && isUploadRealtimePath) + { + UploadRealtimeFile.UploadRealtimeFileOnce((a, b) => + { + timer1.Dispatcher.Invoke(() => + { + RealTimeDataList.Add(new FileModel + { + CreateTime = DateTime.Now.ToString("yyyy-MM-dd T HH:mm:ss"), + Data = $"{DateTime.Now.ToString("yyyy-MM-dd T HH:mm:ss")}事件上传成功。" + }); + }); + if (IsUploadMQTT) + { + MQPublish(a, b); + timer1.Dispatcher.Invoke(() => + { + MQTTDataList.Add(new FileModel + { + CreateTime = DateTime.Now.ToString("yyyy-MM-dd T HH:mm:ss"), + Data = $"{DateTime.Now.ToString("yyyy-MM-dd T HH:mm:ss")}事件上传成功。" + }); + }); + } + }, watcherArray[0].Path, lastLine, WorkAreaId); + } } else if (watch != null && watch.Path == watcherArray[1].Path) { watcherArray[1].EnableRaisingEvents = false; - if (IsUploadDB&& IsUploadPostPath) - UploadPostproFile.UploadPostproFileOnce(watcherArray[1].Path, lastLine, WorkAreaId); - } - var str= dataList.Where(f => f.Data == lastLine).FirstOrDefault(); - if (!string.IsNullOrWhiteSpace(lastLine) - && str==null) - { - System.Windows.Application.Current.Dispatcher.BeginInvoke(new Action(() => + if (IsUploadDB && IsUploadPostPath) { - DataList.Add(new WatcherFileModel + UploadPostproFile.UploadPostproFileOnce((a, b) => { - CreateTime = DateTime.Now.ToString(), - ChangeType = e.ChangeType, - Name = e.Name, - FullPath = e.FullPath, - Data = lastLine, - IsSend = false - }); - })); + timer1.Dispatcher.Invoke(() => + { + PostDataList.Add(new FileModel + { + CreateTime = DateTime.Now.ToString("yyyy-MM-dd T HH:mm:ss"), + Data = $"{DateTime.Now.ToString("yyyy-MM-dd T HH:mm:ss")}事件上传成功。" + }); + }); + if (IsUploadMQTT) + { + MQPublish(a, b); + timer1.Dispatcher.Invoke(() => + { + MQTTDataList.Add(new FileModel + { + CreateTime = DateTime.Now.ToString("yyyy-MM-dd T HH:mm:ss"), + Data = $"{DateTime.Now.ToString("yyyy-MM-dd T HH:mm:ss")}事件上传成功。" + }); + }); + } + }, watcherArray[1].Path, lastLine, WorkAreaId); + } } if (watch != null && watch.Path == watcherArray[0].Path) { @@ -438,7 +540,6 @@ namespace Txgy.FilesWatcher.ViewModels watcherArray[1].EnableRaisingEvents = true; } } - private void Watcher_Renamed(object sender, RenamedEventArgs e) { System.Windows.Application.Current.Dispatcher.BeginInvoke(new Action(() => { @@ -490,11 +591,54 @@ namespace Txgy.FilesWatcher.ViewModels // Data=tmp //}); } - + + private void StartConnectMQ() + { + // 向服务端开放的端口进行服务请求 + // string clientID = Guid.NewGuid().ToString(); + var mqttClientOptions = new MqttClientOptionsBuilder() + .WithClientId(_systemConfig.mQTTConfig.clientID)// 传入ClientID参数 + .WithTcpServer(_systemConfig.mQTTConfig.serverIP, _systemConfig.mQTTConfig.port)//指定TCP连接的IP和Port + .WithCredentials(_systemConfig.mQTTConfig.userName, _systemConfig.mQTTConfig.password);//指定连接的用户名和密码 + + var options = new ManagedMqttClientOptionsBuilder() + .WithAutoReconnectDelay(TimeSpan.FromSeconds(60)) + .WithClientOptions(mqttClientOptions.Build()) + .Build(); + mqttClient.StartAsync(options); + startTime = DateTime.Now; + } + private void StopMQ() + { + mqttClient?.StopAsync(); + } + private void MQPublish(string topic, string openCmd) + { + // byte[] cmdByte = Convert.FromHexString(openCmd.Replace(" ", "")); + MqttApplicationMessage message = new MqttApplicationMessageBuilder() + .WithTopic(topic) + .WithPayload(openCmd) + .WithRetainFlag(false) + .Build(); + var res = mqttClient.PublishAsync(message); + string mes = string.Empty; + if (!res.IsFaulted) + { + mes = $">>>数据发送成功:{topic},{openCmd}"; + } + else + { + mes = $">>>数据发送失败:{topic},{openCmd}"; + } + Debug.WriteLine(mes); + } + + private FileSystemWatcher[] watcherArray = new FileSystemWatcher[2]; private DispatcherTimer timer1 = new DispatcherTimer(); private readonly SystemConfig _systemConfig; - // private readonly WebsocketClient _websocketClient; + private IManagedMqttClient mqttClient; + // private readonly WebsocketClient _websocketClient; private readonly IEventAggregator _ea; protected void ShowLoading(string tip = "正在加载....") diff --git a/Txgy.FilesWatcher/Views/MainView.xaml b/Txgy.FilesWatcher/Views/MainView.xaml index 2fae030..af67ff9 100644 --- a/Txgy.FilesWatcher/Views/MainView.xaml +++ b/Txgy.FilesWatcher/Views/MainView.xaml @@ -129,7 +129,7 @@ - + @@ -147,7 +147,7 @@ - + @@ -165,7 +165,7 @@ - + diff --git a/Txgy.FilesWatcher/Views/MainWindow.xaml.cs b/Txgy.FilesWatcher/Views/MainWindow.xaml.cs index 9151e45..8a9c076 100644 --- a/Txgy.FilesWatcher/Views/MainWindow.xaml.cs +++ b/Txgy.FilesWatcher/Views/MainWindow.xaml.cs @@ -10,6 +10,7 @@ namespace Txgy.FilesWatcher.Views public MainWindow() { InitializeComponent(); + Dispatcher.Invoke(() => { }); } } } diff --git a/Txgy.FilesWatcher/model/UploadMseedFile.cs b/Txgy.FilesWatcher/model/UploadMseedFile.cs index db8250c..67de379 100644 --- a/Txgy.FilesWatcher/model/UploadMseedFile.cs +++ b/Txgy.FilesWatcher/model/UploadMseedFile.cs @@ -20,7 +20,7 @@ namespace Txgy.FilesWatcher.model public string FileName { get; set; } - public static void UploadMSeedOnce(string path, int workAreaId=1) + public static void UploadMSeedOnce(Action mseedAction, string path, int workAreaId=1) { try { @@ -28,9 +28,9 @@ namespace Txgy.FilesWatcher.model var Dfiles = new DirectoryInfo(path).GetFiles("*.mseed"); foreach (var DFile in Dfiles) { - string file = DFile.Name; - string tbname = file.Substring(3,10); - tbname=tbname.Replace("-",""); + string file = DFile.Name; + string tbname = file.Substring(3, 10); + tbname = tbname.Replace("-", ""); string uploadedtbname = $"uploaded{tbname}"; string sqlNumber = $"SELECT COUNT(*) FROM {uploadedtbname} WHERE filename = '{file}'"; @@ -49,15 +49,19 @@ namespace Txgy.FilesWatcher.model string WaveTime = Path.GetFileNameWithoutExtension(file).Substring(7); //tbname.Substring(3); WaveTime = WaveTime.Insert(4, "-").Insert(7, "-").Insert(10, "T").Insert(13, ":").Insert(16, ":00"); - sqlNumber = $"INSERT INTO {tbname} (WaveTime, WorkAreaID, WaveData) VALUES('{WaveTime}', '{workAreaId}', @mseedD)"; - var res = conn.Execute(sqlNumber, new { mseedD=mseedDatas}); - + sqlNumber = $"INSERT INTO {tbname} (WaveTime, WorkAreaID, WaveData) VALUES('{WaveTime}', '{workAreaId}', @mseedD)"; + var res = conn.Execute(sqlNumber, new { mseedD = mseedDatas }); //更新上传文件记录 sqlNumber = $"INSERT INTO {uploadedtbname}(filename, uploadtime) VALUES('{file}', '{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}')"; - res = conn.Execute(sqlNumber); + res = conn.Execute(sqlNumber); } } } + // if (Dfiles.Length != 0) + { + string notifyMes = $"共{Dfiles.Length}个数据上传成功,{string.Join(",", Dfiles.Select(f => f.Name.Substring(3, 3)))}"; + mseedAction(notifyMes); + } } catch (Exception ex) { diff --git a/Txgy.FilesWatcher/model/UploadPostproFile.cs b/Txgy.FilesWatcher/model/UploadPostproFile.cs index 788dd51..f4bcd81 100644 --- a/Txgy.FilesWatcher/model/UploadPostproFile.cs +++ b/Txgy.FilesWatcher/model/UploadPostproFile.cs @@ -5,15 +5,19 @@ using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; +using System.Text.Encodings.Web; +using System.Text.Json; +using System.Text.Unicode; using System.Threading.Tasks; using Txgy.FilesWatcher.DBModels; +using static System.Windows.Forms.Design.AxImporter; namespace Txgy.FilesWatcher.model { public class UploadPostproFile { - public static void UploadPostproFileOnce(string path, string eventMessage, int workAreaId = 1) + public static void UploadPostproFileOnce(Action mqAction, string path, string eventMessage, int workAreaId = 1) { try { @@ -52,10 +56,20 @@ namespace Txgy.FilesWatcher.model fs = new FileStream(JsonPath, FileMode.Open, FileAccess.Read); BinaryReader jbr = new BinaryReader(fs); Byte[] jsonDatas = jbr.ReadBytes((int)fs.Length); + string jsonString = System.Text.Encoding.Default.GetString(jsonDatas); + fs.Close(); + var options = new JsonSerializerOptions + { + // 整齐打印 + WriteIndented = true, + //重新编码,解决中文乱码问题 + Encoder = JavaScriptEncoder.Create(UnicodeRanges.All) + }; sqlNumber = $"INSERT INTO {tbname} (EventTime, WorkAreaID, WaveData, JsonFile) VALUES('{EventTime}', '{workAreaId}', @mDatas, @jDatas);"; MySqlCommand mycomm = new MySqlCommand(sqlNumber, conn); res = conn.Execute(sqlNumber, new { mDatas = mseedDatas, jDatas = jsonDatas }); + mqAction($"/watcherdata/post/", JsonSerializer.Serialize(new { jsonFile = jsonString, eventMessage = eventMessage }, options)); sqlNumber = $"INSERT INTO {uploadedtbname}(filename, uploadtime) VALUES('{file}', '{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}')"; res = conn.Execute(sqlNumber); } diff --git a/Txgy.FilesWatcher/model/UploadRealtimeFile.cs b/Txgy.FilesWatcher/model/UploadRealtimeFile.cs index 8c8cb0c..5e6ba2f 100644 --- a/Txgy.FilesWatcher/model/UploadRealtimeFile.cs +++ b/Txgy.FilesWatcher/model/UploadRealtimeFile.cs @@ -4,23 +4,26 @@ using Google.Protobuf.WellKnownTypes; using HandyControl.Controls; using Microsoft.Xaml.Behaviors.Media; using MySql.Data.MySqlClient; +using Org.BouncyCastle.Bcpg.OpenPgp; using Org.BouncyCastle.Pqc.Crypto.Lms; using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; +using System.Text.Encodings.Web; +using System.Text.Json; +using System.Text.Unicode; using System.Threading.Tasks; using System.Windows.Markup; using System.Xml.Linq; using Txgy.FilesWatcher.DBModels; -using static System.Net.WebRequestMethods; namespace Txgy.FilesWatcher.model { public class UploadRealtimeFile { - public static void UploadRealtimeFileOnce(string path, string eventMessage, bool isUploadMQ,int workAreaId= 1) + public static void UploadRealtimeFileOnce(Action mqAction, string path, string eventMessage, int workAreaId= 1) { try { @@ -59,10 +62,20 @@ namespace Txgy.FilesWatcher.model fs = new FileStream(JsonPath, FileMode.Open, FileAccess.Read); BinaryReader jbr = new BinaryReader(fs); Byte[] jsonDatas = jbr.ReadBytes((int)fs.Length); + string jsonString = System.Text.Encoding.Default.GetString(jsonDatas); fs.Close(); + var options = new JsonSerializerOptions + { + // 整齐打印 + WriteIndented = true, + //重新编码,解决中文乱码问题 + Encoder = JavaScriptEncoder.Create(UnicodeRanges.All) + }; + sqlNumber = $"INSERT INTO {tbname} (EventTime, WorkAreaID, WaveData, JsonFile) VALUES('{EventTime}', '{workAreaId}', @mDatas, @jDatas);"; MySqlCommand mycomm = new MySqlCommand(sqlNumber,conn); res = conn.Execute(sqlNumber, new { mDatas = mseedDatas, jDatas = jsonDatas }); + mqAction("$/watcherdata/realtime/", JsonSerializer.Serialize(new { jsonFile = jsonString, eventMessage = eventMessage }, options)); sqlNumber = $"INSERT INTO {uploadedtbname}(filename, uploadtime) VALUES('{file}', '{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}')"; res = conn.Execute(sqlNumber); } diff --git a/Txgy.FilesWatcher/model/WatcherFileModel.cs b/Txgy.FilesWatcher/model/WatcherFileModel.cs index 65221e8..c473404 100644 --- a/Txgy.FilesWatcher/model/WatcherFileModel.cs +++ b/Txgy.FilesWatcher/model/WatcherFileModel.cs @@ -37,4 +37,25 @@ namespace Txgy.FilesWatcher.model set { SetProperty(ref isSend, value); } } } + + public class FileModel : BindableBase + { + private string createTime; + private string data; + private bool isSend; + public string CreateTime + { + get { return createTime; } + set { SetProperty(ref createTime, value); } + } + + public string Data { get => data; set => SetProperty(ref data, value); } + + public bool IsSend + { + get { return isSend; } + set { SetProperty(ref isSend, value); } + } + } + }