OpenMP 编程参考
OpenMP 并行编程完整参考与实例代码
HPCOpenMP
OpenMP 编程参考
OpenMP (Open Multi-Processing) 是用于共享内存并行编程的API标准。通过编译器指令、运行时库函数和环境变量,可以在C/C++/Fortran程序中实现并行化。
编译与配置
基础设置
# GCC/G++ 编译
g++ -fopenmp -O3 program.cpp -o program
# 设置线程数
export OMP_NUM_THREADS=8
// 包含头文件
#include <omp.h>
// 运行时设置线程数
omp_set_num_threads(8);
核心指令
并行区域
#pragma omp parallel
{
int thread_id = omp_get_thread_num();
int total_threads = omp_get_num_threads();
printf("Thread %d of %d\n", thread_id, total_threads);
}
#pragma omp parallel num_threads(4)
{
// 使用4个线程执行
}
循环并行化
#pragma omp parallel for
for (int i = 0; i < N; i++) {
a[i] = b[i] + c[i];
}
#pragma omp parallel for collapse(2)
for (int i = 0; i < N; i++) {
for (int j = 0; j < M; j++) {
matrix[i][j] = i * M + j;
}
}
并行段
#pragma omp parallel sections
{
#pragma omp section
{
// 任务1
compute_part_a();
}
#pragma omp section
{
// 任务2
compute_part_b();
}
}
数据作用域
关键概念 正确管理数据作用域是并行编程的核心。使用
default(none)强制显式声明所有变量的作用域。
作用域类型
| 子句 | 说明 | 使用场景 |
|---|---|---|
shared(list) | 所有线程共享同一份变量 | 输入数据、输出数组 |
private(list) | 每个线程拥有独立副本,初值未定义 | 循环临时变量 |
firstprivate(list) | 私有副本,用主线程值初始化 | 需要初值的临时变量 |
lastprivate(list) | 私有副本,最后迭代值写回主线程 | 需要保留最终值的变量 |
int sum = 0;
int N = 1000;
float data[1000];
#pragma omp parallel for default(none) \
shared(N, data, sum) private(i)
for (int i = 0; i < N; i++) {
sum += data[i]; // 错误:race condition
}
int sum = 0;
int N = 1000;
float data[1000];
#pragma omp parallel for default(none) \
shared(N, data) reduction(+:sum)
for (int i = 0; i < N; i++) {
sum += data[i]; // 正确
}
归约操作
归约操作将多个线程的局部结果合并为单一值。
支持的运算符
reduction(+:var) // 求和
reduction(*:var) // 乘积
reduction(-:var) // 差
reduction(&:var) // 按位与
reduction(|:var) // 按位或
reduction(^:var) // 按位异或
reduction(&&:var) // 逻辑与
reduction(||:var) // 逻辑或
reduction(max:var) // 最大值
reduction(min:var) // 最小值
实例:向量归约
float sum = 0.0f;
#pragma omp parallel for reduction(+:sum)
for (int i = 0; i < N; i++) {
sum += array[i];
}
float max_val = array[0];
#pragma omp parallel for reduction(max:max_val)
for (int i = 1; i < N; i++) {
if (array[i] > max_val) {
max_val = array[i];
}
}
float sum = 0.0f, prod = 1.0f;
#pragma omp parallel for reduction(+:sum) reduction(*:prod)
for (int i = 0; i < N; i++) {
sum += array[i];
prod *= array[i];
}
同步机制
屏障同步
#pragma omp parallel
{
// 阶段1
compute_phase1();
#pragma omp barrier // 所有线程在此等待
// 阶段2(需要阶段1完成)
compute_phase2();
}
临界区
int counter = 0;
#pragma omp parallel for
for (int i = 0; i < N; i++) {
#pragma omp critical
{
counter++; // 同一时间只有一个线程执行
}
}
int counter = 0;
#pragma omp parallel for
for (int i = 0; i < N; i++) {
#pragma omp atomic
counter++; // 硬件原子操作,更快
}
单线程执行
#pragma omp parallel
{
work();
#pragma omp master // 仅主线程执行
{
printf("Master thread reporting\n");
}
}
#pragma omp parallel
{
work();
#pragma omp single // 任一线程执行
{
initialize_shared_data();
}
// 隐式barrier
}
调度策略
控制循环迭代在线程间的分配方式。
策略类型
// 编译时均匀分配,开销最小
#pragma omp parallel for schedule(static)
for (int i = 0; i < N; i++) {
// 线程0: [0, N/4), 线程1: [N/4, N/2), ...
}
// 运行时动态分配,适合负载不均衡
#pragma omp parallel for schedule(dynamic, 10)
for (int i = 0; i < N; i++) {
// 每次分配10个迭代给空闲线程
do_variable_work(i);
}
// 块大小逐渐减小,兼顾效率和负载均衡
#pragma omp parallel for schedule(guided)
for (int i = 0; i < N; i++) {
do_work(i);
}
SIMD 向量化
指导编译器使用SIMD指令进行向量化。
#pragma omp simd
for (int i = 0; i < N; i++) {
a[i] = b[i] + c[i]; // 向量化加法
}
#pragma omp parallel for simd
for (int i = 0; i < N; i++) {
result[i] = sqrt(data[i]);
}
float *a = (float*)aligned_alloc(64, N * sizeof(float));
#pragma omp simd aligned(a:64)
for (int i = 0; i < N; i++) {
a[i] = a[i] * 2.0f;
}
任务并行
处理不规则并行模式,如递归、动态任务生成。
基础任务
int fib(int n) {
if (n < 2) return n;
int x, y;
#pragma omp task shared(x)
x = fib(n - 1);
#pragma omp task shared(y)
y = fib(n - 2);
#pragma omp taskwait // 等待两个子任务
return x + y;
}
#pragma omp parallel
{
#pragma omp single
result = fib(30);
}
struct Node {
int value;
Node* next;
};
#pragma omp parallel
{
#pragma omp single
{
Node* p = head;
while (p != nullptr) {
#pragma omp task firstprivate(p)
{
process(p->value);
}
p = p->next;
}
}
}
完整实例:矩阵乘法
void matmul_serial(float *A, float *B, float *C, int N) {
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
float sum = 0.0f;
for (int k = 0; k < N; k++) {
sum += A[i*N + k] * B[k*N + j];
}
C[i*N + j] = sum;
}
}
}
void matmul_parallel(float *A, float *B, float *C, int N) {
#pragma omp parallel for collapse(2) \
default(none) shared(A, B, C, N)
for (int i = 0; i < N; i++) {
for (int j = 0; j < N; j++) {
float sum = 0.0f;
#pragma omp simd reduction(+:sum)
for (int k = 0; k < N; k++) {
sum += A[i*N + k] * B[k*N + j];
}
C[i*N + j] = sum;
}
}
}
完整实例:Softmax 函数
void softmax_serial(float *input, float *output, int N) {
// 1. 找最大值
float max_val = input[0];
for (int i = 1; i < N; i++) {
if (input[i] > max_val)
max_val = input[i];
}
// 2. 计算exp和sum
float sum = 0.0f;
for (int i = 0; i < N; i++) {
output[i] = expf(input[i] - max_val);
sum += output[i];
}
// 3. 归一化
for (int i = 0; i < N; i++) {
output[i] /= sum;
}
}
void softmax_parallel(float *input, float *output, int N) {
float max_val = input[0];
float sum = 0.0f;
// 单个并行区域,减少线程创建开销
#pragma omp parallel default(none) \
shared(input, output, N) \
reduction(max:max_val) reduction(+:sum)
{
// 1. 并行找最大值
#pragma omp for
for (int i = 1; i < N; i++) {
if (input[i] > max_val)
max_val = input[i];
}
// 2. 并行计算exp和sum
#pragma omp for
for (int i = 0; i < N; i++) {
float exp_val = expf(input[i] - max_val);
output[i] = exp_val;
sum += exp_val;
}
// 3. 并行归一化
#pragma omp for
for (int i = 0; i < N; i++) {
output[i] /= sum;
}
}
}
运行时库函数
常用函数
// 线程管理
int omp_get_num_threads(void); // 当前线程数
int omp_get_thread_num(void); // 当前线程ID
int omp_get_max_threads(void); // 最大可用线程数
void omp_set_num_threads(int num); // 设置线程数
// 嵌套并行
void omp_set_nested(int nested); // 启用嵌套并行
int omp_get_nested(void); // 查询嵌套并行状态
// 时间测量
double omp_get_wtime(void); // 高精度时间戳
double omp_get_wtick(void); // 时钟精度
// 锁操作
void omp_init_lock(omp_lock_t *lock);
void omp_destroy_lock(omp_lock_t *lock);
void omp_set_lock(omp_lock_t *lock);
void omp_unset_lock(omp_lock_t *lock);
性能测量示例
#include <omp.h>
#include <stdio.h>
int main() {
double start = omp_get_wtime();
#pragma omp parallel for
for (int i = 0; i < 1000000; i++) {
// 计算任务
}
double end = omp_get_wtime();
printf("Time: %.6f seconds\n", end - start);
printf("Threads used: %d\n", omp_get_max_threads());
return 0;
}
环境变量
# 设置线程数
export OMP_NUM_THREADS=8
# 设置调度策略
export OMP_SCHEDULE="dynamic,4"
# 线程绑定
export OMP_PROC_BIND=true
# 嵌套并行
export OMP_NESTED=true
# 设置栈大小
export OMP_STACKSIZE=16M
性能优化建议
优化要点
- 减少并行区域开销:合并多个小的并行区域为一个大的
- 避免负载不均衡:选择合适的调度策略
- 减少同步开销:尽量使用
reduction而非critical- 使用
default(none):强制显式声明变量作用域- 对齐数据:配合SIMD使用对齐的内存分配
常见陷阱
注意事项
// 错误:未同步的共享变量更新 int counter = 0; #pragma omp parallel for for (int i = 0; i < N; i++) { counter++; // Race condition! } // 正确:使用atomic或reduction int counter = 0; #pragma omp parallel for reduction(+:counter) for (int i = 0; i < N; i++) { counter++; }