Apache Kafka
https://kafka.apache.org/intro
Apache Kafka® is a distributed streaming platform.
A streaming platform has three key capabilities:
- Publish and subscribe to streams of records (topics), similar to a message queue or enterprise messaging system.
- Store streams of records in a fault-tolerant durable way.
- Process streams of records as they occur.
Publisher/Subscriber, Observer pattern, Message queues.
First a few concepts:
- Kafka is run as a cluster on one or more servers that can span multiple datacenters.
- The Kafka cluster stores streams of records in categories called topics.
Each record consists of a key, a value, and a timestamp.
Kafka has four core APIs:
- The Producer API allows an application to publish a stream of records to one or more Kafka topics.
- The Consumer API allows an application to subscribe to one or more topics and process the stream of records produced to them.
- The Streams API allows an application to act as a stream processor, consuming an input stream from one or more topics and producing an output stream to one or more output topics, effectively transforming the input streams to output streams.
- The Connector API allows building and running reusable producers or consumers that connect Kafka topics to existing applications or data systems. For example, a connector to a relational database might capture every change to a table.
Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.
How does Kafka's notion of streams compare to a traditional enterprise messaging system? Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each record goes to one of them; in publish-subscribe the record is broadcast to all consumers. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes.
Kafka works well as a replacement for a more traditional message broker. Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ.
https://www.upsolver.com/blog/kafka-versus-rabbitmq-architecture-performance-use-case
Kafka uses a pull model, the Kafka broker waits for the consumer to ask for data.
Kafka provides message ordering (stream/streaming).
Kafka is a log, which means that it retains messages by default.
JMS client
https://docs.confluent.io/current/clients/kafka-jms-client/index.html
JMS is a widely used messaging API that is included as part of the Java Platform, Enterprise Edition. Confluent JMS Client (kafka-jms-client) is an implementation of the JMS 1.1 provider interface that allows Apache Kafka® or Confluent Platform to be used as a JMS message broker.
Kafka topics can mimic the behavior of either topics or queues in the traditional messaging system sense. Both JMS messaging models are supported: Publish/Subscribe (Topics), Point-to-Point (Queues)
Example
1 wget http://mirrors.up.pt/pub/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz
2 tar xvzf kafka_2.11-2.3.0.tgz
3 cd kafka_2.11-2.3.0/
4 # single-node ZooKeeper instance (port 2181)
5 bin/zookeeper-server-start.sh config/zookeeper.properties
6 # new tab ....
7 cd kafka_2.11-2.3.0/
8 bin/kafka-server-start.sh config/server.properties # listens port 9092
9 # create topic
10 bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
11 # check topics
12 bin/kafka-topics.sh --list --bootstrap-server localhost:9092
13 # send messages to topic
14 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
15 >hello
16 >test
17 # consume messages
18 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
19 # https://pypi.org/project/kafka/
20 apt install python-pip # as root
21 pip install kafka
22 # https://pypi.org/project/kafka/
23
Create queue adder for 2 consumers
- bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 2 --topic adder
Amount of partitions equals the amount of consumers.
1 #consumer_adder.py
2 from kafka import KafkaConsumer
3 import json
4 import sys
5
6 topic='adder'
7 consumer = KafkaConsumer('%s-%s'%(topic,sys.argv[1]),bootstrap_servers="localhost:9092")
8 print consumer.partitions_for_topic(topic)
9
10 for msg in consumer:
11 vals = json.loads(msg.value)
12 print("%s %d %s sum: %d"%(msg.topic, msg.timestamp, msg.value, vals['op1']+vals['op2'] ))
1 #producer_adder.py
2 from kafka import KafkaProducer
3 import json
4 producer = KafkaProducer(bootstrap_servers='localhost:9092',compression_type='gzip' )
5 topic='adder'
6 parts = producer.partitions_for(topic)
7 amount_partitions = len(parts)
8
9 for i in range(10000):
10 vals = {'op1':i,'op2':i}
11 #print('adder-%d'%(i%2))
12 producer.send('%s-%d'%(topic,i%amount_partitions), value=b'%s'%( json.dumps(vals) ) )
List topics using zookeeper
ZooKeeper is a high-performance coordination service for distributed applications. The name space provided by ZooKeeper is much like that of a standard file system.
bin/zookeeper-shell.sh localhost:2181 ls /config/topics [adder-0, adder, adder-1, test, __consumer_offsets] quit
- pip install kazoo
Spring kafka
Dockerfile
1 FROM eclipse-temurin:17-jdk-alpine
2 ENV PATH=/opt/java/openjdk/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/root/kafka_2.12-3.4.0/bin/
3 RUN apk add --update --no-cache curl wget nano vim bash gcompat
4 RUN cd ~ && wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz && tar xvzf kafka_2.12-3.4.0.tgz
5 CMD ["/bin/bash","/mnt/start-servers.sh"]
start-servers.sh
Build kafka container
pom.xml
1 <?xml version="1.0" encoding="UTF-8"?>
2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
4 <modelVersion>4.0.0</modelVersion>
5 <parent>
6 <groupId>org.springframework.boot</groupId>
7 <artifactId>spring-boot-starter-parent</artifactId>
8 <version>3.1.0</version>
9 <relativePath/> <!-- lookup parent from repository -->
10 </parent>
11 <groupId>com.example</groupId>
12 <artifactId>demo</artifactId>
13 <version>0.0.1-SNAPSHOT</version>
14 <name>demo</name>
15 <description>Demo project for Spring Boot</description>
16 <properties>
17 <java.version>17</java.version>
18 </properties>
19 <dependencies>
20 <dependency>
21 <groupId>org.springframework.boot</groupId>
22 <artifactId>spring-boot-starter</artifactId>
23 </dependency>
24 <dependency>
25 <groupId>org.springframework.kafka</groupId>
26 <artifactId>spring-kafka</artifactId>
27 </dependency>
28 <dependency>
29 <groupId>org.springframework.boot</groupId>
30 <artifactId>spring-boot-starter-web</artifactId>
31 </dependency>
32 </dependencies>
33 <build>
34 <plugins>
35 <plugin>
36 <groupId>org.springframework.boot</groupId>
37 <artifactId>spring-boot-maven-plugin</artifactId>
38 </plugin>
39 </plugins>
40 </build>
41 </project>
src/main/resources/application.properties
src/main/java/com/example/demo/DemoApplication.java
1 package com.example.demo;
2
3 import org.springframework.boot.SpringApplication;
4 import org.springframework.boot.autoconfigure.SpringBootApplication;
5
6 @SpringBootApplication
7 public class DemoApplication {
8
9 public static void main(String[] args) {
10 SpringApplication.run(DemoApplication.class, args);
11 }
12 }
src/main/java/com/example/demo/DemoController.java
1 package com.example.demo;
2
3 import org.springframework.kafka.core.KafkaTemplate;
4 import org.springframework.stereotype.Controller;
5 import org.springframework.web.bind.annotation.GetMapping;
6 import org.springframework.web.bind.annotation.PathVariable;
7 import org.springframework.web.bind.annotation.ResponseBody;
8
9 @Controller
10 public class DemoController {
11 private KafkaTemplate<String, String> kafkaTemplate;
12
13 public DemoController(KafkaTemplate<String, String> kafkaTemplate) {
14 this.kafkaTemplate = kafkaTemplate;
15 }
16
17 @GetMapping("/uppercase/{text}")
18 @ResponseBody
19 public String uppercase(@PathVariable String text) {
20 String message = String.format("text to be sent in uppercase %s", text);
21 kafkaTemplate.send(KafkaTopicConfig.TOPIC_TASK, message);
22 return message;
23 }
24
25 }
src/main/java/com/example/demo/KafkaConsumerConfig.java
1 package com.example.demo;
2
3 import java.util.HashMap;
4 import java.util.Map;
5 import org.apache.kafka.clients.consumer.ConsumerConfig;
6 import org.apache.kafka.common.serialization.StringDeserializer;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.context.annotation.Configuration;
10 import org.springframework.kafka.annotation.EnableKafka;
11 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
12 import org.springframework.kafka.core.ConsumerFactory;
13 import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
14
15 @EnableKafka
16 @Configuration
17 public class KafkaConsumerConfig {
18 @Value(value = "${spring.kafka.bootstrap-servers}")
19 private String bootstrapAddress;
20 @Value(value = "${spring.kafka.consumer.group-id}")
21 private String groupId;
22
23 @Bean
24 public ConsumerFactory<String, String> consumerFactory() {
25 Map<String, Object> props = new HashMap<>();
26 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
27 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
28 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
29 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
30 return new DefaultKafkaConsumerFactory<>(props);
31 }
32
33 @Bean
34 public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
35 ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
36 factory.setConsumerFactory(consumerFactory());
37 return factory;
38 }
39 }
src/main/java/com/example/demo/KafkaMessageListener.java
1 package com.example.demo;
2
3 import org.slf4j.Logger;
4 import org.slf4j.LoggerFactory;
5 import org.springframework.kafka.annotation.KafkaListener;
6 import org.springframework.kafka.listener.MessageListener;
7 import org.springframework.stereotype.Component;
8
9 @Component
10 public class KafkaMessageListener {
11 private Logger logger;
12
13 public KafkaMessageListener() {
14 this.logger = LoggerFactory.getLogger(MessageListener.class);
15 this.logger.info("Created rest MessageListener");
16 }
17
18 @KafkaListener(topics = KafkaTopicConfig.TOPIC_TASK)
19 public void listen(String message) {
20 System.out.println("Received Message in topicTask: " + message + " in uppercase " + message.toUpperCase());
21 }
22 }
src/main/java/com/example/demo/KafkaProducerConfig.java
1 package com.example.demo;
2
3 import java.util.HashMap;
4 import java.util.Map;
5 import org.apache.kafka.clients.producer.ProducerConfig;
6 import org.apache.kafka.common.serialization.StringSerializer;
7 import org.springframework.beans.factory.annotation.Value;
8 import org.springframework.context.annotation.Bean;
9 import org.springframework.context.annotation.Configuration;
10 import org.springframework.kafka.core.DefaultKafkaProducerFactory;
11 import org.springframework.kafka.core.KafkaTemplate;
12 import org.springframework.kafka.core.ProducerFactory;
13
14 @Configuration
15 public class KafkaProducerConfig {
16 @Value(value = "${spring.kafka.bootstrap-servers}")
17 private String bootstrapAddress;
18
19 @Bean
20 public ProducerFactory<String, String> producerFactory() {
21 Map<String, Object> configProps = new HashMap<>();
22 configProps.put(
23 ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
24 bootstrapAddress);
25 configProps.put(
26 ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
27 StringSerializer.class);
28 configProps.put(
29 ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
30 StringSerializer.class);
31 return new DefaultKafkaProducerFactory<>(configProps);
32 }
33
34 @Bean
35 public KafkaTemplate<String, String> kafkaTemplate() {
36 return new KafkaTemplate<>(producerFactory());
37 }
38 }
src/main/java/com/example/demo/KafkaTopicConfig.java
1 package com.example.demo;
2
3 import java.util.HashMap;
4 import java.util.Map;
5
6 import org.apache.kafka.clients.admin.AdminClientConfig;
7 import org.apache.kafka.clients.admin.NewTopic;
8 import org.springframework.beans.factory.annotation.Value;
9 import org.springframework.context.annotation.Bean;
10 import org.springframework.context.annotation.Configuration;
11 import org.springframework.kafka.core.KafkaAdmin;
12
13 @Configuration
14 public class KafkaTopicConfig {
15 public static final String TOPIC_TASK = "topicTask";
16 @Value(value = "${spring.kafka.bootstrap-servers}")
17 private String bootstrapAddress;
18
19 @Bean
20 public KafkaAdmin kafkaAdmin() {
21 Map<String, Object> configs = new HashMap<>();
22 configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
23 return new KafkaAdmin(configs);
24 }
25
26 @Bean
27 public NewTopic topicTask() {
28 return new NewTopic(TOPIC_TASK, 1, (short) 1);
29 }
30 }