Saturday, March 14, 2020

Kafkanomics



Intro to Kafka link

Kafka install

https://towardsdatascience.com/tagged/kafka

http://cloudurable.com/blog/kafka-tutorial-kafka-consumer/index.html

http://cloudurable.com/blog/kafka-tutorial/index.html

https://kafka.apache.org/quickstart

https://www.javainuse.com/misc/apache-kafka-hello-world

https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

https://www.devglan.com/apache-kafka/apache-kafka-java-example

kafka consumer java  example
https://www.dataneb.com/post/kafka-producer-consumer-example-java

https://dev.to/thegroo/spring-kafka-producer-and-consumer-41oc

https://docs.spring.io/spring-kafka/reference/html/

https://www.baeldung.com/spring-kafka


kafka consumer KafkaTemplate

https://programming.vip/docs/spring-boot-integration-kafka-spring-kafka-in-depth-exploration.html



https://codenotfound.com/spring-kafka-consumer-producer-example.html

code: https://github.com/code-not-found/spring-kafka/tree/master/spring-kafka-hello-world/src/main/java/com/codenotfound/kafka

pom.xml key dependencies:



<dependency>

      <groupId>org.springframework.kafka</groupId>

      <artifactId>spring-kafka</artifactId>

    </dependency>



<dependency>

      <groupId>org.springframework.kafka</groupId>

      <artifactId>spring-kafka-test</artifactId>

      <scope>test</scope>

    </dependency>





For sending messages we will be using the KafkaTemplate which wraps a Producer and provides convenience methods to send data to Kafka topics



The template provides asynchronous send methods which return a ListenableFuture



https://docs.spring.io/spring-kafka/docs/2.2.0.RELEASE/reference/html/_reference.html#kafka-template



- ListenableFuture<SendResult<K, V>> send(String topic, V data);



 kafkaTemplate.send("helloworld.t", payload);



In the Sender class, the KafkaTemplate is auto-wired as the creation will be done further below in a separate SenderConfig class.


@Autowired
  private KafkaTemplate<String, String> kafkaTemplate;



Note: Note that the Kafka broker default settings cause it to auto-create a topic when a request for an unknown topic is received.


The creation of the KafkaTemplate and Sender is handled in the SenderConfig class.:

@Bean
  public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
  }


@Bean
  public Sender sender() {
 

    return new Sender();

  }


In order to be able to use the Spring Kafka template, we need to configure a ProducerFactory and provide it in the template’s constructor.

@Bean
  public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
  }


the 'BOOTSTRAP_SERVERS_CONFIG' property that specifies a list of host:port pairs used for establishing the initial connections to the Kafka cluster


@Bean
  public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections to the Kakfa cluster

    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
        bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
        StringSerializer.class);
    
    


    return props;

  }



Above, a message in Kafka is a key-value pair with a small amount of associated metadata. As Kafka stores and transports Byte arrays, we need to specify the format from which the key and value will be serialized.

In this example we are sending a String as payload, as such we specify the StringSerializer class which will take care of the needed transformation


Other configs: https://kafka.apache.org/20/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html





Like with any messaging-based application, you need to create a receiver that will handle the published messages. The Receiver is nothing more than a simple POJO that defines a method for receiving messages.


@KafkaListener(topics = "helloworld.t")
  public void receive(String payload) {
    LOGGER.info("received payload='{}'", payload);
    latch.countDown();
  }



Above, see the @KafkaListener annotation that creates a ConcurrentMessageListenerContainer message listener container behind the scenes for each annotated method
https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html :

Annotation that marks a method to be the target of a Kafka message listener on the specified topics.

To make this happen, a factory bean kafkaListenerContainerFactory

@Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());

    return factory;
  }

Above is CountDownLatch. This allows the POJO to signal that a message is received. This is something you are not likely to implement in a production application.

@EnableKafka
enables the detection of the @KafkaListener annotation that was used on the Receiver class.

defines the boostrap servers define din application properties file:

  @Value("${kafka.bootstrap-servers}")
  private String bootstrapServers;


@Bean
  public ConsumerFactory<String, String> consumerFactory() {
    return new DefaultKafkaConsumerFactory<>(consumerConfigs());
  }


We also specify a 'GROUP_ID_CONFIG' which allows to identify the group this consumer belongs to. Messages will be load balanced over consumer instances that have the same group id. see helloworld

'AUTO_OFFSET_RESET_CONFIG' to "earliest". This ensures that our consumer reads from the beginning of the topic even if some messages were already sent before it was able to startup.

@Bean
  public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    // list of host:port pairs used for establishing the initial connections to the Kafka cluster
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
        bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class);
    // allows a pool of processes to divide the work of consuming and processing records
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "helloworld");
    // automatically reset the offset to the earliest offset
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

    return props;
  }

https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html