Trabajando con cualquier número de futures
En la sección anterior, cuando pasamos de usar dos futures a tres, tuvimos que cambiar
join
por join3
. Tener que usar una función diferente cada vez que cambiamos la
cantidad de futures que queremos combinar sería poco práctico. Afortunadamente, existe
una versión en macro de join
que nos permite pasar un número arbitrario de argumentos
y, además, se encarga de esperar (await) cada future automáticamente.
Así, podríamos reescribir el código del Listado 17-13 para usar join!
en lugar de
join3
, como se muestra en Listado 17-14:
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join!(tx1_fut, tx_fut, rx_fut); }); }
Esto sin duda es una gran mejora en comparación con tener que alternar entre join
,
join3
y join4
y así sucesivamente! Sin embargo, incluso esta versión con macro solo
funciona cuando conocemos de antemano el número de futures que queremos combinar.
En Rust del mundo real, es muy común agregar futures a una colección y luego esperar
a que algunos o todos ellos se completen.
Para manejar todos los futures dentro de una colección, necesitamos iterar sobre ellos y
unirlos (join). La función trpl::join_all
acepta cualquier tipo que implemente el rasgo
(trait) Iterator
, que aprendimos en el Capítulo 13, por lo que parece la solución
ideal. Probemos colocando nuestros futures en un vector y reemplazando
join!
con join_all
.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures = vec![tx1_fut, rx_fut, tx_fut];
trpl::join_all(futures).await;
});
}
Desafortunadamente, esto no compila. En su lugar, obtenemos este error:
error[E0308]: mismatched types
--> src/main.rs:45:37
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
45 | let futures = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a different `async` block
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
Esto puede resultar sorprendente. Después de todo, ninguno de los futures devuelve un
valor, por lo que cada bloque produce un Future<Output = ()>
. Sin embargo, Future
es un trait, no
tipo concreto. Los tipos concretos (concrete types) son las estructuras de datos individuales que
el compilador genera para los bloques async. No se pueden colocar dos estructuras diferentes escritas
a mano dentro de un Vec
, y lo mismo ocurre con las diferentes
estructuras generadas por el compilador.
Para solucionar esto, necesitamos usar trait objects, al igual que hicimos en la sección “Refactorizando
para mejorar la modularidad y el manejo de errores” en el Capítulo 12. (Cubriremos los trait objects
en detalle en el Capítulo 18.) Usar objetos de trait nos permite tratar cada
uno de los futures anónimos producidos por estos tipos como si fueran del mismo tipo,
ya que todos implementan el trait Future
.
Nota: En el capitulo 8, discutimos otra forma de incluir múltiples tipos en un
Vec
: usar un enum para representar cada uno de los diferentes tipos que pueden aparecer en el vector. Sin embargo, en este caso no podemos hacer eso. Pues, no tenemos forma de nombrar los diferentes tipos, ya que son anónimos. Además, la razón por la que recurrimos a un vector yjoin_all
en primer lugar es porque queremos trabajar con una colección dinámica de futures, donde no sabemos cuáles serán hasta el tiempo de ejecución.
Empezaremos envolviendo cada uno de los futuros de vec!
en una Box::new
, como se
muestra en el Listado 17-16.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
Desafortunadamente, esto aún no compila. De hecho, seguimos teniendo el mismo error
básico que antes, pero ahora aparece tanto en la segunda como en la tercera llamada a Box::new
.
Además, también vemos nuevos errores relacionados con el trait Unpin
. Volveremos a los errores
de Unpin
en un momento. Primero, solucionemos los errores de tipo en las llamadas a
Box::new
, anotando explícitamente el tipo de la variable futures
:
extern crate trpl; // required for mdbook test
use std::{future::Future, time::Duration};
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
El tipo que tuvimos que escribir aquí es un poco complejo, así que desglosémoslo paso a paso:
- El tipo más interno es el futuro en sí. Indicamos explícitamente que su valor de salida
es el tipo unitario
()
escribiendoFuture<Output = ()>
. - Luego, usamos
dyn
para marcar el trait como dinámico. - Toda la referencia al trait se envuelve dentro de un
Box
. - Finalmente, especificamos explícitamente que
futures
es unVec
contiene estos elementos.
Esto ya supone una gran diferencia. Ahora, al ejecutar el compilador, solo vemos los
errores relacionados con Unpin
. Aunque hay tres de ellos, todos son bastante
similares en su contenido.
error[E0308]: mismatched types
--> src/main.rs:46:46
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
| -------- ^^^^^^ expected `async` block, found a different `async` block
| |
| arguments to this function are incorrect
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
note: associated function defined here
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
255 | pub fn new(x: T) -> Self {
| ^^^
error[E0308]: mismatched types
--> src/main.rs:46:64
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
30 | let tx_fut = async move {
| ---------- the found `async` block
...
46 | vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
| -------- ^^^^^^ expected `async` block, found a different `async` block
| |
| arguments to this function are incorrect
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:30:22: 30:32}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
note: associated function defined here
--> file:///home/.rustup/toolchains/1.82/lib/rustlib/src/rust/library/alloc/src/boxed.rs:255:12
|
255 | pub fn new(x: T) -> Self {
| ^^^
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:24
|
48 | trpl::join_all(futures).await;
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `join_all`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:9
|
48 | trpl::join_all(futures).await;
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `{async block@src/main.rs:10:23: 10:33}` cannot be unpinned
--> src/main.rs:48:33
|
48 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `{async block@src/main.rs:10:23: 10:33}`, which is required by `Box<{async block@src/main.rs:10:23: 10:33}>: Future`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<{async block@src/main.rs:10:23: 10:33}>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
Eso es mucho para asimilar, así que desglosémoslo. La primera parte del mensaje nos
indica que el primer bloque async (src/main.rs:8:23: 20:10
) no implementa
el trait Unpin
, y sugiere usar pin!
o Box::pin
para solucionarlo.
Más adelante en el capítulo, profundizaremos en más detalles sobre Pin
y
Unpin
. Por ahora, sin embargo, podemos simplemente seguir el consejo del compilador para salir
del apuro! En el Listado 17-18, comenzamos actualizando la anotación de tipo para
futures
, agregamos un Pin
que envuelva cada Box
. Luego, utilizamos Box::pin
para fijar (pin)
los futures en sí mismos.
extern crate trpl; // required for mdbook test use std::{ future::Future, pin::{pin, Pin}, time::Duration, }; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); let tx_fut = pin!(async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)]; trpl::join_all(futures).await; }); }
Si compilamos y ejecutamos esto, finalmente obtenemos la salida que esperábamos:
received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'
¡Uf!
Todavía hay más que podemos explorar aquí. Por un lado, usar Pin<Box<T>>
introduce un pequeño costo adicional debido a la asignación en el heap con
Box
—y en realidad solo lo estamos haciendo para alinear los tipos. No necesitamos realmente
la asignación en el heap allocation, después de todo: estos futures son locales a esta función
en particular. Como mencionamos antes, Pin
en sí mismo es un tipo contenedor, por lo que podemos
obtener el beneficio de tener un solo tipo en el Vec
—la razón original por la que usamos
Box
—sin necesidad de una asignación en el heap allocation. En su lugar, podemos usar Pin
directamente con cada future, utilizando el macro std::pin::pin
.
Sin embargo, aún debemos ser explícitos sobre el tipo de la referencia fijada (pinned reference);
de lo contrario, Rust no sabrá interpretarlos como objetos de trait dinámicos,
que es lo que necesitamos en el Vec
. Por lo tanto, usamos pin!
en cada future
al definirlo y definimos futures
como un Vec
que contiene referencias mutables fijadas
a Future
, como se muestra en el Listado 17-19.
extern crate trpl; // required for mdbook test use std::{ future::Future, pin::{pin, Pin}, time::Duration, }; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { // --snip-- let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { // --snip-- while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); let tx_fut = pin!(async move { // --snip-- let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let futures: Vec<Pin<&mut dyn Future<Output = ()>>> = vec![tx1_fut, rx_fut, tx_fut]; trpl::join_all(futures).await; }); }
Hemos llegado hasta aquí ignorando el hecho de que podríamos tener diferentes tipos de Output
.
Por ejemplo, en el Listado 17-20, el future anónimo para a
implementa
Future<Output = u32>
, el de b
implementa Future<Output = &str>
, y el de c
implementa Future<Output = bool>
.
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let a = async { 1u32 }; let b = async { "Hello!" }; let c = async { true }; let (a_result, b_result, c_result) = trpl::join!(a, b, c); println!("{a_result}, {b_result}, {c_result}"); }); }
Podemos usar trpl::join!
para esperarlos (await), ya que permite pasar múltiples futures
de diferentes tipos y produce una tupla con esos tipos. Lo que no podemos hacer es usar
trpl::join_all
, porque requiere que todos los futures tengan el mismo tipo. Recordemos
que ese error fue el que nos llevó a esta aventura con Pin
!
Este es un compromiso fundamental: podemos manejar un número dinámico de futures
con join_all
, siempre que todos tengan el mismo tipo, o podemos manejar un número
fijo de futures con las funciones join
o el macro join!
,
incluso si tienen tipos diferentes. Sin embargo, esto es lo mismo que ocurre con cualquier
otro tipo en Rust. Los futures no son especiales en este sentido, aunque tengamos una
sintaxis conveniente para trabajar con ellos, ¡y eso es algo bueno!
Carrera de futuros
Cuando usamos join
y sus variantes, esperamos que todas las futures terminen
antes de continuar. Sin embargo, a veces solo necesitamos que alguna de ellas
termine antes de seguir adelante—algo así como hacer
que compitan entre sí.
En el Listado 17-21, usamos trpl::race
para hacer competir dos futuros, slow
y
fast
. Cada una imprime un mensaje al iniciar, espera un tiempo determinado
con sleep
, e imprime otro mensaje al terminar. Luego, pasamos ambas a trpl::race
y esperamos a que una de ellas termine.
(El resultado no será muy sorprendente: fast
gana)
A diferencia de cuando usamos race
en Futures y la sintaxis async, aquí
simplemente ignoramos la instancia de Either
que devuelve, porque todo lo interesante
sucede dentro de los bloques async.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let slow = async { println!("'slow' started."); trpl::sleep(Duration::from_millis(100)).await; println!("'slow' finished."); }; let fast = async { println!("'fast' started."); trpl::sleep(Duration::from_millis(50)).await; println!("'fast' finished."); }; trpl::race(slow, fast).await; }); }
Observa que si inviertes el orden de los argumentos en race
, el orden de los mensajes
“started” cambia, aunque el futuro fast
siempre se completa
primero. Esto se debe que la implementación de la función race
en particular
no es justa. Siempre ejecuta los futuros en el orden en que se pasan como
argumentos. Otras implementaciones sí son justas y eligen aleatoriamente qué
futuro evaluar primero. De todas maneras, independientemente de si la implementación de
race
que estamos usando es justa o no, uno de los futuros se ejecutará hasta el primer
await
antes de que otra tarea pueda comenzar.
Repasemos sobre Futures y la sintaxis async donde en cada await
,
Rust le da la oportunidad al runtime de pausar la tarea y cambiar a otra si el futuro
que se está esperando aún no está listo. Lo contrario también es cierto: Rust solo pausa
los bloques asíncronos y devuelve el control al runtime en un punto de await. Todo lo que
ocurre entre puntos de await es síncrono.
Esto significa que si realizas una gran cantidad de trabajo dentro de un bloque asíncrono sin un await, ese futuro bloqueará el progreso de otros futuros. A veces, esto se conoce como un futuro dejando sin recursos a otros futuros. En algunos casos, esto puede no ser un gran problema. Sin embargo, si estás realizando una configuración costosa o una tarea de larga duración, o si tienes un futuro que seguirá ejecutando una tarea indefinidamente, necesitarás pensar en cuándo y dónde devolver el control al runtime.
Del mismo modo, si tienes operaciones bloqueantes de larga duración, async puede ser una herramienta útil para permitir que diferentes partes del programa se relacionen entre sí.
Pero ¿cómo podrías devolver el control al runtime en esos casos?
Cediendo el Control
Simulemos una operación de larga duración. En el Listado 17-22 se introduce la función slow
.
Esta usa std::thread::sleep
en lugar de trpl::sleep
, por lo que al llamar a
slow
se bloqueará el hilo actual durante un cierto número de milisegundos. Podemos
usar slow
como sustituto de operaciones reales que son tanto de larga duración
como bloqueante.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { // We will call `slow` here later }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
En el Listado 17-23, usamos slow
para emular ese tipo de trabajo con la CPU limitada trabajando en
un par de futuros. Para empezar, cada futuro en este código solo devuelve el control del runtime
después de terminar un motón de operaciones lentas.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); slow("a", 10); slow("a", 20); trpl::sleep(Duration::from_millis(50)).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); slow("b", 10); slow("b", 15); slow("b", 350); trpl::sleep(Duration::from_millis(50)).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Si ejecutas esto, verás esta salida:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
Al igual que con el ejemplo anterior, race
finaliza tan pronto como a
termina.
Sin embargo, no hay intercalación entre los dos futuros. El futuro a
realiza todo su
trabajo hasta que se espera la llamada a trpl::sleep
, es entonces donde el futuro b
realiza todo su
trabajo hasta que se espera su propia llamada a trpl::sleep
, y solo después el futuro a
termina. Para permitir que ambos futuros avancen entre sus tareas lentas, necesitamos
puntos de espera (con await) para poder devolver el control al runtime. ¡Eso
significa que necesitamos algo que podamos esperar!
Ya podemos ver este tipo de transferencia de control en el Listado 17-23: si elimináramos
la llamada a trpl::sleep
al final del futuro a
, este se completaría sin que el futuro
b
se ejecute en absoluto. Tal vez podríamos usar la función sleep
como punto de partida?
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let one_ms = Duration::from_millis(1); let a = async { println!("'a' started."); slow("a", 30); trpl::sleep(one_ms).await; slow("a", 10); trpl::sleep(one_ms).await; slow("a", 20); trpl::sleep(one_ms).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::sleep(one_ms).await; slow("b", 10); trpl::sleep(one_ms).await; slow("b", 15); trpl::sleep(one_ms).await; slow("b", 35); trpl::sleep(one_ms).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
En el Listado 17-24, añadimos llamadas a trpl::sleep
con await
entre cada llamada a
slow
. Ahora los dos futuros trabajan de forma intercalada:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
El futuro a
sigue ejecutándose un poco antes de ceder el control a b
, porque llama
a slow
antes de hacer cualquier llamada a trpl::sleep
, pero después de eso, los futuros se
intercambian cada vez que uno de ellos alcanza un punto de await. En este caso, hemos
hecho eso después de cada llamada a slow
, pero podríamos dividir el trabajo de la
manera que tenga más sentido para nosotros.
Sin embargo, realmente no queremos esperar aquí: queremos avanzar lo más rápido posible.
Solo necesitamos devolver el control al runtime. Podemos hacer eso directamente,
usando la función yield_now
. En el Listado 17-25, reemplazamos todas esas llamadas
a sleep
con yield_now
.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); trpl::yield_now().await; slow("a", 10); trpl::yield_now().await; slow("a", 20); trpl::yield_now().await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::yield_now().await; slow("b", 10); trpl::yield_now().await; slow("b", 15); trpl::yield_now().await; slow("b", 35); trpl::yield_now().await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Esto no solo deja más claro el propósito real, sino que también puede ser
significativamente más rápido que usar sleep
, ya que los temporizadores como el
que usa sleep
suelen tener limitaciones en su granularidad. La versión de sleep
que estamos usando, por ejemplo, siempre dormirá al menos un milisegundo, incluso si le pasamos un
Duration
de un nanosegundo. Y de nuevo, las computadoras modernas son rápidas:
pueden hacer mucho en un solo milisegundo.
Puedes comprobarlo tú mismo configurando un pequeño benchmark, como el del Listado 17-26.
(No es una forma especialmente rigurosa de hacer pruebas de rendimiento, pero es
suficiente para mostrar la diferencia en este caso). Aquí omitimos toda la
impresión de estado, pasamos un Duration
de un nanosegundo a trpl::sleep
, y dejamos
que cada futuro se ejecute por sí solo, sin alternar entre ellos. Luego,
ejecutamos 1,000 iteraciones y comparamos cuánto tiempo toma el futuro que usa
trpl::sleep
en comparación con el que usa trpl::yield_now
.
extern crate trpl; // required for mdbook test use std::time::{Duration, Instant}; fn main() { trpl::run(async { let one_ns = Duration::from_nanos(1); let start = Instant::now(); async { for _ in 1..1000 { trpl::sleep(one_ns).await; } } .await; let time = Instant::now() - start; println!( "'sleep' version finished after {} seconds.", time.as_secs_f32() ); let start = Instant::now(); async { for _ in 1..1000 { trpl::yield_now().await; } } .await; let time = Instant::now() - start; println!( "'yield' version finished after {} seconds.", time.as_secs_f32() ); }); }
Esta versión con yield_now
es mucho más rápido!
Esto significa que async puede ser útil incluso para tareas que consumen muchos recursos de cómputo, dependiendo de lo que haga el resto del programa. Proporciona una herramienta útil para estructurar las relaciones entre distintas partes del código. Esto es un tipo de multitarea cooperativa, donde cada futuro decide cuándo ceder el control a través de puntos de await. Por lo tanto, también es responsabilidad de cada futuro evitar bloquearse por demasiado tiempo. De hecho, en algunos sistemas operativos embebidos basados en Rust, ¡esta es la única forma de realizar multitarea!
Por supuesto en código real, no estarás alternando llamadas a funciones con await en cada línea. Aunque ceder el control de esta manera es relativamente barato, no es gratuito. En muchos casos, intentar dividir una tarea intensiva en cómputo podría hacerla significativamente más lenta, así que a veces es mejor, en términos de rendimiento general, permitir que una operación bloquee brevemente. Siempre es recomendable medir el rendimiento para identificar los cuellos de botella reales en tu código. Sin embargo, si notas que muchas tareas están ejecutándose en serie cuando esperabas que fueran concurrentes, este concepto es clave para entender qué está ocurriendo.
Construyendo Nuestras Propias Abstracciones Asíncronas
También podemos componer futuros para crear nuevos patrones. Por ejemplo, podemos construir
una función timeout
utilizando los bloques asíncronos que ya tenemos. Al final,
obtendremos otro bloque de construcción que podremos reutilizar para crear
más abstracciones asíncronas aún más avanzadas.
En el Listado 17-27 muestra cómo esperamos que timeout
funcione con un futuro
lento.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_millis(100)).await;
"I finished!"
};
match timeout(slow, Duration::from_millis(10)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
¡Vamos a implementarlo! Para empezar, pensemos en la API de timeout
:
- Debe ser una función asíncrona para que podamos esperarla con
await
. - Su primer parámetro debe ser un futuro a ejecutar. Podemos hacerlo genérico para que funcione con cualquier futuro.
- Su segundo parámetro será el tiempo máximo de espera. Si usamos un
Duration
, será fácil pasarlo atrpl::sleep
. - Debe devolver un
Result
. Si el futuro se completa exitosamente, elResult
seráOk
con el valor producido por el futuro. Si el tiempo de espera se agota primero, elResult
será unErr
con la duración que esperó antes de expirar.
El Listado 17-28 muestra esta declaración.
extern crate trpl; // required for mdbook test
use std::{future::Future, time::Duration};
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_millis(10)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
// Here is where our implementation will go!
}
Eso satisface nuestros objetivos en cuanto a los tipos. Ahora pensemos en el comportamiento
que necesitamos: queremos hacer competir el futuro que recibimos con el tiempo límite.
Porque, podemos usar trpl::sleep
para crear un futuro temporizador a partir de la duración otorgada,
y usar trpl::race
para ejecutar ese temporizador junto con el futuro que nos pasaron.
Como sabemos race
no es justo y evalúa los argumentos en el orden en que se
pasan. Por lo tanto, pasamos future_to_try
primero a race
para que tenga la oportunidad de
completarse incluso si max_time
es un tiempo muy corto. Si future_to_try
termina primero,
race
devolverá Left
con el resultado del future
. Si
timer
finaliza antes, race
devolverá Right
con la salida del temporizador, que es
()
.
En el Listado 17-29, hacemos un match sobre el resultado que se espera en trpl::race
. Si
future_to_try
se completa con éxito y obtenemos un Left(output)
, retornamos un Ok(output)
.
Si en cambio el temporizador se agota y obtenemos un Right(())
, ignoramos el ()
con _
y devolvemos Err(max_time)
.
extern crate trpl; // required for mdbook test use std::{future::Future, time::Duration}; use trpl::Either; // --snip-- fn main() { trpl::run(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); } async fn timeout<F: Future>( future_to_try: F, max_time: Duration, ) -> Result<F::Output, Duration> { match trpl::race(future_to_try, trpl::sleep(max_time)).await { Either::Left(output) => Ok(output), Either::Right(_) => Err(max_time), } }
Con eso, tenemos un timeout
que funciona, construido a partir de otros dos ayudantes asíncronos (async helpers). Si
ejecutamos nuestro código, imprimirá el modo de fallo después del tiempo de espera:
Failed after 2 seconds
Dado que los futuros pueden componerse con otros futuros, es posible construir herramientas muy poderosas utilizando pequeños bloques asíncronos. Por ejemplo, este mismo enfoque puede usarse para combinar tiempos de espera con reintentos y, a su vez, aplicar esto a tareas como llamadas de red, uno de los ejemplos del inicio del capítulo.
En la práctica, trabajarás directamente con async
y await
, y
en segundo lugar con funciones y macros como join
, join_all
, race
, entre otras.
Solo necesitarás recurrir a pin ocasionalmente para utilizarlas con estas
APIs.
Hasta ahora, hemos visto varias formas de trabajar con múltiples futuros al mismo tiempo. A continuación, exploraremos cómo manejar múltiples futuros en secuencia a lo largo del tiempo usando streams. Antes de continuar, aquí hay un par de cosas que podrías considerar:
-
Usamos un
Vec
conjoin_all
para esperar a que todos los futuros en un grupo terminaran. ¿Cómo podrías usar unVec
para procesar un grupo de futuros en secuencia en su lugar? ¿Cuáles serían las ventajas y desventajas de hacerlo? -
Échale un vistazo al tipo
futures::stream::FuturesUnordered
del crate defutures
. ¿En qué se diferenciaría su uso con respecto a unVec
? (No te preocupes por el hecho de que provenga de la parte destream
del crate; funciona perfectamente con cualquier colección de futuros).