|
17 | 17 |
|
18 | 18 | import org.slf4j.Logger; |
19 | 19 | import org.slf4j.LoggerFactory; |
| 20 | +import reactor.core.publisher.Flux; |
| 21 | +import reactor.core.publisher.FluxSink; |
| 22 | +import reactor.core.publisher.Mono; |
20 | 23 | import org.springframework.cloud.stream.annotation.EnableBinding; |
21 | 24 | import org.springframework.cloud.stream.annotation.StreamListener; |
22 | | -import org.springframework.cloud.stream.messaging.Sink; |
23 | | -import org.springframework.messaging.simp.SimpMessagingTemplate; |
24 | 25 | import org.springframework.stereotype.Service; |
| 26 | +import org.springframework.web.reactive.socket.WebSocketSession; |
| 27 | + |
| 28 | +import com.fasterxml.jackson.core.JsonProcessingException; |
| 29 | +import com.fasterxml.jackson.databind.ObjectMapper; |
25 | 30 |
|
26 | 31 | /** |
27 | 32 | * @author Greg Turnquist |
28 | 33 | */ |
29 | 34 | // tag::code[] |
30 | 35 | @Service |
31 | | -@EnableBinding(Sink.class) |
32 | | -public class CommentService { |
| 36 | +@EnableBinding(ChatServiceStreams.class) |
| 37 | +public class CommentService extends AuthorizedWebSocketHandler { |
33 | 38 |
|
34 | | - private final static Logger log = LoggerFactory.getLogger(CommentService.class); |
| 39 | + private final static Logger log = |
| 40 | + LoggerFactory.getLogger(CommentService.class); |
35 | 41 |
|
36 | | - private final SimpMessagingTemplate simpMessagingTemplate; |
| 42 | + private ObjectMapper mapper; |
| 43 | + private Flux<Comment> flux; |
| 44 | + private FluxSink<Comment> webSocketCommentSink; |
37 | 45 |
|
38 | | - public CommentService(SimpMessagingTemplate simpMessagingTemplate) { |
39 | | - this.simpMessagingTemplate = simpMessagingTemplate; |
| 46 | + CommentService(ObjectMapper mapper) { |
| 47 | + this.mapper = mapper; |
| 48 | + this.flux = Flux.<Comment>create( |
| 49 | + emitter -> this.webSocketCommentSink = emitter, |
| 50 | + FluxSink.OverflowStrategy.IGNORE) |
| 51 | + .publish() |
| 52 | + .autoConnect(); |
40 | 53 | } |
41 | 54 |
|
42 | | - @StreamListener(Sink.INPUT) |
| 55 | + @StreamListener(ChatServiceStreams.NEW_COMMENTS) |
43 | 56 | public void broadcast(Comment comment) { |
44 | | - log.info("Publishing " + comment.toString() + " to websocket..."); |
45 | | - simpMessagingTemplate.convertAndSend("/topic/comments.new", comment); |
| 57 | + if (webSocketCommentSink != null) { |
| 58 | + log.info("Publishing " + comment.toString() + |
| 59 | + " to websocket..."); |
| 60 | + webSocketCommentSink.next(comment); |
| 61 | + } |
46 | 62 | } |
47 | 63 |
|
| 64 | + @Override |
| 65 | + public Mono<Void> doHandle(WebSocketSession session) { |
| 66 | + return session.send(this.flux |
| 67 | + .map(comment -> { |
| 68 | + try { |
| 69 | + return mapper.writeValueAsString(comment); |
| 70 | + } catch (JsonProcessingException e) { |
| 71 | + throw new RuntimeException(e); |
| 72 | + } |
| 73 | + }) |
| 74 | + .log("encode-as-json") |
| 75 | + .map(session::textMessage) |
| 76 | + .log("wrap-as-websocket-message")) |
| 77 | + .log("publish-to-websocket"); |
| 78 | + } |
48 | 79 | } |
49 | 80 | // end::code[] |
0 commit comments