Concurrencia con Async

En esta sección, aplicaremos async a algunos de los mismos desafíos de concurrencia que abordamos con threads en el capítulo 16. Hemos explorado muchas de las ideas claves en esa sección, aquí se profundizara en las diferencias entre threads and futures.

En muchos casos, las APIs para trabajar con concurrencia usando async son muy similares a las que se usan con threads. En otros casos, terminan teniendo formas bastante diferentes. Incluso cuando las APIs parecen similares entre threads y async, a menudo presentan comportamientos distintos y casi siempre tienen diferentes características de rendimiento.

Conteo

La primera tarea que abordamos en el capítulo 16 fue contar en dos threads separados. Hagamos lo mismo usando async. El crate trpl proporciona una función spawn_task, que se comporta de forma muy similar a la API de thread::spawn, y una función sleep, que es una versión async de thread::sleep. Podemos usarlas juntas para implementar el mismo ejemplo de conteo que con threads, como se muestra en el Listado 17-6.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

Como punto de partida, configuramos nuestra función main con trpl::run, de de modo que nuestra función de nivel superior pueda ser async.

Nota: A partir de este punto en el capítulo, cada ejemplo incluirá este mismo código de envoltura con trpl::run en main, así que a menudo lo omitiremos, igual que hacemos con main. ¡No olvides incluirlo en tu código!

Luego, dentro de ese bloque, escribimos dos bucles, cada uno con una llamada a trpl::sleep, que espera medio segundo (500 milisegundos) antes de enviar el siguiente mensaje. Uno de los bucles va dentro de trpl::spawn_task, mientras que el otro se ejecuta en un bucle for de nivel superior. También añadimos un await después de cada llamada a sleep.

El resultado es similar a la versión basada en threads, incluyendo el detalle de que los mensajes podrían aparecer en un orden distinto cada vez que lo ejecutes en tu terminal.

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

Esta versión se detiene tan pronto como el bucle for dentro del bloque async principal termina, porque la tarea creada con spawn_task se cierra cuando finaliza la función main. Si quieres que el programa siga ejecutándose hasta que la tarea termine por completo, necesitas usar un join handle para esperar a que la primera tarea finalice. Con threads, utilizamos el método join para “bloquear”, la ejecución hasta que el hilo terminara. En el Listado 17-7, podemos hacer lo mismo con await, ya que el handle de la tarea en sí es un future. Su tipo de Output es un Result, por lo que también usamos unwrap después de esperarlo con await.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}

Esta versión actualizada se ejecuta hasta que ambos bucles terminan.

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Hasta ahora, parece que async y threads nos dan los mismos resultados básicos, solo que con una sintaxis diferente: usamos await en lugar de llamar a join en el join handle, y también esperamos las llamadas a sleep.

La mayor diferencia aquí es que no necesitamos crear otro hilo del sistema operativo para lograrlo. De hecho, ni siquiera es necesario generar una tarea separada. Pues los bloques async se compilan en futures anónimos, podemos colocar cada bucle dentro de un bloque async y dejar que el runtime los ejecute hasta su finalización usando la función trpl::join.

En el capítulo 16, mostramos cómo usar el método join en el tipo JoinHandle que se obtiene al llamar std::thread::spawn. La función trpl::join es similar, pero para futures. Cuando le pasas dos futures, genera un nuevo future cuyo resultado es una tupla con los valores de salida de los futures originales, pero solo cuando ambos han finalizado. Es decir, en Listado 17-8, usamos trpl::join para esperar a que tanto fut1 como fut2 finalicen. En lugar de hacer await sobre fut1 y fut2 por separado, esperamos el nuevo futuro producido por trpl::join. Ignoramos su salida, debido a que solo contiene una tupla con dos valores unitarios.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}

Cuando ejecutamos esto, vemos que ambos futuros se ejecutan hasta completarse:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Aquí verás exactamente el mismo orden en cada ejecución, lo cual es muy diferente de lo que ocurría con threads. Esto se debe a que la función trpl::join es justa, lo que significa que revisa cada future con la misma frecuencia, alternando entre ellos y evitando que uno avance más rápido que el otro si ambos están listos. Con threads, el sistema operativo decide qué hilo revisar y cuánto tiempo permitirle ejecutarse. Con async Rust, es el runtime el que decide que tarea revisar. (En la práctica, esto se vuelve más complejo porque un runtime async puede usar threads del sistema operativo en segundo plano para gestionar la concurrencia, lo que hace que garantizar la equidad requiera más trabajo —¡pero sigue siendo posible!). Los runtimes no están obligados a garantizar equidad en todas las operaciones, y muchas veces ofrecen diferentes APIs para que elijas si quieres equidad o no.

Prueba algunas variaciones en la forma de esperar los futures y observa qué sucede:

  • Elimina el bloque async alrededor de uno o ambos bucles.
  • Espera (await) cada bloque async inmediatamente después de definirlo.
  • Envuelve solo el primer bucle en un bloque async y espera el future resultante después del cuerpo del segundo bucle.

Para un desafío extra, intenta predecir la salida en cada caso antes de ejecutar el código.

Paso de Mensajes

Compartir datos entre futures también te resultará familiar: volveremos a usar el paso de mensajes, pero esta vez con versiones async de los tipos y funciones. Seguiremos un enfoque ligeramente diferente al del capítulo 16 para destacar algunas diferencias clave entre la concurrencia basada en threads y la basada en futures. En el Listado 17-9, comenzaremos con un solo bloque async, sin generar una tarea separada como lo hicimos al crear un thread independiente.

extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("Got: {received}");
    });
}

Aquí usamos trpl::channel, una version de async de la API multiple-producer y single-consumer que usamos con threads en el capítulo 16. La version async de esta API es solo un poco diferente de la versión basada en threads: en lugar de un receptor inmutable, usa un receptor rx mutable, y su método recv produce un future debemos esperar con await en lugar de devolver el valor directamente. Ahora podemos enviar mensajes desde el sender al receiver. Fíjate en que no necesitamos crear un thread separado ni siquiera una tarea; simplemente esperamos la llamada rx.recv.

El método síncrono Receiver::recv en std::mpsc::channel bloquea la ejecución hasta recibir un mensaje. En cambio, el método trpl::Receiver::recv no, porque es async. En lugar de bloquear, devuelve el control al runtime hasta que se recibe un mensaje o se cierra el lado del envío del canal. Por otro lado, no esperamos en la llamada a send, porque esta no bloquea. No es necesario, ya que el canal al que estamos enviando los mensajes es ilimitado.

Nota: Todo este código async se ejecuta dentro de un bloque async dentro de una llamada a trpl::run, lo que permite evitar bloqueos dentro de él. Sin embargo, el código fuera de este bloque sí se bloqueará hasta que run termine. Esa es precisamente la función de trpl::run: te permite eligir en qué parte del código async quieres bloquear la ejecución, definiendo así la transición entre código síncrono y asíncrono. En la mayoría de runtimes async, la función run suele llamarse block_on por esta misma razón.

Hay dos cosas a notar en este ejemplo: Primero, ¡El mensaje llegará de inmediato! Segundo, aunque estamos usando un future, todavía no hay concurrencia. Todo sucede en secuencia, igual que si no hubiera futures involucrados.

Para abordar esto, enviaremos una serie de mensajes con pausas entre ellos, como se muestra en el Listado 17-10:

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}

Además de enviar los mensajes, también necesitamos recibirlos. En este caso, podríamos hacerlo manualmente llamando a rx.recv().await cuatro veces, ya que sabemos cuántos mensajes llegarán. Sin embargo, en la práctica, normalmente estaremos esperando una cantidad desconocida de mensajes, por lo que necesitamos seguir esperando hasta asegurarnos de que no quedan más.

En el Listado 16-10, usamos un bucle for para procesar todos los elementos recibidos de un canal síncrono. Sin embargo, en Rust aún no tiene una forma de escribir un bucle for sobre una serie de elementos asíncronos. En su lugar, debemos usar un tipo de bucle que aún no hemos visto: el bucle condicional while let. Este while let es la versión en bucle de la construcción if let que vimos en el capítulo 6. El bucle continuará ejecutándose mientras el patrón que especifica coincida con el valor recibido.

La llamada rx.recv produce un Future, que debemos esperar con await. El runtime pausará la ejecución del Future hasta que esté listo. Cuando llegue un mensaje, el future se resolverá en Some(message), tantas veces como lleguen mensajes. Cuando el canal se cierre, sin importar si llegaron mensajes o no, el future se resolverá en en None, lo que indica que no hay más valores y debemos dejar de esperar (es decir, dejar de hacer await).

El bucle while let combina todo esto. Si el resultado de rx.recv().await es Some(message), obtenemos acceso al mensaje y podemos usarlo dentro del cuerpo del bucle, igual que con if let. Si el resultado es None, el bucle termina. Cada vez que el bucle se completa, vuelve a alcanzar un punto de espera (await), por lo que el runtime lo pausa nuevamente hasta que llegue otro mensaje.

Con esto, el código ahora envía y recibe todos los mensajes correctamente. Sin embargo, todavía hay un par de problemas. Por un lado, los mensajes no llegan en intervalos de medio segundo. En su lugar, todos llegan de golpe, dos segundos (2,000 milisegundos) después de que el programa inicia. Además, el programa nunca finaliza: en lugar de cerrarse cuando termina la recepción de mensajes, sigue esperando indefinidamente. Tendrás que cerrarlo manualmente con ctrl-c.

Comencemos por entender por qué los mensajes llegan todos juntos después del retraso total en lugar de llegar con pausas entre ellos. Dentro de un bloque async, el orden en el que aparecen las palabras clave await en el código es el mismo en el que ocurren cuando el programa se ejecuta.

En el Listado 17-10, solo hay un bloque async, por lo que todo se ejecuta de manera lineal. Todavía no hay concurrencia. Primero se ejecutan todas las llamadas a tx.send, intercaladas con las llamadas a trpl::sleep y sus correspondientes awaits. Solo después de eso, el bucle while let puede comenzar a procesar los await en las llamadas a recv.

Para obtener el comportamiento deseado, donde hay un retraso entre la recepción de cada mensaje, necesitamos colocar las operaciones de tx y rx en bloques async separados. Así, el runtime puede ejecutarlas de forma independiente usando trpl::join, igual que en el ejemplo de conteo. Una vez más, esperamos el resultado de trpl::join, no los futures individuales. Si esperáramos los futures uno tras otro, volveríamos a un flujo secuencial —exactamente lo que queremos evitar.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Con el código actualizado en el Listado 17-11, los mensajes ahora se imprimen a intervalos de 500 milisegundos en lugar de llegar todos de golpe después de dos segundos.

Sin embargo, el programa aún no finaliza debido a la forma en que el bucle while let interactúa con trpl::join:

  • El future devuelto por trpl::join solo se completa cuando ambos futures que recibe han terminado.
  • El future de tx se completa cuando termina el último sleep después de enviar el último mensaje en vals.
  • El future de rx no se completará hasta que el bucle while let termine.
  • El bucle while let no terminará hasta que esperar rx.recv devuelva None.
  • rx.recv().await solo devolverá None cuando el otro extremo del canal se cierre.
  • El canal solo se cerrará si llamamos a rx.close o cuando se elimine (drop) el lado del envío tx.
  • No llamamos a rx.close en ninguna parte, y tx no se eliminará hasta que finalice el bloque async externo pasado a trpl::run.
  • El bloque no puede terminar porque está esperando que trpl::join se complete, lo que nos devuelve al inicio de esta lista.

Podríamos cerrar manualmente rx llamando a rx.close en algún punto, pero eso no tendría mucho sentido. Detenernos después de manejar un número arbitrario de mensajes haría que el programa se cerrara, pero podríamos perder mensajes. Necesitamos otra forma de asegurarnos de que tx se elimine (drop) antes del final de la función.

En este momento, el bloque async donde enviamos los mensajes solo toma prestado tx porque enviar un mensaje no requiere propiedad, pero si pudiéramos mover tx dentro de ese bloque async, se eliminaría cuando el bloque terminara. En el capítulo 13, aprendimos a usar la palabra clave move con closures, y en el capítulo 16, vimos que a menudo necesitamos mover datos dentro de closures cuando trabajamos con hilos. La misma lógica se aplica a los bloques async, por lo que move funciona con ellos de la misma manera que con las closures.

En el Listado 17-12, cambiamos el bloque async que envía los mensajes de un simple bloque async a un bloque async move. Cuando ejecutamos esta versión del código, el programa se cierra correctamente después de que se envían y reciben todos los mensajes.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                eprintln!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Este canal asíncrono también admite multiple-producer, por lo que podemos llamar a clone en tx si queremos enviar mensajes desde varios futures. En el Listado 17-13, clonamos tx, creando tx1 fuera del primer bloque async. Luego movemos tx1 dentro de ese bloque, tal como hicimos antes con tx. Más adelante, movemos el tx original a un nuevo bloque async, donde enviamos más mensajes con un pequeño retraso adicional. Colocamos este nuevo bloque async después del bloque de recepción de mensajes, pero podría ir antes sin problema. Lo importante no es el orden en que los futures se crean, sino el orden en que los esperamos (await).

Ambos bloques async para enviar mensajes deben ser async move, de modo que tanto tx y tx1 se eliminen (drop) cuando esos bloques terminen. De lo contrario, volveríamos al mismo bucle infinito del principio. Finalmente, cambiamos de trpl::join a trpl::join3 para manejar el future adicional.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}

Ahora vemos todos los mensajes de ambos futures de envío. Como cada uno usa un retraso ligeramente diferente después de enviar, los mensajes también se reciben en esos intervalos distintos.

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

Este es un buen comienzo, pero nos limita a solo unos pocos futures: dos con join o tres con join3 . Veamos cómo podemos manejar una cantidad mayor de futures.