Introducción a la composición de Futures de Akka en Java

0
3071

Akka es un toolkit y un runtime que nos permite desarrollar aplicaciones altamente concurrentes, distribuidas y resilientes, mediante el envío de mensajes entre actores. En este tutorial nos vamos a centrar en sus Futuros y su composición, y en como gracias a ellos podemos realizar de forma sencilla operaciones concurrentes.

Índice de contenidos

Akka Futures logo

1. Introducción

Ya tenemos un par de tutoriales sobre Akka y su sistema de actores:

así que en este tutorial voy a hablar un poquito de cómo funcionan los Futuros y cómo los podemos combinar o componer unos con otros.

Un Futuro no es más que la ejecución de un código que se realiza en paralelo (en otro hilo) y que, en «un futuro», se resolverá y nos retornará un valor. Esto nos permite lanzar varias ejecuciones en paralelo (varios Futuros en paralelo) para que cada uno de ellos resuelva un valor, todos de forma simultánea.

La composición de futuros consiste en combinar varios futuros, o mejor dicho, el resultado de estos futuros, para acabar devolviendo un único valor que será el resultado de la combinación de todos los resultados anteriores.

Este tutorial intenta ser sobre todo práctico y mediante ejemplos de código vamos a ver que opciones tenemos para lanzar Futuros, cómo podemos combinarlos, sincronizarlos, …​

Todo el código del tutorial lo podéis encontrar en este repositorio de GitHub

2. Entorno

El tutorial está escrito usando el siguiente entorno:

  • Hardware: Portátil MacBook Pro 15» (2.5 GHz Intel i7, 16GB 1600 Mhz DDR3, 500GB Flash Storage).
  • AMD Radeon R9 M370X
  • Sistema Operativo: macOS Sierra 10.12.4
  • Oracle Java 1.8.0_121
  • Akka 2.11

3. La aplicación

La aplicación va a ser muy sencilla, tan sencilla que aunque vamos a usar Akka, no vamos a usar Actores. Los Futuros son buenos cuando, dentro de un actor o en cualquier otra parte, queremos realizar un cálculo o proceso en paralelo. De hecho la propia documentación de Akka nos dice que si te pones a crear un pool de UntypedActors sólo por el hecho de realizar un cálculo en paralelo, la forma más sencilla y rápida de hacerlo es mediante Futuros.

public class App {

    public static void main(String[] args) throws InterruptedException {
        final App app = new App();
        app.execute();
    }

    private void execute() throws InterruptedException {
        final ActorSystem system = boot();

        new SimpleFuture(system).runExample();
        new MapFutures(system).runExample();
        new TraverseFutures(system).runExample();
        new SequenceFutures(system).runExample();
        new FoldFutures(system).runExample();
        new ReduceFutures(system).runExample();

        shutdown(system);
    }

    private ActorSystem boot() {
        return ActorSystem.create("actorSystem");
    }

    private void shutdown(ActorSystem system) throws InterruptedException {
        Thread.sleep(3);
        system.terminate();
    }

}

Aquí no hay nada que destacar, simplemente levantamos el sistema de actores y lanzamos los ejemplos.

4. Lanzando un solo Futuro

Este sería el ejemplo más sencillo de Futuro donde no hay ningún tipo de composición.

class SimpleFuture {

    private final LoggingAdapter log;
    private final ExecutionContext ec;

    SimpleFuture(ActorSystem system) {
        log = Logging.getLogger(system, getClass());
        ec = system.dispatcher();
    }

    void runExample() {
        log.info("*** Init");

        final Future<String> f1 = Futures.future(
                () -> "Hola" + " mundo del futuro!!!",
                ec
        );

        f1.onSuccess(new PrintResult<>(log), ec);

        log.info("*** End");
    }

}

En esté código cabe destacar la sintaxis para la creación de Futuros en la línea 14, donde usamos Futures.future pasando como primer parámetro una lambda que es la operación que queremos ejecutar en paralelo (en el ejemplo una concatenación de cadenas), y como segundo parámetro un ExecutionContext que es el que va a determinar el pool de hilos que se va a utilizar para poder ejecutar el Futuro. Este pool de hilos es habitual usar el propio Dispatcher del Actor, pero podemos usar el que queramos, como por ejemplo crear uno propio para los Futuros o como en este caso usar el del sistema.

También podemos destacar como en la línea 19 creamos un callback mediante f1.onSuccess. Con esto lo que estamos indicando es que cuando termine de ejecutarse el Futuro f1, si ha ido correctamente (es decir si no se ha lanzado ninguna excepción), se va a ejecutar el código que indiquemos aquí. En el ejemplo simplemente vamos a volcar el resultado en el log mediante la clase PrintResult, una clase de utilidad que nos hemos creado para tal efecto.

En todos los ejemplos he añadido bastantes trazas para ver el orden en el que se ejecutan las cosas y así ver de forma más clara cómo se comportan los Futuros y las distintas operaciones de composición. En este caso la salida sería:

[INFO] [04/08/2017 11:06:09.067] [main] [SimpleFuture] *** Init
[INFO] [04/08/2017 11:06:09.112] [main] [SimpleFuture] *** End
[INFO] [04/08/2017 11:06:09.112] [actorSystem-akka.actor.default-dispatcher-4] [SimpleFuture] ---> Hola mundo del futuro!!!

De esta salida podemos ver dos cosas:

  1. Como el código del ejemplo se ejecuta antes que el del Futuro. Lo vemos porque el *** End está antes que la salida del callback. Es decir, el método runExample termina antes que la ejecución del callback que habíamos definido en la línea 19.

  2. Podemos ver como el futuro se ejecuta en un hilo diferente. El método runExample se ejecuta en el hilo [main], mientras que el futuro de la línea 14 se ejecuta en el hilo [actorSystem-akka.actor.default-dispatcher-4].

5. Concatenación de Futuros con ‘map’

Los Futuros de Akka tienen varios métodos que nos permiten usarlos con un estilo más funcional y así combinarlos para conseguir la composición de Futuros. En este ejemplo vamos a ver el uso del método map que nos permite encadenar varios Futuros de forma que la salida de un futuro es la entrada del siguiente.

class MapFutures {

    private final LoggingAdapter log;
    private final ExecutionContext ec;
    private final Business business;

    MapFutures(ActorSystem system) {
        log = Logging.getLogger(system, getClass());
        ec = system.dispatcher();
        business = new Business(log);
    }

    void runExample() {
        log.info("*** Init");

        final Future<String> f1 = Futures.future(business::action1, ec);

        final Future<String> f2 = f1.map(new Mapper<String, String>() {
            @Override
            public String apply(String parameter) {
                return parameter + business.action2();
            }
        }, ec);

        final Future<String> f3 = f2.map(new Mapper<String, String>() {
            @Override
            public String apply(String parameter) {
                return parameter + business.action3();
            }
        }, ec);

        f3.onSuccess(new PrintResult<>(log),ec);

        log.info("*** End");
    }
}

Cabe destacar la línea 18 y 25 donde vemos el uso de map para crear un nuevo Futuro que se ejecutará cuando termine el futuro anterior (en el caso de la línea 18, f2 se empezará a ejecutar cuando termine f1). El método map tiene como parámetro de entrada un objeto de tipo Mapper. Esta clase usa Generics de forma que el primero de ellos representa el tipo del objeto que retorna el primer futuro (en la línea 18 sería el objeto que retorna f1), mientras que el segundo representa el tipo del objeto que retorna el futuro que estamos creando (en la línea 18 sería el objeto que va a retornar f2).

Aunque en el ejemplo simplemente estamos manipulando cadenas para hacer una concatenación y por tanto la entrada y la salida son ambas del mismo tipo String, hay que tener presente que podemos usar los Mappers para manipular los objetos e ir cambiándolos incluso de tipo, lo que lo hace muy conveniente para crear pipelines de manipulación de objetos.

El punto negativo del Mapper es que no es una interfaz funcional de Java, por lo que no podemos usar lambdas, quedando la sintaxis un poco engorrosa.

La salida de este ejemplo sería:

[INFO] [04/09/2017 09:06:20.868] [main] [MapFutures] *** Init
[INFO] [04/09/2017 09:06:20.925] [actorSystem-akka.actor.default-dispatcher-3] [MapFutures] action1 init
[INFO] [04/09/2017 09:06:20.927] [main] [MapFutures] *** End
[INFO] [04/09/2017 09:06:20.929] [actorSystem-akka.actor.default-dispatcher-3] [MapFutures] action1 end
[INFO] [04/09/2017 09:06:20.929] [actorSystem-akka.actor.default-dispatcher-4] [MapFutures] action2 init
[INFO] [04/09/2017 09:06:20.932] [actorSystem-akka.actor.default-dispatcher-4] [MapFutures] action2 end
[INFO] [04/09/2017 09:06:20.932] [actorSystem-akka.actor.default-dispatcher-4] [MapFutures] action3 init
[INFO] [04/09/2017 09:06:20.933] [actorSystem-akka.actor.default-dispatcher-4] [MapFutures] action3 end
[INFO] [04/09/2017 09:06:20.934] [actorSystem-akka.actor.default-dispatcher-4] [MapFutures] ---> Hola mundo del futuro!!!

Donde podemos ver como el hilo [main] es el primero que termina y mientras, en paralelo a este hilo principal, cada futuro se empieza a ejecutar sólo cuando termina el anterior, y el callback se ejecuta al final del todo mostrando como resultado la cadena que hemos ido componiendo a lo largo de los tres futuros f1, f2 y f3.

6. Usando ‘traverse’ para aplicar un Futuro a cada objeto de una colección

En este ejemplo vemos como podemos usar el método Futures.traverse para aplicar distintos futuros a cada uno de los elementos de una colección.

class TraverseFutures {

    private final LoggingAdapter log;
    private final ExecutionContext ec;

    TraverseFutures(ActorSystem system) {
        log = Logging.getLogger(system, getClass());
        ec = system.dispatcher();
    }

    void runExample() {
        log.info("*** Init");

        final List<String> listOfWords = asList("Hola", " mundo", " del futuro!!!");

        final Future<Iterable<String>> futureListOfString = Futures.traverse(
                listOfWords,
                word -> Futures.future(() -> {
                    log.info("init");
                    final String upper = word.toUpperCase();
                    log.info("end");
                    return upper;
                    }, ec),
                ec
        );

        futureListOfString.onSuccess(new PrintResult<>(log), ec);

        log.info("*** End");
    }

}

En la línea 16 podemos ver el uso del método traverse que toma como primer parámetro la colección de objetos, como segundo parámetro una lambda que crea un futuro para cada uno de los elementos de la colección (en el ejemplo para cada word), y como tercer parámetro, como siempre, el ExecutionContext.

Destacamos aquí como en este caso el resultado es un Futuro cuyo resultado será un Iterable<String>. Esto representa una colección con el resultado de haber aplicado el Futuro creado por la lambda a cada uno de los objetos de la colección inicial.

El orden de los objetos del resultado se mantiene en el mismo orden que en la colección inicial y no depende en ningún caso del orden de ejecución de los Futuros o de qué Futuro termina primero.

Así la salida de este ejemplo será:

[INFO] [04/09/2017 09:29:21.272] [main] [TraverseFutures] *** Init
[INFO] [04/09/2017 09:29:21.318] [actorSystem-akka.actor.default-dispatcher-2] [TraverseFutures] init
[INFO] [04/09/2017 09:29:21.319] [actorSystem-akka.actor.default-dispatcher-2] [TraverseFutures] end
[INFO] [04/09/2017 09:29:21.319] [actorSystem-akka.actor.default-dispatcher-2] [TraverseFutures] init
[INFO] [04/09/2017 09:29:21.319] [actorSystem-akka.actor.default-dispatcher-3] [TraverseFutures] init
[INFO] [04/09/2017 09:29:21.319] [actorSystem-akka.actor.default-dispatcher-3] [TraverseFutures] end
[INFO] [04/09/2017 09:29:21.319] [actorSystem-akka.actor.default-dispatcher-2] [TraverseFutures] end
[INFO] [04/09/2017 09:29:21.320] [main] [TraverseFutures] *** End
[INFO] [04/09/2017 09:29:21.320] [actorSystem-akka.actor.default-dispatcher-3] [TraverseFutures] ---> [HOLA,  MUNDO,  DEL FUTURO!!!]

Donde se aprecia como los futuros que se aplican a cada objeto de la colección de entrada se ejecutan en paralelo (líneas 4 y 5) y como el resultado es una colección que contiene cada una de las palabras de la entrada convertidas a mayúsculas.

7. Composición de un Futuro con una colección de otros Futuros gracias a ‘sequence’

En esta ocasión como entrada vamos a tener una colección de Futuros, y cuando todos ellos hayan terminado se usará otro Futuro para iterar sobre los resultados de cada uno de esos Futuros y así componer un nuevo resultado.

class SequenceFutures {

    private final LoggingAdapter log;
    private final ExecutionContext ec;
    private final Business business;

    SequenceFutures(ActorSystem system) {
        log = Logging.getLogger(system, getClass());
        ec = system.dispatcher();
        business = new Business(log);
    }

    void runExample() {
        log.info("*** Init");

        final Future<String> f1 = Futures.future(business::action1, ec);
        final Future<String> f2 = Futures.future(business::action2, ec);
        final Future<String> f3 = Futures.future(business::action3, ec);

        final Future<Iterable<String>> futureListOfString = Futures.sequence(asList(f1, f2, f3), ec);

        final Future<String> futureComposedString = futureListOfString.map(new Mapper<Iterable<String>, String>() {
            @Override
            public String apply(Iterable<String> futuresResults) {
                final StringBuilder builder = new StringBuilder();
                for (String s : futuresResults) {
                    builder.append(s);
                }
                return builder.toString();
            }
        }, ec);

        futureComposedString.onSuccess(new PrintResult<>(log), ec);

        log.info("*** End");
    }

}

En las líneas de la 16 a la 18 vemos como creamos tres futuros diferentes. En ese mismo momento estos tres futuros ya se empiezan a ejecutar en paralelo.

En la línea 20 usamos el método Futures.sequence para crear un nuevo futuro cuyo resultado es un Futuro de Iterable<String>, es decir, un Futuro cuyo resultado es una colección con los resultados de los futuros f1, f2 y f3, en el mismo orden en el que se han añadido al método Futures.sequence.

En la línea 22 usamos el método map para procesar el resultado del Futuro de la línea 20. Este método map es el que ya hemos visto antes. Así que igual que hacíamos antes, definimos un Mapper para procesar la entrada (un Iterable<String>) y producir un resultado (un String con la concatenación de todas las cadenas de la entrada).

Puede parecer un poco complicado pero para mí es uno de los ejemplos más útiles, ya que vemos como podemos lanzar varios Futuros que se ejecutan en paralelo y luego gracias a sequence podemos «sincronizar» estas ejecuciones en el sentido de que se llamará al Mapper una vez todos estos Futuros hayan terminado sus ejecuciones, y así poder operar sobre sus respectivos resultados.

Además en el ejemplo estamos trabajando con String pero ¿qué pasaría si trabajamos con Object? Pues que esto nos permitiría que los resultados de los Futuros fueran de cualquier tipo y al recuperarlos en el Mapper bastaría con hacerles un casting para convertirlos a su tipo específico. Esto sería posible porque, igual que con traverse, tenemos garantía de que se mantiene el orden de los resultados con respecto al orden en el que hemos añadido los futuros en el método sequence.

La salida de este ejemplo sería:

[INFO] [04/09/2017 10:15:35.184] [main] [SequenceFutures] *** Init
[INFO] [04/09/2017 10:15:35.229] [actorSystem-akka.actor.default-dispatcher-2] [SequenceFutures] action1 init
[INFO] [04/09/2017 10:15:35.229] [actorSystem-akka.actor.default-dispatcher-3] [SequenceFutures] action2 init
[INFO] [04/09/2017 10:15:35.229] [actorSystem-akka.actor.default-dispatcher-4] [SequenceFutures] action3 init
[INFO] [04/09/2017 10:15:35.231] [actorSystem-akka.actor.default-dispatcher-4] [SequenceFutures] action3 end
[INFO] [04/09/2017 10:15:35.232] [actorSystem-akka.actor.default-dispatcher-3] [SequenceFutures] action2 end
[INFO] [04/09/2017 10:15:35.232] [actorSystem-akka.actor.default-dispatcher-2] [SequenceFutures] action1 end
[INFO] [04/09/2017 10:15:35.235] [main] [SequenceFutures] *** End
[INFO] [04/09/2017 10:15:35.236] [actorSystem-akka.actor.default-dispatcher-2] [SequenceFutures] ---> Hola mundo del futuro!!!

Donde vemos como los tres Futuros se están ejecutando en paralelo.

8. Cómo aplicar una operación concreta a los resultados de una secuencia de Futuros con ‘fold’

En este ejemplo vamos a crear, igual que antes, un conjunto de futuros que se ejecutarán en paralelo. Luego gracias al método Futures.fold crearemos un nuevo futuro donde definiremos una operación que se va a realizar sobre un elemento inicial y luego sobre cada uno de los resultados de los Futuros anteriores.

class FoldFutures {

    private final LoggingAdapter log;
    private final ExecutionContext ec;
    private final Business business;

    FoldFutures(ActorSystem system) {
        log = Logging.getLogger(system, getClass());
        ec = system.dispatcher();
        business = new Business(log);
    }

    void runExample() {
        log.info("*** Init");

        final Future<String> f1 = Futures.future(business::action1, ec);
        final Future<String> f2 = Futures.future(business::action2, ec);
        final Future<String> f3 = Futures.future(business::action3, ec);

        final Future<String> futureString = Futures.fold(
                "",
                asList(f1, f2, f3),
                (previousResult, param) -> previousResult + param,
                ec
        );

        futureString.onSuccess(new PrintResult<>(log), ec);

        log.info("*** End");
    }

}

Vemos como en la línea 20 creamos el Futuro con el método Futures.fold donde el primer parámetro representa el elemento inicial (en el ejemplo la cadena vacía), el segundo parámetro es la colección de Futuros, el tercer parámetro es la lambda con la operación que se va ejecutar sobre cada elemento (en el ejemplo la concatenación del resultado anterior con el resultado de uno de los Futuros), y como siempre el último parámetro el ExecutionContext.

Destacamos como en la lambda tenemos a nuestra disposición dos parámetros, el primero previousResult es el resultado de haber aplicado la lambda anteriormente, y param es el resultado del Futuro que corresponda en cada momento. De esta forma vamos componiendo el resultado final a base de aplicar la lambda al elemento inicial y luego a cada resultado de forma consecutiva.

Si nos damos cuenta este método es prácticamente el mismo que Futures.sequence. La diferencia es que en sequence nosotros teníamos que iterar sobre la colección con los resultados, dándonos esto mucho control sobre lo que estamos haciendo y a la vez «afeando» un poco el código; mientras que con fold nosotros no tenemos control sobre cómo se itera sobre los resultados de los Futuros, y la lambda será llamada, como siempre, en el orden en el que hemos añadido los Futuros en el método fold.

La salida de este ejemplo sería:

[INFO] [04/09/2017 10:38:05.044] [main] [FoldFutures] *** Init
[INFO] [04/09/2017 10:38:05.101] [actorSystem-akka.actor.default-dispatcher-2] [FoldFutures] action1 init
[INFO] [04/09/2017 10:38:05.102] [actorSystem-akka.actor.default-dispatcher-3] [FoldFutures] action2 init
[INFO] [04/09/2017 10:38:05.103] [actorSystem-akka.actor.default-dispatcher-4] [FoldFutures] action3 init
[INFO] [04/09/2017 10:38:05.104] [actorSystem-akka.actor.default-dispatcher-4] [FoldFutures] action3 end
[INFO] [04/09/2017 10:38:05.105] [actorSystem-akka.actor.default-dispatcher-2] [FoldFutures] action1 end
[INFO] [04/09/2017 10:38:05.105] [actorSystem-akka.actor.default-dispatcher-3] [FoldFutures] action2 end
[INFO] [04/09/2017 10:38:05.113] [main] [FoldFutures] *** End
[INFO] [04/09/2017 10:38:05.113] [actorSystem-akka.actor.default-dispatcher-5] [FoldFutures] ---> Hola mundo del futuro!!!

Donde, igual que en el ejemplo anterior, vemos como todos los Futuros se ejecutan en paralelo.

9. ‘reduce’ es un ‘fold’ sin elemento inicial

Como dice el propio título, este ejemplo es exactamente igual que el anterior salvo que con el método Futures.reduce no tenemos elemento inicial (en el ejemplo anterior era la cadena vacía). De tal forma que la lambda se ejecuta directamente sobre el resultado del primer futuro, y luego en secuencia exactamente igual que con fold.

class ReduceFutures {

    private final LoggingAdapter log;
    private final ExecutionContext ec;
    private final Business business;

    ReduceFutures(ActorSystem system) {
        log = Logging.getLogger(system, getClass());
        ec = system.dispatcher();
        business = new Business(log);
    }

    void runExample() {
        log.info("*** Init");

        final Future<String> f1 = Futures.future(business::action1, ec);
        final Future<String> f2 = Futures.future(business::action2, ec);
        final Future<String> f3 = Futures.future(business::action3, ec);

        final Future<String> futureString = Futures.reduce(
                asList(f1, f2, f3),
                (previousResult, param) -> previousResult + param,
                ec
        );

        futureString.onSuccess(new PrintResult<>(log), ec);

        log.info("*** End");
    }

}

La salida sería:

[INFO] [04/09/2017 10:41:52.703] [main] [ReduceFutures] *** Init
[INFO] [04/09/2017 10:41:52.747] [actorSystem-akka.actor.default-dispatcher-2] [ReduceFutures] action1 init
[INFO] [04/09/2017 10:41:52.747] [actorSystem-akka.actor.default-dispatcher-4] [ReduceFutures] action2 init
[INFO] [04/09/2017 10:41:52.748] [actorSystem-akka.actor.default-dispatcher-3] [ReduceFutures] action3 init
[INFO] [04/09/2017 10:41:52.749] [actorSystem-akka.actor.default-dispatcher-3] [ReduceFutures] action3 end
[INFO] [04/09/2017 10:41:52.750] [actorSystem-akka.actor.default-dispatcher-4] [ReduceFutures] action2 end
[INFO] [04/09/2017 10:41:52.750] [actorSystem-akka.actor.default-dispatcher-2] [ReduceFutures] action1 end
[INFO] [04/09/2017 10:41:52.757] [main] [ReduceFutures] *** End
[INFO] [04/09/2017 10:41:52.757] [actorSystem-akka.actor.default-dispatcher-5] [ReduceFutures] ---> Hola mundo del futuro!!!

Donde vemos, igual que antes, como los Futuros se ejecutan en paralelo.

10. Conclusiones

Sé que el tema de la concurrencia, el multi-hilo, y en este caso concreto los Futuros, no es sencillo; por eso es importante que hagamos pequeños programas de ejemplo, tests, dejemos trazas e incluso hagamos sesiones de depuración, todo en un entorno de laboratorio controlado, para así coger soltura con las distintas construcciones y mecanismos que, en este caso Akka, pone a nuestro alcance. Sólo así conseguiremos sacarle el máximo provecho y podremos aprovechar toda esa potencia en entornos de producción.

Una vez cojáis esa soltura con los distintos métodos para la composición de Futuros veréis que se pueden llegar a hacer cosas realmente interesantes.

Y ya sabéis: Prohibidos Monos & Lagartos, que seamos nosotros los que controlamos a la «máquina» y no al contrario.

11. Sobre el autor

Alejandro Pérez García (@alejandropgarci)
Ingeniero en Informática (especialidad de Ingeniería del Software) y Certified ScrumMaster

Socio fundador de Autentia Real Business Solutions S.L. – «Soporte a Desarrollo»

Socio fundador de ThE Audience Megaphone System, S.L. – TEAMS – «Todo el potencial de tus grupos de influencia a tu alcance»

Alejandro Pérez García
Alejandro es socio fundador de Autentia y nuestro experto en Java EE, Linux y optimización de aplicaciones empresariales. Ingeniero en Informática y Certified ScrumMaster. Seguir @alejandropgarci Si te gusta lo que ves, puedes contratarle para darte ayuda con soporte experto, impartir cursos presenciales en tu empresa o para que realicemos tus proyectos como factoría (Madrid). Puedes encontrarme en Autentia: Ofrecemos servicios de soporte a desarrollo, factoría y formación.

DEJA UNA RESPUESTA

Por favor ingrese su comentario!

He leído y acepto la política de privacidad

Por favor ingrese su nombre aquí

Información básica acerca de la protección de datos

  • Responsable:
  • Finalidad:
  • Legitimación:
  • Destinatarios:
  • Derechos:
  • Más información: Puedes ampliar información acerca de la protección de datos en el siguiente enlace:política de privacidad