Introdução
Se você já trabalha com async/await em Rust, sabe que Future resolve um único valor. Mas e quando precisa processar uma sequência contínua de dados — mensagens de WebSocket, linhas de um arquivo enorme, eventos de sensores IoT ou updates de uma API? É aí que entram os async streams.
Async streams são a versão assíncrona dos iteradores: em vez de bloquear a thread para cada item, eles suspendem a task e liberam o runtime para fazer outras coisas. Em 2026, com o ecossistema do Tokio maduro e a trait Stream consolidada, dominar async streams é essencial para qualquer dev Rust que trabalhe com I/O.
O que é um Async Stream?
Um async stream implementa a trait Stream do crate futures-core:
pub trait Stream {
type Item;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>>;
}
Parece complexo, mas na prática você raramente implementa poll_next manualmente. Assim como usamos async fn em vez de implementar Future::poll, existem ferramentas que criam streams de forma ergonômica.
A analogia é direta:
| Síncrono | Assíncrono |
|---|---|
Iterator | Stream |
next() | next().await |
for item in iter | while let Some(item) = stream.next().await |
Criando Streams com tokio-stream
O crate tokio-stream oferece adaptadores prontos. Adicione ao seu Cargo.toml:
[dependencies]
tokio = { version = "1", features = ["full"] }
tokio-stream = "0.1"
futures = "0.3"
Stream a partir de um intervalo
use tokio_stream::StreamExt;
use tokio::time::{interval, Duration};
use tokio_stream::wrappers::IntervalStream;
#[tokio::main]
async fn main() {
let stream = IntervalStream::new(interval(Duration::from_secs(1)));
// Pegar apenas os 5 primeiros ticks
let mut stream = stream.take(5);
while let Some(tick) = stream.next().await {
println!("Tick em: {:?}", tick);
}
println!("Stream finalizado!");
}
O IntervalStream emite um item a cada intervalo. O .take(5) limita a 5 elementos — exatamente como faria com um iterador síncrono.
Stream a partir de um canal
Canais mpsc do Tokio se integram naturalmente com streams:
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel(100);
let mut stream = ReceiverStream::new(rx);
// Produtor: simula dados chegando de uma API
tokio::spawn(async move {
for i in 1..=10 {
tx.send(format!("evento_{}", i)).await.unwrap();
tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
}
});
// Consumidor: processa cada evento conforme chega
while let Some(evento) = stream.next().await {
println!("Recebido: {}", evento);
}
}
Esse padrão é a base de praticamente qualquer pipeline de processamento em tempo real: um produtor envia dados e o consumidor processa sob demanda.
Transformando Streams com StreamExt
A trait StreamExt (de tokio-stream ou futures) adiciona combinadores poderosos, análogos aos de iteradores:
use tokio_stream::StreamExt;
use tokio_stream::iter;
#[tokio::main]
async fn main() {
let dados = vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
let resultado: Vec<i32> = iter(dados)
.filter(|x| x % 2 == 0) // só pares
.map(|x| x * x) // eleva ao quadrado
.take(3) // apenas 3 primeiros
.collect()
.await;
println!("Resultado: {:?}", resultado); // [4, 16, 36]
}
Combinadores mais úteis
| Combinador | Descrição |
|---|---|
.map(f) | Transforma cada item |
.filter(f) | Filtra itens por predicado |
.take(n) | Limita a N itens |
.skip(n) | Pula os N primeiros |
.throttle(dur) | Emite no máximo 1 item por intervalo |
.timeout(dur) | Erro se nenhum item chegar no prazo |
.merge(outro) | Combina dois streams em um |
.chain(outro) | Concatena dois streams |
Criando Streams Customizados com async_stream
Para streams com lógica mais complexa, o crate async-stream permite usar a macro stream! com sintaxe natural:
use async_stream::stream;
use tokio_stream::StreamExt;
use std::time::Duration;
fn dados_sensor(sensor_id: &'static str) -> impl tokio_stream::Stream<Item = f64> {
stream! {
let mut leitura = 20.0;
loop {
// Simula leitura de sensor com variação
leitura += (rand::random::<f64>() - 0.5) * 2.0;
yield leitura;
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}
#[tokio::main]
async fn main() {
let mut stream = dados_sensor("temp_01").take(10);
while let Some(temp) = stream.next().await {
println!("Temperatura: {:.1}°C", temp);
}
}
A palavra-chave yield dentro de stream! emite um item, e o stream suspende automaticamente nos pontos .await.
Exemplo Prático: Pipeline de Logs em Tempo Real
Vamos montar um pipeline realista que lê logs, filtra por severidade e agrega métricas:
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use std::collections::HashMap;
#[derive(Debug, Clone)]
struct LogEntry {
nivel: String,
mensagem: String,
timestamp: u64,
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<LogEntry>(1000);
let stream = ReceiverStream::new(rx);
// Produtor: simula logs chegando
tokio::spawn(async move {
let niveis = ["INFO", "WARN", "ERROR", "DEBUG", "ERROR"];
for (i, nivel) in niveis.iter().cycle().take(50).enumerate() {
let entry = LogEntry {
nivel: nivel.to_string(),
mensagem: format!("Evento #{} processado", i),
timestamp: i as u64,
};
if tx.send(entry).await.is_err() {
break;
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
});
// Pipeline: filtra erros e acumula contagem
let mut contagem: HashMap<String, usize> = HashMap::new();
let mut stream = stream
.filter(|log| log.nivel == "ERROR" || log.nivel == "WARN")
.take(20);
while let Some(log) = stream.next().await {
*contagem.entry(log.nivel.clone()).or_insert(0) += 1;
println!("[{}] {}", log.nivel, log.mensagem);
}
println!("\n--- Resumo ---");
for (nivel, total) in &contagem {
println!("{}: {} ocorrências", nivel, total);
}
}
Esse padrão se aplica diretamente a cenários de produção: ingestão de métricas, monitoramento de infraestrutura, processamento de eventos de microsserviços.
Streams vs Channels vs Iteradores: Quando Usar Cada Um
- Iterador síncrono: dados já estão na memória, processamento CPU-bound
- Canal (mpsc): comunicação entre tasks, quando produtor e consumidor têm lógica separada
- Async stream: processamento sequencial de dados assíncronos com transformações compostas (filter, map, throttle)
Na prática, canais e streams se complementam: você cria um canal para receber dados e envolve o Receiver em um ReceiverStream para usar os combinadores.
Tratamento de Erros em Streams
Streams podem emitir Result como item, e existem combinadores específicos:
use tokio_stream::StreamExt;
use tokio_stream::iter;
#[tokio::main]
async fn main() {
let dados: Vec<Result<i32, String>> = vec![
Ok(1), Ok(2), Err("falha na rede".into()), Ok(4), Ok(5),
];
// Processar apenas os Ok, parando no primeiro erro
let mut stream = iter(dados);
while let Some(item) = stream.next().await {
match item {
Ok(valor) => println!("Processado: {}", valor),
Err(e) => {
eprintln!("Erro no stream: {}. Reconectando...", e);
break;
}
}
}
}
Para cenários de produção, combine com tratamento de erros robusto usando thiserror e anyhow.
Performance: Backpressure e Buffering
Async streams no Tokio têm backpressure natural: se o consumidor está lento, o produtor automaticamente suspende. Isso é uma vantagem enorme sobre modelos push-based.
Para ajustar performance:
use tokio_stream::StreamExt;
// Buffer: acumula itens antes de processar em lote
let mut chunks = stream.chunks_timeout(100, Duration::from_secs(5));
while let Some(lote) = chunks.next().await {
processar_lote(&lote).await;
}
O chunks_timeout acumula até 100 itens ou espera no máximo 5 segundos — o que vier primeiro. Perfeito para batch inserts em bancos de dados como PostgreSQL.
Comparação com Outras Linguagens
Se você vem de Go, async streams são como channels com a ergonomia de range loops mais combinadores funcionais. Em Go você faria for msg := range ch, em Rust é while let Some(msg) = stream.next().await — mas com .filter(), .map(), .throttle() compostos.
Em Python, o equivalente são async generators (async for item in gen). A diferença é que Rust garante zero-cost abstractions — sem overhead de runtime para cada yield.
E em Kotlin, o conceito mais próximo são Flows do coroutines, que também suportam backpressure e combinadores. O modelo é bastante similar ao Rust.
Conclusão
Async streams são a ferramenta certa quando você precisa processar sequências de dados assíncronos com composição elegante. Em 2026, o ecossistema está maduro:
- tokio-stream para adaptadores e wrappers
- async-stream para criar streams com
yield - StreamExt para combinadores poderosos
Combinados com o runtime do Tokio e o sistema de tipos do Rust, você consegue pipelines de dados em tempo real que são seguros, rápidos e legíveis.
Leia Também
- Async/Await em Profundidade — fundamentos de programação assíncrona em Rust
- Tokio: Guia Completo — o runtime assíncrono padrão
- Async Rust: Ecossistema em 2026 — panorama completo do async em Rust
- Rayon e Paralelismo de Dados — quando usar paralelismo em vez de async
- Zig Brasil — outra linguagem de sistemas com abordagem diferente para async I/O