Kafka-之消息传输保障(传输语意、幂等、事务以及代码⽰例)Kafka-之消息传输保障(传输语意、幂等、事务以及代码⽰例) 1 消息传输语义(保障)
不管是Kafka还是其它的消息队列,对消息传输的保障都是基于不同层级的语义
At most once
⾄多⼀次,消息很可能会丢失,但是绝不会出现重复传输
At least once
⾄少⼀次,消息不可能丢失,但是可能会出现重复传输
exactly once
精准⼀次,消息不可能丢失,也不可能重复传输
对于⽣产者⽽⾔,⼀旦消息被成功提交到kafka的⽇志⽂件,由于多副本机制的存在,那么这条消息就不会丢失,但是如果⽣产者将消息提交到kafka之后由于⽹络问题导致通信中断,造成消费者⽆法判断消息 是否提交,那么在重试的时候就可能会造成数据的 重复写⼊,所以在这⾥kafka提供的消息传输保障为at least once。
对于消费者⽽⾔,传输保障是基于offset的commit顺序,如果消费者拉取完消息之后,应⽤逻辑先处理消息再提交offset,如果在处理完消息之后,offset提交之前消费者宕机,那么消费者下次消费就会将未提交offset的消息重复进⾏消费,这样的传输保障属于at least once;假如在拉取完消息后,应⽤在处理消息之前提交offset,假如offset提交成功,消息还未来得及消费,此时消费者宕机,那么此时的传输保障就属于at most once。
kafka从0.11.0.0版本引⼊了幂等和事物⽀持这2个特性,以此来实现EOS(exactly once semantics)。
2 幂等
所谓幂等:简单的来说就是多次调⽤接⼝产⽣的结果与调⽤⼀次接⼝产⽣的结果是⼀致的,打个⽐⽅,在数据库中操作set name="sf" where id = 1;这个SQL不管执⾏多少次,结果都是⼀样的,这种操作能保证幂等性,⽽set age=age+1 where id =1 ;这个操作每次执⾏的结果都不⼀样,所以不能保证幂等性。
⽣产者进⾏重试的时候可能会重复写⼊消息,但是引⼊幂等之后就能避免这种情况,我们可以通过以下配置参数开启⽣产者传输消息幂等Properties prop =new Properties()
prop.set(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true)
//或者
prop.set("enable.idempotence",true)
⽣产者实现幂等还需要关注这些参数联动,⼀般使⽤默认配置就⾏
retries在指定重试次数的时候,这个参数必须⼤于0,默认为Integer.MAX_VALUE;
max.tion,这个参数必须<=5,默认为5;
acks,必须指定为-1(all),不然也会报错。
2.1 那么Kafka是如何保证⽣产者的幂等性的呢?
⾸先得先明确⼏个类
RecordBatch
这是⼀个容器,可以容纳多个ProducerRecord对象,但是在V0,V1版本的时候,假如指定压缩格式C
ompressType,那么通常会存储多个Record记录,V2版本的Kafka不管是否指定压缩,都会存储多个Record记录;
kafka为了保证⽣产者的幂等,引⼊了producerid,sequenceid的概念,分别对应RecordBatch的producer id,first
sequence,其中producerid代表⽣产者id,firstsequence代表递增的序列号,针对每个<PID,分区ID>维护⼀个序列号,这个序列号从0开始单调递增,⽣产者每发送⼀条消息,下⼀条消息的序列号就增加1,最终发送发送到Broker的消息的内容如下:
<<PID,分区ID>,SN_NEW_ID>
衣架钩同时在Broker的内存中也维护着已接受消息的序列号信息**<<PID,分区ID>,SN_OLD_ID>**,当下⼀条消息<<PID,分区
ID>,SN_NEW_ID>来了之后
如果SN_NEW_ID=SN_OLD_ID+1,那么Broker才会接收消息;
如果SN_NEW_ID<SN_OLD_ID+1,说明消息被重复写⼊,Broker可以丢弃它
如果SN_NEW_ID>SN_OLD_ID+1,说明消息传输丢失,此时会抛出OutOfOrderSequenceException,这是很严重的异常,往后的send,beginTransaction()等⽅法也会接⼆连三的抛出该异常。
注意:Kafka的幂等也只是保证同⼀个send()⽅法后的当次消息session的幂等
ProducerRecord<> record =new ProducerRecord<>();
producer.send(record)//在当次会话中能保证幂等,不管因⽹络故障重试多少次,序列号不变保证幂等
producer.send(record)//再发送⼀次该消息,kafka将其作为⼀条新的消息,会重新分配⼀个序列号,具体分配哪个序列号,具体取决RecordBatch封装时分配的
ProducerBatch
通常是对RecordBatch进⼀步的封装,每个ProducerBatch中对应⼀个RecordBatch;
ProducerBatch存放在双端队列Deque<Producerbatch>中,每个分区维护⼀个队列,所有的队列都存放在
RecordAccumulator中,这是⼀个消息收集器。
3 事务(transaction)
幂等性只能保证单个⽣产者-单个分区-单个Session的EOS(Exactly once semantics),kafka引⼊的事务很好的弥补了这个缺陷。在事务的ACID的特性中,主要是⽤到了其Atomic原⼦性,将⽣产、消费、位移提交操作放在⼀个事务中,要么全部成功,要么全部失败,⽽⽣产、消费、提交都会涉及多个分区。
为了实现事务,每个应⽤程序必须提供⼀个唯⼀的transactionId,可以通过ProducerConfig.TRANSACTION_ID_CONFIG配置,与幂等中的PID不同,PID是Kafka内部分配的,⽽TransactionID是我们⼿动指定的。
prop.put(ProducerConfig.TRANSACTION_ID_CONFIG,"transactionId")
//or
prop.put("transactional.id","transactionId")
在流式应⽤(Stream Application Process)中,consume-transform-produce模式是经常存在的,尤其在实时数仓中,通常有从topic-A中消费数据,进⾏转换处理之后再写⼊topic-B,⽣产者可能会因为⽹络重试⽽重复⽣产数据,消费者可能会因为offset提交问题产⽣重复消费。
3.1 开启事务的前提条件
⽣产者Producer必须开启幂等,如果不⼿动指定,只要开启了事务,那么幂等⾃动开启,如果⼿动指定false,那么会报异常。
enable.idempotence=true磨具制造
org.fig.ConfigException: Cannot set a transactional.id without also enabling dempotence
3.2 transactionId与PID的关系与区别
transactionId与PID是⼀⼀对应的,transactionId是对应⼀个Map<PID,TopicPartitionID>(PID->tp1,PID->tp2,…);
transactionId是在ProducerConfig中⼿动指定的,⽽PID是由kafka内部分配的;
为了保证使⽤同样的transactionId的新⽣产者启动,旧的⽣产者能⽴即失效,每个⽣产者通过transactionId获取⼀个producer epoch(对应**KafkaProducer.initTransaction()**⽅法);
如果以同样的transactionId启动2个Producer,那么前⼀个启动的Producer会报错
Caused by: org.s.ProducerFencedException: Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker.
3.3 ⽣产者⾓度论事务
事务能保证跨⽣产者的幂等性
相同transactionId的producer启动后,之前的producer将不再⼯作。
事务能保证跨⽣产者会话的事务恢复
当旧的⽣产者宕机后,新的producer启动后能保证未完成的旧的事务要么被commit,要么被abort,如此可以使新的⽣产者从⼀个正常的状态开始⼯作
3.3.1 ⽣产者事务相关的⽅法
kafka⽣产者提供了5个事务相关的⽅法
void initTransaction() :初始化事务,前提条件是⼿动指定transactionId
void beginTransaction():开始事务
void sendOffsetToTransaction():为消费者提供事务内的消费位移提交的操作
void commitTransaction():提交事务
void abortTransaction():终⽌事务,类似于回滚
3.3.2 典型的⽣产者事务发送操作代码
//该⽅法为获取⽣产者参数配置的⽅法
Properties prop = TestProducerTransaction.initProp(1);
短址KafkaProducer<String, String> producer =new KafkaProducer<String, String>(prop);
//1、开启事务功能,事先得开启幂等,配置transactionId丝光机
producer.initTransactions();
producer.beginTransaction();
try{
ProducerRecord<String, String> record =
new ProducerRecord<>("topic1","hello world"+ System.currentTimeMillis())
producer.send(record);
//2、提交事务
}catch(ProducerFencedException e){
e.printStackTrace();
//3、如果发⽣异常,终⽌事务
producer.abortTransaction();
}
producer.close();
3.4 消费者⾓度论事务
3.4.1 消费者消费事务的隔离级别
与常规的事务隔离级别不同,在MySQL中的隔离级别有4种
read_uncommitted
允许读取开启但未提交事务的数据
read_committed
只允许读取已提交事务的数据
假如⽣产者在commit之前发送了msg1,msg2,msg3三条消息,此时kafkaConsumer是消费不到这些消息的,但是它会缓存这些消息。
repeatable_read
易出现幻读
serializable
最强级别,但是效率低
但是在kafka的消费者读取broker数据的隔离级别只有前2种,通常通过以下参数进⾏配置
//⽅式1
prop.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,IsolationLevel.READ_COMMITTED)
//⽅式2
prop.put("isolation_level","read_uncommitted")
在⽇志中不仅包含消息⽇志,还包含事务⽇志,事务⽇志的消息控制(ControlBatch)类型分2种:COMMIT、ABORT,分别表⽰事务提交与终⽌,有点类似于Flink中的WaterMark,ControlBatch如下图。
3.4.2 consumer-transform-prosuce流式架构事务代码⽰例
常见的从⼀个topic到另外⼀个topic的流式架构,需要经过消费者消费消息转换,然后通过⽣产者将转换之后的消息发送到另⼀个topic,在这架构在实时⿏仓中使⽤⽐较⼴泛。
具体的流程图如下:
为了保证该架构的EOS语意,我们需要将者3个操作放在同⼀个事务中,具体的代码⽰例如下
package ansaction;
import org.apache.sumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.TopicPartition;
import org.serialization.StringDeserializer;
import org.serialization.StringSerializer;
KKS689import java.time.Duration;
import java.util.*;
import urrent.atomic.AtomicBoolean;
public class ConsumeTransformProduceStream {
// public static final String BROKER_LIST = "localhost:9092";
public static final AtomicBoolean isRunning =new AtomicBoolean(true);
public static Properties getProducerConfig(){
Properties prop =new Properties();
prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, Name());
prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, Name());
prop.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transactionId1002");
//如果需要开启事务,必须开启幂等,如果不⼿动指定会⾃动指定,如果显⽰指定enable.idempotence=false,那么会报异常 prop.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);
System.out.println("⽣产者配置完毕");
return prop;
}
public static Properties getConsumerConfig(){
Properties prop =new Properties();
prop.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);制氮气
prop.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Name());
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, Name());
prop.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
prop.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
prop.put(ConsumerConfig.GROUP_ID_CONFIG,"GROUP-001");
System.out.println("消费者配置完毕");
return prop;
}
public static void main(String[] args){
//1、初始化消费者和⽣产者
KafkaConsumer<String, String> consumer =
new KafkaConsumer<>(ConsumerConfig());
//1.1、订阅source topic:topic1
consumer.subscribe(Arrays.asList("topic1"));
KafkaProducer<String, String> producer =
new KafkaProducer<>(ProducerConfig());
//2、初始化事务
producer.initTransactions();
//3、开始消费
()){
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
/
/如果消息不为空,开始transform
if(!records.isEmpty()){
//定义⼀个容器存储消费位移
HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataHashMap =new HashMap<>();
//4、开启事务
producer.beginTransaction();
try{
Set<TopicPartition> topicPartitionSet = records.partitions();
for(TopicPartition topicPartition : topicPartitionSet){
List<ConsumerRecord<String, String>> tpRecords = ds(topicPartition);
for(ConsumerRecord<String, String> record : tpRecords){
//这⾥可以对消息做⼀些逻辑处理或者转换
//假设已经做了转换,开始通过⽣产者发送消息到下⼀个topic:topic2
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>("topic2", record.key(), record.value());
//发送消息
producer.send(producerRecord);
}