This Demo
This demo will stand up Spring application service that exposes a WebSocket service through the Spring 5’s reactive WebSockets API. Typically, for inter-service comms, use a more traditional messaging system to deliver and expect events between services for example - messaging brokers such as Kafka, RabbitMq, etc.. WebSockets is an IETF standard and W3C API that provides a convenient way to issue and consume data streams across the Internet. WebSockets is mainly used at edge consumption where data is being requested outside of your Service boundaries.
#A sample graph of services and then of a mix of clients that connect
WebSocket Server
Spring’s WebSocket API is new in Spring 5, and enables service construction in a Functional Reactive Programing method. We will introduce a single WebSocket handler, and show whats needed to get started using Spring’s reactive WebSocketService
support.
To get started, wire in a WebSocketHandlerAdapter
to handle our web socket handshake, upgrade, and other connection details.
socket_handler_adapter.
@Bean
WebSocketHandlerAdapter socketHandlerAdapter() {
return new WebSocketHandlerAdapter();
}
Sending events down to a client
Next, fill in the WebSocket session handler. The simplest way to do this to use the functional interface for WebSocketHandler
and implement it’s handle()
method to manage our stream response. In this, we will use the Flux.interval
operator and push it’s output to the session for the client to consume. Simply transform/map the integer output as a String and send it to the session.textMessage(String)
or one of it’s overrides depending on content for response.
socket_session_handler.
WebSocketHandler webSocketHandler() {
return session ->
session.send(
Flux.interval(Duration.ofSeconds(1))
.map(n -> n.toString())
.map(session::textMessage)
).and(session.receive()
.map(WebSocketMessage::getPayloadAsText)
.doOnNext(msg -> log.info("Prime#: " + msg))
.doOnSubscribe(sub -> log.info("Started." + session.getId()))
.doFinally(sig -> log.info("Complete." + session.getId()))
);
}
Composing additional session-oriented operations
WebSocket Sessions are wraped within the reactive Mono
type that supports the the and()
operator for chaining other publisher’s termination signal. We expect a client to respond with numbers it finds as prime. So, compose the extra step with the session.receive()
publisher that emits, and logs client-origin messages to console. Additionally, we’ll make use of the do...()
handlers to react to session events so we can do things like clean up resources after the connection is over.
WEB Friendly - URL Paths, CORS
Create an instance of SimpleUrlHandlerMapping
, and add Mappings for URL to WebSocketHandler exchanges. Thus, a similar configuration option exists for CORS. To accomplish this, compose a simple default map for all URLs *
given; this is a simplistic program. CORS configurations are highly sensitive to the service implementor’s nature, and come in specific variances that are outside the scope of this document.
uri_handler_mapping.
@Bean
HandlerMapping webSocketURLMapping() {
SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping();
simpleUrlHandlerMapping.setUrlMap(
Collections.singletonMap("/ws/feed", webSocketHandler()));
simpleUrlHandlerMapping.setCorsConfigurations(
Collections.singletonMap("*", new CorsConfiguration().applyPermitDefaultValues()));
return simpleUrlHandlerMapping;
}
Finally, we will execute this server app:
server_app_main.
public static void main(String[] args) {
SpringApplication.run(WebSocketConfiguration.class, args);
}
Start the application:
$ mvn clean spring-boot:run
...
INFO 10671 --- [ main] o.s.b.web.embedded.netty.NettyWebServer : Netty started on port(s): 8080
Now we are ready to implement the client, and talk to this service.
References/Readling List
Spring WebFlux guide
Articles
W3C Proposals
Theory