伊成博客
伊成博客

org.springframework.amqp.support.converter.MessageConversionExcep异常

背景

最近,在用Spring Boot+rabbitMQ整合的过程中,测试生产者发送单条字符串消息到消费者消费这个过程没出现任何问题。

可是在实际应用中,往往在生产中不可能只生产字符串的,更多时候需要生产一个对象发送到队列,消费者从队列里面获取一个对象进行消费。

准备工作

先定义一个对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Order implements Serializable {

private String id;
private String name;

public Order() {
}
public Order(String id, String name) {
super();
this.id = id;
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

从代码中,可以发现定义对象的时候已经做了序列化了。

生产者代码:

1
2
3
4
5
6
7
8
//发送消息方法调用: 构建自定义对象消息
public void sendOrder(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("0987654321");
rabbitTemplate.convertAndSend("exchange-2", "springboot.def", order, correlationData);
}

测试生产者发送代码:

1
2
3
4
5
6
7
@Test
public void testSender2() throws Exception {
Order order = new Order("001", "第一个订单");
rabbitSender.sendOrder(order);
//防止资源提前关闭,ConfirmCallback异步回调失败
Thread.sleep(2000);
}

消费者代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
durable="${spring.rabbitmq.listener.order.queue.durable}"),
exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
durable="${spring.rabbitmq.listener.order.exchange.durable}",
type= "${spring.rabbitmq.listener.order.exchange.type}",
ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
key = "${spring.rabbitmq.listener.order.key}"
)
)
@RabbitHandler
public void onOrderMessage(@Payload Order order,
Channel channel,
@Headers Map<String, Object> headers) throws Exception {
System.err.println("--------------------------------------");
System.err.println("消费端order: " + order.getId());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
//手工ACK
channel.basicAck(deliveryTag, false);
}

当生产者有数据发送到队列的时候,消费者这端代码报了一个异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.rabbit.producer.RabbitProducer.receiver.OrderRecevier.onOrderMessage(com.rabbit.producer.RabbitProducer.entity.Order,com.rabbitmq.client.Channel,java.util.Map<java.lang.String, java.lang.Object>) throws java.lang.Exception]
Bean [com.rabbit.producer.RabbitProducer.receiver.OrderRecevier@600b7b3d]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:185) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:120) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1414) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1337) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1324) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1303) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:817) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:801) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:77) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1042) [spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.rabbit.Springboot4RabbitMQ.entity.Order] to [com.rabbit.producer.RabbitProducer.entity.Order] for GenericMessage [payload=Order [id=RabbitMQTestId0002, name=HelloWorld, messageId=1538919928275$0836e0e7-4976-457e-92fb-44b937255855], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=order.ABC, amqp_receivedExchange=order-exchange, amqp_deliveryTag=1, amqp_consumerQueue=order-queue, amqp_redelivered=false, id=0ffe4dcd-048f-f274-bca9-5550f9ecebb1, amqp_consumerTag=amq.ctag-82Oo3kl1I138E2pvVRsczA, contentType=application/x-java-serialized-object, timestamp=1538919929083}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE]
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) ~[spring-messaging-5.0.8.RELEASE.jar:5.0.8.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:51) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:182) ~[spring-rabbit-2.0.5.RELEASE.jar:2.0.5.RELEASE]
... 10 common frames omitted

从异常信息中可以看到是消费者对消息反序列化的时候失败了。虽然两个项目中的Order类是完全一样的,但在进行反序列化的时候还是失败了!

于是找了一番解决方案··

第一种
消费者引用生产者项目中的消息体即Order.java

在消费者项目上【右键】->【Bulid Path】->【Configure Build Path】->【Projects】->【Add】 选择生产者项目,然后消费者项目就可以引用生产者项目中类,这样完全保证了两个项目中JavaBean是一致的,所以能解决反序列失败的问题。

第二种
生产者在发送消息前将消息体转换为JSONObject,消费者以JSONObject接收消息,再转换为对应的实体类。

生产者的代码

1
2
3
4
5
6
7
8
//发送消息方法调用: 构建自定义对象消息
public void sendOrder(Order order) throws Exception {
rabbitTemplate.setConfirmCallback(confirmCallback);
rabbitTemplate.setReturnCallback(returnCallback);
//id + 时间戳 全局唯一
CorrelationData correlationData = new CorrelationData("0987654321");
rabbitTemplate.convertAndSend("exchange-2", "springboot.def", FastJsonConvertUtil.toJsonObject(order), correlationData);
}

消费者的代码

1
2
3
4
5
6
7
8
9
10
11
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable = "${spring.rabbitmq.listener.order.queue.durable}"), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable = "${spring.rabbitmq.listener.order.exchange.durable}", type = "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationeExceptions}"), key = "${spring.rabbitmq.listener.order.key}"))
public void onOrderMessage(@Payload JSONObject object, Channel channel, @Headers Map<String, Object> headers)
throws Exception
{
System.err.println("----------------------------------");
Order order = JsonConvertUtils.convertJSONToObject(object);
System.err.println("消费端Order: " + order.toString());
Long deliveryTag = (Long)headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);

}

支付宝打赏 微信打赏

如果本文对你有所帮助,请打赏 1元就足够感动我 :)