header

Spring Boot Server-sent Events Client

In this technical post we will see how to consume events using Sever-sent events in a Spring Webflux application. Please read this previous Spring Boot Server-sent Events before conitnue with this information. Then, let’s create a new Spring Boot project with Webflux and Lombok as dependencies:

spring init --dependencies=webflux,lombok --language=java --build=gradle spring-boot-sse-client

Here is the complete build.gradle file generated:

plugins {
  id 'org.springframework.boot' version '2.3.4.RELEASE'
  id 'io.spring.dependency-management' version '1.0.10.RELEASE'
  id 'java'
}

group = 'com.jos.dem.springboot.sse.client'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '13'

configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}

repositories {
  mavenCentral()
}

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-webflux'
  compileOnly 'org.projectlombok:lombok'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation('org.springframework.boot:spring-boot-starter-test') {
    exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
  }
  testImplementation 'io.projectreactor:reactor-test'
  testCompileOnly 'org.projectlombok:lombok'
  testAnnotationProcessor "org.projectlombok:lombok"
}

test {
  useJUnitPlatform()
}

Let’s start by creating a service to handle data consumption

package com.jos.dem.springboot.sse.client.service;

import reactor.core.publisher.Flux;

import org.springframework.http.codec.ServerSentEvent;

public interface ServerSentEventsConsumerService {

  Flux<ServerSentEvent<String>> consume();

}

This is the service implementation

package com.jos.dem.springboot.sse.client.service.impl;

import com.jos.dem.springboot.sse.client.service.ServerSentEventsConsumerService;
import lombok.RequiredArgsConstructor;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Flux;

@Service
@RequiredArgsConstructor
public class ServerSentEventsConsumerServiceImpl implements ServerSentEventsConsumerService {

  private final WebClient webCLient;

  private ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<>() {
  };

  public Flux<ServerSentEvent<String>> consume(){
    return webCLient.get()
      .uri("/")
      .retrieve()
      .bodyToFlux(type);
  }

}

Here we are using ParameterizedTypeReference as generic type. Let’s also create a WebClient bean definition.

package com.jos.dem.springboot.sse.client.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.reactive.function.client.WebClient;

@Configuration
public class ApplicationConfig {
    @Bean
    WebClient webClient() {
        return WebClient.create("http://localhost:8080/");
    }
}

We are using CommandRunner, so that we can start the stream consumption. CommandLineRunner is a call back interface, when Spring Boot starts will call it and pass in args through a run() internal method.

package com.jos.dem.springboot.sse.client;

import com.jos.dem.springboot.sse.client.service.ServerSentEventsConsumerService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Flux;

import java.time.LocalTime;

@Slf4j
@SpringBootApplication
public class ServerSentEventsClientApplication {

  public static void main(String[] args) {
    SpringApplication.run(ServerSentEventsClientApplication.class, args);
  }

  @Bean
  CommandLineRunner start(ServerSentEventsConsumerService service) {
    return args -> {
      Flux<ServerSentEvent<String>> eventStream = service.consume();

      eventStream.subscribe(ctx ->
              log.info("Current time: {}, content[{}] ", LocalTime.now(), ctx.data()));
    };
  }

}

We are subscribing to the emiter in order to keep receiving information. That’s it a client subscribes to a stream from a server and the server will send messages type event-stream to the client until the server or the client closes the stream. It is up to the server to decide when and what to send the client. For knowing more about Server-sent events technology please go here. Do not forget to run this client in different port using the following specification in our application.yml file

server:
  port: 8081

Finally, here you can find our test case to validate this functionality, StepVerifier is a tool from reactor that help us to do our testing in the reactive world.

package com.jos.dem.springboot.sse.client;


import com.jos.dem.springboot.sse.client.service.ServerSentEventsConsumerService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.codec.ServerSentEvent;
import reactor.test.StepVerifier;

import java.time.Duration;

@Slf4j
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@RequiredArgsConstructor(onConstructor_ = @Autowired)
class ServerSentEventsClientApplicationTest {

    private final ServerSentEventsConsumerService service;

    @Test
    @DisplayName("Consume Server Sent Event")
    public void shouldConsumeServerSentEvents(TestInfo testInfo) throws Exception {
        log.info("Running: {}", testInfo.getDisplayName());

        StepVerifier.create(service.consume())
                .expectNextMatches(event -> event.data().contains("josdem"))
                .thenAwait(Duration.ofSeconds(5))
                .expectNextCount(1)
                .thenCancel()
                .verify();
    }

}

To run the project:

gradle bootRun

To test the project:

gradle test

Then you should see something like this:

2019-05-13 21:41:36.088 INFO 9103 [ctor-http-nio-6] : Current time: 21:41:36.087150, content[{"nickname":"josdem","text":"Guten Tag","timestamp":"2019-05-14T01:41:36.031100Z"}]
2019-05-13 21:41:37.034 INFO 9103 [ctor-http-nio-6] : Current time: 21:41:37.034186, content[{"nickname":"josdem","text":"Zdravstvuyte","timestamp":"2019-05-14T01:41:37.030950Z"}]
2019-05-13 21:41:38.034 INFO 9103 [ctor-http-nio-6] : Current time: 21:41:38.034517, content[{"nickname":"josdem","text":"Bonjour","timestamp":"2019-05-14T01:41:38.030778Z"}]
2019-05-13 21:41:39.031 INFO 9103 [ctor-http-nio-6] : Current time: 21:41:39.031638, content[{"nickname":"josdem","text":"Salve","timestamp":"2019-05-14T01:41:39.029399Z"}]
2019-05-13 21:41:40.033 INFO 9103 [ctor-http-nio-6] : Current time: 21:41:40.033601, content[{"nickname":"josdem","text":"Hola","timestamp":"2019-05-14T01:41:40.030511Z"}]

To browse the project go here, to download the project:

git clone git@github.com:josdem/spring-boot-sse-client.git

Return to the main article

comments powered by Disqus