Kafka 教程 | 生产者实例
概要:
当我们发送消息之前,先问几个问题:每条消息都是很关键且不能容忍丢失么?偶尔重复消息可以么?我们关注的是消息延迟还是写入消息的吞吐量?
举个例子,有一个信用卡交易处理系统,当交易发生时会发送一条消息到Kafka,另一个服务来读取消息并根据规则引擎来检查交易是否通过,将结果通过Kafka返回。 对于这样的业务,消息既不能丢失也不能重复,由于交易量大因此吞吐量需要尽可能大,延迟可以稍微高一点。
再举个例子,假如我们需要收集用户在网页上的点击数据,对于这样的场景,少量消息丢失或者重复是可以容忍的,延迟多大都不重要只要不影响用户体验,吞吐则根据实时用户数来决定.
生产者写消息的基本流程:
流程如下:
- 首先,我们需要创建一个ProducerRecord,这个对象需要包含消息的主题(topic)和值(value),可以选择性指定一个键值(key)或者分区(partition)。
- 发送消息时,生产者会对键值和值序列化成字节数组,然后发送到分配器(partitioner)。
- 如果我们指定了分区,那么分配器返回该分区即可;否则,分配器将会基于键值来选择一个分区并返回。
- 选择完分区后,生产者知道了消息所属的主题和分区,它将这条记录添加到相同主题和分区的批量消息中,另一个线程负责发送这些批量消息到对应的Kafka broker。
- 当broker接收到消息后,如果成功写入则返回一个包含消息的主题、分区及位移的RecordMetadata对象,否则返回异常。
- 生产者接收到结果后,对于异常可能会进行重试。
创建Kafka生产者:
创建Kafka生产者有三个基本属性:
属性 | 解释 |
---|---|
bootstrap.servers | 属性值是一个host:port的broker列表。这个属性指定了生产者建立初始连接的broker列表,这个列表不需要包含所有的broker,因为生产者建立初始连接后会从相应的broker获取到集群信息。但建议指定至少包含两个broker,这样一个broker宕机后生产者可以连接到另一个broker。 |
key.serializer | 属性值是类的名称。这个属性指定了用来序列化键值(key)的类。Kafka broker只接受字节数组,但生产者的发送消息接口允许发送任何的Java对象,因此需要将这些对象序列化成字节数组。key.serializer指定的类需要实现org.apache.kafka.common.serialization.Serializer接口,Kafka客户端包中包含了几个默认实现,例如ByteArraySerializer、StringSerializer和IntegerSerializer。 |
value.serializer | 属性值是类的名称。这个属性指定了用来序列化消息记录的类,与key.serializer差不多。 |
下面是一个样例代码:
1
2
3
4
5
6
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
创建完生产者后,我们可以发送消息。Kafka中有三种发送消息的方式:
- 只发不管结果(fire-and-forget) : 只调用接口发送消息到Kafka服务器,但不管成功写入与否。由于Kafka是高可用的,因此大部分情况下消息都会写入,但在异常情况下会丢消息。
- 同步发送(Synchronous send) : 调用send()方法返回一个Future对象,我们可以使用它的get()方法来判断消息发送成功与否。
- 异步发送(Asynchronous send) :调用send()时提供一个回调方法,当接收到broker结果后回调此方法。
发送消息到Kafka:
最简单的发送消息方式如下:
1
2
3
4
5
6
7
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
说明:
- 我们创建了一个ProducerRecord,并且指定了Topic以及消息的key/value。Topic总是字符串类型的,但key/value则可以是任意类型,在本例中也是字符串。需要注意的是,这里的key/value的类型需要与serializer和生产者的类型匹配。
- 使用send()方法来发送消息,该方法会返回一个RecordMetadata的Future对象,但由于我们没有跟踪Future对象,因此并不知道发送结果。如前所述,这种方式可能会丢失消息。
- 虽然我们忽略了发送消息到broker的异常,但是我们调用send()方法时仍然可能会遇到一些异常,例如序列化异常、发送缓冲区溢出异常等等。
同步发送消息:
同步发送方式可以简单修改如下:
1
2
3
4
5
6
7
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
e.printStackTrace();
}
注意到,这里使用了Future.get()来获取发送结果,如果发送消息失败则会抛出异常,否则返回一个RecordMetadata对象。发送失败异常包含:1)broker返回不可恢复异常,生产者直接抛出该异常;2)对于broker其他异常,生产者会进行重试,如果重试超过一定次数仍不成功则抛出异常。
可恢复异常指的是,如果生产者进行重试可能会成功,例如连接异常;不可恢复异常则是进行重试也不会成功的异常,例如消息内容过大。
异步发送消息:
首先了解下什么场景下需要异步发送消息。假如生产者与broker之间的网络延时为10ms,我们发送100条消息,发送每条消息都等待结果,那么需要1秒的时间。 而如果我们采用异步的方式,几乎没有任何耗时,而且我们还可以通过回调知道消息的发送结果。
异步发送消息的样例如下:
1
2
3
4
5
6
7
8
9
10
11
12
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record = new ProducerRecord<String, String>("CustomerCountry", "Precision Products", "France");
producer.send(record, new DemoProducerCallback());
异步回调的类需要实现org.apache.kafka.clients.producer.Callback接口,这个接口只有一个onCompletion方法。当Kafka返回异常时,异常值不为null,代码中只是简单的打印,但我们可以采取其他处理方式。
生产者的配置:
上面我们只配置了bootstrap.servers和序列化类,其实生产者还有很多配置,上面只是使用了默认值。下面来看下这些配置参数。
- acks:
acks控制多少个副本必须写入消息后生产者才能认为写入成功,这个参数对消息丢失可能性有很大影响。这个参数有三种取值:
acks=0:生产者把消息发送到broker即认为成功,不等待broker的处理结果。这种方式的吞吐最高,但也是最容易丢失消息的。
acks=1:生产者会在该分区的群首(leader)写入消息并返回成功后,认为消息发送成功。如果群首写入消息失败,生产者会收到错误响应并进行重试。这种方式能够一定程度避免消息丢失,但如果群首宕机时该消息没有复制到其他副本,那么该消息还是会丢失。另外,如果我们使用同步方式来发送,延迟会比前一种方式大大增加(至少增加一个网络往返时间);如果使用异步方式,应用感知不到延迟,吞吐量则会受异步正在发送中的数量限制。
acks=all:生产者会等待所有副本成功写入该消息,这种方式是最安全的,能够保证消息不丢失,但是延迟也是最大的。
- buffer.memory:
这个参数设置生产者缓冲发送的消息的内存大小,如果应用调用send方法的速度大于生产者发送的速度,那么调用会阻塞或者抛出异常,具体行为取决于block.on.buffer.full(这个参数在0.9.0.0版本被max.block.ms代替,允许抛出异常前等待一定时间)参数。
- compresstion.type:
默认情况下消息是不压缩的,这个参数可以指定使用消息压缩,参数可以取值为snappy、gzip或者lz4。 snappy压缩算法由Google研发,这种算法在性能和压缩比取得比较好的平衡; 相比之下,gzip消耗更多的CPU资源,但是压缩效果也是最好的。 通过使用压缩,我们可以节省网络带宽和Kafka存储成本。
- retries:
当生产者发送消息收到一个可恢复异常时,会进行重试,这个参数指定了重试的次数。 在实际情况中,这个参数需要结合retry.backoff.ms(重试等待间隔)来使用,建议总的重试时间比集群重新选举群首的时间长,这样可以避免生产者过早结束重试导致失败。
- batch.size:
当多条消息发送到一个分区时,生产者会进行批量发送,这个参数指定了批量消息的大小上限(以字节为单位)。当批量消息达到这个大小时,生产者会一起发送到broker; 但即使没有达到这个大小,生产者也会有定时机制来发送消息,避免消息延迟过大。
- linger.ms:
这个参数指定生产者在发送批量消息前等待的时间,当设置此参数后,即便没有达到批量消息的指定大小,到达时间后生产者也会发送批量消息到broker。 默认情况下,生产者的发送消息线程只要空闲了就会发送消息,即便只有一条消息。设置这个参数后,发送线程会等待一定的时间,这样可以批量发送消息增加吞吐量,但同时也会增加延迟。
- client.id:
这个参数可以是任意字符串,它是broker用来识别消息是来自哪个客户端的。在broker进行打印日志、衡量指标或者配额限制时会用到。
- max.in.flight.requests.per.connection:
这个参数指定生产者可以发送多少消息到broker并且等待响应,设置此参数较高的值可以提高吞吐量,但同时也会增加内存消耗。 另外,如果设置过高反而会降低吞吐量,因为批量消息效率降低。 设置为1,可以保证发送到broker的顺序和调用send方法顺序一致,即便出现失败重试的情况也是如此。
- timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms:
这些参数控制生产者等待broker的响应时间。 request.timeout.ms指定发送数据的等待响应时间,metadata.fetch.timeout.ms指定获取元数据(例如获取分区的群首信息)的等待响应时间。 timeout.ms则指定broker在返回结果前等待其他副本(与acks参数相关)响应的时间,如果时间到了但其他副本没有响应结果,则返回消息写入失败。
- max.block.ms
这个参数指定应用调用send方法或者获取元数据方法(例如partitionFor)时的阻塞时间,超过此时间则抛出timeout异常。
- max.request.size:
这个参数限制生产者发送数据包的大小,数据包的大小与消息的大小、消息数相关。 如果我们指定了最大数据包大小为1M,那么最大的消息大小为1M,或者能够最多批量发送1000条消息大小为1K的消息。 另外,broker也有message.max.bytes参数来控制接收的数据包大小。 在实际中,建议这些参数值是匹配的,避免生产者发送了超过broker限定的数据大小。
- receive.buffer.bytes, send.buffer.bytes:
这两个参数设置用来发送/接收数据的TCP连接的缓冲区,如果设置为-1则使用操作系统自身的默认值。如果生产者与broker在不同的数据中心,建议提高这个值,因为不同数据中心往往延迟比较大。
最后讨论下顺序保证。Kafka保证分区的顺序,也就是说,如果生产者以一定的顺序发送消息到Kafka的某个分区,那么Kafka在分区内部保持此顺序,而且消费者也按照同样的顺序消费。 但是,应用调用send方法的顺序和实际发送消息的顺序不一定是一致的。 举个例子,如果retries参数不为0,而max.in.flights.requests.per.session参数大于1,那么有可能第一个批量消息写入失败,但是第二个批量消息写入成功,然后第一个批量消息重试写入成功,那么这个顺序乱序的。 因此,如果需要保证消息顺序,建议设置max.in.flights.requests.per.session为1,这样可以在第一个批量消息发送失败重试时,第二个批量消息需要等待。
序列化:
上面提到了Kafka自带的序列化类,现在来看下如何使用其他的序列化策略。
自定义序列化:
如果我们发送的消息不是整数或者字符串时,我们需要自定义序列化策略或者使用通用的Avro、Thrift或者Protobuf这些序列化方案。 下面来看下如何使用自定义的序列化方案,以及存在的问题。
假如我们要发送的消息对象是这么一个Customer:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
那么,自定义的序列化类实现样例如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
/**
We are serializing Customer as:
4 byte int representing customerId
4 byte int representing length of customerName in UTF-8 bytes (0 if name is
Null)
N bytes representing customerName in UTF-8
*/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
else {
if (data.getName() != null) {
serializeName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}
分区:
我们创建消息的时候,必须要提供Topic和消息的内容,而消息的key是可选的,当不指定key时默认为null。 消息的key有两个重要的作用: 1)提供描述消息的额外信息; 2)用来决定消息写入到哪个分区,所有具有相同key的消息会分配到同一个分区中。
如果key为null,那么生产者会使用默认的分配器,该分配器使用轮询(round-robin)算法来将消息均衡到所有分区。
如果key不为null而且使用的是默认的分配器,那么生产者会对key进行哈希并根据结果将消息分配到特定的分区。 注意的是,在计算消息与分区的映射关系时,使用的是全部的分区数而不仅仅是可用的分区数。 这也意味着,如果某个分区不可用(虽然使用复制方案的话这极少发生),而消息刚好被分配到该分区,那么将会写入失败。另外,如果需要增加额外的分区,那么消息与分区的映射关系将会发生改变,因此尽量避免这种情况。
自定义分配器:
现在来看下如何自定义一个分配器,下面将key为Banana的消息单独放在一个分区,与其他的消息进行分区隔离:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages to have customer name as key")
if (((String) key).equals("Banana"))
return numPartitions; // Banana will always go to last partition
// Other records will get hashed to the rest of the partitions
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
public void close() {}
}
SimpleProducer应用程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer"
public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value
if(args.length == 0){
System.out.println("Enter topic name");
return;
}
//Assign topicName to string variable
String topicName = args[0].toString();
// create instance for properties to access producer configs
Properties props = new Properties();
//Assign localhost id
props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests.
props.put("acks", “all");
//If the request fails, the producer can automatically retry,
props.put("retries", 0);
//Specify buffer size in config
props.put("batch.size", 16384);
//Reduce the no of requests less than 0
props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", 33554432);
props.put("key.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer
<String, String>(props);
for(int i = 0; i < 10; i++)
producer.send(new ProducerRecord<String, String>(topicName,
Integer.toString(i), Integer.toString(i)));
System.out.println(“Message sent successfully");
producer.close();
}
}