Kafka Producer Components Illustrated

Basic Setup

Properties kafkaProps = new Properties(); // <= start with a property object
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092"); // <= server uri

kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // <= use default serializer
kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(kafkaProps);

Sending Messages

Three primary methods of sending messages

The Simplest Way (fire-and-forget)
ProducerRecord<String, String> record = 
	new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); // <= there're different constructors, here we use a simple one requires: topic name, key and value
	
try {
	producer.send(record); // this returns a future object with RecordMetadata, but we ignore it
} catch (Exception e) {
	e.printStackTrace(); // exceptions before msg is sent can be captured e.g. SerializationException, BufferExhaustedException and InterruptException
}
Synchronous Send
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); 
	
try {
	producer.send(record).get(); // <= use future.get() to wait for the reply
} catch (Exception e) {
	e.printStackTrace(); // <= if there's any errors before or while sending to Kafka
}
Asynchronous Send with a Callback
private class DemoProducerCallback implements Callback { // <= need to implements org.apacke.kafka.clients.producer.callback
	@Override
	public void onCompletion(RecordMetadata recordMetadata, Exception e) {
		if (e != null) {
			e.printStackTrace(); // <= if kafka returns an error, onCompletion will have a nonnull exception
		}
	}
}

ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France"); 
producer.send(record, new DemoProducerCallback()); // <= we pass a Callback object along when sending the record, callback is executed in the main thread, so it should be reasonably fast

Configurations

Core Configurations

You will see that with lower and less reliable acks configuration, the producer will be able to send records faster. This means that you trade off reliability for producer latency. However, end-to-end latency is measured from the time a record was produced until it is available for consumers to read and is identical for all three options. The reason is that, in order to maintain consistency, Kafka will not allow consumers to read records until they are written to all in sync replicas. **Therefore, if you care about end-to-end latency, rather than just the producer latency, there is no trade-off to make: you will get the same end-to-end latency if you choose the most reliable option.


Message Delivery Time


Others

Serializers

Data serialization strategies detail can be ref to DDIA-Chapter4 encoding (serialisation mechanism)

Schema Regsitry Pattern is used: the idea is to store all the schemas used to write data to Kafka in the registry. Then we simply store the identifier for the schema in the record we produce to Kafka. The consumers can then use the identifier to pull the record out of the Schema Registry and deserialize the data. The key is that all this work is done in the serializers and deserializers:

Here is an example of how to produce generated Avro objects to Kafka

Properties props = new Properties();

props.put("boostrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer"); // <= use kafkaAvroSerializer to serialize our objects with Avro
props.put("value.serializer", "io.confluent.kafka.serializers.kafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);

String topic = "customerContacts";

Producer<String, Customer> producer = new KafkaProducer<>(props);

// We keep producing new events until someone ctrl-c
while (true) {
	Customer customer = CustomerGenerator.getNext();
	System.out.println("Generated customer " + customer.toString());
	ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getName(), customer);
	producer.send(record);
}

Partitioners

The importance of keys: all messages with the same key will go to the same partition (for the same topic).

You can implement custom partitioning strategy, e.g. code example below

public class BananaPartitioner implements Partitioner {
	public void configure(Map<String, ?> configs) {}
	
	public int partition(String topic, Object key, byte[] keyBytes,
						 Object value, byte[] valueBytes,
						 Cluster cluster) {
		List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
		int numPartitions = partitions.size();

		if ((keyBytes == null) || (!(key instanceOf String)))
			throw new InvalidRecordException("We expect all messages to have customer name as key");
		
		if (((String)) key).equals("Banana");
			return numPartitions - 1; // Banana will always go to last partition

		// Other records will get hashed to the rest of the partitions
		return Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1)
	}

	public void close() {}
}

Headers, Interceptors, Quotas and Throttling

Headers

Additional metadata information about Kafka record, e.g.indicate the source of data, information for routing or tracing)

Interceptors

Capturing monitoring and tracing information, enhancing the message with standard headers, redacting sensitive information. Example code snippet

public class CountingProducerInterceptor implements ProducerInterceptor {
	ScheduledExecutorService executorService = 
		Executors.newSingleThreadScheduledExecutor();
	static AtomicLong numSent = new AtomicLong(0);
	static AtomicLong numAcked = new AtomicLong(0);

	public void configure(Map<String, ?> map) {
		Long windowSize = Long.valueOf(
			(String) map.get("counting.interceptor.window.size.ms"));
		executorService.scheduleAtFixedRate(CountingProducerInterceptor::run, windowSize, windowSize, TimeUnit.MILLISECONDS);
	}

	public ProducerRecord onSend(ProducerRecord producerRecord) {
		numSent.incrementAndGet();
		return producerRecord;
	}

	public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
		numAcked.incrementAndGet();
	}

	public void close() {
		executorService.shutdownNow();
	}

	public static void run() {
		// just print out the sent and ack counts in a separate thread
		// reset the counts in each time window
		System.out.println(numSent.getAndSet(0));
		System.out.println(numAcked.getAndSet(0));
	}
}

Producer interceptors can be applied without any changes to the client code (need to have deployment config changes). To use the preceding interceptor:

  1. Add your jar to the classpath
    export CLASSPATH=$CLASSPATH:~./target/CountProducerInterceptor-1.0-SNAHPSHOT.jar
    
  2. Create a config file (producer.config) that includes:
    interceptor.classes=com.shapira.examples.interceptors.CountProducerInterceptor counting.interceptor.window.size.ms=100000
    
  3. Run the application as you normally would but make sure include the configuration that you created
    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic interceptor-test --producer.config producer.config
    
Quotas and Throttling
quota.producer.default=2M
quota.producer.override="clientA:4M,clientB:10M"

Summary as Ankicard

πŸ’‘ What are the three ways to send message from producer to Kafka broker?

πŸ’‘ Can a producer object be used by multiple threads to send messages in Kafka producer application?

Yes, product object is thread-safe

πŸ’‘ What is the recommended Kafka producer timeout and retry configuration, and why?

Configure the delivery.timeout.ms and leave the retries config as default. The delivery.timeout.ms` includes the time when record is ready to be sent to the response is received from the broker, including the retries. In this case we limit the total preparation + in-flight time and let the producer retries as many times as possible within the limited timeout constraint.

πŸ’‘ What are the five steps in the send() process in the Kafka producer?

send() β†’ batching β†’ await send β†’ retries β†’ inflight

πŸ’‘ Which component in the Kafka producer is responsible for the serialization and how to config?

Serializer. props.put("value.serializer", "io.confluent.kafka.serializers.kafkaAvroSerializer");

πŸ’‘ What are the commonly used serialization strategies in Kafka producer?

Avro, protobuff, json

πŸ’‘ The key in the message is used to select the partition. If a key is null, what strategy will be used to select the partition?

A round-robin strategy will be used to select the partition

πŸ’‘ What some ready-to-use partitioner to replace the default partitioner?

RoundRobinPartitioner and UniformStickyPartitioner

πŸ’‘ How can you write your own partitioner?

Implement the β€˜Partitioner’ interface

πŸ’‘ How can we add more metadata information in the message in Kafka producer?

Use header

πŸ’‘ How can we limit the Kafka producer quota?

quota.producer.default=2M
quota.producer.override="clientA:4M,clientB:10M"
Buy Me A Coffee