diff --git a/pom.xml b/pom.xml index 9566843..b5896b9 100755 --- a/pom.xml +++ b/pom.xml @@ -87,6 +87,15 @@ org.springframework.cloud spring-cloud-starter-config + + org.apache.kafka + kafka-clients + 3.4.0 + + + org.springframework.kafka + spring-kafka + diff --git a/src/main/java/com/example/mmad/testapp/Listner/UserEventListener.java b/src/main/java/com/example/mmad/testapp/Listner/UserEventListener.java new file mode 100644 index 0000000..e35a873 --- /dev/null +++ b/src/main/java/com/example/mmad/testapp/Listner/UserEventListener.java @@ -0,0 +1,27 @@ +package com.example.mmad.testapp.Listner; + +import com.example.mmad.testapp.entity.UserEntity; +import com.example.mmad.testapp.event.UserCreatedEvent; +import com.example.mmad.testapp.repository.UserSnapshotRepository; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.stereotype.Component; + +@Component +public class UserEventListener { + private final UserSnapshotRepository userSnapshotRepository; + + public UserEventListener(UserSnapshotRepository userSnapshotRepository) { + this.userSnapshotRepository = userSnapshotRepository; + } + + @KafkaListener(topics = "user-commanded", groupId = "") + public void updatedUserSnapshot(UserCreatedEvent user) { + UserEntity userSnapshot = new UserEntity(); + //Todo write an mapper + userSnapshot.setId(userSnapshot.getId()); + userSnapshot.setUserName(user.getUsername()); + userSnapshot.setEmail(user.getEmail()); + userSnapshotRepository.save(userSnapshot); + } + +} diff --git a/src/main/java/com/example/mmad/testapp/kafkaConsumerConfig/KafkaConsumerConfig.java b/src/main/java/com/example/mmad/testapp/kafkaConsumerConfig/KafkaConsumerConfig.java new file mode 100755 index 0000000..a774f7c --- /dev/null +++ b/src/main/java/com/example/mmad/testapp/kafkaConsumerConfig/KafkaConsumerConfig.java @@ -0,0 +1,52 @@ +package com.example.mmad.testapp.kafkaConsumerConfig; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.support.serializer.JsonDeserializer; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@EnableKafka +@Configuration +@ConfigurationProperties(prefix = "spring.kafka.consumer") +public class KafkaConsumerConfig { + + private String bootstrapServers; + + private String groupId; + + + @Bean + public DefaultKafkaConsumerFactory> consumerFactory() { + JsonDeserializer> deserializer = + new JsonDeserializer<>(new TypeReference>() { + }); + deserializer.setRemoveTypeHeaders(false); + deserializer.addTrustedPackages("*"); + deserializer.setUseTypeMapperForKey(true); + + Map config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer); + + return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory> kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory> factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + return factory; + } +} diff --git a/src/main/java/com/example/mmad/testapp/repository/UserSnapshotRepository.java b/src/main/java/com/example/mmad/testapp/repository/UserSnapshotRepository.java new file mode 100644 index 0000000..b370e37 --- /dev/null +++ b/src/main/java/com/example/mmad/testapp/repository/UserSnapshotRepository.java @@ -0,0 +1,7 @@ +package com.example.mmad.testapp.repository; + +import com.example.mmad.testapp.entity.UserEntity; +import org.springframework.data.jpa.repository.JpaRepository; + +public interface UserSnapshotRepository extends JpaRepository { +}