MPI를 활용한 병렬 행렬 곱셈 구현

MPI(Message Passing Interface)를 이용하여 대규모 행렬 곱셈을 병렬 처리하는 방법을 설명한다. 두 가지 접근 방식을 통해 성능 차이를 비교하고자 한다.

직접 메시지 전달 방식

첫 번째 방법은 MPI_SendMPI_Recv를 사용하여 마스터 프로세스가 각 워커 프로세스에 데이터를 직접 분배하고 결과를 수집하는 방식이다.

#include <mpi.h>
#include <iostream>
using namespace std;

int main(int argc, char** argv) {
    MPI_Init(&argc, &argv);

    int rank, total_processes;
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    MPI_Comm_size(MPI_COMM_WORLD, &total_processes);

    const int matrix_dim = 1000;
    const int rows_per_process = matrix_dim / total_processes;

    int* matrix_b = new int[matrix_dim * matrix_dim];
    int* partial_result = new int[rows_per_process * matrix_dim];

    double start_time = MPI_Wtime();

    if (rank == 0) {
        int* matrix_a = new int[matrix_dim * matrix_dim];
        int* final_matrix = new int[matrix_dim * matrix_dim];

        // 초기 데이터 생성
        for (int i = 0; i < matrix_dim; ++i) {
            for (int j = 0; j < matrix_dim; ++j) {
                matrix_a[i * matrix_dim + j] = i * j;
                matrix_b[i * matrix_dim + j] = i + j;
            }
        }

        // 행렬 B 전송
        for (int p = 1; p < total_processes; ++p) {
            MPI_Send(matrix_b, matrix_dim * matrix_dim, MPI_INT, p, 0, MPI_COMM_WORLD);
        }

        // 행렬 A 블록 전송
        for (int p = 1; p < total_processes; ++p) {
            MPI_Send(matrix_a + (p - 1) * rows_per_process * matrix_dim,
                     rows_per_process * matrix_dim, MPI_INT, p, 1, MPI_COMM_WORLD);
        }

        // 마스터 프로세스 계산 수행
        for (int i = (total_processes - 1) * rows_per_process; i < matrix_dim; ++i) {
            for (int j = 0; j < matrix_dim; ++j) {
                int sum = 0;
                for (int k = 0; k < matrix_dim; ++k) {
                    sum += matrix_a[i * matrix_dim + k] * matrix_b[k * matrix_dim + j];
                }
                final_matrix[i * matrix_dim + j] = sum;
            }
        }

        // 결과 수신 및 통합
        for (int p = 1; p < total_processes; ++p) {
            MPI_Recv(partial_result, rows_per_process * matrix_dim, MPI_INT, p, 3, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
            for (int i = 0; i < rows_per_process; ++i) {
                for (int j = 0; j < matrix_dim; ++j) {
                    final_matrix[((p - 1) * rows_per_process + i) * matrix_dim + j] = partial_result[i * matrix_dim + j];
                }
            }
        }

        // 결과 파일 출력
        FILE* output = fopen("result_matrix.txt", "w");
        for (int i = 0; i < matrix_dim; ++i) {
            for (int j = 0; j < matrix_dim; ++j) {
                fprintf(output, "%d\t", final_matrix[i * matrix_dim + j]);
            }
            fputc('\n', output);
        }
        fclose(output);

        delete[] matrix_a;
        delete[] final_matrix;
    } else {
        int* local_block = new int[rows_per_process * matrix_dim];

        MPI_Recv(matrix_b, matrix_dim * matrix_dim, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
        MPI_Recv(local_block, rows_per_process * matrix_dim, MPI_INT, 0, 1, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

        // 지역 계산 수행
        for (int i = 0; i < rows_per_process; ++i) {
            for (int j = 0; j < matrix_dim; ++j) {
                int sum = 0;
                for (int k = 0; k < matrix_dim; ++k) {
                    sum += local_block[i * matrix_dim + k] * matrix_b[k * matrix_dim + j];
                }
                partial_result[i * matrix_dim + j] = sum;
            }
        }

        MPI_Send(partial_result, rows_per_process * matrix_dim, MPI_INT, 0, 3, MPI_COMM_WORLD);
        delete[] local_block;
    }

    double end_time = MPI_Wtime();
    if (rank == 0) {
        cout << "Execution Time: " << end_time - start_time << " seconds" << endl;
    }

    delete[] matrix_b;
    delete[] partial_result;

    MPI_Finalize();
    return 0;
}

콜렉티브 커뮤니케이션 활용 방식

두 번째 방법은 MPI_Scatter, MPI_Bcast, MPI_Gather와 같은 콜렉티브 통신 함수를 사용하여 코드를 간결하게 만든다.

#include <mpi.h>
#include <iostream>
using namespace std;

int main() {
    MPI_Init(nullptr, nullptr);

    int process_id, num_processes;
    MPI_Comm_rank(MPI_COMM_WORLD, &process_id);
    MPI_Comm_size(MPI_COMM_WORLD, &num_processes);

    const int dimension = 1000;
    const int rows_chunk = dimension / num_processes;

    int* local_data = new int[rows_chunk * dimension];
    int* full_matrix_b = new int[dimension * dimension];
    int* computed_chunk = new int[rows_chunk * dimension];

    if (process_id == 0) {
        double begin_time = MPI_Wtime();

        int* matrix_a = new int[dimension * dimension];
        int* result_matrix = new int[dimension * dimension];

        // 데이터 초기화
        for (int i = 0; i < dimension; ++i) {
            for (int j = 0; j < dimension; ++j) {
                matrix_a[i * dimension + j] = i * j;
                full_matrix_b[i * dimension + j] = i + j;
            }
        }

        MPI_Scatter(matrix_a, rows_chunk * dimension, MPI_INT, local_data, rows_chunk * dimension, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(full_matrix_b, dimension * dimension, MPI_INT, 0, MPI_COMM_WORLD);

        // 첫 번째 프로세스 계산
        for (int i = 0; i < rows_chunk; ++i) {
            for (int j = 0; j < dimension; ++j) {
                int accumulator = 0;
                for (int k = 0; k < dimension; ++k) {
                    accumulator += matrix_a[i * dimension + k] * full_matrix_b[k * dimension + j];
                }
                computed_chunk[i * dimension + j] = accumulator;
            }
        }

        MPI_Gather(computed_chunk, rows_chunk * dimension, MPI_INT, result_matrix, rows_chunk * dimension, MPI_INT, 0, MPI_COMM_WORLD);

        // 남은 블록 처리
        for (int i = num_processes * rows_chunk; i < dimension; ++i) {
            for (int j = 0; j < dimension; ++j) {
                int accumulator = 0;
                for (int k = 0; k < dimension; ++k) {
                    accumulator += matrix_a[i * dimension + k] * full_matrix_b[k * dimension + j];
                }
                result_matrix[i * dimension + j] = accumulator;
            }
        }

        FILE* file_output = fopen("gathered_result.txt", "w");
        for (int i = 0; i < dimension; ++i) {
            for (int j = 0; j < dimension; ++j) {
                fprintf(file_output, "%d\t", result_matrix[i * dimension + j]);
            }
            fputc('\n', file_output);
        }
        fclose(file_output);

        double finish_time = MPI_Wtime();
        cout << "Total Execution Time: " << finish_time - begin_time << " seconds" << endl;

        delete[] matrix_a;
        delete[] result_matrix;
    } else {
        int* recv_buffer = new int[rows_chunk * dimension];

        MPI_Scatter(nullptr, rows_chunk * dimension, MPI_INT, recv_buffer, rows_chunk * dimension, MPI_INT, 0, MPI_COMM_WORLD);
        MPI_Bcast(full_matrix_b, dimension * dimension, MPI_INT, 0, MPI_COMM_WORLD);

        // 워커 프로세스 계산
        for (int i = 0; i < rows_chunk; ++i) {
            for (int j = 0; j < dimension; ++j) {
                int accumulator = 0;
                for (int k = 0; k < dimension; ++k) {
                    accumulator += recv_buffer[i * dimension + k] * full_matrix_b[k * dimension + j];
                }
                computed_chunk[i * dimension + j] = accumulator;
            }
        }

        MPI_Gather(computed_chunk, rows_chunk * dimension, MPI_INT, nullptr, rows_chunk * dimension, MPI_INT, 0, MPI_COMM_WORLD);
        delete[] recv_buffer;
    }

    delete[] local_data;
    delete[] full_matrix_b;
    delete[] computed_chunk;

    MPI_Finalize();
    return 0;
}

두 구현 모두 동일한 작업을 수행하지만, 콜렉티브 통신을 사용한 버전이 코드 구조상 더 명확하며 유지보수성이 우수하다. 그러나 실제 성능은 프로세스 수와 시스템 환경에 따라 달라질 수 있다.

태그: mpi parallel computing matrix multiplication C++

6월 9일 17:16에 게시됨