diff --git a/pom.xml b/pom.xml
index 09a5ded..9b00362 100755
--- a/pom.xml
+++ b/pom.xml
@@ -87,6 +87,15 @@
org.springframework.cloud
spring-cloud-starter-config
+
+ org.apache.kafka
+ kafka-clients
+ 3.4.0
+
+
+ org.springframework.kafka
+ spring-kafka
+
diff --git a/src/main/java/com/example/mmad/testapp/kafkaProducerConfig/KafkaProducerConfig.java b/src/main/java/com/example/mmad/testapp/kafkaProducerConfig/KafkaProducerConfig.java
new file mode 100755
index 0000000..c1230bf
--- /dev/null
+++ b/src/main/java/com/example/mmad/testapp/kafkaProducerConfig/KafkaProducerConfig.java
@@ -0,0 +1,32 @@
+package com.example.mmad.testapp.kafkaProducerConfig;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Configuration
+public class KafkaProducerConfig {
+
+ @Bean
+ public ProducerFactory> producerFactory() {
+ Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Replace with your Kafka broker address
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+ return new DefaultKafkaProducerFactory<>(configProps);
+ }
+
+ @Bean
+ public KafkaTemplate> kafkaTemplate() {
+ return new KafkaTemplate<>(producerFactory());
+ }
+}
diff --git a/src/main/java/com/example/mmad/testapp/query/UserQueryHandler.java b/src/main/java/com/example/mmad/testapp/query/UserQueryHandler.java
index a01f7bb..097e4e2 100644
--- a/src/main/java/com/example/mmad/testapp/query/UserQueryHandler.java
+++ b/src/main/java/com/example/mmad/testapp/query/UserQueryHandler.java
@@ -1,15 +1,27 @@
package com.example.mmad.testapp.query;
import com.example.mmad.testapp.event.UserCreatedEvent;
+import com.example.mmad.testapp.kafkaProducerConfig.KafkaProducerConfig;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
+import java.util.List;
+
@Component
public class UserQueryHandler {
+ private static final String kafkaTopic = "user-commanded";
+ private final KafkaProducerConfig kafkaProducer;
+
+ public UserQueryHandler(KafkaProducerConfig kafkaProducer) {
+ this.kafkaProducer = kafkaProducer;
+ }
+
@Async("taskExecutor")
@EventListener
public void handelUserCreated(UserCreatedEvent event) {
+ kafkaProducer.kafkaTemplate().send(kafkaTopic, (List>) event);
+ // persist in query data base
System.out.println("Received user created event" + event.getUserId());
}
}