alpakka-kafka(7)
上篇描述的kafka案例是个库存管理平台。是一个公共服务平台,为其它软件模块或第三方软件提供库存状态管理服务。当然,平台管理的目标必须是共享的,即库存是作为公共资源开放的。这个库存管理平台是一个Kafka消费端独立运行的软件。kafka的生产方即平台的服务对象通过kafka生产端producer从四面八方同时、集中将消息写入kafka。库存管理平台在kafka消费端不间断监控kafka里新的未读过的消息并及时读取,解析消息获取发布者对库存管理的指令,然后按指令更新库存状态。
设计这个库存管理平台最主要的目的先是为了保证库存状态的时效性、准确性,然后才是库存更新的效率。由于库存更新指令的产生是在一个高并发、异类系统、分布式环境里,上篇已经提到多线程环境下更新共享数据会产生的问题。不过通过kafka把并发产生的指令转换成队列然后按顺序单线程逐句执行就能解决主要问题了。现在,平台的数据来源变成kafka消费端口上的一个数据流了,数据的读取和消费自然也变成了逐条的。kafka提供了某种游标机制来记录数据读取的最新位置,防止数据消费过程中的遗漏、重复。记录当前读取位置offset的方式就是所谓数据消费模式代表数据消费不同程度的安全/效率比例,安全系数越高,流量越低。具体读取位置offset可以存放在kafka内部,或者保存在某种数据库表里。简单来讲,数据消费模式分三种:至多一次at-most-once,至少一次at-least-once,只此一次exactly-once。
从由kafka中读出指令到成功完成执行指令整个消息消费过程可能经历多个步骤。每个步骤都可能有失败的可能,从而中断过程影响数据消费结果。保存offset即offset-commit的时间点代表了三种消费模式的特性:
1、至多一次at-most-once:读出数据立即commit-offset,然后才开始消费数据。无论消费过程中发生异常与否,下次都会从新的位置开始读取,过去不再。如果一条数据在消费过程中发生事故中断了过程,那这条数据就没有发生应有的作用,就等于遗失了。
2、至少一次at-least-once:读出数据、消费数据、然后才commit-offset。如果消费过程出现问题中断,那么offset就得不到保存,下次再读取时还是从原先位置重新开始。所以,一条数据有可能被多次读取,造成重复消费的效果。
3、只此一次exactly-once:把保存offse和消费过程放到同一个事务transaction里。这种模式需要数据库事物处理支持,也就是说offset-commit和数据处理都必须在同一种提供事物处理支持的数据库环境里进行。offset-commit只会在确保消费过程成功完成后才进行。
at-most-once和at-least-once都使用kafka内部commit机制保存offset。at-least-once可以利用kafka的自动commit机制实现offset保存,只要通过kafka配置就可以了。下面是这个配置的示范:
val consumerSettings = ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer) .withBootstrapServers(bootstrapServers) .withGroupId(group) .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset) .withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") .withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs.toString)