header

Spring Webflux WebSockets

WebFlux contains support for reactive HTTP and WebSocket clients. WebSockets allows communication in both directions and this could happen simultaneusly, therefore WebSockets are the best way to deliver real time data bi-direactional using both text or binary content. This time we will review how to build a WebFlux application using WebSockets from both client and server. If you want to know more about how to create Spring Webflux please go to my previous post getting started with Spring Webflux here. Then, let’s create a new Spring Boot project with Webflux and Lombok as dependencies:

spring init --dependencies=webflux,lombok --build=gradle --language=java webflux-websocket-workshop

Here is the complete build.gradle file generated:

plugins {
	id 'org.springframework.boot' version '2.2.4.RELEASE'
	id 'io.spring.dependency-management' version '1.0.9.RELEASE'
	id 'java'
}

group = 'com.jos.dem.webflux.websocket'
version = '1.0.0-SNAPSHOT'
sourceCompatibility = '12'

configurations {
	compileOnly {
		extendsFrom annotationProcessor
	}
}

repositories {
	mavenCentral()
}

dependencies {
	implementation 'org.springframework.boot:spring-boot-starter-webflux'
	compileOnly 'org.projectlombok:lombok'
	annotationProcessor 'org.projectlombok:lombok'
	testImplementation('org.springframework.boot:spring-boot-starter-test') {
		exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
	}
	testImplementation 'io.projectreactor:reactor-test'
}

test {
	useJUnitPlatform()
}

In order to implement WebSockets we need to create a WebSocketHandlerAdpater so that we can map each WebSocketHandler we are planning to use.

package com.jos.dem.webflux.websocket;

import lombok.RequiredArgsConstructor;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.Ordered;
import org.springframework.web.reactive.HandlerMapping;
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;

import java.util.HashMap;
import java.util.Map;

@SpringBootApplication
@RequiredArgsConstructor
public class WebsocketApplication {

  private final WebSocketHandler webSocketHandler;

  public static void main(String[] args) {
    SpringApplication.run(WebsocketApplication.class, args);
  }

  @Bean
  public HandlerMapping handlerMapping() {
    Map<String, WebSocketHandler> map = new HashMap<>();
    map.put("/channel", webSocketHandler);

    SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
    mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
    mapping.setUrlMap(map);
    return mapping;
  }

  @Bean
  public WebSocketHandlerAdapter handlerAdapter() {
    return new WebSocketHandlerAdapter();
  }
}

And here is our ReactiveWebSocketHandler which is responsible of managing our WebSocket session.

package com.jos.dem.webflux.websocket.handler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jos.dem.webflux.websocket.model.Event;
import com.jos.dem.webflux.websocket.util.MessageGenerator;
import java.time.Duration;
import java.time.Instant;
import javax.annotation.PostConstruct;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Component
@RequiredArgsConstructor
public class ReactiveWebSocketHandler implements WebSocketHandler {

  private Flux<String> intervalFlux;
  private final ObjectMapper mapper = new ObjectMapper();
  private final MessageGenerator messageGenerator;

  @PostConstruct
  private void setup(){
    intervalFlux =
        Flux.interval(Duration.ofSeconds(1)).map(it -> getEvent());
  }

  @Override
  public Mono<Void> handle(WebSocketSession session) {
    return session
        .send(intervalFlux.map(session::textMessage))
        .and(session.receive().map(WebSocketMessage::getPayloadAsText).log());
  }

  private String getEvent(){
    JsonNode node = mapper.valueToTree(new Event(messageGenerator.generate(), Instant.now()));
    return node.toString();
  }
}

Where:

  • Flux.interval Simulate data streaming every second
  • handle method is used here to send several texts as WebSocketMessage and receive a payload text back
  • messageGenerator Generates a random message

Here is our message generator class

package com.jos.dem.webflux.websocket.util;

import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;
import java.util.Random;

@Component
public class MessageGenerator {

  private List<String> messages =
      Arrays.asList("Bonjour", "Hola", "Zdravstvuyte", "Salve", "Guten Tag", "Hello");

  private final Random random = new Random(messages.size());

  public String generate(){
      return messages.get(random.nextInt(messages.size()));
  }
}

So, now if you execute our Spring Boot application:

gradle bootRun

And our client application which we will see in a moment, you should see this output from server side:

2020-02-27 15:16:10.905  INFO 33056 --- [           main] c.j.d.w.websocket.WebsocketApplication   : Started WebsocketApplication in 0.925 seconds (JVM running for 1.135)
2020-02-27 15:16:18.895  INFO 33056 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onSubscribe(FluxMap.MapSubscriber)
2020-02-27 15:16:18.897  INFO 33056 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : request(unbounded)
2020-02-27 15:16:18.944  INFO 33056 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"start","timestamp":{"nano":887893000,"epochSecond":1582834578}})

WebSockets Client Side

Now, let’s create a WebSocket client so that we will be able to connect to the server and exchange messages.

package com.jos.dem.webflux.websocket.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;
import org.springframework.web.reactive.socket.client.WebSocketClient;

@Configuration
public class ReactiveWebSocketConfig {

  @Bean
  WebSocketClient webSocketClient() {
    return new ReactorNettyWebSocketClient();
  }
}

That’s it, we are returning a bean as ReactorNettyWebSocketClient which is a WebSocketClient implementation. Here is our ReactiveWebSocketHandler

package com.jos.dem.webflux.websocket.handler;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.jos.dem.webflux.websocket.model.Event;
import java.time.Instant;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.WebSocketMessage;
import org.springframework.web.reactive.socket.WebSocketSession;
import reactor.core.publisher.Mono;

@Component
public class ReactiveWebSocketHandler implements WebSocketHandler {

  private final ObjectMapper mapper = new ObjectMapper();

  @Override
  public Mono<Void> handle(WebSocketSession session) {
    return session
        .send(Mono.just(session.textMessage(getEvent())))
        .and(session.receive().map(WebSocketMessage::getPayloadAsText).log());
  }

  private String getEvent(){
    JsonNode node = mapper.valueToTree(new Event("start", Instant.now()));
    return node.toString();
  }
}

Where:

  • handle method is used here to send a text as WebSocketMessage and receive several payloads as text back
  • getEvent method formats an event bean as Json

Finally, we have our WebSocketClientApplication to start the conversation:

package com.jos.dem.webflux.websocket;

import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.reactive.socket.WebSocketHandler;
import org.springframework.web.reactive.socket.client.WebSocketClient;
import reactor.core.publisher.Mono;

@SpringBootApplication
public class WebsocketClientApplication {

  private Logger log = LoggerFactory.getLogger(this.getClass());

  public static void main(String[] args) {
    SpringApplication.run(WebsocketClientApplication.class, args);
  }

  @Bean
  CommandLineRunner run(WebSocketClient client, WebSocketHandler webSocketHandler) {
    return args -> {
      Mono<Void> subscriber = client.execute(URI.create("ws://localhost:8080/channel"), webSocketHandler);

      subscriber.subscribe(
          content -> log.info("content"),
          error -> log.error("Error at receiving events: {}", error),
          () -> log.info("complete"));
      };
  }
}

So, now if you execute our Spring Boot application:

gradle bootRun

You should see this output from client side:

2020-02-27 15:16:18.702  INFO 33092 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8081
2020-02-27 15:16:18.705  INFO 33092 --- [           main] c.j.d.w.w.WebsocketClientApplication     : Started WebsocketClientApplication in 0.841 seconds (JVM running for 1.078)
2020-02-27 15:16:18.940  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onSubscribe(FluxMap.MapSubscriber)
2020-02-27 15:16:18.941  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : request(unbounded)
2020-02-27 15:16:19.934  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"Hola","timestamp":{"nano":898843000,"epochSecond":1582834579}})
2020-02-27 15:16:20.897  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"Bonjour","timestamp":{"nano":896173000,"epochSecond":1582834580}})
2020-02-27 15:16:21.897  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"Zdravstvuyte","timestamp":{"nano":896219000,"epochSecond":1582834581}})
2020-02-27 15:16:22.898  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"Bonjour","timestamp":{"nano":896291000,"epochSecond":1582834582}})
2020-02-27 15:16:23.899  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"Hola","timestamp":{"nano":898594000,"epochSecond":1582834583}})
2020-02-27 15:16:24.897  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"Hello","timestamp":{"nano":895673000,"epochSecond":1582834584}})
2020-02-27 15:16:25.900  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"Hello","timestamp":{"nano":896273000,"epochSecond":1582834585}})
2020-02-27 15:16:26.896  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"Guten Tag","timestamp":{"nano":895445000,"epochSecond":1582834586}})
2020-02-27 15:16:27.897  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"Guten Tag","timestamp":{"nano":896101000,"epochSecond":1582834587}})
2020-02-27 15:16:28.900  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onNext({"message":"Hello","timestamp":{"nano":899134000,"epochSecond":1582834588}})
2020-02-27 15:16:29.565  INFO 33092 --- [ctor-http-nio-2] reactor.Flux.Map.1                       : onComplete()
2020-02-27 15:16:29.566  INFO 33092 --- [ctor-http-nio-2] ication$$EnhancerBySpringCGLIB$$76238f86 : complete

Note: Do not forget to add server.port=8081 in application properties file, so we can run our client in a different port. Here you can find how another example about how to create a chatbot application using WebFlux, Websocket and JavaScript WebFlux-WebSocket-Chatbot

Using Maven

You can do the same using Maven, the only difference is that you need to specify --build=maven parameter in the spring init command line:

spring init --dependencies=webflux,lombok --build=gradle --language=java webflux-websocket-workshop

This is the pom.xml file generated

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.2.4.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.jos.dem.webflux.websocket</groupId>
	<artifactId>server</artifactId>
	<version>1.0.0-SNAPSHOT</version>
	<name>Spring Webflux WebSockets</name>
	<description>This project shows how to use Websocket with Spring Webflux</description>

	<properties>
		<java.version>12</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
			<exclusions>
				<exclusion>
					<groupId>org.junit.vintage</groupId>
					<artifactId>junit-vintage-engine</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		<dependency>
			<groupId>io.projectreactor</groupId>
			<artifactId>reactor-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>

To run the project with Gradle:

gradle bootRun

To run the project with Maven:

mvn spring-boot:run

To browse the project go here, to download the project:

git clone git@github.com:josdem/webflux-websocket-workshop.git

Return to the main article

comments powered by Disqus