In the previous post each thread had its own map and we merged them at the end. That works, but what if the threads need to write to the same data structure? You need shared state and that changes everything.
If thread::scope or &’static str are unfamiliar, the previous post covers both.
Shared state means one map, multiple writers. Here is the simplest way to make that work.
fn parse_line(line: &str) -> Option<&'static str> {
if line.contains("ERROR") {
Some("ERROR")
} else if line.contains("WARN") {
Some("WARN")
} else if line.contains("INFO") {
Some("INFO")
} else {
None
}
}
fn parser_with_arc_beginner(file_name: String) -> io::Result<()> {
let map = Arc::new(Mutex::new(HashMap::new()));
let file = File::open(file_name)?;
let reader = BufReader::new(file);
let mut vec: Vec<String> = Vec::new();
for line_result in reader.lines() {
vec.push(line_result?);
}
let half = vec.len() / 2;
let (vec1, vec2) = vec.split_at(half);
let map_clone1 = Arc::clone(&map);
let map_clone2 = Arc::clone(&map);
thread::scope(|s| {
s.spawn(move || {
let mut local = HashMap::new();
for line in vec1 {
if let Some(level) = parse_line(line) {
*local.entry(level).or_insert(0) += 1;
}
}
let mut global = map_clone1.lock().unwrap();
for (k, v) in local {
*global.entry(k).or_insert(0) += v;
}
});
s.spawn(move || {
let mut local = HashMap::new();
for line in vec2 {
if let Some(level) = parse_line(line) {
*local.entry(level).or_insert(0) += 1;
}
}
let mut global = map_clone2.lock().unwrap();
for (k, v) in local {
*global.entry(k).or_insert(0) += v;
}
});
});
for (level, occurrency) in map.lock().unwrap().iter() {
println!("{level}: {occurrency}");
}
Ok(())
}We read the log file into a vector, split it in two, and give each half to a thread. Both threads write to the same map. That is the shared state.
move transfers ownership into the closure. map_clone1 and map_clone2 are reference-counted pointers to the same map. Inside each thread, lock() grants exclusive access.
Each thread accumulates into a local map first, then takes the lock once to merge. One lock per thread, not one per line.
This works, but you create the clones before the scope, if you have many threads, you end up with a clone for each one before even starting. Both closures duplicate the local accumulation logic. In the next section we refactor this.
Note: unwrap() panics if the Mutex is poisoned. For production code, handle PoisonError explicitly.
Extract the logic into a function and use it in both threads.
fn process_lines(lines: &[String], map: Arc<Mutex<HashMap<&'static str, i32>>>) {
let mut local = HashMap::new();
for line in lines {
if let Some(level) = parse_line(line) {
*local.entry(level).or_insert(0) += 1;
}
}
let mut global = map.lock().unwrap();
for (k, v) in local {
*global.entry(k).or_insert(0) += v;
}
}fn parser_with_arc_advanced(file_name: String) -> io::Result<()> {
let map = Arc::new(Mutex::new(HashMap::new()));
let file = File::open(file_name)?;
let reader = BufReader::new(file);
let mut vec: Vec<String> = Vec::new();
for line_result in reader.lines() {
vec.push(line_result?);
}
let half = vec.len() / 2;
let (vec1, vec2) = vec.split_at(half);
thread::scope(|s| {
let map_clone1 = Arc::clone(&map);
s.spawn(move || {
process_lines(vec1, map_clone1);
});
let map_clone2 = Arc::clone(&map);
s.spawn(move || {
process_lines(vec2, map_clone2);
});
});
for (level, occurrency) in map.lock().unwrap().iter() {
println!("{level}: {occurrency}");
}
Ok(())
}It removes duplication. But split_at is hardcoded to two threads. Chunks fix that.
We use available_parallelism() to match the thread count to the machine, then split the work into equal chunks.
fn parser_chunks(file_name: String) -> io::Result<()> {
let map = Arc::new(Mutex::new(HashMap::<&'static str, i32>::new()));
let file = File::open(file_name)?;
let reader = BufReader::new(file);
let lines: Vec<String> = reader.lines().collect::<Result<_, _>>()?;
let n_threads = std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(4);
let chunk_size = (lines.len() + n_threads - 1) / n_threads;
thread::scope(|s| {
for chunk in lines.chunks(chunk_size) {
let map_clone = Arc::clone(&map);
s.spawn(move || {
process_lines(chunk, map_clone);
});
}
});
for (level, count) in map.lock().unwrap().iter() {
println!("{level}: {count}");
}
Ok(())
}The line (lines.len() + n_threads – 1) / n_threads; is ceiling division. It ensures the last chunk gets the remaining lines when the total does not divide evenly.
With 10 lines and 4 threads, (10 + 3) / 4 = 3 we have chunks of 3, 3, 3, 1. Plain division gives chunk_size 2, which produces 5 chunks instead of 4, so one more thread than intended
Every lock is contention. Contention is serialization. Serialization kills parallelism.
One caveat: if the workload per thread is small, thread overhead and lock cost can exceed the benefit. Profile before parallelizing.
Leave a Reply