购买服务器需要多少钱,温州网站优化关键词,微网站开发需求,医疗器械分为哪三类目录 消息中间件
RabbitMQ
消息不丢失
生产者确认机制
消息持久化
交换机持久化
队列持久化
消息持久化
消费者确认
消息重复消费
出现的场景
解决方案
每条消息设置一个唯一的标识id
幂等方案#xff1a;【 分布式锁、数据库锁#xff08;悲观锁、乐观锁#…目录 消息中间件
RabbitMQ
消息不丢失
生产者确认机制
消息持久化
交换机持久化
队列持久化
消息持久化
消费者确认
消息重复消费
出现的场景
解决方案
每条消息设置一个唯一的标识id
幂等方案【 分布式锁、数据库锁悲观锁、乐观锁 】
延迟队列
死信交换机
TTL
实现方式
使用插件
DLX 实现延迟队列
消息堆积
惰性队列
高可用机制
普通集群
镜像集群
仲裁队列
Kafka
高性能设计
消息不丢失
生产者发送消息到Brocker丢失
消息在Brocker中存储丢失 消费者从Brocker接收消息丢失
保证消息的顺序性
应用场景
解决方案
指定分区
设置业务的key
高可用机制
集群模式
分区备份机制
数据存储与清理
Kafka文件存储机制
数据清理机制
零拷贝 常见面试题 消息中间件
RabbitMQ
消息不丢失
可能导致消息丢失的情况 生产者发送消息丢失消息队列宕机消费者服务宕机未接收到消息 生产者确认机制
该机制解决了生产者发送消息有可能丢失的问题。
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后会返回一个结果给发送者表示消息是否处理成功。 消息失败之后如何处理呢 回调方法即时重发记录日志保存到数据库然后定时重发成功发送后即刻删除表中的数据 消息持久化
该机制解决了消息队列宕机的问题。
MQ默认是内存存储消息开启持久化功能可以确保缓存在MQ中的消息不丢失。
具体内容
交换机持久化
Bean
public DirectExchange simpleExchange(){// 三个参数交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange(simple.direct, true, false);
}队列持久化
Bean
public Queue simpleQueue(){// 使用QueueBuilder构建队列durable就是持久化的return QueueBuilder.durable(simple.queue).build();
}消息持久化
SpringAMQP中的的消息默认是持久的可以通过MessageProperties中的DeliveryMode来指定的
Message msg MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体.setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化.build();消费者确认
该机制解决了消费者服务宕机未接收到消息的问题。
RabbitMQ支持消费者确认机制即消费者处理消息后可以向MQ发送ack回执MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式 manual手动ack需要在业务代码结束后调用api发送ack。auto自动ack由spring监测listener代码是否出现异常没有异常则返回ack抛出异常则返回nacknone关闭ackMQ假定消费者获取消息后会成功处理因此消息投递后立即被删除 如果消费者接收消息失败
我们可以利用Spring的retry机制在消费者出现异常时利用本地重试设置重试次数当次数达到了以后如果消息依然失败将消息投递到异常交换机交由人工处理。 消息重复消费
出现的场景
首先我们来看一下消息的传输流程。消息生产者--》MQ--》消息消费者消息生产者发送消息到MQ服务器MQ服务器存储消息消息消费者监听MQ的消息发现有消息就消费消息。
所以消息重复也就出现在两种场景 1、生产者多发送了消息给MQ2、MQ的一条消息被消费者消费了多次。 第一种场景很好控制只要保证消息生成者不重复发送消息给MQ即可。我们着重来看一下第二个场景。
在保证MQ消息不重复的情况下消费者消费消息成功后在给MQ发送消息确认的时候出现了网络异常(或者是服务中断)MQ没有接收到确认此时MQ不会将发送的消息删除为了保证消息被消费当消费者网络稳定后MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。
解决方案
每条消息设置一个唯一的标识id
为了保证消息不被重复消费首先要保证每个消息是唯一的所以可以给每一个消息携带一个全局唯一的id流程如下 消费者监听到消息后获取id先去查询这个id是否存中如果不存在则正常消费消息并把消息的id存入 数据库或者redis中下面的编码示例使用redis如果存在则丢弃此消息 幂等方案【 分布式锁、数据库锁悲观锁、乐观锁 】
延迟队列
延迟队列死信交换机TTL生存时间
死信交换机
当一个队列中的消息满足下列情况之一时可以成为死信dead letter 消费者使用basic.reject或 basic.nack声明消费失败并且消息的requeue参数设置为false。消息是一个过期消息超时无人消费。要投递的队列消息堆积满了最早的消息可能成为死信。 如果该队列配置了dead-letter-exchange属性指定了一个交换机那么队列中的死信就会投递到这个交换机中而这个交换机称为死信交换机Dead Letter Exchange简称DLX。 TTL
TTL也就是Time-To-Live。如果一个队列中的消息TTL结束仍未消费则会变为死信ttl超时分为两种情况 消息所在的队列设置了存活时间消息本身设置了存活时间 具体以短的存活时间为准。 如上图所示存活时间就为5000。
实现方式
使用插件
首先我们需要下载 rabbitmq_delayed_message_exchange 插件这是一个 GitHub 上的开源项目我们直接下载即可
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
配置文件
spring.rabbitmq.hostlocalhost
spring.rabbitmq.passwordguest
spring.rabbitmq.usernameguest
spring.rabbitmq.virtual-host/RabbitMQ 的配置类
Configuration
public class RabbitConfig {public static final String QUEUE_NAME javaboy_delay_queue;public static final String EXCHANGE_NAME javaboy_delay_exchange;public static final String EXCHANGE_TYPE x-delayed-message;BeanQueue queue() {return new Queue(QUEUE_NAME, true, false, false);}BeanCustomExchange customExchange() {MapString, Object args new HashMap();args.put(x-delayed-type, direct);return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);}BeanBinding binding() {return BindingBuilder.bind(queue()).to(customExchange()).with(QUEUE_NAME).noargs();}
}这里主要是交换机的定义有所不同小伙伴们需要注意。
这里我们使用的交换机是 CustomExchange这是一个 Spring 中提供的交换机创建 CustomExchange 时有五个参数含义分别如下 交换机名称。交换机类型这个地方是固定的。交换机是否持久化。如果没有队列绑定到交换机交换机是否删除。其他参数。 最后一个 args 参数中指定了交换机消息分发的类型这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种用了哪种类型将来交换机分发消息就按哪种方式来。
DLX 实现延迟队列
这个实现就是上面所讲的DLX死信交换机TTL消息超时时间。
消息堆积
当生产者发送消息的速度超过了消费者处理消息的速度就会导致队列中的消息堆积直到队列存储消息达到上限。之后发送的消息就会成为死信可能会被丢弃这就是消息堆积问题。
解决消息堆积有三种种思路 增加更多消费者提高消费速度在消费者内开启线程池加快消息处理速度扩大队列容积提高堆积上限 惰性队列 接收到消息后直接存入磁盘而非内存消费者要消费消息时才会从磁盘中读取并加载到内存支持数百万条的消息存储 高可用机制
通过搭建集群来提高可用性
普通集群
普通集群或者叫标准集群classic cluster具备下列特征 会在集群的各个节点间共享部分数据包括交换机、队列元信息。不包含队列中的消息。当访问集群某节点时如果队列不在该节点会从数据所在节点传递到当前节点并返回队列所在节点宕机队列中的消息就会丢失 镜像集群
镜像集群本质是主从模式具备下面的特征 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。创建队列的节点被称为该队列的主节点备份到的其它节点叫做该队列的镜像节点。一个队列的主节点可能是另一个队列的镜像节点所有操作都是主节点完成然后同步给镜像节点主宕机后镜像节点会替代成新的主 仲裁队列
仲裁队列仲裁队列是3.8版本以后才有的新功能用来替代镜像队列具备下列特征 与镜像队列一样都是主从模式支持主从数据同步使用非常简单没有复杂的配置主从同步基于Raft协议强一致 Kafka
高性能设计
消息不丢失
生产者发送消息到Brocker丢失
设置异步发送
//同步发送
RecordMetadata recordMetadata kafkaProducer.send(record).get();
//异步发送
kafkaProducer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if (e ! null) {System.out.println(消息发送失败 | 记录日志);}long offset recordMetadata.offset();int partition recordMetadata.partition();String topic recordMetadata.topic();}
});消息重试
//设置重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);消息在Brocker中存储丢失
发送到Kafka的消息会存储在不同的分区中接收到消息首先会存储到leader分区中然后再同步到follower分区中为确保消息在Brocker中存储不丢失Kafka提供了一个发送确认机制acks。 acks不同的值对应了消息在broker中的不同的保存状态
确认机制说明acks0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险但是速度最快acks1默认值只要集群首领节点收到消息生产者就会收到一个来自服务器的成功响应acksall只有当所有参与赋值的节点全部收到消息时生产者才会收到一个来自服务器的成功响应 消费者从Brocker接收消息丢失 Kafka 中的分区机制指的是将每个主题划分成多个分区Partitiontopic分区中消息只能由消费者组中的唯一一个消费者处理不同的分区分配给不同的消费者同一个消费者组 Kafka提供了消息偏移量的机制来保证消费者不会丢失从Broker中接收到的消息。
消息偏移量就是用于标识消费者在特定分区中已经读取的消息位置。每个分区都有自己的偏移量消费者可以使用偏移量来确定要从哪里开始读取消息。
消费者默认是自动按期提交已经消费的偏移量默认是每隔5s提交一次 如果出现重平衡的情况可能会重复消费或丢失数据。
重平衡在消费者组中新增或删除消费者时系统会重新分配分区给消费者重新分配后就可能会重复消费或丢失数据。 禁用自动提交偏移量改为手动 同步提交异步提交同步异步组合提交 同步异步组合提交代码如下 保证消息的顺序性
应用场景 即时消息中的单对单聊天和群聊保证发送方消息发送顺序与接收方的顺序一致充值转账两个渠道在同一个时间进行余额变更短信通知必须要有顺序 解决方案
topic分区中消息只能由消费者组中的唯一一个消费者处理所以消息肯定是按照先后顺序进行处理的。但是它也仅仅是保证Topic的一个分区顺序处理不能保证跨分区的消息先后处理顺序。 所以如果你想要顺序的处理Topic的所有消息那就只提供一个分区。
指定分区
可以将需要按顺序的消息放到指定的分区中在分区中消费的消息是按顺序的。
设置业务的key
每一个分区指定唯一的key相同的业务可以指定相同的key 高可用机制
集群模式 Kafka 的服务器端由被称为 Broker 的服务进程构成即一个 Kafka 集群由多个 Broker 组成这样如果集群中某一台机器宕机其他机器上的 Broker 也依然能够对外提供服务。这其实就是 Kafka 提供高可用的手段之一 分区备份机制
某一个topic中有三个分区P0、P1、P2 一个topic有多个分区每个分区有多个副本其中有一个leader其余的是follower副本存储在不同的broker中所有的分区副本的内容是都是相同的如果leader发生故障时会自动将其中一个follower提升为leader 具体的选择哪个follower提升为leader的规则如下 ISRin-sync replica需要同步复制保存的follower。普通中的follower则是异步复制保存。
如果leader失效后需要选出新的leader选举的原则如下 第一选举时优先从ISR中选定因为这个列表中follower的数据是与leader同步的第二如果ISR列表中的follower都不行了就只能从其他follower中选取 具体的副本个数可以通过配置文件设置 数据存储与清理
Kafka文件存储机制
数据存储结构如下 每个分区的数据由多个日志文件段组成就是上面的Segment每个段对应了三个文件 .index 索引文件 .log 数据文件 .timeindex 时间索引文件
为什么要分段 删除无用文件方便提高磁盘利用率查找数据便捷 数据清理机制
日志的清理策略有两个
根据消息的保留时间当消息在kafka中保存的时间超过了指定的时间就会触发清理过程 根据topic存储的数据大小当topic所占的日志文件大小大于一定的阈值则开始删除最久的消息。需手动开启 零拷贝
结合linux的IO模型 在Kafka中零拷贝Zero Copy是一种技术旨在减少数据在内存之间的复制次数从而提高性能和降低资源消耗。在消息传输过程中传统上会涉及将数据从一个缓冲区复制到另一个缓冲区而零拷贝技术可以避免这种数据复制直接在数据的来源和目的地之间进行传输减少了不必要的内存拷贝操作提高了数据传输效率。
在Kafka中零拷贝技术主要体现在生产者和消费者之间的消息传输过程中以及Broker节点内部的数据处理过程中。具体来说Kafka中的零拷贝技术包括以下几个方面 生产者发送消息当生产者发送消息到Kafka Broker时Kafka可以采用零拷贝技术避免在发送消息之前将数据从应用程序的缓冲区复制到网络发送缓冲区而是直接使用应用程序的缓冲区进行网络传输。 消费者接收消息消费者从Broker接收消息时Kafka也可以利用零拷贝技术避免将数据从网络接收缓冲区复制到应用程序的缓冲区而是直接将数据传递给消费者应用程序。 Broker节点内部数据处理在Broker节点内部当进行数据复制、写入磁盘等操作时Kafka也会尽可能地使用零拷贝技术以减少数据在不同缓冲区之间的复制次数。 通过使用零拷贝技术Kafka可以提高消息传输的效率降低系统的内存和CPU开销从而更好地支持高吞吐量和低延迟的数据处理需求。 常见面试题 面试官RabbitMQ-如何保证消息不丢失 候选人 嗯我们当时MYSQL和Redis的数据双写一致性就是采用RabbitMQ实现同步的这里面就要求了消息的高可用性我们要保证消息的不丢失。主要从三个层面考虑 第一个是开启生产者确认机制确保生产者的消息能到达队列如果报错可以先记录到日志中再去修复数据 第二个是开启持久化功能确保消息未消费前在队列中不会丢失其中的交换机、队列、和消息都要做持久化 第三个是开启消费者确认机制为auto由spring确认消息处理成功后完成ack当然也需要设置一定的重试次数我们当时设置了3次如果重试3次还没有收到消息就将失败后的消息投递到异常交换机交由人工处理 面试官RabbitMQ消息的重复消费问题如何解决的 候选人 嗯这个我们还真遇到过是这样的我们当时消费者是设置了自动确认机制当服务还没来得及给MQ确认的时候服务宕机了导致服务重启之后又消费了一次消息。这样就重复消费了 因为我们当时处理的支付订单|业务唯一标识它有一个业务的唯一标识我们再处理消息时先到数据库查询一下这个数据是否存在如果不存在说明没有处理过这个时候就可以正常处理这个消息了。如果已经存在这个数据了就说明消息重复消费了我们就不需要再消费了 面试官那你还知道其他的解决方案吗 候选人 嗯我想想~ 其实这个就是典型的幂等的问题比如redis分布式锁、数据库的锁都是可以的 面试官RabbitMQ中死信交换机 ? RabbitMQ延迟队列有了解过嘛 候选人 嗯了解过 我们当时的xx项目有一个xx业务需要用到延迟队列其中就是使用RabbitMQ来实现的。 延迟队列就是用到了死信交换机和TTL消息存活时间实现的。 如果消息超时未消费就会变成死信在RabbitMQ中如果消息成为死信队列可以绑定一个死信交换机在死信交换机上可以绑定其他队列在我们发消息的时候可以按照需求指定TTL的时间这样就实现了延迟队列的功能了。 我记得RabbitMQ还有一种方式可以实现延迟队列在RabbitMQ中安装一个死信插件这样更方便一些我们只需要在声明交互机的时候指定这个就是死信交换机然后在发送消息的时候直接指定超时时间就行了相对于死信交换机TTL要省略了一些步骤 面试官如果有100万消息堆积在MQ , 如何解决 ? 候选人 我在实际的开发中没遇到过这种情况不过如果发生了堆积的问题解决方案也所有很多的 第一:提高消费者的消费能力 ,可以使用多线程消费任务 第二增加更多消费者提高消费速度 使用工作队列模式, 设置多个消费者消费消费同一个队列中的消息 第三扩大队列容积提高堆积上限 可以使用RabbitMQ惰性队列惰性队列的好处主要是 ①接收到消息后直接存入磁盘而非内存 ②消费者要消费消息时才会从磁盘中读取并加载到内存 ③支持数百万条的消息存储 面试官RabbitMQ的高可用机制有了解过嘛 候选人 嗯熟悉的~ 我们当时项目在生产环境下使用的集群当时搭建是镜像模式集群使用了3台机器。 镜像队列结构是一主多从所有操作都是主节点完成然后同步给镜像节点如果主节点宕机后镜像节点会替代成新的主节点不过在主从同步完成前主节点就已经宕机可能出现数据丢失 面试官那出现丢数据怎么解决呢 候选人 我们可以采用仲裁队列与镜像队列一样都是主从模式支持主从数据同步主从同步基于Raft协议强一致。 并且使用起来也非常简单不需要额外的配置在声明队列的时候只要指定这个是仲裁队列即可 面试官Kafka是如何保证消息不丢失 候选人 嗯这个保证机制很多在发送消息到消费者接收消息在每个阶段都有可能会丢失消息所以我们解决的话也是从多个方面考虑 第一个是生产者发送消息的时候可以使用异步回调发送如果消息发送失败我们可以通过回调获取失败后的消息信息可以考虑重试或记录日志后边再做补偿都是可以的。同时在生产者这边还可以设置消息重试有的时候是由于网络抖动的原因导致发送不成功就可以使用重试机制来解决 第二个在broker中消息有可能会丢失我们可以通过kafka的复制机制来确保消息不丢失在生产者发送消息的时候可以设置一个acks就是确认机制。我们可以设置参数为all这样的话当生产者发送消息到了分区之后不仅仅只在leader分区保存确认在follwer分区也会保存确认只有当所有的副本都保存确认以后才算是成功发送了消息所以这样设置就很大程度了保证了消息不会在broker丢失 第三个有可能是在消费者端丢失消息kafka消费消息都是按照offset进行标记消费的消费者默认是自动按期提交已经消费的偏移量默认是每隔5s提交一次如果出现重平衡的情况可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量改为手动提交当消费成功以后再报告给broker消费的位置这样就可以避免消息丢失和重复消费了 面试官Kafka中消息的重复消费问题如何解决的 候选人 kafka消费消息都是按照offset进行标记消费的消费者默认是自动按期提交已经消费的偏移量默认是每隔5s提交一次如果出现重平衡的情况可能会重复消费或丢失数据。我们一般都会禁用掉自动提价偏移量改为手动提交当消费成功以后再报告给broker消费的位置这样就可以避免消息丢失和重复消费了 为了消息的幂等我们也可以设置唯一主键来进行区分或者是加锁数据库的锁或者是redis分布式锁都能解决幂等的问题 面试官Kafka是如何保证消费的顺序性 候选人 kafka默认存储和消费消息是不能保证顺序性的因为一个topic数据可能存储在不同的分区中每个分区都有一个按照顺序的存储的偏移量如果消费者关联了多个分区不能保证顺序性 如果有这样的需求的话我们是可以解决的把消息都存储同一个分区下就行了有两种方式都可以进行设置第一个是发送消息时指定分区号第二个是发送消息时按照相同的业务设置相同的key因为默认情况下分区也是通过key的hashcode值来选择分区的hash值如果一样的话分区肯定也是一样的 面试官Kafka的高可用机制有了解过嘛 候选人 嗯主要是有两个层面第一个是集群第二个是提供了复制机制 kafka集群指的是由多个broker实例组成即使某一台宕机也不耽误其他broker继续对外提供服务 复制机制是可以保证kafka的高可用的一个topic有多个分区每个分区有多个副本有一个leader其余的是follower副本存储在不同的broker中所有的分区副本的内容是都是相同的如果leader发生故障时会自动将其中一个follower提升为leader保证了系统的容错性、高可用性 面试官解释一下复制机制中的ISR 候选人 ISR的意思是in-sync replica就是需要同步复制保存的follower 其中分区副本有很多的follower分为了两类一个是ISR与leader副本同步保存数据另外一个普通的副本是异步同步数据当leader挂掉之后会优先从ISR副本列表中选取一个作为leader因为ISR是同步保存数据数据更加的完整一些所以优先选择ISR副本列表 面试官Kafka数据清理机制了解过嘛 候选人 嗯了解过~~ Kafka中topic的数据存储在分区上分区如果文件过大会分段存储segment 每个分段都在磁盘上以索引(xxxx.index)和日志文件(xxxx.log)的形式存储这样分段的好处是第一能够减少单个文件内容的大小查找数据方便第二方便kafka进行日志清理。 在kafka中提供了两个日志的清理策略 第一根据消息的保留时间当消息保存的时间超过了指定的时间就会触发清理默认是168小时 7天 第二是根据topic存储的数据大小当topic所占的日志文件大小大于一定的阈值则开始删除最久的消息。这个默认是关闭的 这两个策略都可以通过kafka的broker中的配置文件进行设置 面试官Kafka中实现高性能的设计有了解过嘛 候选人 Kafka 高性能是多方面协同的结果包括宏观架构、分布式存储、ISR 数据同步、以及高效的利用磁盘、操作系统特性等。主要体现有这么几点 消息分区不受单台服务器的限制可以不受限的处理更多的数据 顺序读写磁盘顺序读写提升读写效率 页缓存把磁盘中的数据缓存到内存中把对磁盘的访问变为对内存的访问 零拷贝减少上下文切换及数据拷贝 消息压缩减少磁盘IO和网络IO 分批发送将消息打包批量发送减少网络开销