From c530376068f1ecd6a72dc2887c9f305d46b4f153 Mon Sep 17 00:00:00 2001 From: esmailian Date: Mon, 28 Apr 2025 08:48:48 +0330 Subject: [PATCH] add kafka and producer change style of packaging of class to cqrs styles in future --- pom.xml | 9 ++++++ .../KafkaProducerConfig.java | 32 +++++++++++++++++++ .../mmad/testapp/query/UserQueryHandler.java | 12 +++++++ 3 files changed, 53 insertions(+) create mode 100755 src/main/java/com/example/mmad/testapp/kafkaProducerConfig/KafkaProducerConfig.java diff --git a/pom.xml b/pom.xml index 09a5ded..9b00362 100755 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,15 @@ org.springframework.cloud spring-cloud-starter-config + + org.apache.kafka + kafka-clients + 3.4.0 + + + org.springframework.kafka + spring-kafka + diff --git a/src/main/java/com/example/mmad/testapp/kafkaProducerConfig/KafkaProducerConfig.java b/src/main/java/com/example/mmad/testapp/kafkaProducerConfig/KafkaProducerConfig.java new file mode 100755 index 0000000..c1230bf --- /dev/null +++ b/src/main/java/com/example/mmad/testapp/kafkaProducerConfig/KafkaProducerConfig.java @@ -0,0 +1,32 @@ +package com.example.mmad.testapp.kafkaProducerConfig; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.core.DefaultKafkaProducerFactory; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Configuration +public class KafkaProducerConfig { + + @Bean + public ProducerFactory> producerFactory() { + Map configProps = new HashMap<>(); + configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Replace with your Kafka broker address + configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + return new DefaultKafkaProducerFactory<>(configProps); + } + + @Bean + public KafkaTemplate> kafkaTemplate() { + return new KafkaTemplate<>(producerFactory()); + } +} diff --git a/src/main/java/com/example/mmad/testapp/query/UserQueryHandler.java b/src/main/java/com/example/mmad/testapp/query/UserQueryHandler.java index a01f7bb..097e4e2 100644 --- a/src/main/java/com/example/mmad/testapp/query/UserQueryHandler.java +++ b/src/main/java/com/example/mmad/testapp/query/UserQueryHandler.java @@ -1,15 +1,27 @@ package com.example.mmad.testapp.query; import com.example.mmad.testapp.event.UserCreatedEvent; +import com.example.mmad.testapp.kafkaProducerConfig.KafkaProducerConfig; import org.springframework.context.event.EventListener; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; +import java.util.List; + @Component public class UserQueryHandler { + private static final String kafkaTopic = "user-commanded"; + private final KafkaProducerConfig kafkaProducer; + + public UserQueryHandler(KafkaProducerConfig kafkaProducer) { + this.kafkaProducer = kafkaProducer; + } + @Async("taskExecutor") @EventListener public void handelUserCreated(UserCreatedEvent event) { + kafkaProducer.kafkaTemplate().send(kafkaTopic, (List) event); + // persist in query data base System.out.println("Received user created event" + event.getUserId()); } }