|
58 | 58 | import org.springframework.context.ApplicationContextAware;
|
59 | 59 | import org.springframework.context.SmartLifecycle;
|
60 | 60 | import org.springframework.core.MethodParameter;
|
| 61 | +import org.springframework.core.ResolvableType; |
61 | 62 | import org.springframework.messaging.Message;
|
62 | 63 | import org.springframework.messaging.MessageHeaders;
|
63 | 64 | import org.springframework.messaging.converter.MessageConversionException;
|
|
66 | 67 | import org.springframework.messaging.support.MessageBuilder;
|
67 | 68 | import org.springframework.util.Assert;
|
68 | 69 | import org.springframework.util.MimeTypeUtils;
|
| 70 | +import org.springframework.util.ReflectionUtils; |
69 | 71 |
|
70 | 72 | @SuppressWarnings("WeakerAccess")
|
71 | 73 | public class DefaultRocketMQListenerContainer implements InitializingBean,
|
@@ -553,61 +555,36 @@ private Object doConvertMessage(MessageExt messageExt) {
|
553 | 555 |
|
554 | 556 | private MethodParameter getMethodParameter() {
|
555 | 557 | Class<?> targetClass;
|
| 558 | + Class<?> consumerInterface; |
556 | 559 | if (rocketMQListener != null) {
|
557 | 560 | targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
|
| 561 | + consumerInterface = RocketMQListener.class; |
558 | 562 | } else {
|
559 | 563 | targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
|
| 564 | + consumerInterface = RocketMQReplyListener.class; |
560 | 565 | }
|
561 |
| - Type messageType = this.getMessageType(); |
562 |
| - Class clazz = null; |
563 |
| - if (messageType instanceof ParameterizedType && messageConverter instanceof SmartMessageConverter) { |
564 |
| - clazz = (Class) ((ParameterizedType) messageType).getRawType(); |
565 |
| - } else if (messageType instanceof Class) { |
566 |
| - clazz = (Class) messageType; |
567 |
| - } else { |
568 |
| - throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported"); |
569 |
| - } |
570 |
| - try { |
571 |
| - final Method method = targetClass.getMethod("onMessage", clazz); |
572 |
| - return new MethodParameter(method, 0); |
573 |
| - } catch (NoSuchMethodException e) { |
574 |
| - e.printStackTrace(); |
575 |
| - throw new RuntimeException("parameterType:" + messageType + " of onMessage method is not supported"); |
576 |
| - } |
| 566 | + ResolvableType resolvableType = ResolvableType.forClass(targetClass).as(consumerInterface); |
| 567 | + Class<?> methodParameterType = resolvableType.getGeneric().resolve(); |
| 568 | + Method onMessage = ReflectionUtils.findMethod(targetClass, "onMessage", methodParameterType); |
| 569 | + return MethodParameter.forExecutable(onMessage, 0); |
577 | 570 | }
|
578 | 571 |
|
| 572 | + |
579 | 573 | private Type getMessageType() {
|
580 | 574 | Class<?> targetClass;
|
| 575 | + Class<?> consumerInterface; |
581 | 576 | if (rocketMQListener != null) {
|
582 | 577 | targetClass = AopProxyUtils.ultimateTargetClass(rocketMQListener);
|
| 578 | + consumerInterface = RocketMQListener.class; |
583 | 579 | } else {
|
584 | 580 | targetClass = AopProxyUtils.ultimateTargetClass(rocketMQReplyListener);
|
| 581 | + consumerInterface = RocketMQReplyListener.class; |
585 | 582 | }
|
586 |
| - Type matchedGenericInterface = null; |
587 |
| - while (Objects.nonNull(targetClass)) { |
588 |
| - Type[] interfaces = targetClass.getGenericInterfaces(); |
589 |
| - if (Objects.nonNull(interfaces)) { |
590 |
| - for (Type type : interfaces) { |
591 |
| - if (type instanceof ParameterizedType && |
592 |
| - (Objects.equals(((ParameterizedType) type).getRawType(), RocketMQListener.class) || Objects.equals(((ParameterizedType) type).getRawType(), RocketMQReplyListener.class))) { |
593 |
| - matchedGenericInterface = type; |
594 |
| - break; |
595 |
| - } |
596 |
| - } |
597 |
| - } |
598 |
| - targetClass = targetClass.getSuperclass(); |
599 |
| - } |
600 |
| - if (Objects.isNull(matchedGenericInterface)) { |
601 |
| - return Object.class; |
602 |
| - } |
603 |
| - |
604 |
| - Type[] actualTypeArguments = ((ParameterizedType) matchedGenericInterface).getActualTypeArguments(); |
605 |
| - if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) { |
606 |
| - return actualTypeArguments[0]; |
607 |
| - } |
608 |
| - return Object.class; |
| 583 | + ResolvableType resolvableType = ResolvableType.forClass(targetClass).as(consumerInterface); |
| 584 | + return resolvableType.getGeneric().getType(); |
609 | 585 | }
|
610 | 586 |
|
| 587 | + |
611 | 588 | private void initRocketMQPushConsumer() throws MQClientException {
|
612 | 589 | if (rocketMQListener == null && rocketMQReplyListener == null) {
|
613 | 590 | throw new IllegalArgumentException("Property 'rocketMQListener' or 'rocketMQReplyListener' is required");
|
|
0 commit comments