-
Notifications
You must be signed in to change notification settings - Fork 931
接收消息
rongtong edited this page Jan 22, 2021
·
4 revisions
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
注意:
请将上述示例配置中的
127.0.0.1:9876
替换成真实RocketMQ的NameServer地址与端口
编写代码
@SpringBootApplication
public class ConsumerApplication{
public static void main(String[] args){
SpringApplication.run(ConsumerApplication.class, args);
}
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-1", consumerGroup = "my-consumer_test-topic-1")
public class MyConsumer1 implements RocketMQListener<String>{
public void onMessage(String message) {
log.info("received message: {}", message);
}
}
@Slf4j
@Service
@RocketMQMessageListener(topic = "test-topic-2", consumerGroup = "my-consumer_test-topic-2")
public class MyConsumer2 implements RocketMQListener<OrderPaidEvent>{
public void onMessage(OrderPaidEvent orderPaidEvent) {
log.info("received orderPaidEvent: {}", orderPaidEvent);
}
}
}
从RocketMQ Spring 2.2.0开始,RocketMQ Srping支持Pull模式消费
修改application.properties
## application.properties
rocketmq.name-server=127.0.0.1:9876
rocketmq.consumer.group=my-group1
rocketmq.consumer.topic=test
编写代码
@SpringBootApplication
public class ConsumerApplication implements CommandLineRunner {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Resource(name = "extRocketMQTemplate")
private RocketMQTemplate extRocketMQTemplate;
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
//This is an example of pull consumer using rocketMQTemplate.
List<String> messages = rocketMQTemplate.receive(String.class);
System.out.printf("receive from rocketMQTemplate, messages=%s %n", messages);
//This is an example of pull consumer using extRocketMQTemplate.
messages = extRocketMQTemplate.receive(String.class);
System.out.printf("receive from extRocketMQTemplate, messages=%s %n", messages);
}
}