Sealessland logo Sealessland
HPC

OpenMP 编程参考

OpenMP 并行编程完整参考与实例代码

HPCOpenMP

OpenMP 编程参考

OpenMP (Open Multi-Processing) 是用于共享内存并行编程的API标准。通过编译器指令、运行时库函数和环境变量,可以在C/C++/Fortran程序中实现并行化。

编译与配置

基础设置

# GCC/G++ 编译
g++ -fopenmp -O3 program.cpp -o program

# 设置线程数
export OMP_NUM_THREADS=8
// 包含头文件
#include <omp.h>

// 运行时设置线程数
omp_set_num_threads(8);

核心指令

并行区域

#pragma omp parallel
{
    int thread_id = omp_get_thread_num();
    int total_threads = omp_get_num_threads();
    printf("Thread %d of %d\n", thread_id, total_threads);
}
#pragma omp parallel num_threads(4)
{
    // 使用4个线程执行
}

循环并行化

#pragma omp parallel for
for (int i = 0; i < N; i++) {
    a[i] = b[i] + c[i];
}
#pragma omp parallel for collapse(2)
for (int i = 0; i < N; i++) {
    for (int j = 0; j < M; j++) {
        matrix[i][j] = i * M + j;
    }
}

并行段

#pragma omp parallel sections
{
    #pragma omp section
    {
        // 任务1
        compute_part_a();
    }
    
    #pragma omp section
    {
        // 任务2
        compute_part_b();
    }
}

数据作用域

关键概念 正确管理数据作用域是并行编程的核心。使用 default(none) 强制显式声明所有变量的作用域。

作用域类型

子句说明使用场景
shared(list)所有线程共享同一份变量输入数据、输出数组
private(list)每个线程拥有独立副本,初值未定义循环临时变量
firstprivate(list)私有副本,用主线程值初始化需要初值的临时变量
lastprivate(list)私有副本,最后迭代值写回主线程需要保留最终值的变量
int sum = 0;
int N = 1000;
float data[1000];

#pragma omp parallel for default(none) \
    shared(N, data, sum) private(i)
for (int i = 0; i < N; i++) {
    sum += data[i]; // 错误:race condition
}
int sum = 0;
int N = 1000;
float data[1000];

#pragma omp parallel for default(none) \
    shared(N, data) reduction(+:sum)
for (int i = 0; i < N; i++) {
    sum += data[i]; // 正确
}

归约操作

归约操作将多个线程的局部结果合并为单一值。

支持的运算符

reduction(+:var)   // 求和
reduction(*:var)   // 乘积
reduction(-:var)   // 差
reduction(&:var)   // 按位与
reduction(|:var)   // 按位或
reduction(^:var)   // 按位异或
reduction(&&:var)  // 逻辑与
reduction(||:var)  // 逻辑或
reduction(max:var) // 最大值
reduction(min:var) // 最小值

实例:向量归约

float sum = 0.0f;
#pragma omp parallel for reduction(+:sum)
for (int i = 0; i < N; i++) {
    sum += array[i];
}
float max_val = array[0];
#pragma omp parallel for reduction(max:max_val)
for (int i = 1; i < N; i++) {
    if (array[i] > max_val) {
        max_val = array[i];
    }
}
float sum = 0.0f, prod = 1.0f;
#pragma omp parallel for reduction(+:sum) reduction(*:prod)
for (int i = 0; i < N; i++) {
    sum += array[i];
    prod *= array[i];
}

同步机制

屏障同步

#pragma omp parallel
{
    // 阶段1
    compute_phase1();
    
    #pragma omp barrier  // 所有线程在此等待
    
    // 阶段2(需要阶段1完成)
    compute_phase2();
}

临界区

int counter = 0;
#pragma omp parallel for
for (int i = 0; i < N; i++) {
    #pragma omp critical
    {
        counter++;  // 同一时间只有一个线程执行
    }
}
int counter = 0;
#pragma omp parallel for
for (int i = 0; i < N; i++) {
    #pragma omp atomic
    counter++;  // 硬件原子操作,更快
}

单线程执行

#pragma omp parallel
{
    work();
    
    #pragma omp master  // 仅主线程执行
    {
        printf("Master thread reporting\n");
    }
}
#pragma omp parallel
{
    work();
    
    #pragma omp single  // 任一线程执行
    {
        initialize_shared_data();
    }
    // 隐式barrier
}

调度策略

控制循环迭代在线程间的分配方式。

策略类型

// 编译时均匀分配,开销最小
#pragma omp parallel for schedule(static)
for (int i = 0; i < N; i++) {
    // 线程0: [0, N/4), 线程1: [N/4, N/2), ...
}
// 运行时动态分配,适合负载不均衡
#pragma omp parallel for schedule(dynamic, 10)
for (int i = 0; i < N; i++) {
    // 每次分配10个迭代给空闲线程
    do_variable_work(i);
}
// 块大小逐渐减小,兼顾效率和负载均衡
#pragma omp parallel for schedule(guided)
for (int i = 0; i < N; i++) {
    do_work(i);
}

SIMD 向量化

指导编译器使用SIMD指令进行向量化。

#pragma omp simd
for (int i = 0; i < N; i++) {
    a[i] = b[i] + c[i];  // 向量化加法
}
#pragma omp parallel for simd
for (int i = 0; i < N; i++) {
    result[i] = sqrt(data[i]);
}
float *a = (float*)aligned_alloc(64, N * sizeof(float));

#pragma omp simd aligned(a:64)
for (int i = 0; i < N; i++) {
    a[i] = a[i] * 2.0f;
}

任务并行

处理不规则并行模式,如递归、动态任务生成。

基础任务

int fib(int n) {
    if (n < 2) return n;
    
    int x, y;
    #pragma omp task shared(x)
    x = fib(n - 1);
    
    #pragma omp task shared(y)
    y = fib(n - 2);
    
    #pragma omp taskwait  // 等待两个子任务
    return x + y;
}

#pragma omp parallel
{
    #pragma omp single
    result = fib(30);
}
struct Node {
    int value;
    Node* next;
};

#pragma omp parallel
{
    #pragma omp single
    {
        Node* p = head;
        while (p != nullptr) {
            #pragma omp task firstprivate(p)
            {
                process(p->value);
            }
            p = p->next;
        }
    }
}

完整实例:矩阵乘法

void matmul_serial(float *A, float *B, float *C, int N) {
    for (int i = 0; i < N; i++) {
        for (int j = 0; j < N; j++) {
            float sum = 0.0f;
            for (int k = 0; k < N; k++) {
                sum += A[i*N + k] * B[k*N + j];
            }
            C[i*N + j] = sum;
        }
    }
}
void matmul_parallel(float *A, float *B, float *C, int N) {
    #pragma omp parallel for collapse(2) \
        default(none) shared(A, B, C, N)
    for (int i = 0; i < N; i++) {
        for (int j = 0; j < N; j++) {
            float sum = 0.0f;
            #pragma omp simd reduction(+:sum)
            for (int k = 0; k < N; k++) {
                sum += A[i*N + k] * B[k*N + j];
            }
            C[i*N + j] = sum;
        }
    }
}

完整实例:Softmax 函数

void softmax_serial(float *input, float *output, int N) {
    // 1. 找最大值
    float max_val = input[0];
    for (int i = 1; i < N; i++) {
        if (input[i] > max_val) 
            max_val = input[i];
    }
    
    // 2. 计算exp和sum
    float sum = 0.0f;
    for (int i = 0; i < N; i++) {
        output[i] = expf(input[i] - max_val);
        sum += output[i];
    }
    
    // 3. 归一化
    for (int i = 0; i < N; i++) {
        output[i] /= sum;
    }
}
void softmax_parallel(float *input, float *output, int N) {
    float max_val = input[0];
    float sum = 0.0f;
    
    // 单个并行区域,减少线程创建开销
    #pragma omp parallel default(none) \
        shared(input, output, N) \
        reduction(max:max_val) reduction(+:sum)
    {
        // 1. 并行找最大值
        #pragma omp for
        for (int i = 1; i < N; i++) {
            if (input[i] > max_val)
                max_val = input[i];
        }
        
        // 2. 并行计算exp和sum
        #pragma omp for
        for (int i = 0; i < N; i++) {
            float exp_val = expf(input[i] - max_val);
            output[i] = exp_val;
            sum += exp_val;
        }
        
        // 3. 并行归一化
        #pragma omp for
        for (int i = 0; i < N; i++) {
            output[i] /= sum;
        }
    }
}

运行时库函数

常用函数

// 线程管理
int omp_get_num_threads(void);      // 当前线程数
int omp_get_thread_num(void);       // 当前线程ID
int omp_get_max_threads(void);      // 最大可用线程数
void omp_set_num_threads(int num);  // 设置线程数

// 嵌套并行
void omp_set_nested(int nested);    // 启用嵌套并行
int omp_get_nested(void);           // 查询嵌套并行状态

// 时间测量
double omp_get_wtime(void);         // 高精度时间戳
double omp_get_wtick(void);         // 时钟精度

// 锁操作
void omp_init_lock(omp_lock_t *lock);
void omp_destroy_lock(omp_lock_t *lock);
void omp_set_lock(omp_lock_t *lock);
void omp_unset_lock(omp_lock_t *lock);

性能测量示例

#include <omp.h>
#include <stdio.h>

int main() {
    double start = omp_get_wtime();
    
    #pragma omp parallel for
    for (int i = 0; i < 1000000; i++) {
        // 计算任务
    }
    
    double end = omp_get_wtime();
    printf("Time: %.6f seconds\n", end - start);
    printf("Threads used: %d\n", omp_get_max_threads());
    
    return 0;
}

环境变量

# 设置线程数
export OMP_NUM_THREADS=8

# 设置调度策略
export OMP_SCHEDULE="dynamic,4"

# 线程绑定
export OMP_PROC_BIND=true

# 嵌套并行
export OMP_NESTED=true

# 设置栈大小
export OMP_STACKSIZE=16M

性能优化建议

优化要点

  1. 减少并行区域开销:合并多个小的并行区域为一个大的
  2. 避免负载不均衡:选择合适的调度策略
  3. 减少同步开销:尽量使用 reduction 而非 critical
  4. 使用 default(none):强制显式声明变量作用域
  5. 对齐数据:配合SIMD使用对齐的内存分配

常见陷阱

注意事项

// 错误:未同步的共享变量更新
int counter = 0;
#pragma omp parallel for
for (int i = 0; i < N; i++) {
    counter++;  // Race condition!
}

// 正确:使用atomic或reduction
int counter = 0;
#pragma omp parallel for reduction(+:counter)
for (int i = 0; i < N; i++) {
    counter++;
}

参考资源