测试实例:map-reduce

Rust 使数据的并行化处理非常简单,在 Rust 中你无需面对并行处理的很多传统难题。

标准库提供了开箱即用的线程类型,把它和 Rust 的所有权概念与别名规则结合起来,可以自动地避免数据竞争(data race)。

当某状态对某线程是可见的,别名规则(即一个可变引用 XOR 一些只读引用。译注:XOR 是异或的意思,即「二者仅居其一」)就自动地避免了别的线程对它的操作。(当需要同步处理时,请使用 MutexChannel 这样的同步类型。)

在本例中,我们将会计算一堆数字中每一位的和。我们将把它们分成几块,放入不同的线程。每个线程会把自己那一块数字的每一位加起来,之后我们再把每个线程提供的结果再加起来。

注意到,虽然我们在线程之间传递了引用,但 Rust 理解我们是在传递只读的引用,因此不会发生数据竞争等不安全的事情。另外,因为我们把数据块 move 到了线程中,Rust 会保证数据存活至线程退出,因此不会产生悬挂指针。

use std::thread;

// 这是 `main` 线程
fn main() {

    // 这是我们要处理的数据。
    // 我们会通过线程实现 map-reduce 算法,从而计算每一位的和
    // 每个用空白符隔开的块都会分配给单独的线程来处理
    //
    // 试一试:插入空格,看看输出会怎样变化!
    let data = "86967897737416471853297327050364959
11861322575564723963297542624962850
70856234701860851907960690014725639
38397966707106094172783238747669219
52380795257888236525459303330302837
58495327135744041048897885734297812
69920216438980873548808413720956532
16278424637452589860345374828574668";

    // 创建一个向量,用于储存将要创建的子线程
    let mut children = vec![];

    /*************************************************************************
     * "Map" 阶段
     *
     * 把数据分段,并进行初始化处理
     ************************************************************************/

    // 把数据分段,每段将会单独计算
    // 每段都是完整数据的一个引用(&str)
    let chunked_data = data.split_whitespace();

    // 对分段的数据进行迭代。
    // .enumerate() 会把当前的迭代计数与被迭代的元素以元组 (index, element)
    // 的形式返回。接着立即使用 “解构赋值” 将该元组解构成两个变量,
    // `i` 和 `data_segment`。
    for (i, data_segment) in chunked_data.enumerate() {
        println!("data segment {} is \"{}\"", i, data_segment);

        // 用单独的线程处理每一段数据
        //
        // spawn() 返回新线程的句柄(handle),我们必须拥有句柄,
        // 才能获取线程的返回值。
        //
        // 'move || -> u32' 语法表示该闭包:
        // * 没有参数('||')
        // * 会获取所捕获变量的所有权('move')
        // * 返回无符号 32 位整数('-> u32')
        //
        // Rust 可以根据闭包的内容推断出 '-> u32',所以我们可以不写它。
        //
        // 试一试:删除 'move',看看会发生什么
        children.push(thread::spawn(move || -> u32 {
            // 计算该段的每一位的和:
            let result = data_segment
                        // 对该段中的字符进行迭代..
                        .chars()
                        // ..把字符转成数字..
                        .map(|c| c.to_digit(10).expect("should be a digit"))
                        // ..对返回的数字类型的迭代器求和
                        .sum();

            // println! 会锁住标准输出,这样各线程打印的内容不会交错在一起
            println!("processed segment {}, result={}", i, result);

            // 不需要 “return”,因为 Rust 是一种 “表达式语言”,每个代码块中
            // 最后求值的表达式就是代码块的值。
            result

        }));
    }


    /*************************************************************************
     * "Reduce" 阶段
     *
     * 收集中间结果,得出最终结果
     ************************************************************************/

    // 把每个线程产生的中间结果收入一个新的向量中
    let mut intermediate_sums = vec![];
    for child in children {
        // 收集每个子线程的返回值
        let intermediate_sum = child.join().unwrap();
        intermediate_sums.push(intermediate_sum);
    }

    // 把所有中间结果加起来,得到最终结果
    //
    // 我们用 “涡轮鱼” 写法 ::<> 来为 sum() 提供类型提示。
    //
    // 试一试:不使用涡轮鱼写法,而是显式地指定 intermediate_sums 的类型
    let final_result = intermediate_sums.iter().sum::<u32>();

    println!("Final sum result: {}", final_result);
}


作业

根据用户输入的数据来决定线程的数量是不明智的。如果用户输入的数据中有一大堆空格怎么办?我们真的想要创建 2000 个线程吗?

请修改程序,使得数据总是被分成有限数目的段,这个数目是由程序开头的静态常量决定的。

参见: