This Demo
This demo client will connect and receive events from any URL that emits
an open websocket stream. We have an existing
server
to stand up that can supply the socket events. A reactive client means
that we can respond to backpressure, and weild the Observer
pattern to
our client connections.
The Client (SANS web)
We can use our favorite Spring Application Initializr start dot spring dot io to generate the application. To get started, any project must have the following dependencies:
dependencies_for_webflux.
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
For this client, we will use the
ReactorNettyWebSocketClient
request and handle our WebSocket connection. The execute(…)
method
lets us pass in a
WebSocketHandler
to manage the session. Connecting to to the session’s receive()
stream
will let us handle server source events, and produce our custom stream
logic.
Note
Be aware though, browsers do not support custom headers in ws:// websocket connections.
websocket_client.
Mono<Void> wsConnectNetty(int id) {
URI uri = getURI("ws://localhost:8080/ws/feed");
return new ReactorNettyWebSocketClient()
.execute(uri, session -> session
.receive()
.map(WebSocketMessage::getPayloadAsText)
.take(MAX_EVENTS)
.doOnNext(txt -> log.info(id + ".IN: " + txt))
.filter(txt -> is_prime(Long.valueOf(txt)))
.flatMap(txt -> session.send(Mono.just(session.textMessage(txt))))
.doOnSubscribe(subscriber -> log.info(id + ".OPEN"))
.doFinally(signalType -> log.info(id + ".CLOSE"))
.then() // only handle stream signals (term, complete, etc..)
)
}
Now we can setup the subscription method. Since our client execution
returnd a type of Mono<Void>
, we will compose a stream to emit many
client instances that will survive on a single threaded event loop.
Using
CountDownLatch
,
we can avoid using the deadly
block()
operator in Flux.
a_reactive_ws_stream.
@Bean
ApplicationRunner appRunner() {
return args -> {
final CountDownLatch latch = new CountDownLatch(5);
Flux.merge(
Flux.range(0, 5)
.subscribeOn(Schedulers.single())
.map(this::wsConnectNetty)
.flatMap(sp -> sp.doOnTerminate(latch::countDown))
.parallel()
)
.subscribe();
latch.await(20, TimeUnit.SECONDS);
};
}
Finally, to make this application runnable, there is a fancy
@SpringBootApplication
code :)
ws_client_main.
@SpringBootApplication
@Slf4j
public class SocketClientApp {
// rest of the code goes here
public static void main(String[] args) throws Exception {
SpringApplication app = new SpringApplication(SocketClientApp.class);
app.setWebApplicationType(WebApplicationType.NONE);
app.run(args);
}
}
The spring.main.web-environment
property variable has been deprecated
in Spring 5.0. The recommended method is now to programatically setup
WebApplicationType
at initialization. I removed the
application.properties
for one property (and kept configuration
condensed) :)
execute.
$ mvn spring-boot:run
Execute this application and you see the following output on the client side.
output.
INFO 5292 --- [ main] c.example.socketclient.SocketClientApp : Started SocketClientApp in 1.735 seconds (JVM running for 2.464)
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp : 1.OPEN
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp : 0.OPEN
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp : 0.IN: 0
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp : 1.IN: 0
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp : 0.IN: 1
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp : 1.IN: 1
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp : 1.IN: 2
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp : 0.IN: 2
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp : 0.IN: 3
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp : 1.IN: 3
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp : 0.IN: 4
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp : 1.IN: 4
INFO 5292 --- [ctor-http-nio-4] c.example.socketclient.SocketClientApp : 0.CLOSE
INFO 5292 --- [ctor-http-nio-2] c.example.socketclient.SocketClientApp : 1.CLOSE
INFO 5292 --- [ Thread-6] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@6e75aa0d: startup date []; root of context hierarchy
Server Side should see similar output:
output_server.
INFO 5260 --- [ctor-http-nio-4] c.e.socketserver.WebSocketServerApp : Started.6fa56d2b
INFO 5260 --- [ctor-http-nio-1] c.e.socketserver.WebSocketServerApp : Started.35994779
INFO 5260 --- [ctor-http-nio-4] c.e.socketserver.WebSocketServerApp : Prime#: 2
INFO 5260 --- [ctor-http-nio-1] c.e.socketserver.WebSocketServerApp : Prime#: 2
INFO 5260 --- [ctor-http-nio-1] c.e.socketserver.WebSocketServerApp : Prime#: 3
INFO 5260 --- [ctor-http-nio-4] c.e.socketserver.WebSocketServerApp : Prime#: 3
INFO 5260 --- [ctor-http-nio-1] c.e.socketserver.WebSocketServerApp : Complete.35994779
INFO 5260 --- [ctor-http-nio-4] c.e.socketserver.WebSocketServerApp : Complete.6fa56d2b
Notes
Hope you find this demonstration useful. Send your feedback to me via email twitter, or Carrier Pigeon.
References/Readling List
Spring WebFlux guide
Articles
- Tom Van den Bulck has a niece piece on reactive programming with Spring.
W3C Proposals
Theory