.Net Core `RabbitMQ`封装

news/2024/5/19 20:57:26 标签: java-rabbitmq, rabbitmq, .netcore

分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。

例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。

rabbitmq">然后了解一下RabbitMQ

RabbitMQ是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。

  1. RabbitMQ的主要功能包括:
    1. 消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。
    2. 消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。
    3. 消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。
    4. 可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。
    5. 可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。

本文将讲解使用RabbitMQ实现分布式事件

实现我们创建一个EventsBus.Contract的类库项目,用于提供基本接口,以支持其他实现

在项目中添加以下依赖引用,并且记得添加EventsBus.Contract项目引用

<ItemGroup>
	<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
    <PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />
    <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
    <PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
</ItemGroup>

创建项目完成以后分别创建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我们的分布式事件基本接口定义

EventsBusOptions.cs

namespace EventsBus.Contract;

public class EventsBusOptions
{
    /// <summary>
    /// 接收时异常事件
    /// </summary>
    public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
}

IEventsBusHandle.cs

namespace EventsBus.Contract;

public interface IEventsBusHandle<in TEto> where TEto : class
{
    Task HandleAsync(TEto eventData);
}

ILoadEventBus.cs

namespace EventsBus.Contract;

public interface ILoadEventBus
{
    /// <summary>
    /// 发布事件
    /// </summary>
    /// <param name="eto"></param>
    /// <typeparam name="TEto"></typeparam>
    /// <returns></returns>
    Task PushAsync<TEto>(TEto eto) where TEto : class;
}

EventsBusAttribute.cs:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ的通道

namespace EventsBus.RabbitMQ;

[AttributeUsage(AttributeTargets.Class)]
public class EventsBusAttribute : Attribute
{
    public readonly string Name;

    public EventsBusAttribute(string name)
    {
        Name = name;
    }
}

然后可以创建我们的RabbitMQ实现了,创建EventsBus.RabbitMQ类库项目,用于编写EventsBus.ContractRabbitMQ实现

创建项目完成以后分别创建Extensions\EventsBusRabbitMQExtensions.cs,Options\RabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs

Extensions\EventsBusRabbitMQExtensions.cs:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection,这样就在注入的时候减少过度使用命名空间了

using EventsBus.Contract;
using EventsBus.RabbitMQ;
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Configuration;

namespace Microsoft.Extensions.DependencyInjection;

public static class EventsBusRabbitMQExtensions
{
    public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,
        IConfiguration configuration)
    {
        services.AddSingleton<RabbitMQFactory>();
        services.AddSingleton(typeof(RabbitMQEventsManage<>));
        services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
        services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();
        
        return services;
    }
}

Options\RabbitMQOptions.cs:提供基本的Options 读取配置文件中并且注入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));的方法是读取IConfiguration的名称为RabbitMQOptions的配置东西,映射到Options中,具体使用往下看。

using RabbitMQ.Client;

namespace EventsBus.RabbitMQ.Options;

public class RabbitMQOptions
{
    /// <summary>
    /// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
    /// 指示应使用的协议的缺省值。
    /// </summary>
    public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;

    /// <summary>
    /// 地址
    /// </summary>
    public string HostName { get; set; }

    /// <summary>
    /// 账号
    /// </summary>
    public string UserName { get; set; }

    /// <summary>
    /// 密码
    /// </summary>
    public string Password { get; set; }
}

RabbitMQEventsManage.cs:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace EventsBus.RabbitMQ;

public class RabbitMQEventsManage<TEto> where TEto : class
{
    private readonly IServiceProvider _serviceProvider;
    private readonly RabbitMQFactory _rabbitMqFactory;

    public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
    {
        _serviceProvider = serviceProvider;
        _rabbitMqFactory = rabbitMqFactory;
        _ = Task.Run(Start);
    }

    private void Start()
    {
        var channel = _rabbitMqFactory.CreateRabbitMQ();
        var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
        var name = eventBus?.Name ?? typeof(TEto).Name;
        channel.QueueDeclare(name, false, false, false, null);
        var consumer = new EventingBasicConsumer(channel); //消费者
        channel.BasicConsume(name, true, consumer); //消费消息
        consumer.Received += async (model, ea) =>
        {
            var bytes = ea.Body.ToArray();
            try
            {
                // 这样就可以实现多个订阅
                var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();
                foreach (var handle in events)
                {
                    await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));
                }
            }
            catch (Exception e)
            {
                EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);
            }
        };
    }
}

RabbitMQFactory.cs:提供RabbitMQ链接工厂,在这里你可以自己去定义和管理RabbitMQ工厂

using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;

namespace EventsBus.RabbitMQ;

public class RabbitMQFactory : IDisposable
{
    private readonly RabbitMQOptions _options;
    private readonly ConnectionFactory _factory;
    private IConnection? _connection;

    public RabbitMQFactory(IOptions<RabbitMQOptions> options)
    {
        _options = options?.Value;
        // 将Options中的参数添加到ConnectionFactory
        _factory = new ConnectionFactory
        {
            HostName = _options.HostName,
            UserName = _options.UserName,
            Password = _options.Password,
            Port = _options.Port
        };
    }

    public IModel CreateRabbitMQ()
    {
        // 当第一次创建RabbitMQ的时候进行链接
        _connection ??= _factory.CreateConnection();

        return _connection.CreateModel();
    }

    public void Dispose()
    {
        _connection?.Dispose();
    }
}

RabbitMQLoadEventBus.cs:用于实现ILoadEventBus.cs通过ILoadEventBus发布事件RabbitMQLoadEventBus.cs是RabbitMQ的实现

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;

namespace EventsBus.RabbitMQ;

public class RabbitMQLoadEventBus : ILoadEventBus
{
    private readonly IServiceProvider _serviceProvider;
    private readonly RabbitMQFactory _rabbitMqFactory;

    public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
    {
        _serviceProvider = serviceProvider;
        _rabbitMqFactory = rabbitMqFactory;
    }

    public async Task PushAsync<TEto>(TEto eto) where TEto : class
    {

        //创建一个通道
        //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue
        using var channel = _rabbitMqFactory.CreateRabbitMQ();
        
        // 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称
        var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
        var name = eventBus?.Name ?? typeof(TEto).Name;
        
        // 使用获取的名称创建一个通道
        channel.QueueDeclare(name, false, false, false, null);
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 1;
        // 将数据序列号,然后发布
        channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息
        // 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,
        var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();
        
        await Task.CompletedTask;
    }
}

在这里我们的RabbitMQ分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;

然后我们需要使用RabbitMQ分布式事件总线工具包


http://www.niftyadmin.cn/n/365485.html

相关文章

基于three.js实现的点击盒子消除游戏

一.项目背景 大学时期参加了机器人协会&#xff0c;并有幸成为了视觉组组长&#xff0c;所以在新一届社团招新上做了一款趣味小游戏来吸引新生的眼球&#xff0c;让大家知道协会的视觉组。 二.代码展示 <!DOCTYPE html> <html> <head> <style> body…

深度探索:使用FFmpeg实现视频Logo的添加与移除

深度探索&#xff1a;使用FFmpeg实现视频Logo的添加与移除 前言一、FFmpeg简介&#xff08;Introduction to FFmpeg&#xff09;1.1 FFmpeg的定义&#xff08;Definition of FFmpeg&#xff09;1.2 FFmpeg的功能&#xff08;Functions of FFmpeg&#xff09;1.3 FFmpeg的安装&a…

【Nginx】实战应用(服务器端集群搭建、下载站点、用户认证模块)

文章目录 Nginx实现服务器端集群搭建Nginx与Tomcat部署环境准备(Tomcat)环境准备(Nginx) Nginx实现动静分离需求分析动静分离实现步骤 Nginx实现Tomcat集群搭建 Nginx高可用解决方案KeepalivedVRRP环境搭建Keepalived配置文件介绍访问测试keepalived之vrrp_script Nginx制作下载…

华为OD机试真题B卷 Java 实现【内存资源分配】

一、题目描述 有一个简易内存池&#xff0c;内存按照大小粒度分类&#xff0c;每个粒度有若干个可用内存资源&#xff0c;用户会进行一系列内存申请&#xff0c;需要按需分配内存池中的资源&#xff0c;返回申请结果成功失败列表。 分配规则如下&#xff1a; 分配的内存要大…

MyCat|Shardingsphere-proxy:jdbc连接MySQL8.0.33的query_cache_size异常解决方案

当前版本&#xff1a;MySQL 8.0.33 &#xff0c;Mycat-server-1.6.7.6-release-20220524173810-win&#xff0c;apache-shardingsphere-5.3.2-shardingsphere-proxy-bin&#xff0c;jdk 1.8 1. 问题的主要背景 MySQL 8.0.33版本&#xff0c;搭建了主从复制&#xff0c;需要借…

javaEE基于springboot的小区社区文化活动报名系统jsp生活服务网站

社区文化宣传网站采用的开发框架为springboot框架&#xff0c;开发工具采用Eclipse&#xff0c;idea 服务器用的是Tomcat。编码语言是Java&#xff0c;数据库采用Mysql数据库。 本社区文化宣传网站&#xff0c;主要服务的用户是社区附近的居民&#xff0c;为居民展示最新的新闻…

纯js实现在线文字识别,从图片中提取文本信息

当你需要将图片中的文字内容提取出来时&#xff0c;你可能想到了手动输入或者使用OCR技术。而当你需要进行在线文字识别时&#xff0c;一个纯JavaScript实现的OCR工具可能会成为你的优选方案。 纯JavaScript&#xff0c;使得在浏览器内部进行文字识别变得可能。 此外&#x…

CentOS7.4安装OpenVPN

系统环境 [rootvpn ~]# cat /etc/redhat-release CentOS Linux release 7.4.1708 (Core) 一. 准备工作 [rootvpn ~]# yum -y install openssl-devel openssl pam pam-devel lzo lzo-devel pkcs11-helper pkcs11-helper-devel 二. 安装OpenVPN服务 1. 下载openvpn源码包 [r…