1+ package com .didispace .stream ;
2+
3+ import lombok .extern .slf4j .Slf4j ;
4+ import org .springframework .beans .factory .annotation .Autowired ;
5+ import org .springframework .boot .SpringApplication ;
6+ import org .springframework .boot .autoconfigure .SpringBootApplication ;
7+ import org .springframework .cloud .stream .annotation .EnableBinding ;
8+ import org .springframework .cloud .stream .annotation .Input ;
9+ import org .springframework .cloud .stream .annotation .Output ;
10+ import org .springframework .cloud .stream .annotation .StreamListener ;
11+ import org .springframework .integration .support .MessageBuilder ;
12+ import org .springframework .messaging .MessageChannel ;
13+ import org .springframework .messaging .SubscribableChannel ;
14+ import org .springframework .stereotype .Component ;
15+ import org .springframework .web .bind .annotation .GetMapping ;
16+ import org .springframework .web .bind .annotation .RequestParam ;
17+ import org .springframework .web .bind .annotation .RestController ;
18+
19+
20+ @ EnableBinding (TestApplication .TestTopic .class )
21+ @ SpringBootApplication
22+ public class TestApplication {
23+
24+ public static void main (String [] args ) {
25+ SpringApplication .run (TestApplication .class , args );
26+ }
27+
28+ @ Slf4j
29+ @ RestController
30+ static class TestController {
31+
32+ @ Autowired
33+ private TestTopic testTopic ;
34+
35+ /**
36+ * 消息生产接口
37+ *
38+ * @param message
39+ * @return
40+ */
41+ @ GetMapping ("/sendMessage" )
42+ public String messageWithMQ (@ RequestParam String message ) {
43+ log .info ("Send: " + message );
44+ testTopic .output ().send (MessageBuilder .withPayload (message ).setHeader ("x-delay" , 5000 ).build ());
45+ return "ok" ;
46+ }
47+
48+ }
49+
50+ /**
51+ * 消息消费逻辑
52+ */
53+ @ Slf4j
54+ @ Component
55+ static class TestListener {
56+
57+ @ StreamListener (TestTopic .INPUT )
58+ public void receive (String payload ) {
59+ log .info ("Received: " + payload );
60+ }
61+
62+ }
63+
64+ interface TestTopic {
65+
66+ String OUTPUT = "example-topic-output" ;
67+ String INPUT = "example-topic-input" ;
68+
69+ @ Output (OUTPUT )
70+ MessageChannel output ();
71+
72+ @ Input (INPUT )
73+ SubscribableChannel input ();
74+
75+ }
76+
77+ }
0 commit comments