Java-API+Kafka实现自定义分区
目录章节:
1.pom.xml导入kafka依赖包;
2.kafka普通生产者实现方式;
3.kafka带回调函数的生产者;
4.生产者自定义分区;
4.1使用自定义分区
1.pom.xml导入kafka依赖包:
<!--kafka依赖--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> </dependency>
PS:kafkaProducer发送数据流程及ACK、重复消费与数据丢失问题:
1.Kafka 的 Producer 发送消息采用的是 异步发送的方式。在消息发送的过程中,涉及到了两个线程 ——main 线程和Sender线程,以及 一个线程共享变量 ——RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取 消息发送到 Kafka broker。 2.异步和ack并不冲突,生产者一直发送数据,不等应答,如果某条数据迟迟没有应答,生产者会再发一次;
3.acks: -1 代表所有处于isr列表中的follower partition都会同步写入消息成功 0 代表消息只要发送出去就行,其他不管 1 代表发送消息到leader partition写入成功就可以;
4.重复消费与数据丢失:
说明: 已经消费的数据对于kafka来说,会将消费组里面的offset值进行修改,那什么时候进行修改了?是在数据消费 完成之后,比如在控制台打印完后自动提交;
提交过程:是通过kafka将offset进行移动到下个message所处的offset的位置。拿到数据后,存储到hbase中或者mysql中,如果hbase或者mysql在这个时候连接不上,就会抛出异常,如果在处理数据的时候已经进行了提交,
那么kafka上的offset值已经进行了修改了,但是hbase或者mysql中没有数据,这个时候就会出现数据丢失。什么时候提交offset值?在Consumer将数据处理完成之后,再来进行offset的修改提交。默认情况下offset是 自动提交,
需要修改为手动提交offset值。如果在处理代码中正常处理了,但是在提交offset请求的时候,没有连接到kafka或者出现了故障,那么该次修 改offset的请求是失败的,那么下次在进行读取同一个分区中的数据时,会从已经处理掉的offset值再进行处理一 次,
那么在hbase中或者mysql中就会产生两条一样的数据,也就是数据重复。
PS:数据来源:
/** * 获取数据库数据 * @param * @return * @throws SQLException */ public static List<KafKaMyImage> getKafKaMyImages() throws SQLException { List<KafKaMyImage> kafKaMyImages=new ArrayList<>(); KafKaMyImage kafKaMyImage=null; String sql="select id,loginip,updatetime,username,loginaddr from adminlogin"; Connection conection = SingleJavaJDBC.getConection(); PreparedStatement preparedStatement = conection.prepareStatement(sql); ResultSet resultSet = preparedStatement.executeQuery(); while (resultSet.next()){ kafKaMyImage=new KafKaMyImage(Integer.parseInt(resultSet.getString("id")), resultSet.getString("loginip"), resultSet.getString("updatetime"), resultSet.getString("username"), resultSet.getString("loginaddr")); kafKaMyImages.add(kafKaMyImage); } // SingleJavaJDBC.close(resultSet,preparedStatement,conection); return kafKaMyImages; } }