Paradigma publish/subscribe con Spring Data Redis

0
7968

Paradigma publish/subscribe con Spring Data Redis.

0. Índice de contenidos.


1. Introducción.

Ya vimos en un tutorial anterior una pequeña introducción al proyecto Spring Data y el soporte de Redis a través de la clase RedisTemplate. En aquel tutorial vimos un ejemplo práctico donde recuperábamos tweets a través del API de Twitter4j y los persistíamos en Redis añadiendo una clave cómun a todos por la que posteriormente los recuperábamos.

Ese código se quedaba un poco cojo y no nos valía para cubrir un caso de uso más complejo. El problema tenía que ver con la sincronización de los datos almacenados. La aplicación levantaba dos beans de Spring, uno que guardaba los tweets en Redis y otro que los recuperaba. El segundo realizaba un único acceso a Redis recuperando todos los tweets que cumplían con los criterios de búsqueda, pero ¿qué pasaba con los nuevos tweets introducidos posteriores a esa búsqueda?. Esos datos no los recuperábamos a menos que hiciéramos sucesivas consultas a Redis. Al buscar mediante un patrón que tenían todas las claves (tweet_) cada vez que hacemos una nueva búsqueda Redis nos devuelve todos los registros por lo que estaríamos repitiendo constantemente los tweets recuperados en sucesivas consultas.

Para solucionar este problema vamos a modificar la forma de introducir y recuperar los tweets en Redis ya que esta base de datos admite el paradigma de mensajería publicador/suscriptor. En nuestro ejemplo la clase encargada de recuperar los tweets los publicará en redis y construiremos una clase con Spring que se suscribirá para recibirlos inmediatamente en cuanto son publicados.

2. Entorno.

El tutorial se ha realizado con el siguiente entorno:

  • MacBook Pro 15′ (2.4 GHz Intel Core i5, 8GB DDR3 SDRAM).
  • Sistema Operativo: Mac OS Mavericks 10.9.5
  • Oracle Java SDK 1.7.0_60
  • Redis 2.8.11

3. Publicar en Redis

A través de nuestra clase Repository disponemos del método send que publica el contenido en un determinado canal para que suscriptores al mismo lo reciban. Vamos a modificar el ejemplo que utilizamos en el tutorial de Redis para publicar los tweets. Simplemente modificaremos la forma de almacenar los tweets en redis (línea 53) utilizando el método send que se encarga de publicarlo en el canal ‘tweets’.

package com.autentia.tutoriales.tweets;

import javax.annotation.PostConstruct;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import twitter4j.Query;
import twitter4j.QueryResult;
import twitter4j.Status;
import twitter4j.Twitter;
import twitter4j.TwitterException;
import twitter4j.TwitterFactory;
import twitter4j.TwitterObjectFactory;

import com.autentia.tutoriales.redis.StringRedisRepository;

@Component
public class TweetIngestor {

	private static final Logger logger = LoggerFactory.getLogger(TweetIngestor.class);

	private final Twitter twitter; 

	public static final String TWEETS_SUBSCRIPTION_CHANNEL = "tweets";
	
	@Autowired
	private StringRedisRepository redisRepository;
	
	public TweetIngestor() {
		this.twitter = new TwitterFactory().getInstance();
	}
	
	@PostConstruct
	public void searchByHashtag() {
		new Thread() {
			@Override
			public void run() {
				try {
					Query query = new Query("redis");

					int numTweets = 0;
					long init = System.currentTimeMillis();
					try {
						QueryResult result;

						do {
							result = twitter.search(query);
							for (Status status : result.getTweets()) {
								if (!status.isRetweet()) {
									redisRepository.send(TWEETS_SUBSCRIPTION_CHANNEL, TwitterObjectFactory.getRawJSON(status));
									numTweets++;
								}
							}

						} while ((query = result.nextQuery()) != null);
					
						logger.info(String.format("%s tweets received in %s millis", numTweets, System.currentTimeMillis() - init));
					} catch (TwitterException e) {
						throw new RuntimeException("Something was wrong retrieving tweets:", e);
					}
					
				} catch (Exception e) {
					logger.error("Error", e);
				}
			}
		}.start();
	}
}

El método send del StringRedisRepository llama al método convertAndSend del StringRedisTemplate

public void send(String key, String value) {
	template.convertAndSend(key, value);
}

4. Suscripción a un canal de Redis.

Con Spring Data Redis es muy sencillo configurar la suscripción a un canal de Redis, bastará con crear una clase suscriptora de los mensajes asociados a un topic y un poco de configuración. Lo haremos todo en nuestra clase Application encargada de levantar el contexto de Spring y de configurar los Beans que necesitamos.

Configuramos un RedisMessageListenerContainer al que añadimos mediante el método addMessageListener un MessageListenerAdapter que recibe una clase suscriptora de los mensajes.

Esta clase debemos implementarla y crear el método receiveTweet que será el que se invocará cuando se publique un mensaje en Redis que cumpla el patrón indicado en la definición del listener: new PatternTopic(TweetIngestor.TWEETS_SUBSCRIPTION_CHANNEL), concretamente el topic es el mismo con el que se publicaron los tweets.

package com.autentia.tutoriales;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

import com.autentia.tutoriales.redis.StringRedisRepository;
import com.autentia.tutoriales.tweets.TweetIngestor;
import com.autentia.tutoriales.tweets.TweetsReceiver;

@Configuration
@EnableAutoConfiguration
@ComponentScan
public class Application {

	@Bean
	RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
		final RedisMessageListenerContainer container = new RedisMessageListenerContainer();
		container.setConnectionFactory(connectionFactory);
		container.addMessageListener(listenerAdapter, new PatternTopic(TweetIngestor.TWEETS_SUBSCRIPTION_CHANNEL));

		return container;
	}

	@Bean
	MessageListenerAdapter messageListenerAdapter(TweetsReceiver tweetsReceiver) {
		return new MessageListenerAdapter(tweetsReceiver, "receiveTweet");
	}

	@Bean
	TweetsReceiver receiver() {
		return new TweetsReceiver();
	}

	@Bean
	StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
		return new StringRedisTemplate(connectionFactory);
	}

	@Bean
	StringRedisRepository stringRedisRepository(StringRedisTemplate template) {
		return new StringRedisRepository(template);
	}
	
	public static void main(String[] args) throws InterruptedException {
		SpringApplication.run(Application.class, args);
	}
}

La clase suscriptora define el método receiveTweets en formato JSON que es como los publicamos en Redis. Posteriormente se parsea y únicamente recuperamos el usuario y el texto del tweet que imprimimos por consola.

package com.autentia.tutoriales.tweets;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import twitter4j.Status;
import twitter4j.TwitterException;
import twitter4j.TwitterObjectFactory;

public class TweetsReceiver {
	private static final Logger LOGGER = LoggerFactory.getLogger(TweetsReceiver.class);

	public void receiveTweet(String rawTweet) {
		final Status tweet = parse(rawTweet);

		LOGGER.info(String.format("@%s : %s", tweet.getUser().getName(), tweet.getText()));
	}

	private Status parse(String rawJson) {
		try {
			return TwitterObjectFactory.createStatus(rawJson);
		} catch (TwitterException e) {
			LOGGER.warn("Invalid tweet" + rawJson, e);
			return null;
		}
	}
}

Para probar nuestro código levantamos el servidor de redis (redis-server) y el ejemplo con mvn spring-boot:run y pasados unos segundos aparecerán por consola los tweets que previamente han pasado por Redis. Si queremos suscribrirnos al topic desde el cliente de Redis basta con lanzar el comando desde la consola del redis-cli:

PSUBSCRIBE tweets

5. Referencias

6. Conclusiones.

Hemos visto un ejemplo de uso de la funcionalidad para mensajería publish/subscribe de Redis una más de las muchas que soporta.

A través de las APIs que nos proporcionan un Driver de conexión (como Jedis) ya se nos facilita mucho la tarea para trabajar con Redis pero junto al soporte de Spring Data el uso de Redis parece un juego de niños.

Espero que te haya sido de ayuda.

Un saludo.

Juan

DEJA UNA RESPUESTA

Por favor ingrese su comentario!

He leído y acepto la política de privacidad

Por favor ingrese su nombre aquí

Información básica acerca de la protección de datos

  • Responsable:
  • Finalidad:
  • Legitimación:
  • Destinatarios:
  • Derechos:
  • Más información: Puedes ampliar información acerca de la protección de datos en el siguiente enlace:política de privacidad