Dapr(三) Dapr核心组件的使用一

news/2024/5/19 23:24:21 标签: 后端, .netcore, redis, 分布式, 架构

结合前两期 Dapr(一) 基于云原生了解Dapr(Dapr(一) 基于云原生了解Dapr-CSDN博客) Dapr(二) 分布式应用运行时搭建及服务调用(Dapr(二) 分布式应用运行时搭建及服务调用-CSDN博客)

下篇推出dapr服务注册与发现,dapr组件绑定,dapr Actor功能。

目录

1.0 Dapr状态管理

1.1 Dapr状态组件配置文件

1.2 状态控制器

1.3 切换其它状态存储

1.4 工作原理

2.0 发布订阅

2.1 什么是发布订阅

2.2 设置发布订阅组件

2.3 控制器代码

2.3.1 发布控制器

2.3.2 订阅控制器

2.4 修改文件Program.cs

2.5 切换组件 

2.6 工作原理

总结:


1.0 Dapr状态管理

Dapr的状态管理允许应用程序保存和检索键值对数据,具有可插拔的存储、配置的行为和额外的安全特性。以下是主要特点:

  1. 可插拔状态存储:Dapr支持多种数据存储,比如MySQL、Redis、Azure CosmosDB等,可以在不修改代码的情况下切换。

  2. 配置存储行为:你可以指定并发控制和一致性级别。默认是最终一致性,但也支持强一致性。

  3. 并发控制:通过ETags实现乐观并发控制(OCC)。写操作需要匹配当前的ETag值,防止冲突。

  4. 自动加密:预览功能,支持应用程序状态的自动加密和密钥轮换。

  5. 一致性选项:可以选择强一致性的写入,等待所有副本确认,或者默认的最终一致性。

  6. 批量操作:支持一次性处理多条状态记录。

1.1 Dapr状态组件配置文件

Dapr默认使用的Redis进行存储。

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: actorStateStore
    value: "true"

1.2 状态控制器

 public class StateController : ControllerBase
    {
        private readonly ILogger<StateController> _logger;
        private readonly DaprClient _daprClient;
        public StateController(ILogger<StateController> logger, DaprClient daprClient)
        {
            _logger = logger;
            _daprClient = daprClient;
        }

        // 获取一个值
        [HttpGet]
        public async Task<ActionResult> GetAsync()
        {
            var result = await _daprClient.GetStateAsync<string>("statestore", "guid");
            return Ok(result);
        }

        //保存一个值
        [HttpPost]
        public async Task<ActionResult> PostAsync()
        {
            await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong });
            return Ok("done");
        }

        //删除一个值
        [HttpDelete]
        public async Task<ActionResult> DeleteAsync()
        {
            await _daprClient.DeleteStateAsync("statestore", "guid");
            return Ok("done");
        }

        //通过tag防止并发冲突,保存一个值
        [HttpPost("withtag")]
        public async Task<ActionResult> PostWithTagAsync()
        {
            var (_, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
            await _daprClient.TrySaveStateAsync("statestore", "guid", Guid.NewGuid().ToString(), etag);
            return Ok("done");
        }

        //通过tag防止并发冲突,删除一个值
        [HttpDelete("withtag")]
        public async Task<ActionResult> DeleteWithTagAsync()
        {
            var (_, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
            return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag));
        }


        // 从绑定获取一个值,健值name从路由模板获取
        [HttpGet("frombinding/{name}")]
        public ActionResult GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
        {
            return Ok(state.Value);
        }


        // 根据绑定获取并修改值,健值name从路由模板获取
        [HttpPost("withbinding/{name}")]
        public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
        {
            state.Value = Guid.NewGuid().ToString();
            return Ok(await state.TrySaveAsync());
        }


        // 获取多个个值
        [HttpGet("list")]
        public async Task<ActionResult> GetListAsync()
        {
            var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
            return Ok(result);
        }

        // 删除多个个值
        [HttpDelete("list")]
        public async Task<ActionResult> DeleteListAsync()
        {
            var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
            var removeList = new List<BulkDeleteStateItem>();
            foreach (var item in data)
            {
                removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag));
            }
            await _daprClient.DeleteBulkStateAsync("statestore", removeList);
            return Ok("done");
        }
    }

1.3 切换其它状态存储

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.mysql
  version: v1
  metadata:
  - name: connectionString
    value: "root:123456@tcp(192.168.157.157:3306)/?allowNativePasswords=true"

切换状态为MySql进行存储。

1.4 工作原理

应用程序与 Dapr sidecar 交互,以存储和检索键/值数据。 在底层,sidecar API 使用**可配置的状态存储组件**来保存数据。 开发人员可以从不断增长的受支持状态存储集合中选择,其中包括 Azure Cosmos DB、SQL Server 和 Cassandra。

2.0 发布订阅

2.1 什么是发布订阅

发布订阅(Publish-Subscribe)是一种通信模式,允许发布者发送消息到一个中心节点(通常是消息代理或主题),而不关心具体哪些订阅者会接收到这些消息。订阅者则注册他们感兴趣的特定类型的消息,当匹配的消息发布时,他们会收到通知。这种模式的特点在于解耦了发布者和订阅者,提高了系统的灵活性和可扩展性。

关键元素包括:

  1. 发布者 (Publisher): 生产消息的实体,它向主题或消息代理发送消息,无需了解谁会接收这些消息。
  2. 订阅者 (Subscriber): 对特定消息感兴趣并希望接收通知的实体,它们通过订阅主题或消息代理来表达兴趣。
  3. 主题 或 消息代理 (Topic or Message Broker): 中间媒介,接收并分发消息,确保消息从发布者到达正确的订阅者。

一个简单的示例是新闻系统,其中发布者发布新闻到特定类别,而订阅者选择关注他们感兴趣的类别。发布者不直接通知订阅者,而是通过消息代理进行,这样订阅者仅接收与其订阅相匹配的新闻。

发布订阅模式的应用场景通常涉及异步通信、事件驱动的系统或需要解耦组件的场景。

2.2 设置发布订阅组件

Dapr默认使用的Redis进行发布订阅

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""

2.3 控制器代码

2.3.1 发布控制器

    [ApiController]
    [Route("[controller]")]
    public class PubsubController : ControllerBase
    {
        private DaprClient _daprClient;
        private ILogger<PubsubController> _logger;

        public PubsubController(DaprClient daprClient, ILogger<PubsubController> logger)
        {
            _daprClient = daprClient;
            _logger = logger;
        }

        /// <summary>
        /// 发布消息的方法
        /// </summary>
        /// <returns></returns>
        [HttpPost]
        [Route("pub")]
        public async Task<IActionResult> PublishMessage()
        {
            _logger.LogInformation("***发布消息***");
            var data = new UserInfo(10001,"操作员",19);
            await _daprClient.PublishEventAsync("pubsub", "topic",data);

            return Ok("***发布消息成功***");
        }
    }

2.3.2 订阅控制器

    [ApiController]
    [Route("[controller]")]
    public class SubController : ControllerBase
    {
        private ILogger<SubController> _logger;

        public SubController(ILogger<SubController> logger)
        {
            _logger = logger;
        }


        [HttpPost("sub")]
        [Topic("pubsub", "topic")]
        public IActionResult ConsumerMessage(UserInfo user)
        {
            _logger.LogInformation("***消费消息***");
            Console.WriteLine($"userId:{user.UserId} userName:{user.UserName}");
            return Ok();
        }
    }

2.4 修改文件Program.cs

app.UseCloudEvents();
app.MapSubscribeHandler();

2.5 切换组件 

切换为RabbitMQ

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: pubsub
  namespace: default
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://123:123@192.168.157.157:5672"
  - name: durable
    value: "false"
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: deliveryMode
    value: "0"
  - name: requeueInFailure
    value: "false"
  - name: prefetchCount
    value: "0"
  - name: reconnectWait
    value: "0"
  - name: concurrencyMode
    value: parallel
  - name: backOffPolicy
    value: "exponential"
  - name: backOffInitialInterval
    value: "100"
  - name: backOffMaxRetries
    value: "16"

2.6 工作原理

Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。服务将消息发布到指定主题, 业务服务订阅主题以使用消息。服务在 Dapr sidecar 上调用 pub/sub API。 然后,sidecar 调用预定义 Dapr pub/sub 组件。

总结:

Dapr的发布订阅功能使得在分布式系统中实现发布/订阅消息模式变得更加简单。主要解决了不同消息产品之间实施复杂性和功能差异的问题。你可以通过Dapr的Sidecar API使用HTTP或gRPC来发布和订阅消息。以下是关键操作的概述:

  1. 发布(Publish)消息

    • 使用http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic> URL,其中 <dapr-port> 是Dapr Sidecar监听的端口,<pub-sub-name> 是选择的发布/订阅组件名,而 <topic> 是消息的目标主题。
  2. 订阅(Subscribe)消息

    • 应用程序在启动时,通过http://localhost:<appPort>/dapr/subscribe指定其订阅,其中 <appPort> 是应用程序监听的端口。
    • 订阅者处理消息后返回非错误响应,Dapr认为消息传递成功。
    • 支持订阅者通过响应负载中的状态进行精细化控制,比如指示重试(RETRY)或丢弃(DROP)消息。

Dapr的状态管理提供了一种跨服务持久化数据的方法,支持多种存储后端。关键特性包括:

  1. 原子性操作:支持原子性的读写操作,保证一致性。

  2. 版本控制:允许跟踪状态更改的历史版本,便于回滚。

  3. 事件驱动:状态变化可触发回调函数,实现基于状态变化的自动化操作。

  4. 过期策略:可设置状态项的过期时间。

  5. 备份与恢复:提供状态备份和恢复机制,确保高可用性。 


http://www.niftyadmin.cn/n/5475075.html

相关文章

tdesign坑之EnhancedTable树形结构默认展开所有行

⚠️在官方实例中&#xff0c;树形结构的表格提供了2种方法控制展开全部节点&#xff1a; 一是通过配置属性tree.defaultExpandAll为true代表默认展开全部节点&#xff08;仅默认情况有效&#xff09;&#xff1b; 二是使用组件实例方法expandAll()可以自由控制树形结构的展开…

vscode 安装vim插件配置ctrl + c/v功能

搜索Vim插件 插件介绍部分有提示操作 首先安装该插件&#xff0c;然后按照下述步骤设置ctrl相关的快捷键&#xff0c;以便于脱离im快捷键而愉快的敲代码。 1.在“设置”搜索框内搜索vim.handleKeys&#xff0c;选择 Edit in settings.json 2. 设置ctrl-c,ctrl-v等快捷键置为fa…

AI大模型下的策略模式与模板方法模式对比解析

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》《MYSQL应用》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;坚持默默的做事。 &#x1f680; 转载自热榜文章&#xff1a;设计模式深度解析&#xff1a;AI大模型下…

1、java语法入门(找工作版)

文章目录 一、Java简介二、Java常量与变量1、标识符2、关键字3、变量4、类的命名规则5、数据类型6、基本数据类型字面值7、变量的定义与初始化8、ASCII码和Unicode编码9、转义字符10、类型转换11、常量 三、Java运算符1、算术运算符2、赋值运算符3、关系运算符4、逻辑运算符5、…

TiDB 组件 GC 原理及常见问题

本文详细介绍了 TiDB 的 Garbage Collection&#xff08;GC&#xff09;机制及其在 TiDB 组件中的实现原理和常见问题排查方法。 TiDB 底层使用单机存储引擎 RocksDB&#xff0c;并通过 MVCC 机制&#xff0c;基于 RocksDB 实现了分布式存储引擎 TiKV&#xff0c;以支持高可用分…

IJKPLAYER源码分析-AudioTrack播放

前言 AudioTrack是Android SDK所提供的播放PCM音频的技术&#xff0c;与mediacodec类似&#xff0c;IJKPLAYER对此使用的是以native层反射到Java层的播放能力。 关于AudioTrack的官方文档&#xff0c;请参考AudioTrack官方文档 接口 pipeline IJKFF_Pipeline结构体是对Androi…

外包干了6天,技术明显进步

先说一下自己的情况&#xff0c;本科生&#xff0c;2019年我通过校招踏入了南京一家软件公司&#xff0c;开始了我的职业生涯。那时的我&#xff0c;满怀热血和憧憬&#xff0c;期待着在这个行业中闯出一片天地。然而&#xff0c;随着时间的推移&#xff0c;我发现自己逐渐陷入…

利用Leaflet + React:构建WEBGIS

React是 Facebook 开发的一个开源库&#xff0c;用于构建用户界面。就其本身而言&#xff0c;Leaflet是一个用于将地图发布到网络的JavaScript 库。这两个工具的组合很简单&#xff0c;允许您创建动态网络地图。在本文中&#xff0c;我们将看到这种组合的一些特征以及一些简单的…