分布式-消息与事务

消息与事务处理

Posted by Kang on September 15, 2019

  既然使用了消息,那么其实对整体的一致性要求不是很高,在业务上只要保证最终一致性即可。

消息发送方式分类

  • 非持久化消息:以异步的方式发送到消息中心;
  • 持久化事务型消息:以异步的方式发送到消息中心;
  • 持久化非事务型消息:以同步的方式发送到消息中心;
在使用消息中间件时,一般要允许消息积压量处理,以防止生产者产生速率远远大于消费者将broker冲垮。

调优点

  • 多久/多大后发送消息;
  • 多久/多大后批量确认消息;
  • 每次消费时,取多少条消息进行消费(kafka pull模式);
       消息中间件中,消费者每次拉取待消费消息的数量&每次消费后ACK方式(单个/批量),是优化消息中间件的一个很重要点。同时,要均衡这两个参数与集群均衡负载的关系,比如,在消费者处理比较慢的情况下就需要更多的机器参与而不能让一台机器中拉取了所有待处理消息。

消息调试

  可以通过设置手动方式确认消息但不调用commit来不停的消费Broker中的消息,这样能一直使用该消息进行开发调试。

消息异常处理

  • 发送端:对于非流程失败型消息,若发送失败,则可落本地消息库,定时做消息补偿;
  • 消费端:commit消息确认要在业务做完后做手动确认处理,这样可以在消费过程中产生异常后重复消费。

事务型消息

事务型消息特点

  事务型消息只能保障发起方在业务处理成功的情况下消息被消息中心接收到,在事务发起方业务处理失败情况下业务方能撤销/删除已发送消息。解决了是先处理任务后发消息还是先发消息后处理任务的难题。
  直接使用事务型消息框架时,事务开启方不能感知到消费者处理过程(成功/失败/异常),若希望感知,则需要发起方自己处理,本地存储已发送消息,消费方处理后回发处理结果消息。

事务型消息中间件过程

  1. 开启本地事务,发送事务型消息;
  2. 消息中心接收到消息后落库;
  3. 本地事务结束,发送消息给消息中心;
  4. 若为提交消息,则消息中心将消息投递到消费者,若为回滚消息,则消息中心将对应的事务消息删除掉;

异常处理过程

  为了防止消息发起方的提交或者回滚消息丢失,一般上事务的发起方还需要实现一个消息回查的接口,当消息中心在一段时间内没有收到事务的发起方的通知的时候,消息中心会主动回查发起方,主动咨询发起方对应的事务的状态,根据主动拿到的状态来决定消息是要发送还是删除。
  消息的回查接口,其实实现的是查询本地某个业务(比如某个账户扣钱)是否完成,若完成则表示该事务消息需要提交。

需要注意的问题

  事务型消息在提交等待超时后,将回查是否提交回查的时候,需要依赖业务方实现接口进行业务check,这种情况下当业务方处理内容过多超过中间件回查时间时,这种情况下将产生异常可能导致消息确认失败。

消费者处理异常-人工介入

Q:发送端把消息成功放入了队列(允许消费),但消费端消费失败怎么办?

  • 消费失败了,重试,还一直失败怎么办?是不是要自动回滚整个流程?
      答案是人工介入。从工程实践角度讲,这种整个流程自动回滚的代价是非常巨大的,不但实现复杂,还会引入新的问题。比如自动回滚失败,又怎么处理? 对应这种极低概率的case,采取人工处理,会比实现一个高复杂的自动化回滚系统,更加可靠,也更加简单。

事务型消息示例


非事务型消息

  消息中心接收到消息后立即进行投递。
  若想通过非事务型消息来实现事务消息的功能,其实也就是业务发起方在本地添加一个message表(事务型消息自己包了这个表),该表与业务表开启事务一起进行操作。然后开启守护任务不停的将可发送但未发送消息投递到消息中心,消费方消费消息时进行消息幂等处理(通常也不会要求消息顺序性),若发送方希望感知处理结果,则同样处理往回发处理结果消息。

异步消息

  Activemq通过使用窗口尺寸来约束在发送异步消息时Productor端允许积压的(尚未ACK)消息大小。
  通过全局设置Productor或单个目的地设置window.size来控制可积压消息大小;
  在异步发送时,先检查memoryUsage(持久消息)/tempUsage(非持久化消息,若有使用)中是否有足够的空间,若不够,则阻塞,否则正常发送并增加对应发送消息的尺寸。在每次broker返回productorAck后,则memoryUsage尺寸减少相应的消息大小。