Intro to Kafka link
Kafka install
https://towardsdatascience.com/tagged/kafka
http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html
http://cloudurable.com/blog/kafka-tutorial/index.html
https://kafka.apache.org/quickstart
https://www.javainuse.com/misc/apache-kafka-hello-world
https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/
https://www.devglan.com/apache-kafka/apache-kafka-java-example
kafka consumer java example
https://www.dataneb.com/post/kafka-producer-consumer-example-java
https://dev.to/thegroo/spring-kafka-producer-and-consumer-41oc
https://docs.spring.io/spring-kafka/reference/html/
https://www.baeldung.com/spring-kafka
kafka consumer KafkaTemplate
https://programming.vip/docs/spring-boot-integration-kafka-spring-kafka-in-depth-exploration.html
https://codenotfound.com/spring-kafka-consumer-producer-example.html
code: https://github.com/code-not-found/spring-kafka/tree/master/spring-kafka-hello-world/src/main/java/com/codenotfound/kafka
pom.xml key dependencies:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
For sending messages we will be using the KafkaTemplate which wraps a Producer and provides convenience methods to send data to Kafka topics
The template provides asynchronous send methods which return a ListenableFuture
https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#kafka-template
- ListenableFuture<SendResult<K, V>> send(String topic, V data);
kafkaTemplate.send("helloworld.t", payload);
In the Sender class, the KafkaTemplate is auto-wired as the creation will be done further below in a separate SenderConfig class.
- @Autowired
private KafkaTemplate<String, String> kafkaTemplate;
Note: Note that the Kafka broker default settings cause it to auto-create a topic when a request for an unknown topic is received.
The creation of the KafkaTemplate and Sender is handled in the SenderConfig class.:
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
@Bean
public Sender sender() {
return new Sender();
}
In order to be able to use the Spring Kafka template, we need to configure a ProducerFactory and provide it in the template’s constructor.
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
the 'BOOTSTRAP_SERVERS_CONFIG' property that specifies a list of host:port pairs used for establishing the initial connections to the Kafka cluster
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kakfa cluster
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return props;
}
Above, a message in Kafka is a key-value pair with a small amount of associated metadata. As Kafka stores and transports Byte arrays, we need to specify the format from which the key and value will be serialized.
In this example we are sending a String as payload, as such we specify the StringSerializer class which will take care of the needed transformation
Other configs: https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html
Like with any messaging-based application, you need to create a receiver that will handle the published messages. The Receiver is nothing more than a simple POJO that defines a method for receiving messages.
@KafkaListener(topics = "helloworld.t")
public void receive(String payload) {
LOGGER.info("received payload='{}'", payload);
latch.countDown();
}
Above, see the @KafkaListener annotation that creates a ConcurrentMessageListenerContainer message listener container behind the scenes for each annotated method
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html :
Annotation that marks a method to be the target of a Kafka message listener on the specified topics.
To make this happen, a factory bean kafkaListenerContainerFactory
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@EnableKafka
enables the detection of the @KafkaListener annotation that was used on the Receiver class.
defines the boostrap servers define din application properties file:
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
We also specify a 'GROUP_ID_CONFIG' which allows to identify the group this consumer belongs to. Messages will be load balanced over consumer instances that have the same group id. see helloworld
'AUTO_OFFSET_RESET_CONFIG' to "earliest". This ensures that our consumer reads from the beginning of the topic even if some messages were already sent before it was able to startup.
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
// list of host:port pairs used for establishing the initial connections to the Kafka cluster
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
// allows a pool of processes to divide the work of consuming and processing records
props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
// automatically reset the offset to the earliest offset
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html