1515 */
1616package com .greglturnquist .learningspringboot .chat ;
1717
18- import org .slf4j .Logger ;
19- import org .slf4j .LoggerFactory ;
2018import reactor .core .publisher .Flux ;
2119import reactor .core .publisher .FluxSink ;
2220import reactor .core .publisher .Mono ;
2321
22+ import org .slf4j .Logger ;
23+ import org .slf4j .LoggerFactory ;
2424import org .springframework .cloud .stream .annotation .EnableBinding ;
2525import org .springframework .cloud .stream .annotation .StreamListener ;
2626import org .springframework .cloud .stream .messaging .Sink ;
2727import org .springframework .stereotype .Service ;
2828import org .springframework .web .reactive .socket .WebSocketHandler ;
2929import org .springframework .web .reactive .socket .WebSocketSession ;
30-
3130import com .fasterxml .jackson .core .JsonProcessingException ;
3231import com .fasterxml .jackson .databind .ObjectMapper ;
3332
3433/**
3534 * @author Greg Turnquist
3635 */
37- // tag::code[]
36+ // tag::code-1 []
3837@ Service
3938@ EnableBinding (Sink .class )
4039public class CommentService implements WebSocketHandler {
4140
4241 private final static Logger log =
4342 LoggerFactory .getLogger (CommentService .class );
43+ // end::code-1[]
4444
45+ // tag::code-2[]
4546 private ObjectMapper mapper ;
4647 private Flux <Comment > flux ;
4748 private FluxSink <Comment > webSocketCommentSink ;
@@ -54,7 +55,9 @@ public class CommentService implements WebSocketHandler {
5455 .publish ()
5556 .autoConnect ();
5657 }
58+ // end::code-2[]
5759
60+ // tag::broadcast[]
5861 @ StreamListener (Sink .INPUT )
5962 public void broadcast (Comment comment ) {
6063 if (webSocketCommentSink != null ) {
@@ -63,7 +66,9 @@ public void broadcast(Comment comment) {
6366 webSocketCommentSink .next (comment );
6467 }
6568 }
69+ // end::broadcast[]
6670
71+ // tag::handle[]
6772 @ Override
6873 public Mono <Void > handle (WebSocketSession session ) {
6974 return session .send (this .flux
@@ -79,6 +84,7 @@ public Mono<Void> handle(WebSocketSession session) {
7984 .log ("wrap-as-websocket-message" ))
8085 .log ("publish-to-websocket" );
8186 }
87+ // end::handle[]
8288
8389}
8490// end::code[]
0 commit comments