MPI并行计算01: 几个最常用的函数

YiQi 管理员

第一部分概述参考自 JacksonKim ,其他部分整理自MPI Tutorials ,只留下了干货。初学者建议仔细阅读MPI Tutorials

编程环境:

  • i7-9700F 8-cores
  • mpicxx for MPICH version 3.3.2
  • Ubuntu 20.04 LTS

概述

什么是MPI

Massage Passing Interface:是消息传递函数库的标准规范,由MPI论坛开发。

  • 一种新的库描述,不是一种语言。共有上百个函数调用接口,提供与C和Fortran语言的绑定
  • MPI是一种标准或规范的代表,而不是特指某一个对它的具体实现
  • MPI是一种消息传递编程模型,并成为这种编程模型的代表和事实上的标准

MPI的特点

MPI有以下的特点:

  • 消息传递式并行程序设计,指用户必须通过显式地发送和接收消息来实现处理机间的数据交换。在这种并行编程中,每个并行进程均有自己独立的地址空间,相互之间访问不能直接进行,必须通过显式的消息传递来实现。这种编程方式是大规模并行处理机(MPP)和机群(Cluster)采用的主要编程方式。
  • 并行计算粒度大,特别适合于大规模可扩展并行算法。用户决定问题分解策略、进程间的数据交换策略,在挖掘潜在并行性方面更主动,并行计算粒度大,特别适合于大规模可扩展并行算法
  • 消息传递是当前并行计算领域的一个非常重要的并行程序设计方式

Hello world程序

完整的代码如下(见 hello.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include "mpi.h"
#include <iostream>

int main(int argc,char* argv[])
{

MPI_Init(&argc,&argv);

int rank, size, name_len;
char processor_name[MPI_MAX_PROCESSOR_NAME];

MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Get_processor_name(processor_name, &name_len);

std::cout << "Hello world from processor " << processor_name << ", rank " << rank << " out of " << size << " processors" << std::endl;

MPI_Finalize();
}
  • MPI环境必须以以下代码来初始化:

    1
    MPI_Init(int* argc, char*** argv);

    MPI_Init的过程中,所有MPI的全局变量或者内部变量都会被创建。比如rank

  • MPI_Init之后,有两个主要的函数被调用到了。这两个函数是几乎所有 MPI 程序都会用到的。

    1
    2
    MPI_Comm_size(MPI_Comm communicator, int* size);
    MPI_Comm_rank(MPI_Comm communicator, int* rank);

    MPI_Comm_size会返回communicator的大小,也就是communicator中可用的进程数量。MPI_COMM_WORLD这个变量包含了当前MPI任务中所有的进程,因此在我们的代码里的这个调用会返回所有的可用的进程数目。

    MPI_Comm_rank这个函数会返回communicator中当前进程的rank

  • MPI_Get_processor_name(char* name, int* name_length)会得到当前进程实际跑的时候所在的处理器名字。

  • 代码中最后一个调用MPI_Finalize()是用来清理MPI环境的。

我们用4个核心调用这个程序,注意Makefile相应的编译和执行规则:

1
2
3
hello: 
mpicxx 01_basic/hello.cpp -o build/hello
mpiexec -n 4 ./build/hello

得到结果:

1
2
3
4
Hello world from processor yiqi-OMEN-by-HP, rank 0 out of 4 processors
Hello world from processor yiqi-OMEN-by-HP, rank 1 out of 4 processors
Hello world from processor yiqi-OMEN-by-HP, rank 2 out of 4 processors
Hello world from processor yiqi-OMEN-by-HP, rank 3 out of 4 processors

MPI的发送和接收简介

MPI的发送和接收方法是按以下方式进行的:开始的时候,A 进程决定要发送一些消息给 B 进程。A 进程就会把需要发送给 B 进程的所有数据打包好,放到一个缓存里面。因为所有数据会被打包到一个大的信息里面,因此缓存常常会被比作 信封 。数据打包进缓存之后,通信设备(通常是网络)就需要负责把信息传递到正确的地方。这个正确的地方也就是根据特定rank确定的那个进程。

尽管数据已经被送达到 B 了,但是进程 B 依然需要确认它想要接收 A 的数据。一旦它确定了这点,数据就被传输成功了。进程 A 会接收到数据传递成功的信息,然后去干其他事情。

有时候 A 需要传递很多不同的消息给 B。为了让 B 能比较方便地区分不同的消息,MPI运行发送者和接受者额外地指定一些信息 ID (正式名称是 标签 , tags )。当 B 只要求接收某种特定标签的信息的时候,其他的不是这个标签的信息会先被缓存起来,等到 B 需要的时候才会给 B

MPI发送和接收方法的定义:

1
2
3
4
5
6
7
8
MPI_Send(
void* data,
int count,
MPI_Datatype datatype,
int destination,
int tag,
MPI_Comm communicator
)
1
2
3
4
5
6
7
8
9
MPI_Recv(
void* data,
int count,
MPI_Datatype datatype,
int source,
int tag,
MPI_Comm communicator,
MPI_Status* status
)

data是数据起始地址。countdatatype分别描述了数据的数量和类型。MPI_send会精确地发送count指定的数量个元素,MPI_Recv最多接受count个元素。destinationtag指定了发送方/接受方进程的rank以及信息的标签。MPI_Recv 方法特有的最后一个参数提供了接受到的信息的状态。

基础MPI_Datatype

MPI datatype C equivalent
MPI_SHORT short int
MPI_INT int
MPI_LONG long int
MPI_LONG_LONG long long int
MPI_UNSIGNED_CHAR unsigned char
MPI_UNSIGNED_SHORT unsigned short int
MPI_UNSIGNED unsigned int
MPI_UNSIGNED_LONG unsigned long int
MPI_UNSIGNED_LONG_LONG unsigned long long int
MPI_FLOAT float
MPI_DOUBLE double
MPI_LONG_DOUBLE long double
MPI_BYTE char

MPI发送/接收程序

send_recv.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#include <mpi.h>
#include <iostream>

int main(int argc, char** argv) {
MPI_Init(NULL, NULL);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);

int number;
if (rank == 0) {
number = -1;
MPI_Send(&number, 1, MPI_INT, 1, 0, MPI_COMM_WORLD);
}
if (rank == 1) {
MPI_Recv(&number, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << "Processor 1 received number " << number << " from processor 0" << std::endl;
}
MPI_Finalize();
}

输出结果:

1
Processor 1 received number -1 from processor 0

还有另外两个示例程序:MPI乒乓球程序和MPI环程序,见附录I和附录II

动态消息

MPI_Status结构体

尽管可以将消息的长度作为单独的发送/接收操作发送,但是MPI本身仅通过几个额外的函数调用即可支持动态消息。

MPI_RecvMPI_Status结构体的地址作为参数(可以使用MPI_STATUS_IGNORE忽略)。如果我们将MPI_Status结构体传递给MPI_Recv函数,则操作完成后将在该结构体中填充有关接收操作的其他信息。三个主要的信息包括:

  1. 发送端秩: 发送端的秩存储在结构体的MPI_SOURCE元素中。也就是说,如果我们声明一个MPI_Status stat变量,则可以通过stat.MPI_SOURCE访问秩。
  2. 消息的标签: 消息的标签可以通过结构体的MPI_TAG元素访问(类似于MPI_SOURCE)。
  3. 消息的长度: 消息的长度在结构体中没有预定义的元素。相反,我们必须使用MPI_Get_count找出消息的长度。
1
MPI_Get_count(MPI_Status* status, MPI_Datatype datatype, int* count)

MPI_Get_count函数中,使用者需要传递MPI_Status结构体,消息的datatype,并返回count。变量count是已接收的datatype元素的数目。

为什么需要这些信息?

  • MPI_Recv可以将MPI_ANY_SOURCE用作发送端的秩,将MPI_ANY_TAG用作消息的标签。在这种情况下,MPI_Status结构体是找出消息的实际发送端和标签的唯一方法。
  • 此外,并不能保证MPI_Recv能够接收函数调用参数的全部元素。相反,它只接收已发送给它的元素数量(如果发送的元素多于所需的接收数量,则返回错误)。MPI_Get_count函数用于确定实际的接收量。

MPI_Status结构体查询示例。代码的主要部分如下所示,完整代码见 check_status.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
const int MAX_NUMBERS = 100;
int numbers[MAX_NUMBERS];
int number_amount;
if (world_rank == 0) {
srand(time(NULL));
number_amount = (rand() / (float)RAND_MAX) * MAX_NUMBERS;
MPI_Send(numbers, number_amount, MPI_INT, 1, 0, MPI_COMM_WORLD);
printf("0 sent %d numbers to 1\n", number_amount);
} else if (world_rank == 1) {
MPI_Status status;
MPI_Recv(numbers, MAX_NUMBERS, MPI_INT, 0, 0, MPI_COMM_WORLD,
&status);
// After receiving the message, check the status to determine
// how many numbers were actually received
MPI_Get_count(&status, MPI_INT, &number_amount);
printf("1 received %d numbers from 0. Message source = %d, tag = %d\n",
number_amount, status.MPI_SOURCE, status.MPI_TAG);
}
  • 进程0发送随机数量的数据给进程1
  • 尽管进程1MAX_NUMBERS作为MPI_Recv函数参数,但进程1将最多接收到此数量的数字。

输出结果为:

1
2
0 sent 11 numbers to 1
1 received 11 numbers from 0. Message source = 0, tag = 0

MPI_Probe确定消息大小

除了传递接收消息并简易地配备一个很大的缓冲区来为所有可能的大小的消息提供处理,还可以使用MPI_Probe在实际接收消息之前查询消息大小。

1
MPI_Probe(int source, int tag, MPI_Comm comm, MPI_Status* status)

MPI_Probe看起来与MPI_Recv非常相似。实际上,可以将MPI_Probe视为MPI_Recv,除了不接收消息外,它们执行相同的功能。与 MPI_Recv类似,MPI_Probe将阻塞具有匹配标签和发送端的消息。当消息可用时,它将填充status结构体。然后,用户可以使用MPI_Recv接收实际的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int number_amount;
if (world_rank == 0) {
const int MAX_NUMBERS = 100;
int numbers[MAX_NUMBERS];

srand(time(NULL));
number_amount = (rand() / (float)RAND_MAX) * MAX_NUMBERS;

MPI_Send(numbers, number_amount, MPI_INT, 1, 0, MPI_COMM_WORLD);
printf("0 sent %d numbers to 1\n", number_amount);
} else if (world_rank == 1) {
MPI_Status status;
// Probe for an incoming message from process zero
MPI_Probe(0, 0, MPI_COMM_WORLD, &status);
// When probe returns, the status object has the size and other
// attributes of the incoming message. Get the message size
MPI_Get_count(&status, MPI_INT, &number_amount);

int* number_buf = (int*)malloc(sizeof(int) * number_amount);
MPI_Recv(number_buf, number_amount, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("1 dynamically received %d numbers from 0.\n", number_amount);
free(number_buf);
}

输出结果

1
2
0 sent 85 numbers to 1
1 dynamically received 85 numbers from 0

尽管这个例子很简单,但是MPI_Probe构成了许多动态MPI应用程序的基础。例如,控制端/执行子程序在交换变量大小的消息时通常会大量使用MPI_Probe

MPI广播以及集体通信

集体通信以及同步点

关于集体通信需要记住的一点是它在进程间引入了同步点的概念。这意味着所有的进程在执行代码的时候必须首先到达一个同步点才能继续执行后面的代码。

在看具体的集体通信方法之前,让我们更仔细地看一下同步这个概念。事实上,MPI有一个特殊的函数来做同步进程的这个操作。

1
MPI_Barrier(MPI_Comm communicator)

关于同步一个要注意的地方是:始终记得每一个你调用的集体通信方法都是同步的。也就是说,如果你没法让所有进程都完成MPI_Barrier,那么你也没法完成任何集体调用。如果你在没有确保所有进程都调用 MPI_Barrier 的情况下调用了它,那么程序会空闲下来。这对初学者来说会很迷惑,所以小心这类问题。

MPI_Bcast广播

广播 (broadcast) 是标准的集体通信技术之一。一个广播发生的时候,一个进程会把同样一份数据传递给一个communicator里的所有其他进程。

在MPI里面,广播可以使用MPI_Bcast来做到。函数签名看起来像这样:

1
2
3
4
5
6
7
MPI_Bcast(
void* data,
int count,
MPI_Datatype datatype,
int root,
MPI_Comm communicator
)

尽管根节点和接收节点做不同的事情,它们都是调用同样的这个MPI_Bcast函数来实现广播。当根节点调用MPI_Bcast函数的时候,data变量里的值会被发送到其他的节点上。当其他的节点调用MPI_Bcast的时候,data变量会被赋值成从根节点接收到的数据。

MPI_Bcast的实现使用了一个类似的树形广播算法来获得比较好的网络利用率。我们可以实现一个基于MPI_SendMPI_Recvmy_bcast()函数(见 my_bcast.cpp ),来与MPI_Bcast比较效率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for (i = 0; i < num_trials; i++) {
// Time my_bcast
// Synchronize before starting timing
MPI_Barrier(MPI_COMM_WORLD);
total_my_bcast_time -= MPI_Wtime();
my_bcast(data, num_elements, MPI_INT, 0, MPI_COMM_WORLD);
// Synchronize again before obtaining final time
MPI_Barrier(MPI_COMM_WORLD);
total_my_bcast_time += MPI_Wtime();

// Time MPI_Bcast
MPI_Barrier(MPI_COMM_WORLD);
total_mpi_bcast_time -= MPI_Wtime();
MPI_Bcast(data, num_elements, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Barrier(MPI_COMM_WORLD);
total_mpi_bcast_time += MPI_Wtime();
}

使用8个进程运行,输出结果为

1
2
3
Data size = 400000, Trials = 10
Avg my_bcast time = 0.000292659
Avg MPI_Bcast time = 0.000147843

并行的进程越多差距越大

MPI_Scatter分发

MPI_Scatter是一个跟MPI_Bcast类似的集体通信机制。MPI_Scatter的操作会设计一个指定的根进程,根进程会将数据发送到communicator里面的所有进程。MPI_BcastMPI_Scatter的主要区别很小但是很重要。MPI_Bcast给每个进程发送的是同样的数据,然而MPI_Scatter给每个进程发送的是一个数组的一部分数据。下图进一步展示了这个区别。

下面的 MPI_Scatter 函数的原型。

1
2
3
4
5
6
7
8
9
10
MPI_Scatter(
void* send_data,
int send_count,
MPI_Datatype send_datatype,
void* recv_data,
int recv_count,
MPI_Datatype recv_datatype,
int root,
MPI_Comm communicator
)

第一个参数,send_data,是在根进程上的一个数据数组。第二个和第三个参数,send_countsend_datatype分别描述了发送给每个进程的数据数量和数据类型。比如:

  • 如果send_count是1,send_datatypeMPI_INT的话,进程0会得到数据里的第一个整数,以此类推,
  • 如果send_count是2,进程0会得到前两个整数,进程1会得到第三个和第四个整数,以此类推。
    在实践中,一般来说send_count会等于数组的长度除以进程的数量。

函数定义里面接收数据的参数跟发送的参数几乎相同。recv_data参数是一个缓存,它里面存了recv_countrecv_datatype数据类型的元素。最后两个参数,rootcommunicator分别指定开始分发数组的根进程以及对应的communicator。

MPI_Gather收集

MPI_GatherMPI_Scatter是相反的。MPI_Gather从好多进程里面收集数据到一个进程上面而不是从一个进程分发数据到多个进程。

MPI_Scatter类似,MPI_Gather从其他进程收集元素到根进程上面。元素是根据接收到的进程的秩排序的。MPI_Gather的函数原型跟MPI_Scatter长的一样。

1
2
3
4
5
6
7
8
9
10
MPI_Gather(
void* send_data,
int send_count,
MPI_Datatype send_datatype,
void* recv_data,
int recv_count,
MPI_Datatype recv_datatype,
int root,
MPI_Comm communicator
)

MPI_Gather中,只有根进程需要一个有效的接收缓存。所有其他的调用进程可以传递NULLrecv_data。另外,别忘记recv_count参数是从每个进程接收到的数据数量,而不是所有进程的数据总量之和。

使用MPI_Scatter和MPI_Gather来计算平均数

完整代码见 avg.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (world_rank == 0) {
rand_nums = create_rand_nums(elements_per_proc * world_size);
}

float *sub_rand_nums = malloc(sizeof(float) * elements_per_proc);
MPI_Scatter(rand_nums, elements_per_proc, MPI_FLOAT, sub_rand_nums, elements_per_proc, MPI_FLOAT, 0, MPI_COMM_WORLD);

float sub_avg = compute_avg(sub_rand_nums, elements_per_proc);

float *sub_avgs = NULL;
if (world_rank == 0) {
sub_avgs = malloc(sizeof(float) * world_size);
}
MPI_Gather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, 0, MPI_COMM_WORLD);

if (world_rank == 0) {
float avg = compute_avg(sub_avgs, world_size);
}

输出结果如下:

1
2
Avg of all elements is 0.529195
Avg computed across original data is 0.529195

MPI_Allgather以及修改后的平均程序

对于分发在所有进程上的一组数据来说,MPI_Allgather会收集所有数据到所有进程上。从最基础的角度来看,MPI_Allgather相当于一个MPI_Gather操作之后跟着一个MPI_Bcast操作。下面的示意图显示了MPI_Allgather调用之后数据是如何分布的。

1
2
3
4
5
6
7
8
9
MPI_Allgather(
void* send_data,
int send_count,
MPI_Datatype send_datatype,
void* recv_data,
int recv_count,
MPI_Datatype recv_datatype,
MPI_Comm communicator
)

把计算平均数的代码修改成了使用MPI_Allgather来计算。

1
2
3
4
float *sub_avgs = (float *)malloc(sizeof(float) * world_size);
MPI_Allgather(&sub_avg, 1, MPI_FLOAT, sub_avgs, 1, MPI_FLOAT, MPI_COMM_WORLD);

float avg = compute_avg(sub_avgs, world_size);

完整代码见 all_avg.cpp 。输出结果为

1
2
3
4
Avg of all elements from 0 is 0.483204
Avg of all elements from 1 is 0.483204
Avg of all elements from 2 is 0.483204
Avg of all elements from 3 is 0.483204

归约简介

数据归约包括通过函数将一组数字归约为较小的一组数字。例如,假设我们有一个数字列表[1,2,3,4,5]

  • sum函数归约此数字列表将产生sum([1、2、3、4、5]) = 15
  • 用乘法归约将产生multiply([1、2、3、4、5]) = 120

MPI_Reduce

MPI_Gather类似,MPI_Reduce在每个进程上获取一个输入元素数组,并将输出元素数组返回给根进程。
MPI_Reduce 的原型如下所示:

1
2
3
4
5
6
7
8
9
MPI_Reduce(
void* send_data,
void* recv_data,
int count,
MPI_Datatype datatype,
MPI_Op op,
int root,
MPI_Comm communicator
)

send_data参数是每个进程都希望归约的datatype类型元素的数组。
recv_data仅与具有root秩的进程相关。recv_data数组包含归约的结果,大小为sizeof(datatype)* count
op 参数是您希望应用于数据的操作。MPI 定义的归约操作包括:

  • MPI_MAX - 返回最大元素。
  • MPI_MIN - 返回最小元素。
  • MPI_SUM - 对元素求和。
  • MPI_PROD - 将所有元素相乘。
  • MPI_LAND - 对元素执行逻辑运算。
  • MPI_LOR - 对元素执行逻辑运算。
  • MPI_BAND - 对元素的各个位按位执行。
  • MPI_BOR - 对元素的位执行按位运算。
  • MPI_MAXLOC - 返回最大值和所在的进程的秩。
  • MPI_MINLOC - 返回最小值和所在的进程的秩。

下面是 MPI_Reduce 通信模式的说明。

在上图中,每个进程包含一个整数。根进程调用MPI_Reduce,并使用MPI_SUM作为归约运算。这四个数字相加后将结果存储在根进程中。

下图显示了每个进程归约多个数字的情况。

上图中的每个进程都有两个元素。结果求和基于每个元素进行。换句话说,不是将所有数组中的所有元素累加到一个元素中,而是将每个数组中的第i个元素累加到进程0结果数组中的第i个元素中。

示例程序:使用 MPI_Reduce 计算均值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
float *rand_nums = NULL;
rand_nums = create_rand_nums(num_elements_per_proc);

// Sum the numbers locally
float local_sum = 0;
int i;
for (i = 0; i < num_elements_per_proc; i++) {
local_sum += rand_nums[i];
}

// Print the random numbers on each process
printf("Local sum for process %d - %f, avg = %f\n", world_rank, local_sum, local_sum / num_elements_per_proc);

// Reduce all of the local sums into the global sum
float global_sum;
MPI_Reduce(&local_sum, &global_sum, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD);

// Print the result
if (world_rank == 0) {
printf("Total sum = %f, avg = %f\n", global_sum, global_sum / (world_size * num_elements_per_proc));
}

只有根进城的global_sum有效

输出结果如下:

1
2
3
4
5
Local sum for process 0 - 133.853, avg = 0.522864
Local sum for process 1 - 131.135, avg = 0.512248
Local sum for process 3 - 125.31, avg = 0.489492
Local sum for process 2 - 127.639, avg = 0.49859
Total sum = 517.938, avg = 0.505798

MPI_Allreduce

许多并行程序中,需要在所有进程而不是仅仅在根进程中访问归约的结果。

1
2
3
4
5
6
7
8
MPI_Allreduce(
void* send_data,
void* recv_data,
int count,
MPI_Datatype datatype,
MPI_Op op,
MPI_Comm communicator
)

您可能已经注意到,MPI_AllreduceMPI_Reduce相同,不同之处在于它不需要根进程 ID(因为结果分配给所有进程)。

示例程序:使用 MPI_Allreduce 计算标准差

许多计算问题需要进行多次归约来解决。一个这样的问题是找到一组分布式数字的标准差。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
rand_nums = create_rand_nums(num_elements_per_proc);

// Sum the numbers locally
float local_sum = 0;
int i;
for (i = 0; i < num_elements_per_proc; i++) {
local_sum += rand_nums[i];
}

// Reduce all of the local sums into the global sum in order to calculate the mean
float global_sum;
MPI_Allreduce(&local_sum, &global_sum, 1, MPI_FLOAT, MPI_SUM, MPI_COMM_WORLD);
float mean = global_sum / (num_elements_per_proc * world_size);

// Compute the local sum of the squared differences from the mean
float local_sq_diff = 0;
for (i = 0; i < num_elements_per_proc; i++) {
local_sq_diff += (rand_nums[i] - mean) * (rand_nums[i] - mean);
}

// Reduce the global sum of the squared differences to the root process and print off the answer
float global_sq_diff;
MPI_Reduce(&local_sq_diff, &global_sq_diff, 1, MPI_FLOAT, MPI_SUM, 0, MPI_COMM_WORLD);

// The standard deviation is the square root of the mean of the squared differences.
if (world_rank == 0) {
float stddev = sqrt(global_sq_diff / (num_elements_per_proc * world_size));
printf("Mean - %f, Standard deviation = %f\n", mean, stddev);
}

输出如下

1
Mean - 0.519573, standard deviation = 0.286734

通讯器概述

在以前的教程中,使用了通讯器MPI_COMM_WORLD。对于简单的程序,这已经足够了,因为我们的进程数量相对较少,并且通常要么一次要与其中之一对话,要么一次要与所有对话。当程序规模开始变大时,这变得不那么实用了,我们可能只想一次与几个进程进行对话。

MPI_Comm_split

第一个也是最常见的用于创建新的通讯器的函数:

1
2
3
4
5
6
MPI_Comm_split(
MPI_Comm comm,
int color,
int key,
MPI_Comm* newcomm
)

MPI_Comm_split通过基于输入值colorkey将通讯器“拆分”为一组子通讯器来创建新的通讯器。

  • 第一个参数comm是通讯器,它将用作新通讯器的基础。这可能是MPI_COMM_WORLD,但也可能是其他任何通讯器。
  • 第二个参数color确定每个进程将属于哪个新的通讯器。为color传递相同值的所有进程都分配给同一通讯器。如果colorMPI_UNDEFINED,则该进程将不包含在任何新的通讯器中。
  • 第三个参数key确定每个新通讯器中的顺序(秩)。传递key最小值的进程将为0,下一个最小值将为1,依此类推。如果存在平局,则在原始通讯器中秩较低的进程将是第一位。
  • 最后一个参数newcomm是 MPI 如何将新的通讯器返回给用户。

使用多个通讯器的示例

现在,让我们看一个简单的示例,在该示例中,我们尝试将单个全局通讯器拆分为一组较小的通讯器。在此示例中,我们将想象我们已经在逻辑上将原始通讯器布局为共 16 个进程的 4x4 网格,并且希望按行划分网格。为此,每一行将获得自己的颜色(参数 color)。在下图中,您可以看到左图具有相同颜色的每组进程如何最终变成右图的自己的通讯器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 获取原始通讯器的秩和大小
int world_rank, world_size;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
MPI_Comm_size(MPI_COMM_WORLD, &world_size);

int color = world_rank / 4; // 根据行确定颜色
// 根据颜色拆分通讯器,然后调用
// 利用原始秩
MPI_Comm row_comm;
MPI_Comm_split(MPI_COMM_WORLD, color, world_rank, &row_comm);

int row_rank, row_size;
MPI_Comm_rank(row_comm, &row_rank);
MPI_Comm_size(row_comm, &row_size);

printf("WORLD RANK/SIZE: %d/%d \t ROW RANK/SIZE: %d/%d\n", world_rank, world_size, row_rank, row_size);

MPI_Comm_free(&row_comm);

请记住,color决定了拆分后该进程所属的通讯器。这里使用原始秩world_rank作为拆分操作的key。最后,使用MPI_Comm_free释放通讯器。输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
WORLD RANK/SIZE: 0/16    ROW RANK/SIZE: 0/4
WORLD RANK/SIZE: 1/16 ROW RANK/SIZE: 1/4
WORLD RANK/SIZE: 4/16 ROW RANK/SIZE: 0/4
WORLD RANK/SIZE: 5/16 ROW RANK/SIZE: 1/4
WORLD RANK/SIZE: 2/16 ROW RANK/SIZE: 2/4
WORLD RANK/SIZE: 8/16 ROW RANK/SIZE: 0/4
WORLD RANK/SIZE: 12/16 ROW RANK/SIZE: 0/4
WORLD RANK/SIZE: 14/16 ROW RANK/SIZE: 2/4
WORLD RANK/SIZE: 15/16 ROW RANK/SIZE: 3/4
WORLD RANK/SIZE: 3/16 ROW RANK/SIZE: 3/4
WORLD RANK/SIZE: 9/16 ROW RANK/SIZE: 1/4
WORLD RANK/SIZE: 13/16 ROW RANK/SIZE: 1/4
WORLD RANK/SIZE: 10/16 ROW RANK/SIZE: 2/4
WORLD RANK/SIZE: 6/16 ROW RANK/SIZE: 2/4
WORLD RANK/SIZE: 7/16 ROW RANK/SIZE: 3/4
WORLD RANK/SIZE: 11/16 ROW RANK/SIZE: 3/4

每次运行,打印结果的顺序并不一致,这很正常

附录I

MPI乒乓程序

两个进程会一直使用MPI_SendMPI_Recv方法来“推挡”消息,直到他们决定不玩了。代码如下(见 pingpang.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include <mpi.h>
#include <iostream>

int main(int argc, char** argv) {
const int PING_PONG_LIMIT = 10;

MPI_Init(NULL, NULL);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);

int ping_pong_count = 0;
int partner_rank = (rank + 1) % 2;

while (ping_pong_count < PING_PONG_LIMIT) {
if (rank == ping_pong_count % 2) {
ping_pong_count++;
MPI_Send(&ping_pong_count, 1, MPI_INT, partner_rank, 0, MPI_COMM_WORLD);
std::cout << rank << " sent and incremented ping_pong_count " << ping_pong_count << " to " << partner_rank << std::endl;
}
else {
MPI_Recv(&ping_pong_count, 1, MPI_INT, partner_rank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << rank << "received ping_pong_count " << ping_pong_count << " from " << partner_rank << std::endl;
}
}
MPI_Finalize();
return 0;
}

输出结果如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
0 sent and incremented ping_pong_count 1 to 1
0 received ping_pong_count 2 from 1
0 sent and incremented ping_pong_count 3 to 1
0 received ping_pong_count 4 from 1
0 sent and incremented ping_pong_count 5 to 1
0 received ping_pong_count 6 from 1
0 sent and incremented ping_pong_count 7 to 1
0 received ping_pong_count 8 from 1
0 sent and incremented ping_pong_count 9 to 1
0 received ping_pong_count 10 from 1
1 received ping_pong_count 1 from 0
1 sent and incremented ping_pong_count 2 to 0
1 received ping_pong_count 3 from 0
1 sent and incremented ping_pong_count 4 to 0
1 received ping_pong_count 5 from 0
1 sent and incremented ping_pong_count 6 to 0
1 received ping_pong_count 7 from 0
1 sent and incremented ping_pong_count 8 to 0
1 received ping_pong_count 9 from 0
1 sent and incremented ping_pong_count 10 to 0

附录II

环程序。

代码见 ring.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <mpi.h>
#include <iostream>

int main(int argc, char** argv) {

MPI_Init(NULL, NULL);

int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);

int token;
if (rank != 0) {
MPI_Recv(&token, 1, MPI_INT, rank - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << "Process " << rank << " received token " << token << " from process" << rank - 1 << std::endl;
} else {
token = -1;
}

MPI_Send(&token, 1, MPI_INT, (rank + 1) % size, 0, MPI_COMM_WORLD);
if (rank == 0) {
MPI_Recv(&token, 1, MPI_INT, size - 1, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::cout << "Process " << rank << " received token " << token << " from process" << size - 1 << std::endl;
}
MPI_Finalize();
return 0;
}

输出结果如下:

1
2
3
4
Process 1 received token -1 from process0
Process 2 received token -1 from process1
Process 3 received token -1 from process2
Process 0 received token -1 from process3