Reactive Websocket Client with Spring

3 minute read Published:

Demonstrating the Spring WebSocket Client

This Demo

This demo client will connect and receive events from any URL that emits an open websocket stream. We have an existing server to stand up that can supply the socket events. A reactive client means that we can respond to backpressure, and weild the Observer pattern to our client connections.

The Client (SANS web)

We can use our favorite Spring Application Initializr start dot spring dot io to generate the application. To get started, any project must have the following dependencies:

dependencies_for_webflux.

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

For this client, we will use the ReactorNettyWebSocketClient request and handle our WebSocket connection. The execute(…​) method lets us pass in a WebSocketHandler to manage the session. Connecting to to the session’s receive() stream will let us handle server source events, and produce our custom stream logic.

Note

Be aware though, browsers do not support custom headers in ws:// websocket connections.

websocket_client.

    Mono<Void> wsConnectNetty(int id) {
        URI uri = getURI("ws://localhost:8080/ws/feed");
        return new ReactorNettyWebSocketClient()
                        .execute(uri, session -> session
                        .receive()
                        .map(WebSocketMessage::getPayloadAsText)
                        .take(MAX_EVENTS)
                        .doOnNext(txt -> log.info(id + ".IN: " + txt))
                        .filter(txt -> is_prime(Long.valueOf(txt)))
                        .flatMap(txt -> session.send(Mono.just(session.textMessage(txt))))
                        .doOnSubscribe(subscriber -> log.info(id + ".OPEN"))
                        .doFinally(signalType -> log.info(id + ".CLOSE"))
                        .then() // only handle stream signals (term, complete, etc..)
                )
    }

Now we can setup the subscription method. Since our client execution returnd a type of Mono<Void>, we will compose a stream to emit many client instances that will survive on a single threaded event loop. Using CountDownLatch, we can avoid using the deadly block() operator in Flux.

a_reactive_ws_stream.

    @Bean
    ApplicationRunner appRunner() {
        return args -> {
            final CountDownLatch latch = new CountDownLatch(5);
            Flux.merge(
                    Flux.range(0, 5)
                            .subscribeOn(Schedulers.single())
                            .map(this::wsConnectNetty)
                            .flatMap(sp -> sp.doOnTerminate(latch::countDown))
                            .parallel()
            )
                    .subscribe();
            latch.await(20, TimeUnit.SECONDS);
        };
    }

Finally, to make this application runnable, there is a fancy @SpringBootApplication code :)

ws_client_main.

@SpringBootApplication
@Slf4j
public class SocketClientApp {
    // rest of the code goes here

    public static void main(String[] args) throws Exception {
        SpringApplication app = new SpringApplication(SocketClientApp.class);
                app.setWebApplicationType(WebApplicationType.NONE);
                app.run(args);
    }
}

The spring.main.web-environment property variable has been deprecated in Spring 5.0. The recommended method is now to programatically setup WebApplicationType at initialization. I removed the application.properties for one property (and kept configuration condensed) :)

execute.

$ mvn spring-boot:run

Execute this application and you see the following output on the client side.

output.

INFO 5292 --- [           main] c.example.socketclient.SocketClientApp   : Started SocketClientApp in 1.735 seconds (JVM running for 2.464)
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp   : 1.OPEN
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp   : 0.OPEN
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp   : 0.IN: 0
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp   : 1.IN: 0
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp   : 0.IN: 1
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp   : 1.IN: 1
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp   : 1.IN: 2
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp   : 0.IN: 2
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp   : 0.IN: 3
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp   : 1.IN: 3
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp   : 0.IN: 4
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp   : 1.IN: 4
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp   : 0.CLOSE
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp   : 1.CLOSE
INFO 5292 --- [       Thread-6] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6e75aa0d: startup date []; root of context hierarchy

Server Side should see similar output:

output_server.

INFO 5260 --- [ctor-http-nio-4] c.e.socketserver.WebSocketServerApp      : Started.6fa56d2b
INFO 5260 --- [ctor-http-nio-1] c.e.socketserver.WebSocketServerApp      : Started.35994779
INFO 5260 --- [ctor-http-nio-4] c.e.socketserver.WebSocketServerApp      : Prime#: 2
INFO 5260 --- [ctor-http-nio-1] c.e.socketserver.WebSocketServerApp      : Prime#: 2
INFO 5260 --- [ctor-http-nio-1] c.e.socketserver.WebSocketServerApp      : Prime#: 3
INFO 5260 --- [ctor-http-nio-4] c.e.socketserver.WebSocketServerApp      : Prime#: 3
INFO 5260 --- [ctor-http-nio-1] c.e.socketserver.WebSocketServerApp      : Complete.35994779
INFO 5260 --- [ctor-http-nio-4] c.e.socketserver.WebSocketServerApp      : Complete.6fa56d2b

Notes

Hope you find this demonstration useful. Send your feedback to me via email twitter, or Carrier Pigeon.

References/Readling List