Introducción a Apache Flink

2
20198

En este tutorial haremos una pequeña introducción a la motivación por usar Apache Flink, sus ideas principales y un ejemplo de proyecto Maven haciendo uso del framework.

0. Índice de contenidos.

flink_logo


1. ¿Qué es Apache Flink?

Se trata de un motor de procesamiento de streams o flujos de datos que proporciona capacidades de distribución de datos, comunicaciones y, muy importante, tolerancia a fallos a las computaciones.

Cuando hablamos de computación distribuida, lo primero que se nos viene a la cabeza es el celebérrimo Apache Hadoop, del cual tenéis muchos tutoriales en este mismo portal. Lo que hizo muy popular a este framework fue el uso del modelo de programación ideado por Google, MapReduce. El modelo seguido por Apache Flink fue pensado desde el principio como alternativa a MapReduce, lo que no significa que no pueda acceder y hacer uso tanto de HDFS (Hadoop Distributed File System) como de YARN (MapReduce v2).


1.1. Componentes fundamentales

flink_components

El ecosistema de Flink se sustenta sobre el núcleo de Flink o Flink Core en el que se encuentran todas las APIs y librerías de utilidades básicas, entre ellas el optimizador de programas Flink o el constructor de streams.

Las dos APIs principales sobre flink-core que determinan el comportamiento, así como el entorno de trabajo de los programas ejecutados son:

  • DataSet API: Entorno de ejecución en el que las transformaciones se ejecutan sobre conjuntos de datos tomados de fuentes más estáticas (como ficheros o bases de datos locales).
  • DataStream API: Muy parecido a la API de DataSet, pero con la gran diferencia de que los datos son tomados o recolectados desde fuentes dinámicas, como sockets o colas de mensajes.

Las últimas releases de Apache Flink vienen con una serie de librerías adicionales que forman las capacidades más fronterizas del Big Data:

  • FlinkML como librería de machine learning en continuo crecimiento en número de algoritmos implmentados.
  • Gelly como API de creación y análisis de grafos.
  • Table API, en sus versiones tanto por lotes como de flujos, que permiten utilizar expresiones con una sintaxis SQL.


1.2. Arquitectura básica

A la hora de ejecutar un proceso, el sistema de Flink levanta un JobManager, que hace las veces de coordinador de todo el sistema, y uno o más TaskManager, encargados de ejecutar partes del programa en paralelo.

Cuando se envía un programa al sistema es el optimizador el encargado de transformarlo en un DataFlow, ejecutable de forma paralela por los TaskManager, coordinados por el JobManager.

El siguiente esquema muestra esta arquitectura básica:

flink_architecture

2. Apache Flink y Apache Spark

La comparación de Flink con el rey actual de la computación distribuida, Apache Spark (del cual podéis encontrar una guía de primeros pasos con él en éste tutorial), se antoja necesaria.

En la superficie, Flink es muy parecido a Spark, sin embargo, internamente se trata de un modelo de programación enteramente centrado en el procesado de flujos o stream processing, a diferencia de Spark que consigue esta funcionalidad gracias a un enfoque de pequeños procesos por lotes procesados rápidamente y de forma ligera, lo que los desarrolladores llaman micro-batching. En éste enlace podéis encontran más información sobre éste tema.

Otras diferencias fundamentales entre ambos modelos son la abstracción con la que trabaja cada uno de ellos: Spark procesa RDDs o bloques de datos, mientras que Flink realiza un procesado registro a registro en tiempo real.

El hecho de que soporte técnicas de machine learning o procesado de grafos de forma nativa sobre su motor de streaming y que sea más fácil para entornos ya implantados el paso de streaming a batch que al revés, hacen de Apache Flink lo que parece el futuro en éste campo.


2.1. Pero, ¿qué es streaming?

Estamos hablando en estos párrafos continuamente de procesamiento tanto de flujos o streams como de lotes o batchs, pero, ¿qué quieren decir exactamente estos dos conceptos?

El streaming viene a ser una nueva forma de pensar en la infraestructura de datos. Muchas fuentes de datos representan series de eventos que se producen de forma continua (por citar casos, logs, transacciones, sensores, etc.). Tradicionalmente, estos eventos se agrupaban de forma periódica en batchs o series, salvándose para luego ser procesadas en conjunto.

Esta forma de proceder introducía latencias innecesarias entre la generación de los datos y la obtención de resultados, así como asumía de forma implícita que los datos deben estar «completos» en algún momento y, por lo tanto, pueden ser usados en ese instante para realizar predicciones precisas.

A partir de esta forma de procesar por lotes, el siguiente paso lógico en analíticas es aprovechar la naturaleza continua de la generación de datos para procesarlos de la misma manera: como flujos de datos o streams.


3. Nuestra primera aplicación con Flink

Lo primero de todo que necesitamos para comenzar a desarrollar con Flink es el framework completo que nos proporciona la página oficial. Lo podemos mover a la localización que queramos; en todo éste tutorial nos referiremos a dicho path como $FLINK_HOME. Todo este proceso de descarga, relocalización y creación de variables de entorno lo dejamos como ejercicio para el lector.

Nosotros trabajaremos con tres scripts principales dentro de $FLINK_HOME/bin para arrancar servicios de Flink, aunque existen muchos más:

  • $FLINK_HOME/bin/start-local.sh que nos permite arrancar una instancia local de la arquitectura de Apache Flink y que será la encargada de ejecutar nuestras aplicaciones. Se levanta una página web en nuestra máquina local en la dirección localhost:8081.
  • $FLINK_HOME/bin/start-webclient.sh que nos levanta una página web que nos permite desplegar y visualizar los grafos de funcionamiento de las aplicaciones Flink en localhost:8080.

Existen scripts análogos para parar dichos servicios, $FLINK_HOME/bin/stop-local.sh y $FLINK_HOME/bin/stop-webclient.sh, respectivamente.


3.1. Generación del proyecto Maven

Podemos generar el arquetipo ya existente de ‘Primeros Pasos con Flink en Java’ que nos proporciona el propio Apache. Es posible utilizar el modo interactivo de generación de arquetipos de Maven para tener más control en el proceso. La siguiente orden es más directa:

mvn archetype:generate /
    -DarchetypeGroupId=org.apache.flink/
    -DarchetypeArtifactId=flink-quickstart-java /
    -DarchetypeVersion=0.10.0

Sin embargo, vamos a crear nosotros mismos el proyecto Maven con el fin de que nos quede un poco más claro qué es lo que estamos haciendo.

Nosotros trabajamos con el IDE Eclipse, por lo que crearemos un nuevo proyecto Maven sin arquetipo. El contenido interesante del pom.xml que utilizaremos será el siguiente:

pom.xml
[...]
<properties>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<flink.version>0.10.0</flink.version>
	<jdk.version>1.7</jdk.version>
</properties>

<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients</artifactId>
		<version>${flink.version}</version>
	</dependency>
</dependencies>

<build>
	<plugins>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-jar-plugin</artifactId>
			<version>2.5</version>
			<configuration>
				<archive>
					<manifestEntries>
						<Main-Class>com.autentia.tutoriales.flink.flink_intro.StreamingWordCount</Main-Class>
					</manifestEntries>
				</archive>
			</configuration>
		</plugin>

		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<version>3.1</version>
			<configuration>
				<source>${jdk.version}</source>
				<target>${jdk.version}</target>
			</configuration>
		</plugin>
	</plugins>
</build>
[...]


3.2. WordCount en streaming

Dentro de Big Data y originalmente en el mundo de data mining, WordCount o conteo de palabras dentro de un texto es el equivalente al HelloWorld en la programación clásica. Con esto jugaremos en ésta introducción a Apache Flink.

Según la documentación de Flink, los pasos básicos a la hora de desarrollar una aplicación con el framework son los siguientes:

  • Instanciar un entorno de ejecución, que puede ser para procesamiento batch con ExecutionEnvironment.getExecutionEnvironment() o para streaming con StreamExecutionEnvironment.getExecutionEnvironment().
  • Obtener el conjunto de datos con el que trabajaremos, en forma de DataSet para batching o DataStream para streaming.
  • Realizar transformaciones sobre el conjunto de datos como filter(), flatMap() o join().

Nuestra implementación de WordCount con Flink en Java es la siguiente:

[...]
public class StreamingWordCount {

    public static void main(String[] args) throws Exception {

        if (args.length != 3) {
            System.err.println("USO:\nbatStreamingWordCount <host> <puerto> <ruta_salida>");
            return;
        }

        String host = args[0];
        Integer puerto = Integer.parseInt(args[1]);
        String rutaSalida = args[2];

        // instanciamos el entorno de ejecución
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // obtenemos el stream de datos del host provisto
        DataStream<String> texto = env.socketTextStream(hostName, port);

        DataStream<Tuple2<String, Integer>> conteo =
        // realizamos un mapeo de las palabras a tuplas de la forma (palabra, 1)
        text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {

            public void flatMap(String valor, Collector<Tuple2<String, Integer>> salida) throws Exception {
                String[] palabras = valor.toLowerCase().split("\\W+");

                for (String palabra : palabras) {
                    if (palabra.isEmpty() == false) salida.collect(new Tuple2<String, Integer>(palabra, 1));
                }
            }
        })
        // utilizamos la palabra (posición 0) como clave, y sumamos las frecuencias (posición 1)
        .keyBy(0).sum(1);

        conteo.print();

        // execute program
        env.execute("WordCount en Streaming");
    }

}

Como hicimos en el tutorial de Apache Spark, vamos a utilizar el binario netcat para montar un servidor en local que «stremee» por el puerto 9999:

nc -lk 9999

Para esta primera prueba arrancamos la aplicación directamente desde Eclipse, que nos monta un mini-clúster independiente. En los argumentos del programa especificaremos «localhost» como host y «9999» como puerto de escucha. La salida inicial:

11/19/2015 09:27:12	Job execution switched to status RUNNING.
11/19/2015 09:27:12	Source: Socket Stream(1/1) switched to SCHEDULED
11/19/2015 09:27:12	Source: Socket Stream(1/1) switched to DEPLOYING
11/19/2015 09:27:12	Flat Map(1/2) switched to SCHEDULED
11/19/2015 09:27:12	Flat Map(1/2) switched to DEPLOYING
11/19/2015 09:27:12	Flat Map(2/2) switched to SCHEDULED
11/19/2015 09:27:12	Flat Map(2/2) switched to DEPLOYING
11/19/2015 09:27:12	Keyed Aggregation -> Sink: Unnamed(1/2) switched to SCHEDULED
11/19/2015 09:27:12	Keyed Aggregation -> Sink: Unnamed(1/2) switched to DEPLOYING
11/19/2015 09:27:12	Keyed Aggregation -> Sink: Unnamed(2/2) switched to SCHEDULED
11/19/2015 09:27:12	Keyed Aggregation -> Sink: Unnamed(2/2) switched to DEPLOYING
11/19/2015 09:27:12	Flat Map(2/2) switched to RUNNING
11/19/2015 09:27:12	Keyed Aggregation -> Sink: Unnamed(1/2) switched to RUNNING
11/19/2015 09:27:12	Flat Map(1/2) switched to RUNNING
11/19/2015 09:27:12	Source: Socket Stream(1/1) switched to RUNNING
11/19/2015 09:27:12	Keyed Aggregation -> Sink: Unnamed(2/2) switched to RUNNING

En el anterior log podemos observar como el JobManager se encarga de levantar dos instancias de TaskManager e ir planificando y desplegando cada una de las operaciones (flatMap o Keyed Aggregation, p.e.) en estos trabajadores.

Si en netcat escribimos, por ejemplo, los primeros párrafos del «Don Quijote» de Cervantes, obtendremos la siguiente salida de nuestra aplicación.

[...]
1> (acordarme,1)
1> (mucho,1)
1> (galgo,1)
1> (corredor,1)
1> (una,1)
1> (olla,1)
1> (duelos,1)
1> (quebrantos,1)
1> (los,2)
1> (lentejas,1)
1> (los,3)
1> (domingos,1)
1> (las,1)
1> (tres,1)
1> (adarga,1)
1> (roc,1)
1> (n,1)
2> (antigua,1)
1> (ca,1)
2> (flaco,1)
1> (carnero,1)
2> (que,2)
1> (salpic,1)
[...]

Es conveniente observar como, por ejemplo, la palabra «los» se repite dos veces con diferentes frecuencias. Este comportamiento es debido a la división del trabajo entre los TaskManager, así como a la frecuencia de muestreo en la lectura del socket.


3.3. Haciendo uso del cliente web de Flink

Vamos a repetir la misma prueba de concepto del WordCount en streaming, pero desplegando la aplicación a través del cliente web que nos proporciona el propio framework.

Como la salida de la aplicación ya no podrá ser por consola al estar siendo ejecutado de forma interna por el cliente de Flink, hemos reemplazado la impresión por pantalla al fichero en la ruta que se nos indique en los argumentos:

// conteo.print();
conteo.writeAsCsv(rutaSalida, WriteMode.OVERWRITE);

Lo primero que debemos hacer es arrancar tanto el framework en standalone como el cliente web:

$FLINK_HOME/bin/start-local.sh
$FLINK_HOME/bin/start-webclient.sh

Dirijámonos ahora a localhost:8080, donde se encuentra levantado el cliente web:

flink_client_start

Subimos nuestro .jar, especificamos host, puerto y ruta al resultado:

flink_client_arguments

Una de las características que hace a este cliente web especial es que es capaz de mostrarnos el grafo DataFlow optimizado de nuestra aplicación:

flink_client_dataflow

Una vez ejecutemos la aplicación, se nos redirige al gestor de trabajos de Apache Flink, donde podemos ver el estado de cada ejecución:

flink_manager

La salida ahora es por el fichero que específicamos en los argumentos:

[...]
por,2
dar,1
una,1
mano,1
de,11
coces,1
al,1
traidor,1
de,12
galal,1
n,5
al,2
ama,1
que,7
ten,2
a,10
y,7
aun,1
a,11
su,3
sobrina,1
de,13
a,12
adidura,1
[...]

No nos olvidemos de parar las instancias levantadas de Flink cuando acabemos de operar con ellas:

$> $FLINK_HOME/bin/stop-local.sh
$> $FLINK_HOME/bin/stop-webclient.sh


4. Conclusiones

Apache Flink está recibiendo muchísima atención desde sus fases más tempranas de desarrollo gracias a esa filosofía de cambio radical en la forma de pensar en el procesado y analíticas en el mundo del Big Data, por lo que conviene estar atentos a su evolución en los próximos tiempos.


5. Referencias

2 COMENTARIOS

  1. hola, voy a hacer unas pruebas de consultas sobre flujos de datos, con spark y Flink, queria preguntarte,
    como genero el data stream? hay algun simulador de estos flujos de datos, o puedo obtener datos reales?
    saludos, espero tu respuesta.
    atte. Sebastian Henao.

DEJA UNA RESPUESTA

Por favor ingrese su comentario!

He leído y acepto la política de privacidad

Por favor ingrese su nombre aquí

Información básica acerca de la protección de datos

  • Responsable:
  • Finalidad:
  • Legitimación:
  • Destinatarios:
  • Derechos:
  • Más información: Puedes ampliar información acerca de la protección de datos en el siguiente enlace:política de privacidad