矩阵相乘的并行计算

矩阵相乘的并行计算

BaconToast Lv3

最终提交的程序作业 ID:11195683

一、辅助程序使用

1. 矩阵生成程序

matrixgen.cxx 是一个C++文件,编译命令:

1
g++ -o matrixgen matrixgen.cxx

运行命令:

1
./matrixgen 5 4 matrix.dat

生成一个行数5、列数4的矩阵,存至文件 matrix.dat

2. 矩阵输出程序

print.cxx 也是一个C++程序,编译同上。

运行:

1
./print matrix.dat

输出结果如下:

1
2
3
4
5
6
       ---- matrix.dat: 5*4 Matrix -----
0.0308 0.2374 0.2902 0.1330
0.9789 0.1510 0.5571 0.3774
0.6537 0.6411 0.0840 0.4454
0.6113 0.0884 0.3083 0.3465
0.8908 0.1426 0.1449 0.1984

Vim调整tab宽度::set tabstop=4,感觉对OS实验有用,之前一直是缩进八格,很丑。
还有缩进相关,都可以调,原来都没开 :set smartindent

二、源程序 bug 修复

1. 分配矩阵 A 的每一行处理错误

错误代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
if (!myid) {  
j = (numprocs - 1) < M ? (numprocs - 1) : M;
for (i = 1; i <= j; i++) {
MPI_Send(A[i-1], N, MPI_FLOAT, i, 99, MPI_COMM_WORLD);
}
numsend = j;

for (i = 1; i <= M; i++) {
sender = (i - 1) % (numprocs - 1) + 1;
MPI_Recv(C[i-1], P, MPI_FLOAT, sender, 100, MPI_COMM_WORLD, &status);
if (numsend < M) {
MPI_Send(A[i-1], N, MPI_FLOAT, sender, 99, MPI_COMM_WORLD); // error
numsend++;
} else {
MPI_Send(&j, 0, MPI_INT, sender, 0, MPI_COMM_WORLD);
}
}
}

这一段代码实现了0号进程分发矩阵A的每一行,j = (numprocs - 1) < M ? (numprocs - 1) : M; 判断处理器数和A的行数的关系,如果处理器够多就都分发了,如果不够先发一部分。

numsend = j; 此处记录了已分发的行数量。接下来的循环中判断是否还有未分发的(if (numsend < M)),接下来的send语句存在bug:

1
MPI_Send(A[i-1], N, MPI_FLOAT, sender, 99, MPI_COMM_WORLD);

A[i-1]i 是从1到M,显然不对,应该改成 numsend,和下面的自增也配合对了。但是这仍然是错的:

这种修改方式保证了所有行都能被分发,解决了浅层问题,但又存在新的bug:接收端 MPI_Recv(C[i-1], P, MPI_FLOAT, sender, 100, MPI_COMM_WORLD, &status); 是按照进程 ID 进行填充矩阵 C 行的,而多出来的那些行会被随机进程获取并计算,并没有记录行序号信息

修改思路是使用 tag 传递行号信息。

正确代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
if (!myid) {  
j = (numprocs - 1) < M ? (numprocs - 1) : M;
for (i = 1; i <= j; i++) {
/** fix bug: i as tag **/
MPI_Send(A[i-1], N, MPI_FLOAT, i, i, MPI_COMM_WORLD);
}
numsend = j;

for (i = 1; i <= M; i++) {
float result[P]; // 临时接收结果
MPI_Recv(result, P, MPI_FLOAT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// 获取发送者和行号tag
sender = status.MPI_SOURCE;
int index = status.MPI_TAG - 1;
// 结果存入正确的行
for (k = 0; k < P; k++) {
C[index][k] = result[k];
}

if (numsend < M) {
/** fix bug: numsend as tag **/
MPI_Send(A[numsend], N, MPI_FLOAT, sender, numsend + 1, MPI_COMM_WORLD); // error
numsend++;
} else {
MPI_Send(&j, 0, MPI_INT, sender, 0, MPI_COMM_WORLD);
}
}
}

2. 从属进程数大于矩阵 A 行数时死锁

错误代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
if (!myid) {  
j = (numprocs - 1) < M ? (numprocs - 1) : M;
for (i = 1; i <= j; i++) {
MPI_Send(A[i-1], N, MPI_FLOAT, i, 99, MPI_COMM_WORLD);
}
numsend = j;
// ...
} else {
// ...
while (1) {
MPI_Recv(A_row, N, MPI_FLOAT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if (status.MPI_TAG == 0) {
break;
}
// ...
}
}

0号进程(主进程)给 j 个从属进程发送数据,此处 j 取从属进程数量和矩阵 A 行数的较小值。因此考虑一种情况:当从属进程数大于 A 矩阵行数时。所有行都会被分发,但会有一些从属进程未收到任务

else 块中,所有从属进程都进行阻塞接收,前 M 个从属进程成功接收,但剩下的从属进程并未被分配数据,也会进行阻塞等待,因此死锁了。

正确代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
if (!myid) {  
j = (numprocs - 1) < M ? (numprocs - 1) : M;
for (i = 1; i <= j; i++) {
MPI_Send(A[i-1], N, MPI_FLOAT, i, 99, MPI_COMM_WORLD);
}
numsend = j;

/** fix bug **/
for (i = j + 1; i < numprocs; i++) {
MPI_Send(&j, 0, MPI_INT, i, 0, MPI_COMM_WORLD);
}
/** fix bug **/

// ...
} else {
// ...
while (1) {
MPI_Recv(A_row, N, MPI_FLOAT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
if (status.MPI_TAG == 0) {
break;
}
// ...
}
}

主进程逻辑中新增给其他未发送数据的从属进程发送 tag 为 0 的信号,解除阻塞等待,并提醒它直接退出循环。

3. 严格的接收顺序导致性能下降

这不是一个bug,但是一个可以优化的地方:

1
2
3
4
5
6
7
8
9
10
if (!myid) {  
// ...

for (i = 1; i <= M; i++) {
sender = (i - 1) % (numprocs - 1) + 1;
MPI_Recv(C[i-1], P, MPI_FLOAT, sender, 100, MPI_COMM_WORLD, &status);

// ...
}
}

主进程按照严格的顺序计算 sender,顺序接收从进程的计算结果。但是如果后面的进程先计算结束,后面进程的 send 就要一直等待前面进程。将 sender 改为 MPI_ANY_SOURCE 可以达到优化的目的。

三、代码注释

这里是修改了 bug 的完整代码和 50% 注释,并且输入接入了随机矩阵生成程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
#include <stdio.h> // 引入标准输入输出库,用于文件操作和打印信息
#include <mpi.h> // 引入MPI(消息传递接口)库,用于多进程并行计算
#include <sys/sysinfo.h> // 引入系统信息库,用于获取CPU核心数
#include <pthread.h> // 引入Pthreads库,用于在单个进程内进行多线程并行
#include <stdlib.h> // 引入标准库,用于内存分配(malloc)、退出(exit)等

// 定义一个结构体,用于向每个线程传递参数
struct threadArg {
int tid; // 线程的ID (0, 1, 2, ...)
float* B; // 指向完整的矩阵B的指针,所有线程共享
int N_dim; // 矩阵A的列数 / 矩阵B的行数
int P_dim; // 矩阵B的列数 / 结果矩阵C的列数
float* A_row; // 指向当前正在处理的矩阵A的某一行的指针
float* C_row; // 指向用于存放计算结果的C的某一行的指针
int numthreads; // 当前工作进程中创建的线程总数
};

// 全局变量,用于存放所有线程的参数结构体数组
struct threadArg* targs;

/**
* @brief 线程工作函数
* 每个线程执行这个函数,计算结果行向量(C_row)的一部分。
* @param arg 传入的参数,是一个指向threadArg结构体的指针
* @return void* NULL
*/
void* worker(void* arg) {
int i, j;
// 将通用的void*参数强制类型转换为我们自定义的结构体指针
struct threadArg* myarg = (struct threadArg*)arg;
int N = myarg->N_dim; // 获取矩阵A的列数N
int P = myarg->P_dim; // 获取矩阵B的列数P

// 任务分配:采用交叉分配(cyclic distribution)的方式
// 每个线程负责计算结果行C_row中的第 tid, tid+numthreads, tid+2*numthreads, ... 列
for (i = myarg->tid; i < P; i += myarg->numthreads) {
myarg->C_row[i] = 0.0; // 初始化结果元素为0
// 计算 C_row[i] = A_row * B_col[i]
// B矩阵是按列优先存储的(实际上代码中是按行优先,但通过 j*P+i 访问实现列访问)
for (j = 0; j < N; j++) {
myarg->C_row[i] += myarg->A_row[j] * myarg->B[j * P + i];
}
}
return NULL; // 线程函数标准返回
}

/**
* @brief 从二进制文件中读取矩阵数据
* @param filename 文件名
* @param rows 用于返回矩阵行数的指针
* @param cols 用于返回矩阵列数的指针
* @param matrix_ptr 用于返回读取到的矩阵数据的指针的指针
* @return int 1表示成功,0表示失败
*/
int read_matrix(const char* filename, int* rows, int* cols, float** matrix_ptr) {
FILE* file = fopen(filename, "rb"); // 以二进制只读模式打开文件
if (!file) {
printf("错误: 无法打开文件 %s\n", filename);
return 0; // 打开失败返回0
}
int m, n; // 用于临时存储行数和列数
int i;

// 从文件头读取行数和列数
fread(&m, sizeof(int), 1, file);
fread(&n, sizeof(int), 1, file);
*rows = m; // 通过指针设置外部变量的值
*cols = n;

// 文件中存储的是double类型,但程序计算使用float类型
// 所以先分配一个临时空间读取double数据
double* temp_matrix = (double*)malloc(m * n * sizeof(double));
if (!temp_matrix) {
printf("错误: 为double矩阵分配临时内存失败。\n");
fclose(file);
return 0;
}

// 为最终的float矩阵分配内存
*matrix_ptr = (float*)malloc(m * n * sizeof(float));
if (!(*matrix_ptr)) {
printf("错误: 为float矩阵分配内存失败。\n");
free(temp_matrix); // 释放已分配的临时内存
fclose(file);
return 0;
}

// 从文件中读取整个矩阵的double数据
fread(temp_matrix, sizeof(double), m * n, file);
fclose(file); // 关闭文件

// 将double数据转换为float数据
for (i = 0; i < m * n; i++) {
(*matrix_ptr)[i] = (float)temp_matrix[i];
}

free(temp_matrix); // 释放临时内存
return 1; // 成功返回1
}


/**
* @brief 主函数
*/
int main(int argc, char *argv[]) {
int myid, numprocs; // myid: 当前进程的ID(rank), numprocs: 总进程数
MPI_Status status; // MPI状态变量,用于接收消息时获取额外信息(如来源、标签)
int i, j, k, numsend, sender; // 循环变量和临时变量
int numthreads; // 线程数

int M, N, P; // 矩阵维度: A(M x N), B(N x P), C(M x P)

float *A = NULL, *B = NULL, *C = NULL; // 指向矩阵A, B, C的指针

pthread_t* tids; // 指向线程ID数组的指针
float *A_row, *C_row; // 工作进程中用于存储A的一行和计算C的一行的缓冲区

// 初始化MPI环境
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &myid); // 获取当前进程的ID
MPI_Comm_size(MPI_COMM_WORLD, &numprocs); // 获取总进程数

// ================= 主进程 (rank 0) 的工作 =================
if (myid == 0) {
const char* filename_A = "matrix_A.dat"; // 矩阵A的文件名
const char* filename_B = "matrix_B.dat"; // 矩阵B的文件名

int N_A, N_B; // 用于存储从文件中读取的矩阵维度以进行检查
// 读取矩阵A
if (!read_matrix(filename_A, &M, &N_A, &A)) {
MPI_Abort(MPI_COMM_WORLD, 1); // 如果失败,中止所有MPI进程
}
// 读取矩阵B
if (!read_matrix(filename_B, &N_B, &P, &B)) {
free(A); // 释放已分配的内存
MPI_Abort(MPI_COMM_WORLD, 1);
}

// 检查矩阵维度是否匹配 (A的列数必须等于B的行数)
if (N_A != N_B) {
printf("错误: 矩阵维度不匹配. A是 %dx%d, B是 %dx%d.\n", M, N_A, N_B, P);
free(A);
free(B);
MPI_Abort(MPI_COMM_WORLD, 1);
}
N = N_A; // 确定维度N
printf("矩阵A: %d x %d, 矩阵B: %d x %d\n", M, N, N, P);

// 为结果矩阵C分配内存
C = (float*)malloc(M * P * sizeof(float));
if (!C) {
printf("错误: 为结果矩阵C分配内存失败。\n");
free(A); free(B);
MPI_Abort(MPI_COMM_WORLD, 1);
}
}

// 将矩阵维度(M, N, P)从主进程广播给所有其他进程
int dims[3]; // 创建一个数组来存放维度信息
if (myid == 0) {
dims[0] = M;
dims[1] = N;
dims[2] = P;
}
MPI_Bcast(dims, 3, MPI_INT, 0, MPI_COMM_WORLD);
// 其他进程接收广播的维度信息
if (myid != 0) {
M = dims[0];
N = dims[1];
P = dims[2];
}

// 对于非主进程,需要为矩阵B分配内存
if (myid != 0) {
B = (float*)malloc(N * P * sizeof(float));
}
// 主进程将完整的矩阵B广播给所有其他进程
MPI_Bcast(B, N * P, MPI_FLOAT, 0, MPI_COMM_WORLD);

// ================= 任务分发和计算 =================
if (!myid) { // 主进程 (myid == 0)
// 初始任务分发:给每个工作进程(或直到任务分发完)发送A的一行
// j是实际需要初始分发任务的进程数,不能超过总进程数-1,也不能超过总行数M
j = (numprocs - 1) < M ? (numprocs - 1) : M;
for (i = 1; i <= j; i++) {
// 发送A的第(i-1)行给进程i
// MPI_Send(数据指针, 数量, 数据类型, 目标进程ID, 消息标签, 通信域)
// 这里的消息标签(tag)很关键,我们用它来标记这是A的哪一行(从1开始计数)
MPI_Send(A + (i-1)*N, N, MPI_FLOAT, i, i, MPI_COMM_WORLD);
}
numsend = j; // 记录已经发送了多少行

// 如果工作进程数比初始任务还多,给它们发送一个“无工作”的初始消息(可选,但这里实现是等待接收后再动态分配)
// 这段代码实际是发送一个无效任务来“启动”多余的进程,但更好的做法是直接进入主循环
for (i = j + 1; i < numprocs; i++) {
// tag=0 表示这是一个结束信号,但在这里用作一个“空”任务的启动信号
MPI_Send(A, N, MPI_FLOAT, i, 0, MPI_COMM_WORLD);
}

// 主循环:接收已完成的结果,并发送新任务
for (i = 1; i <= M; i++) {
float result[P]; // 接收结果的缓冲区
// 接收任何一个工作进程发回的结果
// MPI_ANY_SOURCE 表示可以接收来自任何进程的消息
// MPI_ANY_TAG 表示可以接收任何标签的消息
MPI_Recv(result, P, MPI_FLOAT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
sender = status.MPI_SOURCE; // 从status中获取是哪个进程发来的
int index = status.MPI_TAG - 1; // 从tag中获取这是A的哪一行计算的结果(之前发送时tag=行号+1)

// 将收到的结果存入C矩阵的正确位置
if (index >= 0 && index < M) {
for (k = 0; k < P; k++) {
C[index*P + k] = result[k];
}
}

// 动态任务分配:如果还有未计算的行,就发送给刚刚完成任务的那个进程
if (numsend < M) {
MPI_Send(A + numsend*N, N, MPI_FLOAT, sender, numsend + 1, MPI_COMM_WORLD);
numsend++; // 已发送行数加一
} else {
// 如果所有行都已分发完毕,就给这个空闲的进程发送一个结束信号 (tag=0)
MPI_Send(A, N, MPI_FLOAT, sender, 0, MPI_COMM_WORLD);
}
}
} else { // 工作进程 (myid != 0)
numthreads = get_nprocs(); // 获取当前机器的CPU核心数,作为线程数
tids = (pthread_t*)malloc(numthreads * sizeof(pthread_t)); // 分配线程ID数组

A_row = (float*)malloc(N * sizeof(float)); // 分配接收A一行的缓冲区
C_row = (float*)malloc(P * sizeof(float)); // 分配存放计算结果C一行的缓冲区
targs = (struct threadArg*)malloc(numthreads * sizeof(struct threadArg)); // 分配线程参数数组

// 初始化所有线程的参数
// 除了tid不同,其他很多参数是所有线程共享的
for (i = 0; i < numthreads; i++) {
targs[i].tid = i;
targs[i].B = B;
targs[i].N_dim = N;
targs[i].P_dim = P;
targs[i].A_row = A_row;
targs[i].C_row = C_row;
targs[i].numthreads = numthreads;
}

// 工作进程主循环:不断接收任务直到收到结束信号
while (1) {
// 接收主进程发来的A的一行
MPI_Recv(A_row, N, MPI_FLOAT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// 检查消息标签,如果为0,则表示所有任务已完成,跳出循环
if (status.MPI_TAG == 0) {
break;
}
int index_tag = status.MPI_TAG; // 保存这个标签,完成计算后要用同样的标签发回去

// 创建多个线程来并行计算 C_row = A_row * B
for (i = 0; i < numthreads; i++) {
pthread_create(&tids[i], NULL, worker, &targs[i]);
}
// 等待所有线程完成计算
for (i = 0; i < numthreads; i++) {
pthread_join(tids[i], NULL);
}

// 将计算好的结果行C_row发送回主进程
MPI_Send(C_row, P, MPI_FLOAT, 0, index_tag, MPI_COMM_WORLD);
}

// 释放工作进程分配的内存
free(B); free(tids); free(A_row); free(C_row); free(targs);
}

// 同步点:确保所有进程都完成了自己的工作,主进程才开始打印结果
// 这可以防止主进程在工作进程还未退出时就结束程序
MPI_Barrier(MPI_COMM_WORLD);

// ================= 结果输出和清理 =================
if (!myid) { // 只有主进程打印最终结果
printf("结果矩阵 C:\n");
for (i = 0; i < M; i++) {
for (j = 0; j < P; j++) {
printf("%6.2f ", C[i*P + j]); // 格式化输出
}
printf("\n");
}

// 释放主进程分配的内存
free(A); free(B); free(C);
}

// 结束MPI环境,释放所有与MPI相关的资源
MPI_Finalize();
return 0; // 程序正常退出
}

四、运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
[pl23373179@ln02 hw3]$ g++ matrixgen.cxx -o matgen
[pl23373179@ln02 hw3]$ g++ print.cxx -o print
[pl23373179@ln02 hw3]$ ./matgen 5 4 matrix_A.dat
[pl23373179@ln02 hw3]$ ./matgen 4 3 matrix_B.dat
[pl23373179@ln02 hw3]$ ./print matrix_A.dat
---- matrix_A.dat: 5*4 Matrix -----
0.8542 0.2758 0.9811 0.3089
0.8958 0.0017 0.6990 0.3635
0.2163 0.0449 0.3980 0.8485
0.9474 0.8159 0.3180 0.1536
0.6925 0.6190 0.8004 0.7914
[pl23373179@ln02 hw3]$ ./print matrix_B.dat
---- matrix_B.dat: 4*3 Matrix -----
0.8206 0.9126 0.8887
0.0331 0.8013 0.7295
0.6622 0.3368 0.2291
0.0144 0.6393 0.6054
[pl23373179@ln02 hw3]$ mpicc mulMatrix.c -o mulMatrix
[pl23373179@ln02 hw3]$ sbatch mulMat
Submitted batch job

[pl23373179@ln02 hw3]$ cat hpc.out
Matrix A: 5 x 4, Matrix B: 4 x 3
Result matrix C:
1.36 1.53 1.37
1.20 1.29 1.18
0.45 0.91 0.83
1.02 1.72 1.60
1.13 1.90 1.73

五、程序流程图

hw3_mulMatrix

  • Title: 矩阵相乘的并行计算
  • Author: BaconToast
  • Created at : 2025-11-18 10:43:21
  • Updated at : 2025-11-18 10:55:55
  • Link: https://bacontoast-pro.github.io/2025/11/18/parallel/mulMatrix/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments