프로세스 간 통신(IPC) 프로그래밍 - 메시지큐

반응형

개요

리눅스(및 대다수의 운영체제)에서는 시스템에서 함께 동작하는 다수의 어플리케이션 프로세스 간에 정보를 교환하도록 하기 위한 프로세스간 통신 (Inter Process Communication, IPC) 기법을 제공합니다.

 

본 글에서는 여러 IPC 기법 중 메시지큐를 이용하여 프로세스 간 정보 교환 기능을 구현하는 예제를 소개합니다.

 

메시지큐 기반 IPC 기법의 대략적인 구조 및 동작 절차는 다음과 같습니다.

  • 임의의 프로세스에서 메시지큐를 생성합니다 - msgget() 시스템 함수 호출
  • 그 외 프로세스에서는 해당 메시지큐를 엽니다 - msgget() 시스템 함수 호출
  • 수신 프로세스에서는 메시지큐로부터 메시지를 수신합니다 - msgrcv() 시스템 함수 호출
  • 송신 프로세스에서는 메시지큐로 메시지를 송신합니다 - msgsnd() 시스템 함수 호출

 

구현

다음 예제는 하나의 메시지큐를 통해 서로 다른 2가지 종류의 메시지를 교환하는 프로그램의 예제입니다.

수신 프로세스에서는 하나의 메시지큐를 생성한 후, 2 종류의 메시지를 각각 수신하는 2개의 쓰레드를 생성하여 메시지를 수신합니다.

송신 프로세스에서는 생성되어 있는 메시지큐에 접근하여, 2 종류의 메시지를 번갈아가며 송신합니다.

 

msg_q_defines.h - 두 프로세스에서 공통으로 사용되는 정보를 정의한 헤더 파일

// 시스템 헤더 파일
#include <stdint.h>


/// 메시지큐 키 값
#define MSG_Q_KEY (0x64)
/// 메시지큐를 통해 전달되는 사용자 데이터의 최대 길이
#define USER_DATA_MAX_LEN 255


/**
 * 메시지큐에 저장되는 각 메시지를 구분하기 위한 데이터 유형 정의
 */ 
enum eMsgQueueMsgType
{
  kMsgQueueMsgType_Unused = 0, ///< 0번은 사용하지 않는다. (메시지 구분에 상관없이 수신하는데 사용된다)
  kMsgQueueMsgType_1 = 1,
  kMsgQueueMsgType_2 = 2,
};


/**
 * 메시지큐를 통해 전달되는 메시지 형식 1
 */ 
struct MsgQueueMsg1
{
  long type; ///< 메시지큐 내에 저장된 각 메시지의 유형을 식별하기 위한 정보. 반드시 이 위치에 정의되어야 한다.
  uint16_t user_data1; ///< 필요에 의해 정의한 사용자 데이터
  uint32_t user_data2; ///< 필요에 의해 정의한 사용자 데이터
  char user_data3[USER_DATA_MAX_LEN+1]; ///< 필요에 의해 정의한 사용자 데이터
};


/**
 * 메시지큐를 통해 전달되는 메시지 형식 2
 */ 
struct MsgQueueMsg2
{
  long type; ///< 메시지큐 내에 저장된 각 메시지의 유형/그룹을 식별하기 위한 정보. 반드시 이 위치에 정의되어야 한다.
  uint8_t user_data1[USER_DATA_MAX_LEN]; ///< 필요에 의해 정의한 사용자 데이터
  uint8_t user_data1_len;
};

 

 

msg_q_rx.c - 수신 프로세스를 구현한 파일

// 시스템 헤더 파일
#include <sys/ipc.h>
#include <sys/msg.h>
#include <pthread.h>
#include <stdio.h>

// 어플리케이션 헤더 파일
#include "msg_q_defines.h"


/// 메시지큐 식별자
int g_msg_q_id;
/// 메시지1 수신 쓰레드
pthread_t g_msg_q_msg1_rx_thread;
/// 메시지2 수신 쓰레드
pthread_t g_msg_q_msg2_rx_thread;


/**
 * @brief 메시지큐를 통해 메시지 1을 수신하여 처리하는(=화면에 출력하는) 쓰레드
 * @param[in] arg 사용되지 않음
 * @return NULL
 */ 
static void * MsgQueueMsg1RxThread(void *arg)
{
  int ret;
  struct MsgQueueMsg1 msg;

  while(1) {
    ret = msgrcv(g_msg_q_id, &msg, sizeof(msg) - sizeof(long), kMsgQueueMsgType_1, 0);
    if (ret <= 0) {
      perror("msgrcv()");
      continue;
    }
    printf("Message 1 is received - data1: %u, data2: %u, data3: %s\n", msg.user_data1, msg.user_data2, msg.user_data3);
  }
  return NULL;
}


/**
 * @brief 메시지큐를 통해 메시지 2를 수신하여 처리하는(=화면에 출력하는) 쓰레드
 * @param[in] arg 사용되지 않음
 * @return NULL
 */ 
static void * MsgQueueMsg2RxThread(void *arg)
{
  int ret;
  struct MsgQueueMsg2 msg;

  while(1) {
    ret = msgrcv(g_msg_q_id, &msg, sizeof(msg) - sizeof(long), kMsgQueueMsgType_2, 0);
    if (ret <= 0) {
      perror("msgrcv()");
      continue;
    }

    printf("Message 2 is received - data1: 0x");
    for (uint8_t i = 0; i < msg.user_data1_len; i++) {
      printf("%02X", msg.user_data1[i]);
    }
    printf("\n");
  }
}


/**
 * @brief 메시지큐 수신 어플리케이션 메인 함수
 */ 
int main(int argc, char *argv[])
{
  // 메시지큐를 연다. (해당 메시지큐가 시스템에 존재하지 않으면 생성한다)
  g_msg_q_id = msgget((key_t)MSG_Q_KEY, IPC_CREAT | 0666);
  if (g_msg_q_id == -1) {
    perror("msgget()");
    return -1;
  }

  // 메시지큐를 통해 메시지 1을 수신할 쓰레드를 생성한다.
  if (pthread_create(&g_msg_q_msg1_rx_thread, NULL, MsgQueueMsg1RxThread, NULL) < 0) {
    perror("pthread_create()");
    return -1;
  }
  
  // 메시지큐를 통해 메시지 2를 수신할 쓰레드를 생성한다.
  if (pthread_create(&g_msg_q_msg2_rx_thread, NULL, MsgQueueMsg2RxThread, NULL) < 0) {
    perror("pthread_create()");
    return -1;
  }

  // 각 쓰레드가 종료될 때까지 대기한다.
  pthread_join(g_msg_q_msg1_rx_thread, NULL);
  pthread_join(g_msg_q_msg2_rx_thread, NULL);

  return 0;
}

 

 

msg_q_tx.c - 송신 프로세스를 구현한 파일

// 시스템 헤더 파일
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>

// 어플리케이션 헤더 파일
#include "msg_q_defines.h"


/// 메시지큐 식별자
int g_msg_q_id;


/**
 * @brief 메시지큐 송신 어플리케이션 메인 함수
 */ 
int main(int argc, char *argv[])
{
  // 메시지큐를 연다. (해당 메시지큐가 시스템에 존재하지 않으면 생성한다)
  g_msg_q_id = msgget((key_t)MSG_Q_KEY, IPC_CREAT | 0666);
  if (g_msg_q_id == -1) {
    perror("msgget()");
    return -1;
  }

  struct MsgQueueMsg1 msg1;
  struct MsgQueueMsg2 msg2;

  while (1) {
    // 메시지큐를 통해 메시지 1을 전송한다.
    memset(&msg1, 0, sizeof(msg1));
    msg1.type = kMsgQueueMsgType_1;
    msg1.user_data1 = 1;
    msg1.user_data2 = 2;
    sprintf(msg1.user_data3, "Hello");
    if (msgsnd(g_msg_q_id, &msg1, sizeof(msg1) - sizeof(long), IPC_NOWAIT) < 0) {
      perror("msgsnd()");
    }

    // 수신 어플리케이션에서 출력되는 로그의 가독성을 위해 메시지 간 지연 삽입
    sleep(1);
    
    // 메시지큐를 통해 메시지 2를 전송한다.
    memset(&msg2, 0, sizeof(msg2));
    msg2.type = kMsgQueueMsgType_2;
    msg2.user_data1_len = 1;
    msg2.user_data1[0] = 3;
    if (msgsnd(g_msg_q_id, &msg2, sizeof(msg2) - sizeof(long), IPC_NOWAIT) < 0) {
      perror("msgsnd()");
    }

    // 수신 어플리케이션에서 출력되는 로그의 가독성을 위해 메시지 간 지연 삽입
    sleep(1);
  }

  return 0;
}

 

 

빌드

본 예제에서는 cmake를 이용한 빌드를 수행하므로, cmake 빌드를 위한 파일을 다음과 같이 작성합니다.

 

CMakeLists.txt - cmake 빌드를 위한 파일

cmake_minimum_required(VERSION 3.13)
project(message-queue-example)
set(CMAKE_C_STANDARD 99)
set(CMAKE_VERBOSE_MAKEFILE true)

link_libraries(pthread)
add_executable(msg_q_rx msg_q_rx.c)
add_executable(msg_q_tx msg_q_tx.c)

 

모든 파일들을 동일한 디렉토리 상에 작성한 후 다음 명령으로 빌드합니다.

# cmake .
# make

 

실행

수신 프로세스를 실행한 후 송신 프로세스를 실행하면, 수신 프로세스에 수신된 메시지가 화면에 출력되는 것을 확인할 수 있습니다. 본 실행 예제에서는 단일 터미널 상에서 두 프로세스를 모두 실행시키기 위해 수신 프로세스를 백그라운드로 실행하였습니다.

# ./msg_q_rx &
[1] 448
# ./msg_q_tx
Message 1 is received - data1: 1, data2: 2, data3: Hello
Message 2 is received - data1: 0x03
Message 1 is received - data1: 1, data2: 2, data3: Hello
Message 2 is received - data1: 0x03
Message 1 is received - data1: 1, data2: 2, data3: Hello
Message 2 is received - data1: 0x03
Message 1 is received - data1: 1, data2: 2, data3: Hello
Message 2 is received - data1: 0x03
Message 1 is received - data1: 1, data2: 2, data3: Hello

댓글

Designed by JB FACTORY