【微服务】02-集成事件与MediatR

news/2024/5/20 0:07:50 标签: 微服务, 架构, .netcore, 后端

文章目录

    • 1.集成事件
      • 1.1 定义
      • 1.2 集成事件工作原理
      • 1.3 总结
    • 2.使用RabbitMQ来实现EventBus
      • 2.1 RabbitMQ安装
      • 2.2 CAP框架实现RabbitMQ
        • 2.2.1 CAP框架实现架构
        • 2.2.2 CAP框架实现原理
    • 3.MediatR
      • 3.1 使用Mediator实现命令查询职责分离模式(CQRS)
        • 3.1.1 核心对象
      • 3.2 处理领域事件
        • 3.2.1 核心对象

1.集成事件

1.1 定义

集成事件目的是为了实现系统的集成,主要是用来在系统多个微服务之间相互传递事件,实现方式有两种

  • 发布、订阅通过EventBus方式
  • 通过观察者模式,由观察者将事件发给关注事件的人

1.2 集成事件工作原理

在这里插入图片描述

// 定义集成事件,命名:事件名称+IntergationEvent
public class OrderCreateIntergationEvent
{
	public OrderCreatedIntergrationEvent(long orderId) => OrderId = orderId;
	public long OrderId {get;}
}


// 发布
public class OrderCreateDomainEventHandler : IDomainEventHandler<OrderCreateDomainEvent>
{
	// 发送集成事件的接口框架
	ICapPublisher _capPublisher;
	public OrderCreateDomainEventHandler(ICapPublisher capPublisher)
	{
		_capPublisher = capPublisher;
	}
	
	public async Task Handle(OrderCreatedDomainEvent notification,CancellationToken cancelationToken)
	{
	// 将名称为 "OrderCreated"的集成事件发送出去
		await _capPublisher.PublishAsync("OrderCreated",new OrderCreatedIntergrationEvent(notification.Order.Id))
	}
}

// 订阅服务
public class SubscriberService : ISubscriberService,ICapSubscribe
{
	IMediator _mediator;
	public SubscriberService(IMediator mediator)
	{
		_mediator = mediator;
	}
	
	[CapSubscribe("OrderPaymentSuccessed")]
	public void OrderPaymentSuccessed(OrderPaymentSuccessedIntergrationEvent envent)
	{
		// TODO...
	}

}

1.3 总结

  • 集成事件是跨服务的领域事件
  • 集成事件一般由领域事件驱动触发
  • 不通过事务来处理集成事件(实现最终一致性)
  • 仅在必要的情况下定义和使用集成事件

2.使用RabbitMQ来实现EventBus

2.1 RabbitMQ安装

进入RabbitMQ网站下载
RabbitMQ下载地址

2.2 CAP框架实现RabbitMQ

2.2.1 CAP框架实现架构

在这里插入图片描述

CAP框架实际是实现了OutBox的设计模式
OutBox设计模式是在每一个微服务中,比如微服务A的数据库A中建立两张表。一张publish事件表和一张receive事件表,这两张事件表用来记录微服务A发出和接收到的事件。当需要发出事件时,会把事件的存储逻辑和业务逻辑的事务合并,在同一个事务里提交,也就意味着当业务逻辑提交成功时,事件表里面的事件是一定存在的,它是与业务逻辑的事务是强绑定的,这就保证了所发出的事件是与业务逻辑一致的。接下来就是由组件负责将事件表中的事件全部发送到EventBus中,比如RabbitMQ消息队列中,由接受方订阅。
对于订阅事件,设计模式是同理。当应用程序从消息队列中获取到消息时,就会将这些消息持久化到数据库中的receive事件表中,这样就可以在本地进行事件的处理、失败重试等操作,这都是由CAP框架来完成。

// 演示代码
private IDbContextTransaction _currentTransaction;

public Task<IDbContextTransaction> BeginTransactionAsync()
{
	if(_currentTransaction == null) return null;
	// 将业务存储放在同一个事务中,使得事务提交或回滚时,事件与业务逻辑的存取都是一致的
	_currentTransaction = DataBase.BeginTransaction(_capBus,autoCommit:false);
	return Task.FromResult(_currentTransaction);
}

// 服务配置
public static IServiceCollection AddEventBus(this IServiceCollection services,IConfiguration configuration)
{
	services.AddTransient<ISubscriberService,SubscriberService>();
	services.AddCap(options =>
	{
		// 基于DomainContext实现EventBus
		options.UseEntityFramework<DomainContext>();
		// 使用RabbitMQ作为消息队列的存储
		options.UseRabbitMQ(options =>
		{
			// RabbitMQ配置
			configuration.GetSection("RabbitMQ").Bind(options);
		});
	};
	return services;
}

2.2.2 CAP框架实现原理

  • 事件表
  • 事务控制

3.MediatR

3.1 使用Mediator实现命令查询职责分离模式(CQRS)

3.1.1 核心对象

  • IMediator
  • IRequest、IRequest< T>
  • IRequestHandler< in TRquest,TResponse>
async static Task Main(string[] args)
{
	var services = new ServiceCollection();
	services.AddMediatR(typeof(Program).Assembly);
	var serviceProvider = services.BuildServiceProvider();
	var mediator = serviceProvider.GetService<IMediator>();
	await mediator.Send(new MyCommand{CommandName= "cmd01"});
}

class MyCommand : IRequest<long>
{
	public string CommandName {get;set;}
}

// 命令处理器定义
class MyCommandHandler : IRequestHandler<MyCommand,long>
{
	public Task<long> Handle(MyCommand request,CancellationToken cancelationToken)
	{
		ConsoleWriteLine($@"MyCommandHandler 执行命令{request.CommandName}");
		return Task.FromResult(10L);
	}
}

通过中介者模式(MediatoR),可以将命令的构造和命令的处理分离开

3.2 处理领域事件

3.2.1 核心对象

  • IMediator
  • INotification
  • INotificationHandler< in TNotification>

async static Task Main(string[] args)
{
	var services = new ServiceCollection();
	services.AddMediatR(typeof(Program).Assembly);
	var serviceProvider = services.BuildServiceProvider();
	var mediator = serviceProvider.GetService<IMediator>();
	//
	await mediator.Publish(new MyEvent{EventName= "Event01"});
	Console.WriteLine("Hello World");
}


class MyEvent : INotification
{
	public string EventName {get;set;}
}

internal class MyEventHandler : INotificationHandler<MyEvent>
{
	public Task Handle(MyEvent notification,CancellationToken cancelationToken)
	{
		Console.WriteLine($"MyEventHandle执行:{notification.EventName}");
		return Task.CompletedTask;
	}
}

internal class MyEventHandler2 : INotificationHandler<MyEvent>
{
	public Task Handle(MyEvent notification,CancellationToken cancellationToken)
	{
		Console.WriteLine($"MyEventHandle2执行:{notification.EventName}");
		return Task.CompletedTask;
	}
}

INotification可以注册多个,是一对多的关系,借助此我们可以对领域事件定义多个处理器


http://www.niftyadmin.cn/n/4960287.html

相关文章

为什么说ChatGPT还不是搜索引擎的对手

一 前言 1950年&#xff0c;英国科学家图灵在一篇论文中预言&#xff0c;人类有可能创造出具有真正智能的机器。 著名的「图灵测试」就此诞生&#xff1a;如果一台机器能够与人类展开对话&#xff0c;而不被辨别出其机器身份&#xff0c;那么称这台机器具有智能。 也是从那时…

【Linux】进程间通信原理与Reactor模式

一、用户进程缓冲区和内核缓冲区 缓冲区的目的&#xff0c;是为了减少频繁的系统IO调用。大家都知道&#xff0c;系统调用需要保存之前的进程数据和状态等信息&#xff0c;而结束调用之后回来还需要恢复之前的信息&#xff0c;为了减少这种损耗时间、也损耗性能的系统调用&…

Android OkHttp 源码浅析一

演进之路:原生Android框架不好用 ---- HttpUrlConnect 和 Apache HTTPClient 第一版 底层使用HTTPURLConnect 第二版 Square构建 从Android4.4开始 基本使用: val okhttp OkHttpClient()val request Request.Builder().url("http://www.baidu.com").buil…

2020年9月全国计算机等级考试真题(C语言二级)

2020年9月全国计算机等级考试真题&#xff08;C语言二级&#xff09; 第1题 有下列程序&#xff1a; #include<stdio.h> main() { FILE*fp;int k,n,a[6]{1,2,3,4,5,6}; fpfopen("d2.dat","w"); fprintf(fp,"%d%d%d\n",a[0],…

LightDB sequence支持MAXVALUE最大值与Oracle相同

功能介绍 Oracle数据库在创建sequence的时候可以支持设置maxvalue 为9999999999999999999999999999&#xff0c;这样的SQL在LightDB23.3版本之前都是执行失败的。为了方便Oracle用户迁移到LightDB上&#xff0c;在LightDB23.3版本上&#xff0c;增加了sequence支持maxvalue设置…

矩阵乘法(C++ mpi 并行实现)

矩阵乘法有2种思路&#xff0c;我最先想到的是第一种思路&#xff0c;但是时间、空间复杂度都比较高。后面参考了一些资料&#xff0c;实现了第二种思路。 一、思路1&#xff1a;按行、列分块 矩阵乘法有一个很好的性质&#xff0c;就是结果矩阵的每个元素是不互相依赖的&…

【已解决】Linux中启动docker 出现 ‘ Failed to start docker.service: Unit not found. ’ 错误

启动docker 出现 ‘ Failed to start docker.service: Unit not found. ’ 错误 这是因为缺少 rhel-push-plugin.socket 单元&#xff0c;该单元是rhel-push-plugin软件包的一部分。所以我们执行以下指令就可以成功解决&#xff1a; curl -sSL https://get.docker.com/ | sh 执…

CSS中的display属性有哪些值?它们的作用?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ CSS display 属性的不同取值和作用1. block2. inline3. inline-block4. none5. flex6. grid7. table、table-row、table-cell8. list-item9. inline-table、table-caption、table-column 等 ⭐ 写在最后 ⭐ 专栏简介 前端入门之旅&#x…