프로세스 간 통신(IPC) 프로그래밍 - 메시지큐

반응형

    개요

    리눅스(및 대다수의 운영체제)에서는 시스템에서 함께 동작하는 다수의 어플리케이션 프로세스 간에 정보를 교환하도록 하기 위한 프로세스간 통신 (Inter Process Communication, IPC) 기법을 제공합니다.

     

    본 글에서는 여러 IPC 기법 중 메시지큐를 이용하여 프로세스 간 정보 교환 기능을 구현하는 예제를 소개합니다.

     

    메시지큐 기반 IPC 기법의 대략적인 구조 및 동작 절차는 다음과 같습니다.

    • 임의의 프로세스에서 메시지큐를 생성합니다 - msgget() 시스템 함수 호출
    • 그 외 프로세스에서는 해당 메시지큐를 엽니다 - msgget() 시스템 함수 호출
    • 수신 프로세스에서는 메시지큐로부터 메시지를 수신합니다 - msgrcv() 시스템 함수 호출
    • 송신 프로세스에서는 메시지큐로 메시지를 송신합니다 - msgsnd() 시스템 함수 호출

     

    구현

    다음 예제는 하나의 메시지큐를 통해 서로 다른 2가지 종류의 메시지를 교환하는 프로그램의 예제입니다.

    수신 프로세스에서는 하나의 메시지큐를 생성한 후, 2 종류의 메시지를 각각 수신하는 2개의 쓰레드를 생성하여 메시지를 수신합니다.

    송신 프로세스에서는 생성되어 있는 메시지큐에 접근하여, 2 종류의 메시지를 번갈아가며 송신합니다.

     

    msg_q_defines.h - 두 프로세스에서 공통으로 사용되는 정보를 정의한 헤더 파일

    // 시스템 헤더 파일
    #include <stdint.h>
    
    
    /// 메시지큐 키 값
    #define MSG_Q_KEY (0x64)
    /// 메시지큐를 통해 전달되는 사용자 데이터의 최대 길이
    #define USER_DATA_MAX_LEN 255
    
    
    /**
     * 메시지큐에 저장되는 각 메시지를 구분하기 위한 데이터 유형 정의
     */ 
    enum eMsgQueueMsgType
    {
      kMsgQueueMsgType_Unused = 0, ///< 0번은 사용하지 않는다. (메시지 구분에 상관없이 수신하는데 사용된다)
      kMsgQueueMsgType_1 = 1,
      kMsgQueueMsgType_2 = 2,
    };
    
    
    /**
     * 메시지큐를 통해 전달되는 메시지 형식 1
     */ 
    struct MsgQueueMsg1
    {
      long type; ///< 메시지큐 내에 저장된 각 메시지의 유형을 식별하기 위한 정보. 반드시 이 위치에 정의되어야 한다.
      uint16_t user_data1; ///< 필요에 의해 정의한 사용자 데이터
      uint32_t user_data2; ///< 필요에 의해 정의한 사용자 데이터
      char user_data3[USER_DATA_MAX_LEN+1]; ///< 필요에 의해 정의한 사용자 데이터
    };
    
    
    /**
     * 메시지큐를 통해 전달되는 메시지 형식 2
     */ 
    struct MsgQueueMsg2
    {
      long type; ///< 메시지큐 내에 저장된 각 메시지의 유형/그룹을 식별하기 위한 정보. 반드시 이 위치에 정의되어야 한다.
      uint8_t user_data1[USER_DATA_MAX_LEN]; ///< 필요에 의해 정의한 사용자 데이터
      uint8_t user_data1_len;
    };

     

     

     

    msg_q_rx.c - 수신 프로세스를 구현한 파일

    // 시스템 헤더 파일
    #include <sys/ipc.h>
    #include <sys/msg.h>
    #include <pthread.h>
    #include <stdio.h>
    
    // 어플리케이션 헤더 파일
    #include "msg_q_defines.h"
    
    
    /// 메시지큐 식별자
    int g_msg_q_id;
    /// 메시지1 수신 쓰레드
    pthread_t g_msg_q_msg1_rx_thread;
    /// 메시지2 수신 쓰레드
    pthread_t g_msg_q_msg2_rx_thread;
    
    
    /**
     * @brief 메시지큐를 통해 메시지 1을 수신하여 처리하는(=화면에 출력하는) 쓰레드
     * @param[in] arg 사용되지 않음
     * @return NULL
     */ 
    static void * MsgQueueMsg1RxThread(void *arg)
    {
      int ret;
      struct MsgQueueMsg1 msg;
    
      while(1) {
        ret = msgrcv(g_msg_q_id, &msg, sizeof(msg) - sizeof(long), kMsgQueueMsgType_1, 0);
        if (ret <= 0) {
          perror("msgrcv()");
          continue;
        }
        printf("Message 1 is received - data1: %u, data2: %u, data3: %s\n", msg.user_data1, msg.user_data2, msg.user_data3);
      }
      return NULL;
    }
    
    
    /**
     * @brief 메시지큐를 통해 메시지 2를 수신하여 처리하는(=화면에 출력하는) 쓰레드
     * @param[in] arg 사용되지 않음
     * @return NULL
     */ 
    static void * MsgQueueMsg2RxThread(void *arg)
    {
      int ret;
      struct MsgQueueMsg2 msg;
    
      while(1) {
        ret = msgrcv(g_msg_q_id, &msg, sizeof(msg) - sizeof(long), kMsgQueueMsgType_2, 0);
        if (ret <= 0) {
          perror("msgrcv()");
          continue;
        }
    
        printf("Message 2 is received - data1: 0x");
        for (uint8_t i = 0; i < msg.user_data1_len; i++) {
          printf("%02X", msg.user_data1[i]);
        }
        printf("\n");
      }
    }
    
    
    /**
     * @brief 메시지큐 수신 어플리케이션 메인 함수
     */ 
    int main(int argc, char *argv[])
    {
      // 메시지큐를 연다. (해당 메시지큐가 시스템에 존재하지 않으면 생성한다)
      g_msg_q_id = msgget((key_t)MSG_Q_KEY, IPC_CREAT | 0666);
      if (g_msg_q_id == -1) {
        perror("msgget()");
        return -1;
      }
    
      // 메시지큐를 통해 메시지 1을 수신할 쓰레드를 생성한다.
      if (pthread_create(&g_msg_q_msg1_rx_thread, NULL, MsgQueueMsg1RxThread, NULL) < 0) {
        perror("pthread_create()");
        return -1;
      }
      
      // 메시지큐를 통해 메시지 2를 수신할 쓰레드를 생성한다.
      if (pthread_create(&g_msg_q_msg2_rx_thread, NULL, MsgQueueMsg2RxThread, NULL) < 0) {
        perror("pthread_create()");
        return -1;
      }
    
      // 각 쓰레드가 종료될 때까지 대기한다.
      pthread_join(g_msg_q_msg1_rx_thread, NULL);
      pthread_join(g_msg_q_msg2_rx_thread, NULL);
    
      return 0;
    }

     

     

     

    msg_q_tx.c - 송신 프로세스를 구현한 파일

    // 시스템 헤더 파일
    #include <sys/ipc.h>
    #include <sys/msg.h>
    #include <stdio.h>
    #include <string.h>
    #include <unistd.h>
    
    // 어플리케이션 헤더 파일
    #include "msg_q_defines.h"
    
    
    /// 메시지큐 식별자
    int g_msg_q_id;
    
    
    /**
     * @brief 메시지큐 송신 어플리케이션 메인 함수
     */ 
    int main(int argc, char *argv[])
    {
      // 메시지큐를 연다. (해당 메시지큐가 시스템에 존재하지 않으면 생성한다)
      g_msg_q_id = msgget((key_t)MSG_Q_KEY, IPC_CREAT | 0666);
      if (g_msg_q_id == -1) {
        perror("msgget()");
        return -1;
      }
    
      struct MsgQueueMsg1 msg1;
      struct MsgQueueMsg2 msg2;
    
      while (1) {
        // 메시지큐를 통해 메시지 1을 전송한다.
        memset(&msg1, 0, sizeof(msg1));
        msg1.type = kMsgQueueMsgType_1;
        msg1.user_data1 = 1;
        msg1.user_data2 = 2;
        sprintf(msg1.user_data3, "Hello");
        if (msgsnd(g_msg_q_id, &msg1, sizeof(msg1) - sizeof(long), IPC_NOWAIT) < 0) {
          perror("msgsnd()");
        }
    
        // 수신 어플리케이션에서 출력되는 로그의 가독성을 위해 메시지 간 지연 삽입
        sleep(1);
        
        // 메시지큐를 통해 메시지 2를 전송한다.
        memset(&msg2, 0, sizeof(msg2));
        msg2.type = kMsgQueueMsgType_2;
        msg2.user_data1_len = 1;
        msg2.user_data1[0] = 3;
        if (msgsnd(g_msg_q_id, &msg2, sizeof(msg2) - sizeof(long), IPC_NOWAIT) < 0) {
          perror("msgsnd()");
        }
    
        // 수신 어플리케이션에서 출력되는 로그의 가독성을 위해 메시지 간 지연 삽입
        sleep(1);
      }
    
      return 0;
    }

     

     

     

    빌드

    본 예제에서는 cmake를 이용한 빌드를 수행하므로, cmake 빌드를 위한 파일을 다음과 같이 작성합니다.

     

    CMakeLists.txt - cmake 빌드를 위한 파일

    cmake_minimum_required(VERSION 3.13)
    project(message-queue-example)
    set(CMAKE_C_STANDARD 99)
    set(CMAKE_VERBOSE_MAKEFILE true)
    
    link_libraries(pthread)
    add_executable(msg_q_rx msg_q_rx.c)
    add_executable(msg_q_tx msg_q_tx.c)

     

    모든 파일들을 동일한 디렉토리 상에 작성한 후 다음 명령으로 빌드합니다.

    # cmake .
    # make

     

    실행

    수신 프로세스를 실행한 후 송신 프로세스를 실행하면, 수신 프로세스에 수신된 메시지가 화면에 출력되는 것을 확인할 수 있습니다. 본 실행 예제에서는 단일 터미널 상에서 두 프로세스를 모두 실행시키기 위해 수신 프로세스를 백그라운드로 실행하였습니다.

    # ./msg_q_rx &
    [1] 448
    # ./msg_q_tx
    Message 1 is received - data1: 1, data2: 2, data3: Hello
    Message 2 is received - data1: 0x03
    Message 1 is received - data1: 1, data2: 2, data3: Hello
    Message 2 is received - data1: 0x03
    Message 1 is received - data1: 1, data2: 2, data3: Hello
    Message 2 is received - data1: 0x03
    Message 1 is received - data1: 1, data2: 2, data3: Hello
    Message 2 is received - data1: 0x03
    Message 1 is received - data1: 1, data2: 2, data3: Hello

    댓글

    Designed by JB FACTORY