实现线程以使用C#进行快速,批量和连续读取的最佳实践?

Best practices for implementing a thread to do fast, bulk, and continuous reading in C#?
2020-11-21
  •  译文(汉语)
  •  原文(英语)

在.NET 4.0中应如何处理从C#中的设备读取批量数据?具体来说,我需要从USB HID设备快速读取数据,该设备会发出26个以上必须保留订单的数据包的报告.

我尝试在BackgroundWorker线程中执行此操作.它一次从设备读取一个数据包,并在读取更多数据包之前对其进行处理.这样可以提供合理的响应时间,但是很容易在某个地方丢失一个数据包,并且读取单个数据包的开销成本加起来了.

while (!( sender as BackgroundWorker ).CancellationPending) {
       //read a single packet
       //check for header or footer
       //process packet data
    }
}

用C#读取这样的设备的最佳实践是什么?


背景:

我的USB HID设备连续报告大量数据.数据分为26个数据包,我必须保留该顺序.不幸的是,该设备仅在每个报告中将第一个数据包标记为最后一个数据包,因此我需要能够捕获之间的所有其他数据包.

速聊1:
什么版本的.net?答案将取决于它.
速聊2:
我的目标是.NET 4.0,但如果答案也能解释与其他版本的不同,那就太好了.
速聊3:
HID数据速率非常低,最多8 KB /秒.您无法编写跟不上该代码的代码,不需要"最佳实践".
速聊4:
它比以64字节/毫秒的速度稍高一点,但是与USB或HID无关的问题.我很好奇应该如何处理这样的事情.
解决过程1

对于.Net 4,您可以使用BlockingCollection提供一个线程安全队列,供生产者和使用者使用.该BlockingCollection.GetConsumingEnumerable()方法提供了一个枚举器,当使用将该队列标记为已完成CompleteAdding()并且为空时,该枚举器会自动终止.

这是一些示例代码.在此示例中,有效负载是一个整数数组,但是您当然可以使用所需的任何数据类型.

请注意,对于您的特定示例,您可以使用其重载GetConsumingEnumerable()接受type参数的重载CancellationToken.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            var queue = new BlockingCollection<int[]>();

            Task.Factory.StartNew(() => produce(queue));

            consume(queue);

            Console.WriteLine("Finished.");
        }

        private static void consume(BlockingCollection<int[]> queue)
        {
            foreach (var item in queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming " + item[0]);
                Thread.Sleep(25);
            }
        }

        private static void produce(BlockingCollection<int[]> queue)
        {
            for (int i = 0; i < 1000; ++i)
            {
                Console.WriteLine("Producing " + i);
                var payload = new int[100];
                payload[0] = i;
                queue.Add(payload);
                Thread.Sleep(20);
            }

            queue.CompleteAdding();
        }
    }
}

对于.Net 4.5及更高版本,您可以使用Microsoft的Task Parallel Library中的高级类,该类具有很多功能(乍一看可能有些令人生畏).

这是使用TPL DataFlow的相同示例:

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            var queue = new BufferBlock<int[]>();

            Task.Factory.StartNew(() => produce(queue));
            consume(queue).Wait();

            Console.WriteLine("Finished.");
        }

        private static async Task consume(BufferBlock<int[]> queue)
        {
            while (await queue.OutputAvailableAsync())
            {
                var payload = await queue.ReceiveAsync();
                Console.WriteLine("Consuming " + payload[0]);
                await Task.Delay(25);
            }
        }

        private static void produce(BufferBlock<int[]> queue)
        {
            for (int i = 0; i < 1000; ++i)
            {
                Console.WriteLine("Producing " + i);
                var payload = new int[100];
                payload[0] = i;
                queue.Post(payload);
                Thread.Sleep(20);
            }

            queue.Complete();
        }
    }
}
速聊1:
我将要发布一个非常相似的答案,唯一不同的是我拥有produceconsume接受了a,CancellationToken因此我可以重新创建OP的行为(!( sender as BackgroundWorker ).CancellationPending)
速聊2:
我做了足够的工作,还是决定发布答案.
速聊3:
很好的答案,也感谢您发布了4.5的示例.
解决过程2

如果担心丢失数据包,请不要在同一线程上进行处理和读取.从.NET 4.0开始,他们添加了System.Collections.Concurrent使此操作非常容易的命名空间.您所需要的只是一个BlockingCollection充当传入数据包队列的方式.

BlockingCollection<Packet> _queuedPackets = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());

void readingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
    while (!( sender as BackgroundWorker ).CancellationPending) 
    {
       Packet packet = GetPacket();
       _queuedPackets.Add(packet);
    }        
    _queuedPackets.CompleteAdding();
}

void processingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
    List<Packet> report = new List<Packet>();
    foreach(var packet in _queuedPackets.GetConsumingEnumerable())
    {
        report.Add(packet);
        if(packet.IsLastPacket)
        {
            ProcessReport(report);
            report = new List<Packet>();
        }
    }
}

_queuedPackets_queuedPackets.GetConsumingEnumerable()时将发生的情况将阻塞线程而不消耗任何资源.数据包一到达,它将解除阻塞并执行foreach的下一次迭代.

当您_queuedPackets.CompleteAdding();在处理线程上调用foreach时,它将运行直到集合为空,然后退出foreach循环.如果您不希望它在取消时"完成队列",则可以轻松地将其更改为提前退出.我还将切换到使用Tasks而不是Background Workers,因为它使传递参数变得更加容易.

void ReadingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
    while (!token.IsCancellationRequested) 
    {
       Packet packet = GetPacket();
       queue.Add(packet);
    }        
    queue.CompleteAdding();
}

void ProcessingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
    List<Packet> report = new List<Packet>();

    try
    {
        foreach(var packet in queue.GetConsumingEnumerable(token))
        {
            report.Add(packet);
            if(packet.IsLastPacket)
            {
                ProcessReport(report);
                report = new List<Packet>();
            }
        }
    }
    catch(OperationCanceledException)
    {
        //Do nothing, we don't care that it happened.
    }
}

//This would replace your backgroundWorker.RunWorkerAsync() calls;
private void StartUpLoops()
{
    var queue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());    
    var cancelRead = new CancellationTokenSource();
    var cancelProcess = new CancellationTokenSource();
    Task.Factory.StartNew(() => ReadingLoop(queue, cancelRead.Token));
    Task.Factory.StartNew(() => ProcessingLoop(queue, cancelProcess.Token));

    //You can stop each loop indpendantly by calling cancelRead.Cancel() or cancelProcess.Cancel()
}
速聊1:
感谢您决定发布此信息,它非常有帮助+1

How should the reading of bulk data from a device in C# be handled in .NET 4.0? Specifically I need to read quickly from a USB HID device that emits reports over 26 packets where order must be preserved.

I've tried doing this in a BackgroundWorker thread. It reads one packet from the device at a time, and process it, before reading more. This gives reasonably good response times, but it is liable to lose a packet here and there, and the overhead costs of a single packet read adds up.

while (!( sender as BackgroundWorker ).CancellationPending) {
       //read a single packet
       //check for header or footer
       //process packet data
    }
}

What is the best practice in C# for reading a device like this?


Background:

My USB HID device continuously reports a large amount of data. The data is split over 26 packets and I must preserver the order. Unfortunately the device only marks the first the last packets in each report, so I need to be able to catch every other packet in between.

Talk1:
What version of .net? Answer will depend on it.
Talk2:
I'm targeting .NET 4.0, but it would be awesome if an answer explains the difference with other versions, too.
Talk3:
HID data rates are very low, 8 KB/sec at most. You can't write code that cannot keep up with that, no "best practice" is required.
Talk4:
It's a wee bit higher than that at 64 bytes per ms, but the question isn't necessary to do with USB or HID. I'm just curious about how something like this should be handled.
Solutions1

For .Net 4 you can use a BlockingCollection to provide a threadsafe queue that can be used by a producer and a consumer. The BlockingCollection.GetConsumingEnumerable() method provides an enumerator which automatically terminates when the queue has been marked as completed using CompleteAdding() and is empty.

Here's some sample code. The payload is an array of ints in this example, but of course you would use whatever data type you need.

Note that for your specific example, you can use the overload of GetConsumingEnumerable() which accepts an argument of type CancellationToken.

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            var queue = new BlockingCollection<int[]>();

            Task.Factory.StartNew(() => produce(queue));

            consume(queue);

            Console.WriteLine("Finished.");
        }

        private static void consume(BlockingCollection<int[]> queue)
        {
            foreach (var item in queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming " + item[0]);
                Thread.Sleep(25);
            }
        }

        private static void produce(BlockingCollection<int[]> queue)
        {
            for (int i = 0; i < 1000; ++i)
            {
                Console.WriteLine("Producing " + i);
                var payload = new int[100];
                payload[0] = i;
                queue.Add(payload);
                Thread.Sleep(20);
            }

            queue.CompleteAdding();
        }
    }
}

For .Net 4.5 and later, you could use the higher-level classes from Microsoft's Task Parallel Library, which has a wealth of functionality (and can be somewhat daunting at first sight).

Here's the same example using TPL DataFlow:

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            var queue = new BufferBlock<int[]>();

            Task.Factory.StartNew(() => produce(queue));
            consume(queue).Wait();

            Console.WriteLine("Finished.");
        }

        private static async Task consume(BufferBlock<int[]> queue)
        {
            while (await queue.OutputAvailableAsync())
            {
                var payload = await queue.ReceiveAsync();
                Console.WriteLine("Consuming " + payload[0]);
                await Task.Delay(25);
            }
        }

        private static void produce(BufferBlock<int[]> queue)
        {
            for (int i = 0; i < 1000; ++i)
            {
                Console.WriteLine("Producing " + i);
                var payload = new int[100];
                payload[0] = i;
                queue.Post(payload);
                Thread.Sleep(20);
            }

            queue.Complete();
        }
    }
}
Talk1:
I was about to post a very similar answer, the only thing I had different was I had my produce and consume take in a CancellationToken so I could recreate the behavior of the OP's (!( sender as BackgroundWorker ).CancellationPending)
Talk2:
I put enough work in to it, I decided to post my answer anyway.
Talk3:
Great answer, thanks for posting an example for 4.5 too.
Solutions2

If missing packets is a concern do not do your processing and your reading on the same thread. Starting with .NET 4.0 they added the System.Collections.Concurrent namespace which makes this very easy to do. All you need is a BlockingCollection which behaves as a queue for your incoming packets.

BlockingCollection<Packet> _queuedPackets = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());

void readingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
    while (!( sender as BackgroundWorker ).CancellationPending) 
    {
       Packet packet = GetPacket();
       _queuedPackets.Add(packet);
    }        
    _queuedPackets.CompleteAdding();
}

void processingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
    List<Packet> report = new List<Packet>();
    foreach(var packet in _queuedPackets.GetConsumingEnumerable())
    {
        report.Add(packet);
        if(packet.IsLastPacket)
        {
            ProcessReport(report);
            report = new List<Packet>();
        }
    }
}

What will happen is while _queuedPackets is empty _queuedPackets.GetConsumingEnumerable() will block the thread not consuming any resources. As soon as a packet arrives it will unblock and do the next iteration of the foreach.

When you call _queuedPackets.CompleteAdding(); the foreach on your processing thread will run till the collection is empty then exit the foreach loop. If you don't want it to "finish up the queue" when you cancel you can easily change it up to quit early. I also am going to switch to using Tasks instead of Background workers because it makes the passing in parameters much easier to do.

void ReadingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
    while (!token.IsCancellationRequested) 
    {
       Packet packet = GetPacket();
       queue.Add(packet);
    }        
    queue.CompleteAdding();
}

void ProcessingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
    List<Packet> report = new List<Packet>();

    try
    {
        foreach(var packet in queue.GetConsumingEnumerable(token))
        {
            report.Add(packet);
            if(packet.IsLastPacket)
            {
                ProcessReport(report);
                report = new List<Packet>();
            }
        }
    }
    catch(OperationCanceledException)
    {
        //Do nothing, we don't care that it happened.
    }
}

//This would replace your backgroundWorker.RunWorkerAsync() calls;
private void StartUpLoops()
{
    var queue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());    
    var cancelRead = new CancellationTokenSource();
    var cancelProcess = new CancellationTokenSource();
    Task.Factory.StartNew(() => ReadingLoop(queue, cancelRead.Token));
    Task.Factory.StartNew(() => ProcessingLoop(queue, cancelProcess.Token));

    //You can stop each loop indpendantly by calling cancelRead.Cancel() or cancelProcess.Cancel()
}
Talk1:
Thanks for deciding to post this, it is very informative +1
转载于:https://stackoverflow.com/questions/28361718/best-practices-for-implementing-a-thread-to-do-fast-bulk-and-continuous-readin

本人是.net程序员,因为英语不行,使用工具翻译,希望对有需要的人有所帮助
如果本文质量不好,还请谅解,毕竟这些操作还是比较费时的,英语较好的可以看原文

留言回复
我们只提供高质量资源,素材,源码,坚持 下了就能用 原则,让客户花了钱觉得值
上班时间 : 周一至周五9:00-17:30 期待您的加入