Skip to content

Commit 779ff9c

Browse files
committed
kafka send email
1 parent f4ae33a commit 779ff9c

File tree

28 files changed

+655
-275
lines changed

28 files changed

+655
-275
lines changed

Pre-request-Script.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ const postRequest = {
1010
mode: 'raw',
1111
raw: JSON.stringify({
1212
"username" : "admin",
13-
"password" : "1234567"
13+
"password" : "abc123"
1414
})
1515
}
1616
};

api-gateway/src/main/resources/application.properties

+3-3
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ spring.cloud.gateway.routes[1].id=inventory-service
2121
spring.cloud.gateway.routes[1].uri=lb://INVENTORY-SERVICE
2222
spring.cloud.gateway.routes[1].predicates[0]=Path=/api/inventory/**
2323

24-
spring.cloud.gateway.routes[2].id=notify-service-send-email
25-
spring.cloud.gateway.routes[2].uri=lb://NOTIFY-SERVICE-SEND-EMAIL
24+
spring.cloud.gateway.routes[2].id=notification-service
25+
spring.cloud.gateway.routes[2].uri=lb://NOTIFICATION-SERVICE
2626
spring.cloud.gateway.routes[2].predicates[0]=Path=/api/email/**
2727

2828
spring.cloud.gateway.routes[3].id=order-service-carts
@@ -55,7 +55,7 @@ spring.cloud.gateway.routes[9].predicates[0]=Path=/api/auth/**
5555

5656
spring.cloud.gateway.routes[10].id=user-service-information
5757
spring.cloud.gateway.routes[10].uri=lb://USER-SERVICE
58-
spring.cloud.gateway.routes[10].predicates[0]=Path=/api/information/**
58+
spring.cloud.gateway.routes[10].predicates[0]=Path=/api/role/**
5959

6060
spring.cloud.gateway.routes[11].id=user-service-manager
6161
spring.cloud.gateway.routes[11].uri=lb://USER-SERVICE

api-gateway/src/main/resources/application.yml

+4-4
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ spring:
2020
predicates:
2121
- Path=/api/inventory/**
2222

23-
# NOTIFY-SERVICE-SEND-EMAIL
24-
- id: notify-service-send-email
25-
uri: lb://NOTIFY-SERVICE-SEND-EMAIL
23+
# NOTIFY-SERVICE
24+
- id: notification-service
25+
uri: lb://NOTIFICATION-SERVICE
2626
predicates:
2727
- Path=/api/email/**
2828

@@ -69,7 +69,7 @@ spring:
6969
- id: user-service-information
7070
uri: lb://USER-SERVICE
7171
predicates:
72-
- Path=/api/information/**
72+
- Path=/api/role/**
7373

7474
- id: user-service-manager
7575
uri: lb://USER-SERVICE

notification-service/pom.xml

+49
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@
2222
<groupId>org.springframework.boot</groupId>
2323
<artifactId>spring-boot-starter-web</artifactId>
2424
</dependency>
25+
<dependency>
26+
<groupId>org.springframework.boot</groupId>
27+
<artifactId>spring-boot-starter-webflux</artifactId>
28+
</dependency>
2529
<dependency>
2630
<groupId>org.springframework.boot</groupId>
2731
<artifactId>spring-boot-devtools</artifactId>
@@ -52,6 +56,51 @@
5256
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
5357
</dependency>
5458

59+
60+
<dependency>
61+
<groupId>com.hoangtien2k3</groupId>
62+
<artifactId>commonservice</artifactId>
63+
<version>0.0.1-SNAPSHOT</version>
64+
</dependency>
65+
<dependency>
66+
<groupId>org.springframework.kafka</groupId>
67+
<artifactId>spring-kafka-test</artifactId>
68+
<scope>test</scope>
69+
</dependency>
70+
<dependency>
71+
<groupId>org.springframework.cloud</groupId>
72+
<artifactId>spring-cloud-starter-bootstrap</artifactId>
73+
</dependency>
74+
<dependency>
75+
<groupId>org.springframework.cloud</groupId>
76+
<artifactId>spring-cloud-starter-circuitbreaker-reactor-resilience4j</artifactId>
77+
<version>3.0.0</version>
78+
</dependency>
79+
<dependency>
80+
<groupId>com.google.code.gson</groupId>
81+
<artifactId>gson</artifactId>
82+
<version>2.10.1</version>
83+
</dependency>
84+
<dependency>
85+
<groupId>org.json</groupId>
86+
<artifactId>json</artifactId>
87+
<version>20220924</version>
88+
</dependency>
89+
<dependency>
90+
<groupId>org.springframework.kafka</groupId>
91+
<artifactId>spring-kafka</artifactId>
92+
</dependency>
93+
<dependency>
94+
<groupId>io.projectreactor.kafka</groupId>
95+
<artifactId>reactor-kafka</artifactId>
96+
</dependency>
97+
<dependency>
98+
<groupId>org.apache.kafka</groupId>
99+
<artifactId>kafka-clients</artifactId>
100+
<version>2.8.0</version>
101+
</dependency>
102+
103+
55104
</dependencies>
56105

57106
<dependencyManagement>
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package com.hoangtien2k3.notificationservice.api;
22

3-
import com.hoangtien2k3.notificationservice.entity.EmailDetails;
3+
import com.hoangtien2k3.notificationservice.dto.EmailDetails;
44
import com.hoangtien2k3.notificationservice.service.EmailService;
55
import org.springframework.beans.factory.annotation.Autowired;
66
import org.springframework.web.bind.annotation.*;
77
import org.springframework.web.multipart.MultipartFile;
8+
import reactor.core.publisher.Mono;
89

910
@RestController
1011
@RequestMapping("/api/email")
@@ -13,26 +14,23 @@ public class EmailController {
1314
@Autowired
1415
private EmailService emailService;
1516

16-
// Sending a simple Email
17-
@PostMapping("/sendMail")
18-
public String sendMail(@RequestBody EmailDetails details) {
17+
@PostMapping("/sendSimpleMail")
18+
public Mono<String> sendSimpleMail(@RequestBody EmailDetails details) {
1919
return emailService.sendSimpleMail(details);
2020
}
2121

22-
// Sending email with attachment
2322
@PostMapping("/sendMailWithAttachment")
24-
public String sendMailWithAttachment(@RequestBody EmailDetails details) {
23+
public Mono<String> sendMailWithAttachment(@RequestBody EmailDetails details) {
2524
return emailService.sendMailWithAttachment(details);
2625
}
2726

28-
@PostMapping("/send")
29-
public String sendMail(@RequestParam(value = "file", required = false) MultipartFile[] file,
30-
String to,
31-
String[] cc,
32-
String subject,
33-
String body) {
34-
35-
return emailService.sendMail(file, to, cc, subject, body);
27+
@PostMapping("/sendMail")
28+
public Mono<String> sendMail(@RequestParam(value = "file", required = false) MultipartFile[] files,
29+
@RequestParam String to,
30+
@RequestParam String[] cc,
31+
@RequestParam String subject,
32+
@RequestParam String body) {
33+
return emailService.sendMail(files, to, cc, subject, body);
3634
}
3735

3836
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.hoangtien2k3.notificationservice.config;
2+
3+
public class KafkaConstant {
4+
public static final String PROFILE_ONBOARDING_TOPIC = "profileOnboarding";
5+
public static final String PROFILE_ONBOARDED_TOPIC = "profileOnboarded";
6+
7+
public static final String STATUS_PROFILE_PENDING = "PENDING";
8+
public static final String STATUS_PROFILE_ACTIVE = "ACTIVE";
9+
10+
public static final String STATUS_PAYMENT_CREATING = "CREATING";
11+
public static final String STATUS_PAYMENT_REJECTED = "REJECTED";
12+
public static final String STATUS_PAYMENT_PROCESSING = "PROCESSING";
13+
public static final String STATUS_PAYMENT_SUCCESSFUL = "SUCCESSFUL";
14+
15+
public static final String PAYMENT_REQUEST_TOPIC = "paymentRequest";
16+
public static final String PAYMENT_CREATED_TOPIC = "paymentCreated";
17+
public static final String PAYMENT_COMPLETED_TOPIC = "paymentCompleted";
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
package com.hoangtien2k3.notificationservice.config.kafka;
2+
3+
import org.apache.kafka.clients.consumer.ConsumerConfig;
4+
import org.apache.kafka.clients.producer.ProducerConfig;
5+
import org.apache.kafka.common.serialization.StringDeserializer;
6+
import org.apache.kafka.common.serialization.StringSerializer;
7+
import org.springframework.beans.factory.annotation.Autowired;
8+
import org.springframework.context.annotation.Bean;
9+
import org.springframework.context.annotation.Configuration;
10+
import reactor.kafka.receiver.ReceiverOptions;
11+
import reactor.kafka.sender.KafkaSender;
12+
import reactor.kafka.sender.SenderOptions;
13+
14+
import java.util.HashMap;
15+
import java.util.Map;
16+
17+
@Configuration
18+
public class CommonConfiguration {
19+
@Autowired
20+
private ReactiveKafkaAppProperties reactiveKafkaAppProperties;
21+
22+
@Bean
23+
KafkaSender<String,String> kafkaSender(){
24+
Map<String, Object> props = new HashMap<>();
25+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, reactiveKafkaAppProperties.bootstrapServers);
26+
props.put(ProducerConfig.ACKS_CONFIG, "all");
27+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
28+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
29+
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
30+
return KafkaSender.create(senderOptions);
31+
}
32+
33+
@Bean
34+
ReceiverOptions<String,String> receiverOptions(){
35+
Map<String, Object> propsReceiver = new HashMap<>();
36+
propsReceiver.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, reactiveKafkaAppProperties.bootstrapServers);
37+
propsReceiver.put(ConsumerConfig.GROUP_ID_CONFIG, reactiveKafkaAppProperties.consumerGroupId);
38+
propsReceiver.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
39+
propsReceiver.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
40+
41+
return ReceiverOptions.create(propsReceiver);
42+
}
43+
44+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.hoangtien2k3.notificationservice.config.kafka;
2+
3+
import org.springframework.beans.factory.annotation.Value;
4+
import org.springframework.context.annotation.Configuration;
5+
6+
@Configuration
7+
public class ReactiveKafkaAppProperties {
8+
@Value("${kafka.bootstrap.servers}")
9+
String bootstrapServers;
10+
11+
@Value("${payment.kafka.consumer-group-id}")
12+
String consumerGroupId;
13+
}

notification-service/src/main/java/com/hoangtien2k3/notificationservice/entity/EmailDetails.java notification-service/src/main/java/com/hoangtien2k3/notificationservice/dto/EmailDetails.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.hoangtien2k3.notificationservice.entity;
1+
package com.hoangtien2k3.notificationservice.dto;
22

33
import lombok.AllArgsConstructor;
44
import lombok.Data;

notification-service/src/main/java/com/hoangtien2k3/notificationservice/entity/EmailSender.java notification-service/src/main/java/com/hoangtien2k3/notificationservice/dto/EmailSender.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package com.hoangtien2k3.notificationservice.entity;
1+
package com.hoangtien2k3.notificationservice.dto;
22

33
import org.springframework.web.multipart.MultipartFile;
44

Original file line numberDiff line numberDiff line change
@@ -1,4 +1,42 @@
11
package com.hoangtien2k3.notificationservice.event;
22

3+
import com.google.gson.Gson;
4+
import com.hoangtien2k3.notificationservice.config.KafkaConstant;
5+
import com.hoangtien2k3.notificationservice.dto.EmailDetails;
6+
import com.hoangtien2k3.notificationservice.service.EmailService;
7+
import lombok.extern.slf4j.Slf4j;
8+
import org.springframework.beans.factory.annotation.Autowired;
9+
import org.springframework.stereotype.Service;
10+
import reactor.kafka.receiver.KafkaReceiver;
11+
import reactor.kafka.receiver.ReceiverOptions;
12+
import reactor.kafka.receiver.ReceiverRecord;
13+
14+
import java.util.Collections;
15+
16+
@Service
17+
@Slf4j
318
public class EventConsumer {
19+
Gson gson = new Gson(); // convert Json -> DTO
20+
21+
@Autowired
22+
private EmailService emailService;
23+
24+
@Autowired
25+
EventProducer eventProducer;
26+
27+
public EventConsumer(ReceiverOptions<String, String> receiverOptions) {
28+
KafkaReceiver.create(receiverOptions.subscription(Collections.singleton(KafkaConstant.PROFILE_ONBOARDING_TOPIC)))
29+
.receive()
30+
.subscribe(this::sendEmailKafkaOnboarding);
31+
}
32+
33+
public void sendEmailKafkaOnboarding(ReceiverRecord<String, String> receiverRecord) {
34+
log.info("USER-SERVICE Onboarding event send email on notification service.");
35+
EmailDetails emailDetails = gson.fromJson(receiverRecord.value(), EmailDetails.class);
36+
37+
emailService.sendSimpleMail(emailDetails).subscribe(email -> {
38+
log.info("send email successfully -> user-service change password.");
39+
eventProducer.send(KafkaConstant.PROFILE_ONBOARDED_TOPIC, gson.toJson(emailDetails)).subscribe();
40+
});
41+
}
442
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package com.hoangtien2k3.notificationservice.event;
2+
3+
import lombok.extern.slf4j.Slf4j;
4+
import org.apache.kafka.clients.producer.ProducerRecord;
5+
import org.springframework.beans.factory.annotation.Autowired;
6+
import org.springframework.stereotype.Service;
7+
import reactor.core.publisher.Mono;
8+
import reactor.kafka.sender.KafkaSender;
9+
import reactor.kafka.sender.SenderRecord;
10+
11+
@Service
12+
@Slf4j
13+
public class EventProducer {
14+
@Autowired
15+
private KafkaSender<String, String> sender;
16+
17+
public Mono<String> send(String topic, String message){
18+
return sender
19+
.send(Mono.just(SenderRecord.create(new ProducerRecord<>(topic,message),message)))
20+
.then()
21+
.thenReturn("OK");
22+
}
23+
24+
}
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
package com.hoangtien2k3.notificationservice.service;
22

3-
import com.hoangtien2k3.notificationservice.entity.EmailDetails;
3+
import com.hoangtien2k3.notificationservice.dto.EmailDetails;
44
import org.springframework.web.multipart.MultipartFile;
5+
import reactor.core.publisher.Mono;
56

67
public interface EmailService {
78
// To send a simple email
8-
String sendSimpleMail(EmailDetails details);
9+
Mono<String> sendSimpleMail(EmailDetails details);
910

1011
// To send an email with attachment
11-
String sendMailWithAttachment(EmailDetails details);
12+
Mono<String> sendMailWithAttachment(EmailDetails details);
1213

1314
// send email file
14-
String sendMail(MultipartFile[] file, String to, String[] cc, String subject, String body);
15+
Mono<String> sendMail(MultipartFile[] file, String to, String[] cc, String subject, String body);
1516
}

0 commit comments

Comments
 (0)