프로세스 간 통신(IPC) 프로그래밍 - 메시지큐
- 프로그래밍/리눅스 프로그래밍
- 2021. 1. 19.
개요
리눅스(및 대다수의 운영체제)에서는 시스템에서 함께 동작하는 다수의 어플리케이션 프로세스 간에 정보를 교환하도록 하기 위한 프로세스간 통신 (Inter Process Communication, IPC) 기법을 제공합니다.
본 글에서는 여러 IPC 기법 중 메시지큐를 이용하여 프로세스 간 정보 교환 기능을 구현하는 예제를 소개합니다.
메시지큐 기반 IPC 기법의 대략적인 구조 및 동작 절차는 다음과 같습니다.
- 임의의 프로세스에서 메시지큐를 생성합니다 - msgget() 시스템 함수 호출
- 그 외 프로세스에서는 해당 메시지큐를 엽니다 - msgget() 시스템 함수 호출
- 수신 프로세스에서는 메시지큐로부터 메시지를 수신합니다 - msgrcv() 시스템 함수 호출
- 송신 프로세스에서는 메시지큐로 메시지를 송신합니다 - msgsnd() 시스템 함수 호출
구현
다음 예제는 하나의 메시지큐를 통해 서로 다른 2가지 종류의 메시지를 교환하는 프로그램의 예제입니다.
수신 프로세스에서는 하나의 메시지큐를 생성한 후, 2 종류의 메시지를 각각 수신하는 2개의 쓰레드를 생성하여 메시지를 수신합니다.
송신 프로세스에서는 생성되어 있는 메시지큐에 접근하여, 2 종류의 메시지를 번갈아가며 송신합니다.
msg_q_defines.h - 두 프로세스에서 공통으로 사용되는 정보를 정의한 헤더 파일
// 시스템 헤더 파일
#include <stdint.h>
/// 메시지큐 키 값
#define MSG_Q_KEY (0x64)
/// 메시지큐를 통해 전달되는 사용자 데이터의 최대 길이
#define USER_DATA_MAX_LEN 255
/**
* 메시지큐에 저장되는 각 메시지를 구분하기 위한 데이터 유형 정의
*/
enum eMsgQueueMsgType
{
kMsgQueueMsgType_Unused = 0, ///< 0번은 사용하지 않는다. (메시지 구분에 상관없이 수신하는데 사용된다)
kMsgQueueMsgType_1 = 1,
kMsgQueueMsgType_2 = 2,
};
/**
* 메시지큐를 통해 전달되는 메시지 형식 1
*/
struct MsgQueueMsg1
{
long type; ///< 메시지큐 내에 저장된 각 메시지의 유형을 식별하기 위한 정보. 반드시 이 위치에 정의되어야 한다.
uint16_t user_data1; ///< 필요에 의해 정의한 사용자 데이터
uint32_t user_data2; ///< 필요에 의해 정의한 사용자 데이터
char user_data3[USER_DATA_MAX_LEN+1]; ///< 필요에 의해 정의한 사용자 데이터
};
/**
* 메시지큐를 통해 전달되는 메시지 형식 2
*/
struct MsgQueueMsg2
{
long type; ///< 메시지큐 내에 저장된 각 메시지의 유형/그룹을 식별하기 위한 정보. 반드시 이 위치에 정의되어야 한다.
uint8_t user_data1[USER_DATA_MAX_LEN]; ///< 필요에 의해 정의한 사용자 데이터
uint8_t user_data1_len;
};
msg_q_rx.c - 수신 프로세스를 구현한 파일
// 시스템 헤더 파일
#include <sys/ipc.h>
#include <sys/msg.h>
#include <pthread.h>
#include <stdio.h>
// 어플리케이션 헤더 파일
#include "msg_q_defines.h"
/// 메시지큐 식별자
int g_msg_q_id;
/// 메시지1 수신 쓰레드
pthread_t g_msg_q_msg1_rx_thread;
/// 메시지2 수신 쓰레드
pthread_t g_msg_q_msg2_rx_thread;
/**
* @brief 메시지큐를 통해 메시지 1을 수신하여 처리하는(=화면에 출력하는) 쓰레드
* @param[in] arg 사용되지 않음
* @return NULL
*/
static void * MsgQueueMsg1RxThread(void *arg)
{
int ret;
struct MsgQueueMsg1 msg;
while(1) {
ret = msgrcv(g_msg_q_id, &msg, sizeof(msg) - sizeof(long), kMsgQueueMsgType_1, 0);
if (ret <= 0) {
perror("msgrcv()");
continue;
}
printf("Message 1 is received - data1: %u, data2: %u, data3: %s\n", msg.user_data1, msg.user_data2, msg.user_data3);
}
return NULL;
}
/**
* @brief 메시지큐를 통해 메시지 2를 수신하여 처리하는(=화면에 출력하는) 쓰레드
* @param[in] arg 사용되지 않음
* @return NULL
*/
static void * MsgQueueMsg2RxThread(void *arg)
{
int ret;
struct MsgQueueMsg2 msg;
while(1) {
ret = msgrcv(g_msg_q_id, &msg, sizeof(msg) - sizeof(long), kMsgQueueMsgType_2, 0);
if (ret <= 0) {
perror("msgrcv()");
continue;
}
printf("Message 2 is received - data1: 0x");
for (uint8_t i = 0; i < msg.user_data1_len; i++) {
printf("%02X", msg.user_data1[i]);
}
printf("\n");
}
}
/**
* @brief 메시지큐 수신 어플리케이션 메인 함수
*/
int main(int argc, char *argv[])
{
// 메시지큐를 연다. (해당 메시지큐가 시스템에 존재하지 않으면 생성한다)
g_msg_q_id = msgget((key_t)MSG_Q_KEY, IPC_CREAT | 0666);
if (g_msg_q_id == -1) {
perror("msgget()");
return -1;
}
// 메시지큐를 통해 메시지 1을 수신할 쓰레드를 생성한다.
if (pthread_create(&g_msg_q_msg1_rx_thread, NULL, MsgQueueMsg1RxThread, NULL) < 0) {
perror("pthread_create()");
return -1;
}
// 메시지큐를 통해 메시지 2를 수신할 쓰레드를 생성한다.
if (pthread_create(&g_msg_q_msg2_rx_thread, NULL, MsgQueueMsg2RxThread, NULL) < 0) {
perror("pthread_create()");
return -1;
}
// 각 쓰레드가 종료될 때까지 대기한다.
pthread_join(g_msg_q_msg1_rx_thread, NULL);
pthread_join(g_msg_q_msg2_rx_thread, NULL);
return 0;
}
msg_q_tx.c - 송신 프로세스를 구현한 파일
// 시스템 헤더 파일
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
// 어플리케이션 헤더 파일
#include "msg_q_defines.h"
/// 메시지큐 식별자
int g_msg_q_id;
/**
* @brief 메시지큐 송신 어플리케이션 메인 함수
*/
int main(int argc, char *argv[])
{
// 메시지큐를 연다. (해당 메시지큐가 시스템에 존재하지 않으면 생성한다)
g_msg_q_id = msgget((key_t)MSG_Q_KEY, IPC_CREAT | 0666);
if (g_msg_q_id == -1) {
perror("msgget()");
return -1;
}
struct MsgQueueMsg1 msg1;
struct MsgQueueMsg2 msg2;
while (1) {
// 메시지큐를 통해 메시지 1을 전송한다.
memset(&msg1, 0, sizeof(msg1));
msg1.type = kMsgQueueMsgType_1;
msg1.user_data1 = 1;
msg1.user_data2 = 2;
sprintf(msg1.user_data3, "Hello");
if (msgsnd(g_msg_q_id, &msg1, sizeof(msg1) - sizeof(long), IPC_NOWAIT) < 0) {
perror("msgsnd()");
}
// 수신 어플리케이션에서 출력되는 로그의 가독성을 위해 메시지 간 지연 삽입
sleep(1);
// 메시지큐를 통해 메시지 2를 전송한다.
memset(&msg2, 0, sizeof(msg2));
msg2.type = kMsgQueueMsgType_2;
msg2.user_data1_len = 1;
msg2.user_data1[0] = 3;
if (msgsnd(g_msg_q_id, &msg2, sizeof(msg2) - sizeof(long), IPC_NOWAIT) < 0) {
perror("msgsnd()");
}
// 수신 어플리케이션에서 출력되는 로그의 가독성을 위해 메시지 간 지연 삽입
sleep(1);
}
return 0;
}
빌드
본 예제에서는 cmake를 이용한 빌드를 수행하므로, cmake 빌드를 위한 파일을 다음과 같이 작성합니다.
CMakeLists.txt - cmake 빌드를 위한 파일
cmake_minimum_required(VERSION 3.13)
project(message-queue-example)
set(CMAKE_C_STANDARD 99)
set(CMAKE_VERBOSE_MAKEFILE true)
link_libraries(pthread)
add_executable(msg_q_rx msg_q_rx.c)
add_executable(msg_q_tx msg_q_tx.c)
모든 파일들을 동일한 디렉토리 상에 작성한 후 다음 명령으로 빌드합니다.
# cmake .
# make
실행
수신 프로세스를 실행한 후 송신 프로세스를 실행하면, 수신 프로세스에 수신된 메시지가 화면에 출력되는 것을 확인할 수 있습니다. 본 실행 예제에서는 단일 터미널 상에서 두 프로세스를 모두 실행시키기 위해 수신 프로세스를 백그라운드로 실행하였습니다.
# ./msg_q_rx &
[1] 448
# ./msg_q_tx
Message 1 is received - data1: 1, data2: 2, data3: Hello
Message 2 is received - data1: 0x03
Message 1 is received - data1: 1, data2: 2, data3: Hello
Message 2 is received - data1: 0x03
Message 1 is received - data1: 1, data2: 2, data3: Hello
Message 2 is received - data1: 0x03
Message 1 is received - data1: 1, data2: 2, data3: Hello
Message 2 is received - data1: 0x03
Message 1 is received - data1: 1, data2: 2, data3: Hello
'프로그래밍 > 리눅스 프로그래밍' 카테고리의 다른 글
nanosleep() - 프로세스/쓰레드의 실행을 멈추는(재우는) 함수 (0) | 2022.04.20 |
---|---|
Kernel PPS 신호 사용하기 (C source code) (0) | 2022.03.26 |
CURL 라이브러리 사용법 - 임베디드 플랫폼용으로 빌드하기(arm-cross-sysroot 활용) (0) | 2021.03.09 |
리눅스 TUN/TAP 디바이스 프로그래밍 (1) | 2021.01.28 |
GPSd를 이용한 GNSS 프로그래밍 (0) | 2020.12.24 |
리눅스 프로그래밍 - 프로세스 종료 신호(SIGINT, SIGTERM) 후킹하기 (0) | 2020.08.21 |
리눅스 타이머 프로그래밍 (0) | 2020.07.31 |
리눅스 네트워크 프로그래밍 - CAN(Controller Area Network) 통신 (2) | 2020.04.27 |