Java-API+Kafka实现自定义分区

Java-API+Kafka实现自定义分区

目录章节:

  1.pom.xml导入kafka依赖包;

  2.kafka普通生产者实现方式;

  3.kafka带回调函数的生产者;

  4.生产者自定义分区;

     4.1使用自定义分区

1.pom.xml导入kafka依赖包:

<!--kafka依赖-->
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.11.0.0</version>
    </dependency>

PS:kafkaProducer发送数据流程及ACK、重复消费与数据丢失问题:

1.Kafka 的 Producer 发送消息采用的是 异步发送的方式。在消息发送的过程中,涉及到了两个线程 ——main 线程和Sender线程,以及 一个线程共享变量 ——RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取
消息发送到 Kafka broker。
2.异步和ack并不冲突,生产者一直发送数据,不等应答,如果某条数据迟迟没有应答,生产者会再发一次;
3.acks: -1 代表所有处于isr列表中的follower partition都会同步写入消息成功 0 代表消息只要发送出去就行,其他不管 1 代表发送消息到leader partition写入成功就可以;
4.重复消费与数据丢失:
  说明: 已经消费的数据对于kafka来说,会将消费组里面的offset值进行修改,那什么时候进行修改了?是在数据消费 完成之后,比如在控制台打印完后自动提交;
      提交过程:是通过kafka将offset进行移动到下个message所处的offset的位置。拿到数据后,存储到hbase中或者mysql中,如果hbase或者mysql在这个时候连接不上,就会抛出异常,如果在处理数据的时候已经进行了提交,
      那么kafka上的offset值已经进行了修改了,但是hbase或者mysql中没有数据,这个时候就会出现数据丢失。什么时候提交offset值?在Consumer将数据处理完成之后,再来进行offset的修改提交。默认情况下offset是 自动提交,
     需要修改为手动提交offset值。如果在处理代码中正常处理了,但是在提交offset请求的时候,没有连接到kafka或者出现了故障,那么该次修 改offset的请求是失败的,那么下次在进行读取同一个分区中的数据时,会从已经处理掉的offset值再进行处理一 次,
      那么在hbase中或者mysql中就会产生两条一样的数据,也就是数据重复。

PS:数据来源:

/**
     * 获取数据库数据
     * @param
     * @return
     * @throws SQLException
     */
    public static List<KafKaMyImage> getKafKaMyImages() throws SQLException {
        List<KafKaMyImage> kafKaMyImages=new ArrayList<>();
        KafKaMyImage kafKaMyImage=null;
        String sql="select id,loginip,updatetime,username,loginaddr from adminlogin";
        Connection conection = SingleJavaJDBC.getConection();
        PreparedStatement preparedStatement = conection.prepareStatement(sql);
        ResultSet resultSet = preparedStatement.executeQuery();
        while (resultSet.next()){
            kafKaMyImage=new KafKaMyImage(Integer.parseInt(resultSet.getString("id")),
                                                resultSet.getString("loginip"),
                                                    resultSet.getString("updatetime"),
                                                        resultSet.getString("username"),
                                                            resultSet.getString("loginaddr"));
            kafKaMyImages.add(kafKaMyImage);
        }
//        SingleJavaJDBC.close(resultSet,preparedStatement,conection);
        return kafKaMyImages;
    }
}
hmoban主题是根据ripro二开的主题,极致后台体验,无插件,集成会员系统
自学咖网 » Java-API+Kafka实现自定义分区