alpakka-kafka(6)
了解了kafka原理之后,对kafka的的应用场景有了一些想法。在下面的一系列讨论中把最近一个项目中关于kafka的应用介绍一下。
先介绍一下使用kafka的起因:任何进销存系统,销售开单部分都应该算是主体部分了。简单的说,这是一个包括销售数据录入、库存数扣减两个动作的过程。销售项目录入与库存扣减之间的联系可以是单向的,如录入一个销售商品导致该商品库存扣减、也可以是双向的,即商品销售需要参考当前库存数量。依照具体的业务需求,销售开单过程对当前库存的依赖决定了更新库存的方式:当前库存数从作为参考值到必备条件分别代表事后批次更新或过程中实时更新。当然,从业务方面考虑,录入销售项目、立即扣减库存并作为下一笔录入的参考数是最理想的了。但在现实中,一个多用户的环境里,大量线程同时对一个商品的库存数进行更新,这时必须对数据库表进行锁定,那么由锁表造成的问题就无可避免了:轻者造成数据的遗失、重复、偏差,或者拖慢进程,重者锁死整个系统。这是经典进销存业务系统普遍面临的问题。也是促使许多商业软件开发人员纷纷转向新的分布式大数据模式去寻求解决方案的主要原因。
如果我们再实际点,能够容许些微的数据更新延迟,比如说:毫秒级的,那么就可以把销售项目录入和库存扣减两个动作拆分到两个相互独立的过程里。就像DDD模式里的两个聚合根(aggregate root), 分别在两个独立业务域中实现这两个动作。独立的域之间是松散耦合,互不影响的,所以,两个独立域的计算模式可以是不同的。例如:销售项目录入必须是多人操作,多线程高并发的,而库存扣减却可以设计成单线程或者限定线程数量的。这可以是一种典型的读写分离CQRS模式:扣减库存作为一项数据更新动作可以在另外一个模块,甚至另外一个软件里,在一个可控的、限定线程的环境里独立运算,和销售数据录入部分不发生任何关系。当然,数据录入完成到库存更新出结果之间一定会存在延迟。这种延迟不单只是与库存更新算法和运算效率有着直接关系,它也和两个独立域之间的数据交换速度有莫大关系。kafka,作为一个高效率、高吞吐量、高可用性的消息队列系统,足够担负起独立域与域之间的数据交换任务。而且kafka的消息是持久性的,有重复消费控制机制可以实现数据状态的重新计算,是事件源event-sourcing模式的一项理想工具选择。这就是我选择kafka的原因。
好了,说说这个案例的具体业务需求:这是一个零售业POS软件云租赁平台。初步规划上千独立门店及上万级的门店业务操作终端,包括收银终端、查询终端、业务管理终端。可想而知,系统应该容许上万用户同时进行信息录入操作。高并发、高频率的数据录入部分(特别是收银终端商品条码扫描销售)已经通过event-sourcing,CQRS等模式实现了。接着需要后端的数据处理部分,特别是当前库存状态更新。因为零售店其它业务,如:添订货、收发货、配退货等都需要及时、准确库存数据的支持。我们把这个库存更新功能的实现作为典型的kafka应用案例来介绍,然后再在过程中对akka系列alpakka-kafka的使用进行讲解和示范。
首先,后端业务功能与前端数据采集是松散耦合的。特别是后端数据处理应该是所有前端系统共享的业务功能。这个容易理解,我们不可能给每个门店运行一个后端,那样就需要几千个后端系统同时运行了。所以,可以把这个库存更新功能做成一个独立的库存管理平台,为所有业务模块,或者第三方业务软件提供库存状态支持。
现在我们可以把这个独立的库存管理平台作为一个典型的kafka应用示范。简单来讲,kafka就是一个消息队列MQ,从一端写入消息(produce)、另一端按写入顺序读出消息(consume),中间是一堆复杂的机制去保证集群节点协调、消息输出顺序、消息持久化及消息重复消费等等。在我们的案例里,以库存管理平台为核心,一端通过kafka连接所有的平台用户。这些分布在各处的应用通过kafka的集群功能同时向kafka的写入端写入消息。这些消息实际是序列化的库存更新指令。平台再通过kafka消费端读取这些指令,反序列化解析后按顺序执行这些更新库存命令。值得注意的是:平台此时可以在一个单线程里按发出的顺序,逐个执行指令,避免了多线程产生的不确定因素。
从kafka角度描述:库存管理平台用户即消息发布者producer,这种消息发布必须是高并发、高吞吐量的。简单讲就是同时集中大批量的向kafka写入数据。对平台各用户来讲,就是一种写完就了fire-and-go模式,实现起来比较简单。alpakka-kafka提供了很多类型的sink来实现写produce功能。下面是一个实际的例子:
def writeToKafka(posTxn: PosTxns)(implicit producerKafka: ProducerKafka) = { val doc = BizDoc.fromPosTxn(posTxn) if (producerKafka.producerSettings.isDefined) { implicit val producer = producerKafka.akkaClassicSystem.get SendProducer(producerKafka.producerSettings.get) .send(new ProducerRecord[String, String](producerKafka.publisherSettings.topic, doc.shopId, toJson(doc))) } else FastFuture.successful(Completed) }