We can start from a real problem: how to fetch N urls? The best ideas are the simplest, so create a worker pool in rust and distribute the job in the queue. There are multiple ways to do it. Not all of them are right. In this post we’ll see two approaches, and we’ll see why one is wrong.
The first possible idea could be share the channel between the workers, thus share mpsc::Receiver, but it’s not clonable so we cannot pass it to any worker, the compiler prevents it, so we need a workaround to use mpsc::Receiver across multiple workers.
We start by defining a Job, it has an id and a URL. We implement the async function process that calls a simple HTTP client using the url.
#[derive(Debug)]
struct Job {
id: usize,
url: String,
}
impl Job {
async fn process(self) -> String {
let result = async_http_client(&self.url).await;
match result {
Ok(status) => {
return status;
}
Err(e) => {
return format!("Job {} failed: {}", self.id, e);
}
};
}
}Now we define the worker:
async fn worker(
id: usize,
job_rx: Arc<Mutex<mpsc::Receiver<Job>>>,
result_tx: mpsc::Sender<String>,
) {
loop {
let job_opt = {
let mut locked_rx = job_rx.lock().await;
locked_rx.recv().await
};
match job_opt {
Some(job) => {
let output = job.process().await;
let _ = result_tx.send(output).await;
}
None => {
println!("Worker {} exiting", id);
break;
}
}
}
}The worker receives an id, a shared receiver wrapped in Arc and Mutex, and a sender for results.
Note that the lock is acquired in the scope of job_opt and not for the entire loop, otherwise the lock would hold for the entire job processing. While a worker is processing a job, another one can get the lock. It could seem nice, but we are serializing, not parallelizing.
Take a look at the functions that write and read from the two channels.
produce_jobs creates a job for every URL and sends it to the channel.
collect_results reads the data sent by the workers.
async fn produce_jobs(tx: mpsc::Sender<Job>, urls: Vec<String>) {
for i in 0..urls.len() {
let job = Job {
id: i,
url: urls[i].clone(),
};
tx.send(job).await.unwrap();
}
}
async fn collect_results(mut rx: mpsc::Receiver<String>) {
while let Some(result) = rx.recv().await {
println!("{}", result);
}
}In the final part of the code we push all together.
#[tokio::main]
async fn main() -> io::Result<()> {
const NUM_WORKERS: usize = 2;
const QUEUE_CAPACITY: usize = 16;
let (job_tx, job_rx) = mpsc::channel::<Job>(QUEUE_CAPACITY);
let (result_tx, result_rx) = mpsc::channel::<String>(QUEUE_CAPACITY);
let args = Args::parse();
if args.urls.len() > 1 {
let job_rx = Arc::new(Mutex::new(job_rx));
for i in 0..NUM_WORKERS {
let job_rx = Arc::clone(&job_rx);
let result_tx = result_tx.clone();
tokio::spawn(worker(i, job_rx, result_tx));
}
// Start collecting results
let collector = tokio::spawn(collect_results(result_rx));
produce_jobs(job_tx, args.urls).await;
// Close the extra result_tx clones
drop(result_tx);
collector.await.unwrap();
// Start job producer
}
Ok(())
}
async fn async_http_client(url: &str) -> io::Result<String> {
let request = format!(
"GET / HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
url
);
let mut stream: tokio::net::TcpStream = tokio::net::TcpStream::connect(url).await?;
stream.write_all(request.as_bytes()).await?;
stream.flush().await?;
let mut reader = tokio::io::BufReader::new(stream);
let mut first_line = String::new();
reader.read_line(&mut first_line).await?;
let status_code = first_line.split_whitespace().nth(1).unwrap_or("???");
return Ok(status_code.to_string());
}The main creates the channels, creates the workers with spawn, each running in its own async task, run producer and the collector and wait for the result.
mpsc (Multiple Producer Single Consumer) is single consumer by design, so using it for multiple consumers is a bad choice.
Coming from Go, where you pass channels directly, this felt wrong. In Go the channel is the coordination primitive. In Rust, mpsc forces you to wrap it in Arc and Mutex just to share it. That is the signal that you are using the wrong type.
To resolve the problem that mpsc::Receiver is not a clone and we can’t give it to many workers we have a better solution: use the async_channel::Receiver. It’s clonable and every worker has his clone.
async fn worker(id: usize, job_rx: async_channel::Receiver<Job>, result_tx: mpsc::Sender<String>) {
while let Ok(job) = job_rx.recv().await {
let output = job.process().await;
let _ = result_tx.send(output).await;
}
println!("Worker {} exiting", id);
}where the channels are defined in the main in this way:
async fn main() -> io::Result<()> {
const NUM_WORKERS: usize = 2;
const QUEUE_CAPACITY: usize = 16;
let (job_tx, job_rx) = async_channel::bounded::<Job>(QUEUE_CAPACITY);
let (result_tx, result_rx) = mpsc::channel::<String>(QUEUE_CAPACITY);
let args = Args::parse();
if args.urls.len() > 1 {
for i in 0..NUM_WORKERS {
let job_rx = job_rx.clone(); // every worker has his clone, so no mutex
let result_tx = result_tx.clone();
tokio::spawn(worker(i, job_rx, result_tx));
}
// Start collecting results
let collector = tokio::spawn(collect_results(result_rx));
produce_jobs(job_tx, args.urls).await;
// Close the extra result_tx clones
drop(result_tx);
collector.await.unwrap();
// Start job producer
}
Ok(())
}async_channel is multi producer and multi consumer, it’s clonable, so every worker has his clone and no lock is needed; in the main is defined as bounded, it means that the channel has a capacity, a size.
Thus, when to use one over the other?
tokio::mpsc only with one consumer. When there is one receiver no extra synchronization is needed.
async_channel is for multiple consumers. It has internal synchronization, not free, but worth it when you have multiple consumers. For a single consumer it adds unnecessary overhead. Use mpsc.
Leave a Reply