.NET Core 使用 System.Threading.Channels消息队列

news/2024/5/19 21:08:18 标签: .netcore

System.Threading.Channels 是 .NET Core 中的一个新的同步通信机制,它提供了一种高效的方法来在多个线程之间共享数据。它比使用锁或信号量等传统同步机制更灵活、更高效,并且可以帮助您避免许多并发问题。下面是一个简单的示例,演示如何使用 Channels 实现生产者-消费者模型。

一、定义数据类和生产者

首先,我们需要定义一个类型来表示我们要在 Channel 中传递的数据。在这个例子中,我们将使用一个简单的整数类型:

public class Data { public int Value; }

接下来,我们需要创建一个 Channel,以便生产者将数据发送到消费者:

var channel = Channel.CreateUnbounded<Data>();    

然后,我们可以创建一个生产者线程,它将不断生成新的数据并将其发送到 Channel 中:

class Producer {
    public void Run() {
        while (true) {
            var data = new Data { Value =Guid.NewGuid().GetHashCode() };
            channel.Writer.TryWrite(data);
            Thread.Sleep(1000);
        }
    }
}

在上面的代码中,我们使用Channel.Writer.TryWrite方法将数据写入 Channel。如果写入成功,则生产者线程将继续执行下一个循环。如果写入失败,则生产者线程将被阻塞,直到有足够的空间可用于写入数据。

二、消费者类

现在,我们可以创建一个消费者线程,它将从 Channel 中读取数据并进行处理:

class Consumer {
    public void Run() {
        while (true) {
            var data = channel.Reader.Read();
            if (data != null) {
                Console.WriteLine($"Received data: {data.Value}");
            }
        }
    }
}

在代码中,我们使用channel.Reader.Read方法从 Channel 中读取数据。如果读取成功,则消费者线程将获得一个包含数据的实例。如果读取失败,则消费者线程将被阻塞,直到有新的数据可用。

三、 模拟数据消费类

最后,我们在后台运行定时任务,以模拟数据的自动消费:

class BackgroundConsumer {
    public void Run() {
        while (true) {
            var data = channel.Reader.ReadTimeout(5000);
            if (data != null) {
                console.WriteLine($"Received background data: {data.Value}");
            }
        }
    }
}

在上面的代码中,我们使用channel.Reader.ReadTimeout方法从 Channel 中读取数据。与Read方法不同,ReadTimeout方法将在指定的时间内阻塞,如果在指定的时间内没有新的数据可用,则将返回null
下面,我们可以启动所有三个线程:

var producer = new Producer();
var consumer = new Consumer();
var backgroundConsumer = new BackgroundConsumer();

Task.Run(producer).Wait();
Task.Run(consumer).Wait();
Task.Run(backgroundConsumer).Wait();

这将在控制台中产生以下输出:

Received data: 1 
Received data: 2
Received data: 3
Received data: 4
Received data: 5
Received background data: 1
Received background data: 2
Received background data: 3
Received background data: 4
Received background data: 5

可以看到,生产者和消费者线程都在正常工作,并且后台定时任务也在自动消费数据。 这就是使用 Channels 的基本示例。

四、总结

Channels 是一种非常强大的工具,可以帮助您管理并发和共享数据,并且可以在许多不同的场景中使用。例如,您可以使用 Channels 实现异步数据处理、任务调度、分布式系统等。
在使用 Channels 时,需要注意以下几点:

  1. 确保正确使用生产者和消费者:生产者应该以稳定的速度生成数据,并且消费者应该以稳定的速度消费数据。如果生产者生成数据的速度过快,消费者将无法及时处理,导致数据堆积。如果消费者消费数据的速度过快,生产者将被阻塞,导致系统性能下降。
  2. 避免死锁:如果生产者和消费者同时尝试访问同一个资源,就可能会导致死锁。为了避免这种情况,您应该确保使用正确的同步机制,例如使用信号量或条件变量来协调访问。
  3. 合理设置缓冲区大小:Chanel 中使用了内存缓冲区来存储数据。如果缓冲区太小,数据将被频繁地刷新,导致系统性能下降。如果缓冲区太大,内存使用量将增加,并且可能导致内存不足错误。因此,您应该根据实际情况合理设置缓冲区大小。

http://www.niftyadmin.cn/n/5146092.html

相关文章

代码随想录训练营第55天 | ● 392.判断子序列 ● 115.不同的子序列

392.判断子序列 题目链接&#xff1a;https://leetcode.com/problems/is-subsequence 解法&#xff1a; 1. 确定dp数组&#xff08;dp table&#xff09;以及下标的含义 dp[i][j] 表示以下标i-1为结尾的字符串s&#xff0c;和以下标j-1为结尾的字符串t&#xff0c;相同子序列的…

第六章 包图组织模型|系统建模语言SysML实用指南学习

仅供个人学习记录 概述 包是容器的一个例子。包中的模型元素称为可封装元素&#xff0c;这些元素可以是包、用例和活动。由于包本身也是可封装元素&#xff0c;因此可以支持包层级。 每个有名称的模型元素也必须是命名空间的一份子&#xff0c;命名空间使得每个元素均能够通过…

Idea快速生成测试类

例如写写完一个功能类,需要对里面方法进行测试 在当前页面 按住CTRLSHFITT 选择你要生成的测试方法 点击OK,就会在test目录下在你对应包下生成对应测试类

基于GEE云平台一种快速修复Landsat影像条带色差的方法

这是之前关于去除遥感影像条带的另一篇文章&#xff0c;因为出版商推迟了一年发布&#xff0c;所以让大家久等了。这篇文章的主要目的是对Landsat系列卫星因为条带拼接或者镶嵌产生的条带来进行的一种在线修复方式。 原文连接 一种快速修复Landsat影像条带色差的方法 题目&a…

hive的工作机制

hive的工作机制 1、在hive中建一个库 ---在hive的元数据库中记录 ---在hdfs的默认路径下/user/hive/warehouse/ 建一个以 "库名.db" 为名字的文件夹 2、在hive的库中建表 ---在hive的元数据库中记录 ---在hdfs的默认路径下 /user/hive/…

机器人控制算法—如何使用C++读取pgm格式的栅格地图并转化为ROS地图格式的data?

1.Introduction 近期正在做全局规划局部动态规划的项目&#xff0c;目前遇到的问题是&#xff0c;我们如何利用C处理pgm地图文件。即将地图信息要与像素点结合起来。所以我们需要知道地图读取和处理的底层原理&#xff0c;这样更好地在非ROS平台下移植。 2.Main 如下几条信息…

Flask三种添加路由的方法

Flask 是一个流行的 Python Web 框架&#xff0c;它提供了多种方法来添加路由。路由是将 URL 映射到特定函数的过程&#xff0c;它是构建 Web 应用程序的基础。本文将介绍 Flask 中几种常用的路由添加方法&#xff0c;并附带代码示例。 方法一&#xff1a;使用装饰器 from flas…

MySQL 全文索引

简述 MySQL 全文索引是一种用于搜索文本内容的索引技术。它允许在 MySQL 数据库中执行高效的全文搜索操作&#xff0c;而不仅仅是简单的精确匹配。 全文索引可以用于在文本字段&#xff08;如 VARCHAR 或 TEXT 类型的字段&#xff09;中查找特定的关键词、短语或表达式。它有助…