/ PROGRAMMING, RUST, OS

Rust - Inter-Process Communication

IPC는 여러 목적 (Modularize, Batch Processing, Information Sharing) 달성을 위해 사용됩니다. Message Passing, Pipe, Shared Memory, Remote Procedure Call 등 다양한 방법이 존재합니다. Rust에서는 어떻게 IPC를 수행하는지 알아봅시다.

Rust 맛보기

Message Passing

nix crate를 활용하여 message passing model 구현이 가능합니다. POSIX message queue를 지원하는 Linux 환경을 가정합니다.

API functions

pub fn mq_open<P> (
    name: P,
    oflag: OFlag,
    mode: Mode,
    attr: Option<&MqAttr> 
) -> Result<MqdT>
where P: ?Sized + NixPath;

Queue를 생성하는 함수입니다.

pub fn mq_receive (
    mqdes: &MqdT,
    message: &mut [u8],
    msg_prio: &mut u32
) -> Result<usize>

메시지를 받는 함수입니다.

pub fn mq_send (mqdes: &MqdT, message: &[u8], msg_prio: u32) -> Result<()>

메시지를 보내는 함수입니다.

주의할 점은, MqdTDrop trait을 구현하고 있지 않기 때문에, 수동으로 queue를 닫아 줄 필요가 있다는 것입니다.

mq_close() 에 argument로 MqdT 타입 queue를 전달함으로써 queue를 닫을 수 있습니다.

Producer-Consumer 예제를 살펴보겠습니다. 먼저 Producer 코드입니다.

use nix::mqueue::*;
use nix::sys::stat::Mode;
use std::io;

fn main() {

    let mq_attr = MqAttr {
        mq_flags: 0,
        mq_maxmsg: 10,
        mq_msgsize: 1024,
        mq_curmsgs: 0,
    };

    let mq = mq_open("/my_mq", OFlag::O_CREAT | OFlag::O_RDWR, Mode::S_IRUSR | Mode::S_IWUSR, &mq_attr).unwrap();
    // message queue의 이름은 반드시 '/'로 시작해야 합니다.

    let mut message = String::new();
    io::stdin().read_line(&mut message).unwrap().trim();

    mq_send(&mq, message.as_bytes(), 0).unwrap();
    // 3번째 argument인 정수는 message priority

    mq_close(&mq).unwrap();
}

이제 Consumer 코드를 살펴보겠습니다.

use nix::mqueue::*;
use nix::sys::stat::Mode;
use std::io;

fn main() {
    let mq = mq_open("/my_mq", OFlag::O_RDWR, Mode::S_IRUSR | Mode::S_IWUSR, None).unwrap();
    // queue의 이름 producer와 같게

    let mut buffer = [0; 1024];
    let mut prio = 0;

    mq_receive(&mq, &mut buffer, &mut prio).unwrap();
    println!("Received: {}", String::from_utf8_lossy(&buffer));

    mq_close(&mq).unwrap();
}

Linux는 message의 최대 크기와 queue의 최대 길이를 미리 정의하고 있습니다. 이를 고려하여 queue attribute들을 설정할 필요가 있습니다.

다음과 같은 sysfs 경로에서 확인 가능합니다.

cat /proc/sys/fs/mqueue/msgsize_max # 8192
cat /proc/sys/fs/mqueue/msg_max # 10

코드에서 생성한 이름의 message queue는 /dev/mqueue/ 디렉토리에 생성됩니다. 즉, 위 예제 소스 코드에서 생성한 /my_mq queue는 /dev/mqueue/my_mq 경로에 생성됩니다.

Shared Memory

공유 메모리 영역을 설정해서 데이터를 공유할 수도 있는데, 일반적으로 Message Passing보다 빠릅니다. 메모리 영역을 만들 때 말고는 별도의 system call이 필요 없기 때문입니다. 한 번 영역을 만들고 나면, 공유 메모리를 통한 통신은 OS가 아닌 유저 프로세스의 관할이 됩니다. Synchronization을 달성하기 위한 메커니즘의 제공이 주요한 고려 사항입니다.

Rust에서는 공유 메모리 영역을 만들 수 있는 nix crate와, 유저 메모리 공간에 공유 메모리 오브젝트를 맵핑할 수 있게 해주는 memmap2 crate를 이용할 수 있습니다.

주요 API 함수는 다음과 같습니다.

pub fn shm_open<P> (
    name: P,
    oflag: OFlag,
    mode: Mode
) -> Result<RawFd>
where P: ?Sized + NixPath;

pub unsafe fn map<T: MmapAsRawDesc>(file: T) -> Result<Mmap> // immutable region
pub unsafe fn map_mut<T: MmapAsRawDesc>(file: T) -> Result<MmapMut> // mutable region

pub fn shm_unlink<P> (name: &P) -> Result<()> // 공유 메모리 영역 제거
where P: ?Sized + NixPath;

먼저 shm_open()으로 공유 메모리 세그먼트를 만들고, 그 후에 map() 또는 map_mut()으로 메모리를 각 프로세스의 가상 메모리 공간에 맵핑합니다. 이 함수들은 내부적으로 mmap() 시스템 콜을 호출합니다. 이후에는 해당 메모리 영역을 읽거나 쓸 수 있습니다.

이제 예제 소스 코드로 살펴보겠습니다. 먼저 Producer 입니다.

use std::fs::File;
use memmap2::MmapMut;
use nix::sys::mman::{shm_open, shm_unlink, MAP_SHARED, PROT_READ, PROT_WRITE};
use nix::sys::stat::Mode;
use nix::fnctl::OFlag;

fn main() {
    const SIZE: usize = 4096;

    let shm = shm_open("/my_shm", OFlag::O_CREAT | OFlag::O_RDWR, Mode::S_IRUSR | Mode::S_IWUSR).unwrap();
    // 공유 메모리 오브젝트 생성

    let file = File::from(shm);
    file.set_len(SIZE as u64).unwrap(); // 공유 메모리 오브젝트의 크기 설정

    let mut mmap = unsafe {
        MmapMut::map_mut(&file).unwrap()
    }; // 공유 메모리 오브젝트를 프로세스의 가상 메모리 공간에 맵핑

    let message = "Hello from the Producer!";
    mmap[..message.len()].copy_from_slice(message);

    println!("Producer wrote to shared memory: {}", message);
}

File::from() 함수는 파일 디스크립터를 File 객체로 변환합니다.

이제 Consumer 봅시다

use std::fs::File;
use memmap2::Mmap;
use nix::sys::mman::{shm_open, shm_unlink, MAP_SHARED, PROT_READ, PROT_WRITE};
use nix::sys::stat::Mode;
use nix::fnctl::OFlag;

fn main() {
    const SIZE: usize = 4096;

    let shm = shm_open("/my_shm", OFlag::O_RDWR, Mode::S_IRUSR | Mode::S_IWUSR).unwrap();
    // 공유 메모리 오브젝트 생성

    let file = File::from(shm);
    file.set_len(SIZE as u64).unwrap(); // 공유 메모리 오브젝트의 크기 설정

    let mmap = unsafe {
        Mmap::map(&file).unwrap()
    }; // 공유 메모리 오브젝트를 프로세스의 가상 메모리 공간에 맵핑

    let message = String::from_utf8_lossy(&mmap[..SIZE]); // 바로 접근 가능!
    println!("Consumer read from shared memory: {}", message);
}

Signals

특정 이벤트의 발생을 프로세스에게 알리는 수단입니다. software interrupt로 해석할 수도 있습니다. Interrupt와 비슷하게 Signal handler가 받은 signal을 적절히 처리합니다.

Rust에서는 nix crate가 signal을 지원합니다.

use nix::sys::signal::{kill, Signal};

Signal handler는 extern "C" 키워드와 함께 작성해야 합니다. OS가 signal handler를 C의 calling convention에 따라 호출할 것이기 때문에 이 키워드가 필요합니다.

extern "C" fn signal_handler(signal: libc::c_int) {
    println!("Received signal: {}", signal);
}

이제 OS에 이 signal handler를 넘겨줍시다.

let sa = SigAction::new(SigHandler::Handler(signal_handler), SaFlags::SA_RESTART, SigSet::empty());
unsafe { 
    sigaction(Signal::SIGALRM, &sa).unwrap(); // 주어진 signal에 대한 이전 action을 반환
    // SIALRM이라는 signal 에 대한 signal handler를 등록
}

이제 signal을 보내봅시다.

kill(Pid::from_raw(0), Signal::SIGALRM).unwrap();

위와 같이, pid와 signal을 명시하여 특정 프로세스에 시그널을 보낼 수 있고, 시그널을 받은 프로세스에서는 signal handler가 호출됩니다.

Pipes

read()write() 시스템 콜을 통해서 접근 가능한 Unix 파일 시스템의 일종입니다. 이전에 언급했듯이, 대표적으로 input/output redirection을 위해 사용할 수 있습니다.

두 종류가 있는데 Ordinary Pipe(Anonymous Pipe)와 Named Pipe(FIFO)입니다.

Ordinary Pipes

오직 단방향 통신만 지원하며, 통신하는 프로세스간 parent-child relationship을 강제합니다. Producer가 write-end에 데이터를 쓰고, Consumer는 read-end에서 데이터를 읽습니다. Rust에서는 nix crate가 pipe() 함수를 제공합니다.

과제에서 봤듯이, 이 함수의 리턴값은 Result 이고 성공적으로 생성되면 Ok 값으로 각각 read-end와 write-end에 해당하는 두 개의 file descripter(OwnedFd)를 담은 튜플이 반환됩니다.

write-end에 데이터를 쓰면 동시에 그 데이터는 read-end에서 읽을 수가 있게 됩니다.

use nix::unistd::{fork, pipe, ForkResult};
use std::fs::File;
use std::process::exit;
use std::io::{Read, Write};
use nix::sys::wait::wait;

fn main() {
    // create a pipe
    let (read_fd, write_fd) = pipe().unwrap();
    match unsafe {fork()} { 
        Ok (ForkResult::Parent { child }) => {
            // parent process
            let mut write_file = File::from(write_fd);
            write_file.write_all(b"Hello from the Parent!").unwrap();
            println!("Parent wrote to pipe!");

            wait().unwrap();
        }
        Ok (ForkResult::Child) => {
            // child process
            let mut read_file = File::from(read_fd);
            let mut buffer = [0; 512];
            read_file.read(&mut buffer).unwrap();
            println!("Child read from pipe: {}", String::from_utf8_lossy(&buffer));
            exit(0);
        }
        Err(_) => {
            eprintln!("Fork failed!");
        }
    }
}

Named Pipe (FIFO)

기본적으로 단방향 통신이지만, 경우에 따라 양방향 통신일 떄도 있습니다. Parent-Child 관계가 강제되는 Ordinary Pipe와 다르게 Named Pipe는 독립적인 프로세스 간 통신이 가능합니다.

N(producer) : 1(consumer) 구조를 주로 채택합니다.

프로세스가 종료되더라도 Named Pipe는 파일 시스템에 남아있습니다.

Rust에서는 nix crate가 mkfifo() 함수를 제공합니다.

use nix::unistd::mkfifo;
use nix::sys::stat::Mode;
use std::fs::File;
use std::io::Write;
use std::path::Path;
const PIPE_PATH: &str = "/tmp/my_named_pipe";
fn main() {
// Create the pipe (skip if it already exists)
if !Path::new(PIPE_PATH).exists() {
mkfifo(PIPE_PATH, Mode::S_IRUSR | Mode::S_IWUSR).expect("Failed to create named pipe");
}
// Write data to the pipe
let mut pipe_writer = File::create(PIPE_PATH).expect("Failed to open pipe");
let message = "Hello from the Sender!";
pipe_writer.write_all(message.as_bytes()).expect("Failed to write to pipe");
println!("Sender wrote to pipe: {}", message);
}

Remote IPC

네트워크를 통해 프로세스 간 통신을 하는 방법입니다.

Linux 커널의 네트워킹은 3가지 요소로 구현됩니다.

  • Sockets

  • Protocol stack

  • Network-device drivers

Socket Programming

네트워크를통한 IPC를 지원하기 위해 Socket system call이 이용 가능합니다.

Berkeley Socket API는 Unix 계열 OS에서 사용되는 표준 API입니다.

Rust는 std::net 모듈을 통해 OS가 제공하는 Berkeley 소켓 인터페이스을 지원합니다.

TCP Server-Client

서버는 먼저 소켓을 만들고, IP 주소와 포트 번호를 바인딩하고, 클라이언트의 연결을 기다립니다(listening). 이 일련의 과정은 Rust에서 TCPListener::bind()TCPListener::incoming(), TCPListner::accept() 메소드를 통해 수행할 수 있습니다. TCPListner::incoming()은 여러 개의 connection이 들어올 때 내부적으로 TCPListner::accept()를 호출하고, 이후 iterator를 반환합니다.

클라이언트는 소켓을 만든 후 서버에 연결을 시도합니다.

이후 서버와 클라이언트는 데이터를 주고(send) 받을(recv) 수 있습니다. 각각 TcpStream::write()TcpStream::read() 메소드를 이용합니다.

통신이 끝나면 연결을 close 합니다. Drop 트레이트를 구현한 TcpStream 객체는 스코프를 벗어나면 자동으로 close됩니다.

우선 서버 측 코드를 살펴보겠습니다.

use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};

fn main () -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8484").unwrap()?;
    println!("Server listening on port 8484");

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => { // Result<TcpStream, Error>
                println!("Connection established!");
                handle_client(stream)?;
            }
            Err(e) => {
                eprintln!("Error: {}", e);
            }
        }

        let mut buffer = [0; 512];
        match stream.read(&mut buffer) {
            Ok(n) => {
                println!("Received: {}", String::from_utf8_lossy(&buffer[..n]));
            }
            Err(e) => {
                eprintln!("Failed to receive data: {}", e);
            }
        }

        match stream.write_all(b"Hello from the Server!") {
            Ok(_) => {
                println!("Sent response!");
            }
            Err(e) => {
                eprintln!("Failed to send response: {}", e);
            }
        }
    }

    Ok(())
}

이제 클라이언트 측 코드를 살펴보겠습니다.

use std::net::TcpStream;
use std::io::{Read, Write};

fn main() -> std::io::Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:8484")?;
    println!("Connected to the server!");

    let mut input = String::new();
    std::io::stdin().read_line(&mut input)?;
    stream.write_all(input.as_bytes())?;

    let mut buffer = [0; 512];
    stream.read(&mut buffer)?;
    println!("Received: {}", String::from_utf8_lossy(&buffer));

    Ok(())
}

Multi-Threaded TCP Server

서버에 연결하려는 클라이언트가 다수 존재하는 경우 서버는 각 클라이언트에 대해 별도의 스레드를 생성하여 처리할 수 있습니다.

use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};

fn handle_client(mut stream: TcpStream) -> std::io::Result<()> {
    let mut buffer = [0; 512];
    stream.read(&mut buffer)?;
    println!("Received: {}", String::from_utf8_lossy(&buffer));

    stream.write_all(b"Hello from the Server!")?;
    println!("Sent response!");

    Ok(())
}

fn main() -> std::io::Result<()> {
    let listener = TcpListener::bind("127.0.0.1:8484")?;
    println!("Server listening on port 8484");

    for stream in listener.incoming() {
        match stream {
            Ok(stream) => {
                println!("Connection established!");
                std::thread::spawn(|| {
                    handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
            }
            Err(e) => {
                eprintln!("Error: {}", e);
            }
        }
    }

    Ok(())
}

UDP Server-Client

connection 과정이 필요 없는 UDP는 연결을 기다릴 필요가 없고, 필요에 따라 소켓에 데이터를 전송하거나, 데이터를 수신할 수 있습니다.

Rust에서는 UdpSocket을 이용하여 UDP 통신을 수행할 수 있습니다.

우선 서버 측 코드를 살펴보겠습니다.

use std::net::{UdpSocket, SocketAddr};

fn main() -> std::io::Result<()> {
    let socket = UdpSocket::bind("127.0.0.1:8484")?;
    println!("Server listening on port 8484");

    let mut buffer = [0; 512];
    loop {
        let (n, src) = socket.recv_from(&mut buffer)?;
        println!("Received: {}", String::from_utf8_lossy(&buffer[..n]));

        socket.send_to(b"Hello from the Server!", &src)?;
        println!("Sent response!");
    }

이제 클라이언트 측 코드를 살펴보겠습니다.

use std::net::UdpSocket;

fn main() -> std::io::Result<()> {
    let socket = UdpSocket::bind("127.0.0.1:0")?;
    socket.connect("127.0.0.1:8484")?;
    println!("Connected to the server!");

    let mut input = String::new();
    std::io::stdin().read_line(&mut input)?;
    socket.send(input.as_bytes())?;

    let mut buffer = [0; 512];

    let n = socket.recv(&mut buffer)?;

    println!("Received: {}", String::from_utf8_lossy(&buffer[..n]));

    Ok(())
}

connect() 메소드를 사용하지 않을 경우 send_to() 또는 recv_from()으로 항상 목적지 주소를 지정해야 합니다.

Remote Procedure Call (RPC)

소켓 프로그래밍은 데이터의 구조화를 지원하지 않기 때문에 low-level communication으로 취급됩니다. 반면 RPC는 데이터의 구조화를 지원하며, abstracted된 함수 호출을 통해 데이터를 주고 받을 수 있는 high-level communication입니다. Client-server 컴퓨팅뿐 아니라 Android에서도 같은 시스템 내 IPC를 RPC로 구현하고 있습니다.

Client는 server의 함수를 각 노드의 stub을 거쳐 로컬 함수처럼 호출할 수 있습니다.

Stub이란 서버 측의 실제 procedure를 abstract하는 proxy object입니다. 덕분에 socket interface의 implementation detail을 몰라도 쉽게 remote procedure를 호출할 수 있습니다. Client 측 stub은 서버를 특정하고 parameter들을 marshalling(data serialization)하여 서버로 전송합니다. Server 측 stub은 parameter들을 unmarshalling(unserialization)하고 실제 procedure를 호출합니다.

Rust에서는 gRPC 또는 JSON-RPC crate를 이용하여 RPC를 구현할 수 있습니다. gRPC는 tonic crate를 통해 이용 가능하고, JSON-RPC는 jsonrpsee crate를 통해 이용 가능합니다.


참고 문헌

Rust 맛보기