add order command query for test in project and add consumer kfka for test event driven read model design pattern
dont forget change implementation of kafka consumer to list of values
This commit is contained in:
@ -14,14 +14,15 @@ public class UserEventListener {
|
|||||||
this.userSnapshotRepository = userSnapshotRepository;
|
this.userSnapshotRepository = userSnapshotRepository;
|
||||||
}
|
}
|
||||||
|
|
||||||
@KafkaListener(topics = "user-commanded", groupId = "")
|
@KafkaListener(topics = "user-created", containerFactory = "kafkaListenerContainerFactory", groupId = "test")
|
||||||
public void updatedUserSnapshot(UserCreatedEvent user) {
|
public void updatedUserSnapshot(UserCreatedEvent user) {
|
||||||
UserEntity userSnapshot = new UserEntity();
|
UserEntity userSnapshot = new UserEntity();
|
||||||
//Todo write an mapper
|
//Todo write an mapper
|
||||||
userSnapshot.setId(userSnapshot.getId());
|
userSnapshot.setId(user.getUserId());
|
||||||
userSnapshot.setUserName(user.getUsername());
|
userSnapshot.setUserName(user.getUsername());
|
||||||
userSnapshot.setEmail(user.getEmail());
|
userSnapshot.setEmail(user.getEmail());
|
||||||
userSnapshotRepository.save(userSnapshot);
|
System.out.println("snapshot is updated" + user.getUserId());
|
||||||
|
// userSnapshotRepository.save(userSnapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,13 @@
|
|||||||
|
package com.example.mmad.testapp.command;
|
||||||
|
|
||||||
|
import lombok.Getter;
|
||||||
|
import lombok.Setter;
|
||||||
|
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
public class CreateOrderCommand {
|
||||||
|
|
||||||
|
private Long orderId;
|
||||||
|
private String orderName;
|
||||||
|
private String description;
|
||||||
|
}
|
@ -1,13 +0,0 @@
|
|||||||
package com.example.mmad.testapp.command;
|
|
||||||
|
|
||||||
import lombok.Getter;
|
|
||||||
import lombok.Setter;
|
|
||||||
|
|
||||||
@Getter
|
|
||||||
@Setter
|
|
||||||
public class CreateUserCommand {
|
|
||||||
|
|
||||||
private Long userId;
|
|
||||||
private String userName;
|
|
||||||
private String email;
|
|
||||||
}
|
|
@ -0,0 +1,33 @@
|
|||||||
|
package com.example.mmad.testapp.command;
|
||||||
|
|
||||||
|
import com.example.mmad.testapp.entity.OrderEntity;
|
||||||
|
import com.example.mmad.testapp.event.OrderCreatedEvent;
|
||||||
|
import com.example.mmad.testapp.repository.UserRepository;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.springframework.context.ApplicationEventPublisher;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class OrderCommandHandler {
|
||||||
|
@Autowired
|
||||||
|
private ApplicationEventPublisher applicationEventPublisher;
|
||||||
|
private final UserRepository userRepository;
|
||||||
|
|
||||||
|
public OrderCommandHandler(UserRepository userRepository) {
|
||||||
|
this.userRepository = userRepository;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void createUser(CreateOrderCommand createUserCommand) {
|
||||||
|
OrderEntity orderEntity = new OrderEntity();
|
||||||
|
orderEntity.setId(createUserCommand.getOrderId());
|
||||||
|
orderEntity.setOrderName(createUserCommand.getOrderName());
|
||||||
|
orderEntity.setDescription(createUserCommand.getDescription());
|
||||||
|
// userRepository.save(orderEntity);
|
||||||
|
OrderCreatedEvent event = new OrderCreatedEvent(
|
||||||
|
orderEntity.getId(),
|
||||||
|
orderEntity.getOrderName(),
|
||||||
|
orderEntity.getDescription()
|
||||||
|
);
|
||||||
|
applicationEventPublisher.publishEvent(event);
|
||||||
|
}
|
||||||
|
}
|
@ -1,33 +0,0 @@
|
|||||||
package com.example.mmad.testapp.command;
|
|
||||||
|
|
||||||
import com.example.mmad.testapp.entity.UserEntity;
|
|
||||||
import com.example.mmad.testapp.event.UserCreatedEvent;
|
|
||||||
import com.example.mmad.testapp.repository.UserRepository;
|
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
|
||||||
import org.springframework.context.ApplicationEventPublisher;
|
|
||||||
import org.springframework.stereotype.Service;
|
|
||||||
|
|
||||||
@Service
|
|
||||||
public class UserCommandHandler {
|
|
||||||
@Autowired
|
|
||||||
private ApplicationEventPublisher applicationEventPublisher;
|
|
||||||
private final UserRepository userRepository;
|
|
||||||
|
|
||||||
public UserCommandHandler(UserRepository userRepository) {
|
|
||||||
this.userRepository = userRepository;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void createUser(CreateUserCommand createUserCommand) {
|
|
||||||
UserEntity userEntity = new UserEntity();
|
|
||||||
userEntity.setId(createUserCommand.getUserId());
|
|
||||||
userEntity.setUserName(createUserCommand.getUserName());
|
|
||||||
userEntity.setEmail(createUserCommand.getEmail());
|
|
||||||
// userRepository.save(userEntity);
|
|
||||||
UserCreatedEvent event = new UserCreatedEvent(
|
|
||||||
userEntity.getId(),
|
|
||||||
userEntity.getUserName(),
|
|
||||||
userEntity.getEmail()
|
|
||||||
);
|
|
||||||
applicationEventPublisher.publishEvent(event);
|
|
||||||
}
|
|
||||||
}
|
|
@ -1,7 +1,7 @@
|
|||||||
package com.example.mmad.testapp.controller;
|
package com.example.mmad.testapp.controller;
|
||||||
|
|
||||||
import com.example.mmad.testapp.command.CreateUserCommand;
|
import com.example.mmad.testapp.command.CreateOrderCommand;
|
||||||
import com.example.mmad.testapp.command.UserCommandHandler;
|
import com.example.mmad.testapp.command.OrderCommandHandler;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.http.HttpStatus;
|
import org.springframework.http.HttpStatus;
|
||||||
import org.springframework.http.ResponseEntity;
|
import org.springframework.http.ResponseEntity;
|
||||||
@ -13,10 +13,10 @@ import javax.validation.Valid;
|
|||||||
@RequestMapping("/user")
|
@RequestMapping("/user")
|
||||||
public class UserController {
|
public class UserController {
|
||||||
|
|
||||||
private UserCommandHandler userCommandHandler;
|
private OrderCommandHandler userCommandHandler;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
public void setPersonService(UserCommandHandler userCommandHandler) {
|
public void setPersonService(OrderCommandHandler userCommandHandler) {
|
||||||
this.userCommandHandler = userCommandHandler;
|
this.userCommandHandler = userCommandHandler;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -28,7 +28,7 @@ public class UserController {
|
|||||||
|
|
||||||
@CrossOrigin(origins = "http://localhost:3001")
|
@CrossOrigin(origins = "http://localhost:3001")
|
||||||
@PostMapping("/create")
|
@PostMapping("/create")
|
||||||
public ResponseEntity<?> create(@Valid @RequestBody CreateUserCommand userCommand) {
|
public ResponseEntity<?> create(@Valid @RequestBody CreateOrderCommand userCommand) {
|
||||||
userCommandHandler.createUser(userCommand);
|
userCommandHandler.createUser(userCommand);
|
||||||
return ResponseEntity.ok(200);
|
return ResponseEntity.ok(200);
|
||||||
}
|
}
|
||||||
|
25
src/main/java/com/example/mmad/testapp/entity/OrderEntity.java
Executable file
25
src/main/java/com/example/mmad/testapp/entity/OrderEntity.java
Executable file
@ -0,0 +1,25 @@
|
|||||||
|
package com.example.mmad.testapp.entity;
|
||||||
|
|
||||||
|
import jakarta.persistence.*;
|
||||||
|
import lombok.*;
|
||||||
|
|
||||||
|
import static jakarta.persistence.GenerationType.SEQUENCE;
|
||||||
|
|
||||||
|
@Entity
|
||||||
|
@Getter
|
||||||
|
@Setter
|
||||||
|
@Table(schema = "testapp", name = "order")
|
||||||
|
@Builder
|
||||||
|
@AllArgsConstructor
|
||||||
|
@NoArgsConstructor
|
||||||
|
public class OrderEntity {
|
||||||
|
|
||||||
|
@Id
|
||||||
|
@SequenceGenerator(name = "seqTest", sequenceName = "testapp.order_seq", allocationSize = 1)
|
||||||
|
@GeneratedValue(strategy = SEQUENCE, generator = "seqTest")
|
||||||
|
private Long id;
|
||||||
|
@Column(name = "order_name")
|
||||||
|
private String orderName;
|
||||||
|
@Column(name = "description")
|
||||||
|
private String description;
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package com.example.mmad.testapp.event;
|
||||||
|
|
||||||
|
public class OrderCreatedEvent {
|
||||||
|
private final Long orderId;
|
||||||
|
private final String orderName;
|
||||||
|
private final String description;
|
||||||
|
|
||||||
|
public OrderCreatedEvent(Long orderId, String orderName, String description) {
|
||||||
|
this.orderId = orderId;
|
||||||
|
this.orderName = orderName;
|
||||||
|
this.description = description;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Long getOrderId() {
|
||||||
|
return orderId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getOrderName() {
|
||||||
|
return orderName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDescription() {
|
||||||
|
return description;
|
||||||
|
}
|
||||||
|
}
|
@ -1,25 +1,14 @@
|
|||||||
package com.example.mmad.testapp.event;
|
package com.example.mmad.testapp.event;
|
||||||
|
|
||||||
public class UserCreatedEvent {
|
import lombok.Getter;
|
||||||
private final Long userId;
|
import lombok.Setter;
|
||||||
private final String username;
|
|
||||||
private final String email;
|
|
||||||
|
|
||||||
public UserCreatedEvent(Long userId, String username, String email) {
|
import java.io.Serializable;
|
||||||
this.userId = userId;
|
|
||||||
this.username = username;
|
|
||||||
this.email = email;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Long getUserId() {
|
@Getter
|
||||||
return userId;
|
@Setter
|
||||||
}
|
public class UserCreatedEvent implements Serializable {
|
||||||
|
private Long userId;
|
||||||
public String getUsername() {
|
private String username;
|
||||||
return username;
|
private String email;
|
||||||
}
|
|
||||||
|
|
||||||
public String getEmail() {
|
|
||||||
return email;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,9 @@
|
|||||||
package com.example.mmad.testapp.kafkaConsumerConfig;
|
package com.example.mmad.testapp.kafkaConsumerConfig;
|
||||||
|
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.example.mmad.testapp.event.UserCreatedEvent;
|
||||||
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
import org.apache.kafka.clients.consumer.ConsumerConfig;
|
||||||
import org.apache.kafka.common.serialization.StringDeserializer;
|
import org.apache.kafka.common.serialization.StringDeserializer;
|
||||||
|
import org.springframework.beans.factory.annotation.Value;
|
||||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||||
import org.springframework.context.annotation.Bean;
|
import org.springframework.context.annotation.Bean;
|
||||||
import org.springframework.context.annotation.Configuration;
|
import org.springframework.context.annotation.Configuration;
|
||||||
@ -12,7 +13,6 @@ import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
|
|||||||
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
import org.springframework.kafka.support.serializer.JsonDeserializer;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@EnableKafka
|
@EnableKafka
|
||||||
@ -20,32 +20,29 @@ import java.util.Map;
|
|||||||
@ConfigurationProperties(prefix = "spring.kafka.consumer")
|
@ConfigurationProperties(prefix = "spring.kafka.consumer")
|
||||||
public class KafkaConsumerConfig {
|
public class KafkaConsumerConfig {
|
||||||
|
|
||||||
|
@Value("${spring.kafka.consumer.bootstrap-servers}")
|
||||||
private String bootstrapServers;
|
private String bootstrapServers;
|
||||||
|
|
||||||
|
@Value("${spring.kafka.consumer.group-id}")
|
||||||
private String groupId;
|
private String groupId;
|
||||||
|
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public DefaultKafkaConsumerFactory<String, List<?>> consumerFactory() {
|
public DefaultKafkaConsumerFactory<String, UserCreatedEvent> consumerFactory() {
|
||||||
JsonDeserializer<List<?>> deserializer =
|
JsonDeserializer<UserCreatedEvent> deserializer = new JsonDeserializer<>(UserCreatedEvent.class);
|
||||||
new JsonDeserializer<>(new TypeReference<List<?>>() {
|
|
||||||
});
|
|
||||||
deserializer.setRemoveTypeHeaders(false);
|
|
||||||
deserializer.addTrustedPackages("*");
|
deserializer.addTrustedPackages("*");
|
||||||
deserializer.setUseTypeMapperForKey(true);
|
|
||||||
|
|
||||||
Map<String, Object> config = new HashMap<>();
|
Map<String, Object> config = new HashMap<>();
|
||||||
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
|
||||||
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
|
||||||
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
|
||||||
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer);
|
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
|
||||||
|
|
||||||
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
|
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Bean
|
@Bean
|
||||||
public ConcurrentKafkaListenerContainerFactory<String, List<?>> kafkaListenerContainerFactory() {
|
public ConcurrentKafkaListenerContainerFactory<String, UserCreatedEvent> kafkaListenerContainerFactory() {
|
||||||
ConcurrentKafkaListenerContainerFactory<String, List<?>> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
ConcurrentKafkaListenerContainerFactory<String, UserCreatedEvent> factory = new ConcurrentKafkaListenerContainerFactory<>();
|
||||||
factory.setConsumerFactory(consumerFactory());
|
factory.setConsumerFactory(consumerFactory());
|
||||||
return factory;
|
return factory;
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,6 @@
|
|||||||
package com.example.mmad.testapp.query;
|
package com.example.mmad.testapp.query;
|
||||||
|
|
||||||
import com.example.mmad.testapp.event.UserCreatedEvent;
|
import com.example.mmad.testapp.event.OrderCreatedEvent;
|
||||||
import org.springframework.context.event.EventListener;
|
import org.springframework.context.event.EventListener;
|
||||||
import org.springframework.scheduling.annotation.Async;
|
import org.springframework.scheduling.annotation.Async;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
@ -9,7 +9,7 @@ import org.springframework.stereotype.Component;
|
|||||||
public class UserQueryHandler {
|
public class UserQueryHandler {
|
||||||
@Async("taskExecutor")
|
@Async("taskExecutor")
|
||||||
@EventListener
|
@EventListener
|
||||||
public void handelUserCreated(UserCreatedEvent event) {
|
public void handelUserCreated(OrderCreatedEvent event) {
|
||||||
System.out.println("Received user created event" + event.getUserId());
|
System.out.println("Received user created event" + event.getOrderId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
package com.example.mmad.testapp.repository;
|
package com.example.mmad.testapp.repository;
|
||||||
|
|
||||||
import com.example.mmad.testapp.entity.UserEntity;
|
import com.example.mmad.testapp.entity.OrderEntity;
|
||||||
import org.springframework.data.jpa.repository.JpaRepository;
|
import org.springframework.data.jpa.repository.JpaRepository;
|
||||||
import org.springframework.stereotype.Repository;
|
import org.springframework.stereotype.Repository;
|
||||||
|
|
||||||
@Repository
|
@Repository
|
||||||
public interface UserRepository extends JpaRepository<UserEntity, Long> {
|
public interface UserRepository extends JpaRepository<OrderEntity, Long> {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
spring.application.name=service-a
|
spring.application.name=service-b
|
||||||
eureka.client.service-url.defaultZone=http://localhost:8761/eureka
|
eureka.client.service-url.defaultZone=http://localhost:8761/eureka
|
||||||
spring.config.import=configserver:http://localhost:8888/
|
spring.config.import=configserver:http://localhost:8888/
|
||||||
eureka.client.register-with-eureka=true
|
eureka.client.register-with-eureka=true
|
||||||
|
Reference in New Issue
Block a user