Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

作者:网友投稿 时间:2019-01-04 21:55

字号

有的时候,我们对于同一通道中的消息处理,会通过判断头信息或者消息内容来做一些差异化处理,比如:可能在消息头信息中带入消息版本号,然后通过if判断来执行不同的处理逻辑,其代码结构可能是这样的:

Spring Cloud Stream同一通道根据消息内容分发不同的消费逻辑

@StreamListener(value = TestTopic.INPUT) 

public void receiveV1(String payload, @Header("version") String version) { 

    if("1.0".equals(version)) { 

        // Version 1.0 

    } 

    if("2.0".equals(version)) { 

        // Version 2.0 

    } 

那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,可以用来优化这样的处理结构。

动手试试

下面通过编写一个简单的例子来具体体会一下这个属性的用法:

@EnableBinding(TestApplication.TestTopic.class) 

@SpringBootApplication 

public class TestApplication { 

 

    public static void main(String[] args) { 

        SpringApplication.run(TestApplication.class, args); 

    } 

 

    @RestController 

    static class TestController { 

 

        @Autowired 

        private TestTopic testTopic; 

 

        /** 

         * 消息生产接口 

         * 

         * @param message 

         * @return 

         */ 

        @GetMapping("/sendMessage"

        public String messageWithMQ(@RequestParam String message) { 

            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version""1.0").build()); 

            testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version""2.0").build()); 

            return "ok"

        } 

 

    } 

 

    /** 

     * 消息消费逻辑 

     */ 

    @Slf4j 

    @Component 

    static class TestListener { 

 

        @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'"

        public void receiveV1(String payload, @Header("version") String version) { 

责任编辑:CQITer新闻报料:400-888-8888   本站原创,未经授权不得转载
继续阅读
热新闻
推荐
关于我们联系我们免责声明隐私政策 友情链接