En este post, pretendo explicar como ustedes pueden construir microservicios con streaming en tiempo real usando spring cloud stream y kafka

Para ello me dispongo a crear un mini proyecto que utiliza streaming en tiempo real usando una arquitectura dirigida por eventos (event-driven architecture), Spring Boot, Spring Cloud Stream, Apache Kafka y Lombok.

En el flujo final de la aplicación tendremos:

  1. La aplicación toma un mensaje de un REST API
  2. Escribe ese mensaje en un tópico de kafka
  3. Lee se mensaje del tópico
  4. Escribe el mensaje en la consola.

Antes de comenzar veamos algunas definiciones para aquellos que son nuevos en el tema.

Que es Apache Kafka?

Es una plataforma para hacer transmisiones de información (streaming) de forma distribuida la cual nos permite lograr 3 actividades:

  • Publicar y Subscribir (pub/sub) a transmisiones de registros, de forma similar a como lo hacen las colas de mensajes o sistemas de mensajería empresarial.
  • Almacenar estas transmisiones de registros de forma resilente y durable.
  • Procesar las transmisiones de datos en tiempo real.

Si aun no tienen instalado Apache Kafka, sigan estas instrucciones

Que es Spring Cloud Stream?

Es un framework que se basa en spring-boot para crear microservicios y/o aplicaciones impulsadas por mensajes. Provee una configuración dogmática (como casi todo en spring, para nuestra comodidad)  de brokers de mensajes (como kafka) y los conceptos de pub/sub semantico, grupos de consumidores y particiones a  través de varios proveedores de intermediarios.

Que es Lombok?

Lombok es una librería hecha en java que hace mas magia con las anotaciones para que nosotros hagamos mas con menos código, genera automáticamente para un campo deseado getters, setters, toString, builders y loggers. Todo muy fácil rápido y sin ensuciarse.

Lo siguiente que vamos a hacer es generar nuestro proyecto. Hay un herramienta en linea muy buena para que nos genere toda la estructura del proyecto y un pom.xml, ir a este enlace:

https://start.spring.io/

En la interfaz buscamos y agregamos las siguientes dependencias:

  • Spring Cloud Stream
  • Kafka
  • DevTools (para hacer deploys durante el desarrollo)
  • Actuator (para monitorear la aplicacion)
  • Lombok

Usualmente no soy muy participe de las herramientas que generan código pero esta si que ahorra tiempo, y genera lo estrictamente básico, no es un hola mundo, solo las dependencias la clase para iniciar la aplicación y la estructura de carpetas.

Este es el pom generado:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.ricardogeek</groupId>
	<artifactId>demo-spring-kafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>demo-spring-kafka</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.0.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<spring-cloud.version>Finchley.M8</spring-cloud.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-actuator</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-stream-test-support</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>

	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>${spring-cloud.version}</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

	<repositories>
		<repository>
			<id>spring-milestones</id>
			<name>Spring Milestones</name>
			<url>https://repo.spring.io/milestone</url>
			<snapshots>
				<enabled>false</enabled>
			</snapshots>
		</repository>
	</repositories>


</project>

Definir transmisiones (streams) con kafka

Vamos, primero que nada, a crear la siguiente interfaz:

package com.ricardogeek.demospringkafka.stream;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;  

public interface SaludoStreams {
    String INPUT = "saludos-in";
    String OUTPUT = "saludos-out";

    @Input(INPUT)
    SubscribableChannel saludoEntrante();

    @Output(OUTPUT)
    MessageChannel saludoSaliente();

}

Para que nuestra aplicación pueda comunicarse correctamente con kafka necesitamos definir primero los streams de salida y de entrada para los mensajes.

Spring Cloud provee una manera simple de lograr esto simplemente definiendo la interfaz antes mencionada con un método para cada stream.

el metodo saludoEntrate sirve para leer los mensajes entrantes, y saludoSaliente sirve para escribir los mensajes salientes hacia el broker de kafka.

Durante la ejecución spring generara el código de esta interfaz y lo inyectara con un @Bean de modo que la interfaz SaludoStreams puede utilizarse en cualquier parte de la aplicación que tenga acceso al contexto y con esto acceder a los dos streams.

Configurar el stream de kafka

Ahora para configurar el stream de kafka necesitamos la siguiente clase:

package com.ricardogeek.demospringkafka.config;

import com.ricardogeek.demospringkafka.stream.SaludoStreams;
import org.springframework.cloud.stream.annotation.EnableBinding;

@EnableBinding(SaludoStreams.class)
public class StreamsConfig {

}

Ahora renombremos el archivo en src/main/resources/application.properties  a src/main/resources/application.yml y pongamole la siguiente configuración:

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        saludos-in:
          destination: saludos
          contentType: application/json
        saludos-out:
          destination: saludos
          contentType: application/json

Estas propiedades configuran la dirección de kafka que estaremos utilizando (asumiendo que ya instalaron kafka) y los nombres de los tópicos que usaremos para leer y escribir streams. Noten que ambos streams in/out utilizan el mismo tópico.

y finalmente contentType le indica a spring el tipo de contenido que vamos a estar escribiendo y leyendo en los streams.

El Objeto Mensaje

En este caso necesitamos un objeto Saludo para poder intercambiar saludos en nuestra aplicación. Aquí viene la magia de Lombok:

package com.ricardogeek.demospringkafka.model;

import lombok.Getter;
import lombok.Setter;
import lombok.Builder;
import lombok.ToString;

@Getter @Setter  @ToString  @Builder
public class Saludos {
    private long timestamp;
    private String message;
}

Es un simple objeto con sus getters y setters, toString y builder autogenerados por Lombok, y que luego como ya vimos en la configuración sera transmitido en kafka en formato json.

Creando una capa de servicio para escribir a kafka

package com.ricardogeek.demospringkafka.service;

import com.ricardogeek.demospringkafka.model.Saludos;
import com.ricardogeek.demospringkafka.stream.SaludoStreams;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.util.MimeTypeUtils;

@Service
@Slf4j
public class SaludoService {

    private final SaludoStreams saludoStreams;

    public SaludoService(final SaludoStreams saludoStreams) {
        this.saludoStreams = saludoStreams;
    }

    public void sendGreeting(final Saludos saludos) {

        log.info("Enviando saludos {}", saludos);
        final MessageChannel messageChannel = saludoStreams.saludoSaliente();
        messageChannel.send(MessageBuilder
                .withPayload(saludos)
                .setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON)
                .build());
    }
}

Aquí creamos el servicio de saludos, donde se escribirá un saludo al tópico saludos de kafka.

La anotación @Service registra un @Bean en el contexto de la aplicación como es de esperarse y de ese modo podemos acceder a el en cualquier parte de la aplicación.

También añadimos la anotación @Slf4j que inyecta una forma fácil y rápida de loggear cosas.

Y en el método enviarSaludos pues hacemos eso, enviamos un saludo al stream de salido de kafka.

Creando el REST API

Ahora que ya tenemos todo el motor de como funcionaría, creamos la interfaz de programación de aplicaciones API. Que no es mas que un controller que maneja todo lo que ya hemos hecho hasta el momento.

package com.ricardogeek.demospringkafka.web;

import com.ricardogeek.demospringkafka.model.Saludos;
import com.ricardogeek.demospringkafka.service.SaludoService;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SaludosController {
    private final SaludoService saludoService;

    public SaludosController(final SaludoService saludoService) {
        this.saludoService = saludoService;
    }

    @GetMapping("/saludos")
    @ResponseStatus(HttpStatus.ACCEPTED)
    public void greetings(@RequestParam("message") final String message) {

        final Saludos saludos = Saludos.builder()
                .message(message)
                .timestamp(System.currentTimeMillis())
                .build();

        saludoService.sendGreeting(saludos);

    }
}

Este es un controlador MVC simple, y funciona como lo he explicado en este post. Y como vemos hace uso de el objeto mensaje “Saludos” y el servicio que escribimos para enviar datos a kafka a través del stream de salida.

Escuchando mensajes en el tópico de kafka

package com.ricardogeek.demospringkafka.service;

import com.ricardogeek.demospringkafka.model.Saludos;
import com.ricardogeek.demospringkafka.stream.SaludoStreams;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class SaludosListener {
    @StreamListener(SaludoStreams.INPUT)
    public void manejarSaludos(@Payload final Saludos saludos) {
        log.info("Recibimos Saludos: {}", saludos);
    }
}

La anotación @Component, similarmente a @Service y @RestController, definen un @Bean y lo registran en el contexto.

El SaludosListener tiene un solo metodo manejarSaludos() que sera invocado por spring cloud stream con cada nuevo objeto Saludos entrante en el tópico de kafka (kafka topic). Esto se logra gracas a la anotación @StreamListener que se configura para este método.

Listos para la accion!

Ahora simpmente corremos la aplicacion:

mvn spring-boot:run

Y si acceden a su navegador a esta url:

http://localhost:8080/saludos?message=hola_mundo

podran ver los resultados en la consola!

Conclusion

Hacer que java y kafka interactuen mutuamente, es bastante fácil con spring cloud! solo unas anotaciones por aca y por alla y listo!

Recursos

Pueden encontrar el código de este tutorial en mi github:

https://github.com/RicardoGeek/demo-spring-cloud

Y pueden hacer cualquier pregunta que tengan en los comentarios.

Saludos.

Categorized in:

Tagged in:

, , ,