You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

228 lines
8.9 KiB
C#

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

using System;
using System.Collections.Generic;
using System.Data;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Txgy.Microseismic.BaseLib
{
public class StationWorkModel
{
public static object _lock = new object();
public int Sampling { get; set; }
public int StationCnt { get; set; }
/// <summary>
/// 任务总数
/// </summary>
public int TaskCnt { get; set; }
public string[] files { get; set; }
public List<StationModel> Stations { get; set; }
/// <summary>
/// 单文件长度(个)
/// </summary>
public int SingleFileLength = 30000;
public void InitWork()
{
Stations = new List<StationModel>();
TaskCnt = StationCnt = files.Length;
for (int i = 0; i < StationCnt; i++)
{
string fn = Path.GetFileNameWithoutExtension(files[i]).Substring(3, 3);
StationModel sm = new StationModel();
sm.Name = fn;
sm.FilePath = files[i];
Stations.Add(sm);
}
}
public void StartWriter()
{
Task[] tasks = new Task[TaskCnt];
for (int i = 0; i < TaskCnt; i++)
{
int num = i;
//Func<bool> func = (s, ri) =>
// Writer(ri);
//func.b
Func<StationModel, ChannelModel> func = WriteChannelZ;
AsyncCallback asyncCallback = ar =>
{
ChannelModel cm=func.EndInvoke(ar);
//Stopwatch stopWatch = new Stopwatch();
//stopWatch.Start();//开始或继续测量某个时间间隔的运行时间
lock (_lock)
{
for (int k = 0; k < SingleFileLength; k++)
{
int nk = k;
Stations[num].cz.Enqueue(cm.datas[nk]);
}
}
//stopWatch.Stop();//停止测量某个时间间隔的运行时间。
//TimeSpan ts = stopWatch.Elapsed;
//Console.WriteLine("读取耗时:{0}ms",ts.TotalMilliseconds);
};
func.BeginInvoke(Stations[num], asyncCallback, null);
//ChannelModel cm = func.EndInvoke(func.BeginInvoke(Stations[i],null,null));
//AsyncCallback asyncCallback = ar =>
//{
// ChannelModel cm = (ChannelModel)ar.AsyncState;
// if (cm != null)
// {
// Stopwatch stopWatch = new Stopwatch();
// stopWatch.Start();//开始或继续测量某个时间间隔的运行时间
// lock (_lock)
// {
// for (int k = 0; k < SingleFileLength; k++)
// {
// Stations[i].cz.Enqueue(cm.datas[k]);
// }
// }
// stopWatch.Stop();//停止测量某个时间间隔的运行时间。
// TimeSpan ts = stopWatch.Elapsed;
// Console.WriteLine(ts.TotalMilliseconds);
// }
//};
//func.BeginInvoke(Stations[i], asyncCallback, null);
//tasks[num] = Task.Run(() =>
//{ Writer(Stations[num]); });
//Action
}
// Task.WaitAll(tasks);
//Thread.Sleep(100);
}
public void StartReader()
{
//Task[] tasks2 = new Task[TaskCnt];
//for (int i = 0; i < TaskCnt; i++)
//{
// int num = i;
// tasks2[num] = Task.Run(() =>
// { Reader(Stations[num]); });
//}
}
public ChannelModel WriteChannelZ(StationModel sm)
{
ChannelModel cm=new ChannelModel();
cm.Name = sm.Name;
cm.datas = new double[SingleFileLength];
string al = new StreamReader(sm.FilePath).ReadToEnd();
string[] als = al.Split(new char[] { '\n' });
for (int k = 0; k < SingleFileLength; k++)
{
int num = k;
double value = (double.TryParse(als[num + 1], out double zz) ? zz : 0);
if (value < 30000)
{
cm.datas[num] = 32400;
continue;
}
//Console.WriteLine(value);
cm.datas[k]=(value);
}
return cm;
}
public void Writer(StationModel sm)
{
//while (true)
List<double> data = new List<double>();
//{
if (sm.cz.IsEmpty)
{
//Stopwatch stopWatch = new Stopwatch();
//stopWatch.Start();//开始或继续测量某个时间间隔的运行时间
string al = new StreamReader(sm.FilePath).ReadToEnd();
string[] als = al.Split(new char[] { '\n' });
lock (_lock)
{
for (int k = 0; k < SingleFileLength; k++)
{
int num = k;
int value = (int)(int.TryParse(als[num + 1], out int zz) ? zz : sm.cz.Average());
if (value <30000)
Console.WriteLine(value);
sm.cz.Enqueue(value);
}
for (int k = 0; k < SingleFileLength; k++)
{
int num = k;
int value = (int)(int.TryParse(als[num + 30003], out int ee) ? ee : sm.ce.Average());
sm.ce.Enqueue(value);
}
for (int k = 0; k < SingleFileLength; k++)
{
int num = k;
int value = (int)(int.TryParse(als[num + 60004], out int nn) ? nn : sm.cn.Average());
sm.cn.Enqueue(value);
}
//Stopwatch stopWatch = new Stopwatch();
//stopWatch.Start();//开始或继续测量某个时间间隔的运行时间
//data.AddRange(sm.cz.ToList());
//stopWatch.Stop();//停止测量某个时间间隔的运行时间。
//TimeSpan ts = stopWatch.Elapsed;
//Console.WriteLine(ts.TotalMilliseconds);
// Get the elapsed time as a TimeSpan value.
//TimeSpan ts = stopWatch.Elapsed; //获取当前实例测量得出的总运行时间。
//Console.WriteLine($"{dp.station}.{dp.channel}\t生产出:{dp.cz.Count}\t耗时:{ts.TotalMilliseconds}");
//Console.WriteLine($"{dp.station}.{dp.channel}\t生产出:{dp.ce.Count}\t耗时:{ts.TotalMilliseconds}");
//Console.WriteLine($"{dp.station}.{dp.channel}\t生产出:{dp.cn.Count}\t耗时:{ts.TotalMilliseconds}");
}
}
Thread.Sleep(1);
//}
}
public double[] Reader(StationModel sm,int cnt)
{
double[] js = new double[cnt];
//while (true)
//{
//int ri = 0;
List<int> cz = new List<int>();
List<int> ce = new List<int>();
List<int> cn = new List<int>();
lock (_lock)
{
for (int i = 0; i < js.Length; i++)
{
int index= i;
sm.cz.TryDequeue(out js[index]);
// js[index] = kk;
//cz.Add(kk);
//sm.ce.TryDequeue(out int ee);
//ce.Add(ee);
//sm.cn.TryDequeue(out int nn);
//cn.Add(nn);
}
}
//Console.WriteLine($"{dp.station}.{dp.channel}\t消费{cz.Count}个,平均值:{cz.Average()},cqCount:{dp.cz.Count}");
//Console.WriteLine($"{dp.station}.{dp.channel}\t消费{ce.Count}个,平均值:{ce.Average()},cqCount:{dp.ce.Count}");
//Console.WriteLine($"{dp.station}.{dp.channel}\t消费{cn.Count}个,平均值:{cn.Average()},cqCount:{dp.cn.Count}");
//Task.Delay(1000);
//Thread.Sleep(1000);
// }
return js;
}
}
}