header

Spring Webflux Server-sent Events el lado del Cliente

En este post técnico te mostraremos como consumir un stream de eventos usando Sever-sent events en una aplicación Spring Webflux. Por favor lee mis previo post Spring Boot Server-sent Events antes de continuar con esta información. Entonces, creamos un nuevo proyecto Spring Boot con Webflux y Lombok como dependencias:

spring init --dependencies=webflux,lombok --language=java --build=gradle spring-boot-sse-client

Aquí está el build.gradle generado:

plugins {
  id 'org.springframework.boot' version '2.1.5.RELEASE'
  id 'java'
}

apply plugin: 'io.spring.dependency-management'

group = 'com.jos.dem.springboot.sse.client'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'

configurations {
  compileOnly {
    extendsFrom annotationProcessor
  }
}

repositories {
  mavenCentral()
}

dependencies {
  implementation 'org.springframework.boot:spring-boot-starter-webflux'
  compileOnly 'org.projectlombok:lombok'
  annotationProcessor 'org.projectlombok:lombok'
  testImplementation 'org.springframework.boot:spring-boot-starter-test'
  testImplementation 'io.projectreactor:reactor-test'
}

Ahora por favor agrega la dependencias Junit 5 al archivo build.gradle:

testCompile "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion"
testRuntime "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion"

Empecemos por crear un servicio para manejar el consumo de datos.

package com.jos.dem.springboot.sse.client.service;

import reactor.core.publisher.Flux;

import org.springframework.http.codec.ServerSentEvent;

public interface ServerSentEventsConsumerService {

  Flux<ServerSentEvent<String>> consume();

}

Esta es la implementación del servicio.

package com.jos.dem.springboot.sse.client.service.impl;

import reactor.core.publisher.Flux;

import org.springframework.stereotype.Service;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.reactive.function.client.WebClient;

import com.jos.dem.springboot.sse.client.service.ServerSentEventsConsumerService;

@Service
public class ServerSentEventsConsumerServiceImpl implements ServerSentEventsConsumerService {

  @Autowired
  private WebClient webCLient;

  private ParameterizedTypeReference<ServerSentEvent<String>> type = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

  public Flux<ServerSentEvent<String>> consume(){
    return webCLient.get()
      .uri("/")
      .retrieve()
      .bodyToFlux(type);
  }

}

El valor ParameterizedTypeReference es usado para regresar valores de tipo genéricos. Aquí tu puedes ver como definimos nuestro WebClient como un bean de Spring.

package com.jos.dem.springboot.sse.client;

import org.springframework.context.annotation.Bean;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.web.reactive.function.client.WebClient;

@SpringBootApplication
public class ServerSentEventsClientApplication {

  public static void main(String[] args) {
    SpringApplication.run(ServerSentEventsClientApplication.class, args);
  }

  @Bean
  WebClient webClient() {
    return WebClient.create("http://localhost:8080/");
  }

}

Ahora definimos un controlador, de esta manera podemos iniciar el consumo del stream.

package com.jos.dem.springboot.sse.client.controller;

import java.time.LocalTime;

import reactor.core.publisher.Flux;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.codec.ServerSentEvent;

import com.jos.dem.springboot.sse.client.service.ServerSentEventsConsumerService;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RestController
public class ServerSentEventsConsumerController {

  @Autowired
  private ServerSentEventsConsumerService service;

  private Logger log = LoggerFactory.getLogger(this.getClass());

  @GetMapping("/")
  public String index() {
    Flux<ServerSentEvent<String>> eventStream = service.consume();

    eventStream.subscribe(ctx ->
      log.info("Current time: {}, content[{}] ", LocalTime.now(), ctx.data()));

    return "Starting to consume events...";
  }

}

El método subscribe mantiene recibiendo información tanto como el servidor siga enviando datos. Así es un cliente se suscribe a un stream del servidor y el servidor envía mensajes de tipo event-stream al cliente hasta el servidor o el cliente cierra el stream. Es el control del servidor decidir cuando y como enviar al cliente. Para saner más acerca de la tgecnología Server-send events por favor ve here. No olvides correr este cliente en un puerto diferente usando la siguiente especificación en el archbivo application.properties.

server.port=8081

Ahora, para correr el proyecto:

gradle bootRun

Y al consultar el end-point:

curl http://localhost:8081/

Deberías poder ver algo parecido a esto:

2019-05-13 21:41:36.088 INFO 9103 [ctor-http-nio-6] : Current time: 21:41:36.087150, content[{"nickname":"josdem","text":"Guten Tag","timestamp":"2019-05-14T01:41:36.031100Z"}]
2019-05-13 21:41:37.034 INFO 9103 [ctor-http-nio-6] : Current time: 21:41:37.034186, content[{"nickname":"josdem","text":"Zdravstvuyte","timestamp":"2019-05-14T01:41:37.030950Z"}]
2019-05-13 21:41:38.034 INFO 9103 [ctor-http-nio-6] : Current time: 21:41:38.034517, content[{"nickname":"josdem","text":"Bonjour","timestamp":"2019-05-14T01:41:38.030778Z"}]
2019-05-13 21:41:39.031 INFO 9103 [ctor-http-nio-6] : Current time: 21:41:39.031638, content[{"nickname":"josdem","text":"Salve","timestamp":"2019-05-14T01:41:39.029399Z"}]
2019-05-13 21:41:40.033 INFO 9103 [ctor-http-nio-6] : Current time: 21:41:40.033601, content[{"nickname":"josdem","text":"Hola","timestamp":"2019-05-14T01:41:40.030511Z"}]

Para explorar el proyecto, por favor ve aquí, para descargar el proyecto:

git clone git@github.com:josdem/spring-boot-sse-client.git

Regresar al artículo principal

comments powered by Disqus