Spring Reactive WebSocket Cold Publisher

3 minute read Published:

Web-Socket that utilizes the Reactive Cold-Publisher pattern.
Table of Contents

This Demo

This demo will stand up Spring application service that exposes a WebSocket service through the Spring 5’s reactive WebSockets API. Typically, for inter-service comms, use a more traditional messaging system to deliver and expect events between services for example - messaging brokers such as Kafka, RabbitMq, etc.. WebSockets is an IETF standard and W3C API that provides a convenient way to issue and consume data streams across the Internet. WebSockets is mainly used at edge consumption where data is being requested outside of your Service boundaries.

#A sample graph of services and then of a mix of clients that connect

WebSocket Server

Spring’s WebSocket API is new in Spring 5, and enables service construction in a Functional Reactive Programing method. We will introduce a single WebSocket handler, and show whats needed to get started using Spring’s reactive WebSocketService support.

To get started, wire in a WebSocketHandlerAdapter to handle our web socket handshake, upgrade, and other connection details.

socket_handler_adapter.

    @Bean
    WebSocketHandlerAdapter socketHandlerAdapter() {
        return new WebSocketHandlerAdapter();
    }

Sending events down to a client

Next, fill in the WebSocket session handler. The simplest way to do this to use the functional interface for WebSocketHandler and implement it’s handle() method to manage our stream response. In this, we will use the Flux.interval operator and push it’s output to the session for the client to consume. Simply transform/map the integer output as a String and send it to the session.textMessage(String) or one of it’s overrides depending on content for response.

socket_session_handler.

    WebSocketHandler webSocketHandler() {
        return session ->
                session.send(
                        Flux.interval(Duration.ofSeconds(1))
                                .map(n -> n.toString())
                                .map(session::textMessage)
                ).and(session.receive()
                        .map(WebSocketMessage::getPayloadAsText)
                        .doOnNext(msg -> log.info("Prime#: " + msg))
                        .doOnSubscribe(sub -> log.info("Started." + session.getId()))
                        .doFinally(sig -> log.info("Complete." + session.getId()))
                );
    }

Composing additional session-oriented operations

WebSocket Sessions are wraped within the reactive Mono type that supports the the and() operator for chaining other publisher’s termination signal. We expect a client to respond with numbers it finds as prime. So, compose the extra step with the session.receive() publisher that emits, and logs client-origin messages to console. Additionally, we’ll make use of the do...() handlers to react to session events so we can do things like clean up resources after the connection is over.

WEB Friendly - URL Paths, CORS

Create an instance of SimpleUrlHandlerMapping, and add Mappings for URL to WebSocketHandler exchanges. Thus, a similar configuration option exists for CORS. To accomplish this, compose a simple default map for all URLs * given; this is a simplistic program. CORS configurations are highly sensitive to the service implementor’s nature, and come in specific variances that are outside the scope of this document.

uri_handler_mapping.

    @Bean
    HandlerMapping webSocketURLMapping() {
        SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping();
        simpleUrlHandlerMapping.setUrlMap(
                Collections.singletonMap("/ws/feed", webSocketHandler()));
        simpleUrlHandlerMapping.setCorsConfigurations(
                Collections.singletonMap("*", new CorsConfiguration().applyPermitDefaultValues()));
        return simpleUrlHandlerMapping;
    }

Finally, we will execute this server app:

server_app_main.

    public static void main(String[] args) {
        SpringApplication.run(WebSocketConfiguration.class, args);
    }

Start the application:

$ mvn clean spring-boot:run
...
INFO 10671 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080

Now we are ready to implement the client, and talk to this service.

References/Readling List