Rust - Inter-Process Communication
IPC는 여러 목적 (Modularize, Batch Processing, Information Sharing) 달성을 위해 사용됩니다. Message Passing, Pipe, Shared Memory, Remote Procedure Call 등 다양한 방법이 존재합니다. Rust에서는 어떻게 IPC를 수행하는지 알아봅시다.
Rust 맛보기
- 1. Rust - 입문하기
- 2. Rust - 타입 모음
- 3. Rust - Memory Ownership
- 4. Rust - Control Flow
- 5. Rust - Structured Data Types
- 6. Rust - Project Organization
- 7. Rust - Error Handling
- 8. Rust - Collections
- 9. Rust - Generics
- 10. Rust - Test Automation
- 11. Rust - Functional Programming
- 12. Rust - Memory Management
- 13. Rust - I/O Management
- 14. Rust - Process & Thread Management
- 15. Rust - Inter-Process Communication
Message Passing
nix
crate를 활용하여 message passing model 구현이 가능합니다. POSIX message queue를 지원하는 Linux 환경을 가정합니다.
API functions
pub fn mq_open<P> (
name: P,
oflag: OFlag,
mode: Mode,
attr: Option<&MqAttr>
) -> Result<MqdT>
where P: ?Sized + NixPath;
Queue를 생성하는 함수입니다.
pub fn mq_receive (
mqdes: &MqdT,
message: &mut [u8],
msg_prio: &mut u32
) -> Result<usize>
메시지를 받는 함수입니다.
pub fn mq_send (mqdes: &MqdT, message: &[u8], msg_prio: u32) -> Result<()>
메시지를 보내는 함수입니다.
주의할 점은, MqdT
가 Drop
trait을 구현하고 있지 않기 때문에, 수동으로 queue를 닫아 줄 필요가 있다는 것입니다.
mq_close()
에 argument로 MqdT
타입 queue를 전달함으로써 queue를 닫을 수 있습니다.
Producer-Consumer 예제를 살펴보겠습니다. 먼저 Producer 코드입니다.
use nix::mqueue::*;
use nix::sys::stat::Mode;
use std::io;
fn main() {
let mq_attr = MqAttr {
mq_flags: 0,
mq_maxmsg: 10,
mq_msgsize: 1024,
mq_curmsgs: 0,
};
let mq = mq_open("/my_mq", OFlag::O_CREAT | OFlag::O_RDWR, Mode::S_IRUSR | Mode::S_IWUSR, &mq_attr).unwrap();
// message queue의 이름은 반드시 '/'로 시작해야 합니다.
let mut message = String::new();
io::stdin().read_line(&mut message).unwrap().trim();
mq_send(&mq, message.as_bytes(), 0).unwrap();
// 3번째 argument인 정수는 message priority
mq_close(&mq).unwrap();
}
이제 Consumer 코드를 살펴보겠습니다.
use nix::mqueue::*;
use nix::sys::stat::Mode;
use std::io;
fn main() {
let mq = mq_open("/my_mq", OFlag::O_RDWR, Mode::S_IRUSR | Mode::S_IWUSR, None).unwrap();
// queue의 이름 producer와 같게
let mut buffer = [0; 1024];
let mut prio = 0;
mq_receive(&mq, &mut buffer, &mut prio).unwrap();
println!("Received: {}", String::from_utf8_lossy(&buffer));
mq_close(&mq).unwrap();
}
Linux는 message의 최대 크기와 queue의 최대 길이를 미리 정의하고 있습니다. 이를 고려하여 queue attribute들을 설정할 필요가 있습니다.
다음과 같은 sysfs
경로에서 확인 가능합니다.
cat /proc/sys/fs/mqueue/msgsize_max # 8192
cat /proc/sys/fs/mqueue/msg_max # 10
코드에서 생성한 이름의 message queue는 /dev/mqueue/
디렉토리에 생성됩니다. 즉, 위 예제 소스 코드에서 생성한 /my_mq
queue는 /dev/mqueue/my_mq
경로에 생성됩니다.
Shared Memory
공유 메모리 영역을 설정해서 데이터를 공유할 수도 있는데, 일반적으로 Message Passing보다 빠릅니다. 메모리 영역을 만들 때 말고는 별도의 system call이 필요 없기 때문입니다. 한 번 영역을 만들고 나면, 공유 메모리를 통한 통신은 OS가 아닌 유저 프로세스의 관할이 됩니다. Synchronization을 달성하기 위한 메커니즘의 제공이 주요한 고려 사항입니다.
Rust에서는 공유 메모리 영역을 만들 수 있는 nix
crate와, 유저 메모리 공간에 공유 메모리 오브젝트를 맵핑할 수 있게 해주는 memmap2
crate를 이용할 수 있습니다.
주요 API 함수는 다음과 같습니다.
pub fn shm_open<P> (
name: P,
oflag: OFlag,
mode: Mode
) -> Result<RawFd>
where P: ?Sized + NixPath;
pub unsafe fn map<T: MmapAsRawDesc>(file: T) -> Result<Mmap> // immutable region
pub unsafe fn map_mut<T: MmapAsRawDesc>(file: T) -> Result<MmapMut> // mutable region
pub fn shm_unlink<P> (name: &P) -> Result<()> // 공유 메모리 영역 제거
where P: ?Sized + NixPath;
먼저 shm_open()
으로 공유 메모리 세그먼트를 만들고, 그 후에 map()
또는 map_mut()
으로 메모리를 각 프로세스의 가상 메모리 공간에 맵핑합니다. 이 함수들은 내부적으로 mmap()
시스템 콜을 호출합니다. 이후에는 해당 메모리 영역을 읽거나 쓸 수 있습니다.
이제 예제 소스 코드로 살펴보겠습니다. 먼저 Producer 입니다.
use std::fs::File;
use memmap2::MmapMut;
use nix::sys::mman::{shm_open, shm_unlink, MAP_SHARED, PROT_READ, PROT_WRITE};
use nix::sys::stat::Mode;
use nix::fnctl::OFlag;
fn main() {
const SIZE: usize = 4096;
let shm = shm_open("/my_shm", OFlag::O_CREAT | OFlag::O_RDWR, Mode::S_IRUSR | Mode::S_IWUSR).unwrap();
// 공유 메모리 오브젝트 생성
let file = File::from(shm);
file.set_len(SIZE as u64).unwrap(); // 공유 메모리 오브젝트의 크기 설정
let mut mmap = unsafe {
MmapMut::map_mut(&file).unwrap()
}; // 공유 메모리 오브젝트를 프로세스의 가상 메모리 공간에 맵핑
let message = "Hello from the Producer!";
mmap[..message.len()].copy_from_slice(message);
println!("Producer wrote to shared memory: {}", message);
}
File::from()
함수는 파일 디스크립터를 File
객체로 변환합니다.
이제 Consumer 봅시다
use std::fs::File;
use memmap2::Mmap;
use nix::sys::mman::{shm_open, shm_unlink, MAP_SHARED, PROT_READ, PROT_WRITE};
use nix::sys::stat::Mode;
use nix::fnctl::OFlag;
fn main() {
const SIZE: usize = 4096;
let shm = shm_open("/my_shm", OFlag::O_RDWR, Mode::S_IRUSR | Mode::S_IWUSR).unwrap();
// 공유 메모리 오브젝트 생성
let file = File::from(shm);
file.set_len(SIZE as u64).unwrap(); // 공유 메모리 오브젝트의 크기 설정
let mmap = unsafe {
Mmap::map(&file).unwrap()
}; // 공유 메모리 오브젝트를 프로세스의 가상 메모리 공간에 맵핑
let message = String::from_utf8_lossy(&mmap[..SIZE]); // 바로 접근 가능!
println!("Consumer read from shared memory: {}", message);
}
Signals
특정 이벤트의 발생을 프로세스에게 알리는 수단입니다. software interrupt로 해석할 수도 있습니다. Interrupt와 비슷하게 Signal handler가 받은 signal을 적절히 처리합니다.
Rust에서는 nix
crate가 signal을 지원합니다.
use nix::sys::signal::{kill, Signal};
Signal handler는 extern "C"
키워드와 함께 작성해야 합니다. OS가 signal handler를 C의 calling convention에 따라 호출할 것이기 때문에 이 키워드가 필요합니다.
extern "C" fn signal_handler(signal: libc::c_int) {
println!("Received signal: {}", signal);
}
이제 OS에 이 signal handler를 넘겨줍시다.
let sa = SigAction::new(SigHandler::Handler(signal_handler), SaFlags::SA_RESTART, SigSet::empty());
unsafe {
sigaction(Signal::SIGALRM, &sa).unwrap(); // 주어진 signal에 대한 이전 action을 반환
// SIALRM이라는 signal 에 대한 signal handler를 등록
}
이제 signal을 보내봅시다.
kill(Pid::from_raw(0), Signal::SIGALRM).unwrap();
위와 같이, pid
와 signal을 명시하여 특정 프로세스에 시그널을 보낼 수 있고, 시그널을 받은 프로세스에서는 signal handler가 호출됩니다.
Pipes
read()
와 write()
시스템 콜을 통해서 접근 가능한 Unix 파일 시스템의 일종입니다. 이전에 언급했듯이, 대표적으로 input/output redirection을 위해 사용할 수 있습니다.
두 종류가 있는데 Ordinary Pipe(Anonymous Pipe)와 Named Pipe(FIFO)입니다.
Ordinary Pipes
오직 단방향 통신만 지원하며, 통신하는 프로세스간 parent-child relationship을 강제합니다. Producer가 write-end에 데이터를 쓰고, Consumer는 read-end에서 데이터를 읽습니다. Rust에서는 nix
crate가 pipe()
함수를 제공합니다.
과제에서 봤듯이, 이 함수의 리턴값은 Result
이고 성공적으로 생성되면 Ok
값으로 각각 read-end와 write-end에 해당하는 두 개의 file descripter(OwnedFd
)를 담은 튜플이 반환됩니다.
write-end에 데이터를 쓰면 동시에 그 데이터는 read-end에서 읽을 수가 있게 됩니다.
use nix::unistd::{fork, pipe, ForkResult};
use std::fs::File;
use std::process::exit;
use std::io::{Read, Write};
use nix::sys::wait::wait;
fn main() {
// create a pipe
let (read_fd, write_fd) = pipe().unwrap();
match unsafe {fork()} {
Ok (ForkResult::Parent { child }) => {
// parent process
let mut write_file = File::from(write_fd);
write_file.write_all(b"Hello from the Parent!").unwrap();
println!("Parent wrote to pipe!");
wait().unwrap();
}
Ok (ForkResult::Child) => {
// child process
let mut read_file = File::from(read_fd);
let mut buffer = [0; 512];
read_file.read(&mut buffer).unwrap();
println!("Child read from pipe: {}", String::from_utf8_lossy(&buffer));
exit(0);
}
Err(_) => {
eprintln!("Fork failed!");
}
}
}
Named Pipe (FIFO)
기본적으로 단방향 통신이지만, 경우에 따라 양방향 통신일 떄도 있습니다. Parent-Child 관계가 강제되는 Ordinary Pipe와 다르게 Named Pipe는 독립적인 프로세스 간 통신이 가능합니다.
N(producer) : 1(consumer) 구조를 주로 채택합니다.
프로세스가 종료되더라도 Named Pipe는 파일 시스템에 남아있습니다.
Rust에서는 nix
crate가 mkfifo()
함수를 제공합니다.
use nix::unistd::mkfifo;
use nix::sys::stat::Mode;
use std::fs::File;
use std::io::Write;
use std::path::Path;
const PIPE_PATH: &str = "/tmp/my_named_pipe";
fn main() {
// Create the pipe (skip if it already exists)
if !Path::new(PIPE_PATH).exists() {
mkfifo(PIPE_PATH, Mode::S_IRUSR | Mode::S_IWUSR).expect("Failed to create named pipe");
}
// Write data to the pipe
let mut pipe_writer = File::create(PIPE_PATH).expect("Failed to open pipe");
let message = "Hello from the Sender!";
pipe_writer.write_all(message.as_bytes()).expect("Failed to write to pipe");
println!("Sender wrote to pipe: {}", message);
}
Remote IPC
네트워크를 통해 프로세스 간 통신을 하는 방법입니다.
Linux 커널의 네트워킹은 3가지 요소로 구현됩니다.
-
Sockets
-
Protocol stack
-
Network-device drivers
Socket Programming
네트워크를통한 IPC를 지원하기 위해 Socket system call이 이용 가능합니다.
Berkeley Socket API는 Unix 계열 OS에서 사용되는 표준 API입니다.
Rust는 std::net
모듈을 통해 OS가 제공하는 Berkeley 소켓 인터페이스을 지원합니다.
TCP Server-Client
서버는 먼저 소켓을 만들고, IP 주소와 포트 번호를 바인딩하고, 클라이언트의 연결을 기다립니다(listening). 이 일련의 과정은 Rust에서 TCPListener::bind()
와 TCPListener::incoming()
, TCPListner::accept()
메소드를 통해 수행할 수 있습니다. TCPListner::incoming()
은 여러 개의 connection이 들어올 때 내부적으로 TCPListner::accept()
를 호출하고, 이후 iterator를 반환합니다.
클라이언트는 소켓을 만든 후 서버에 연결을 시도합니다.
이후 서버와 클라이언트는 데이터를 주고(send) 받을(recv) 수 있습니다. 각각 TcpStream::write()
와 TcpStream::read()
메소드를 이용합니다.
통신이 끝나면 연결을 close 합니다. Drop
트레이트를 구현한 TcpStream
객체는 스코프를 벗어나면 자동으로 close됩니다.
우선 서버 측 코드를 살펴보겠습니다.
use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
fn main () -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8484").unwrap()?;
println!("Server listening on port 8484");
for stream in listener.incoming() {
match stream {
Ok(stream) => { // Result<TcpStream, Error>
println!("Connection established!");
handle_client(stream)?;
}
Err(e) => {
eprintln!("Error: {}", e);
}
}
let mut buffer = [0; 512];
match stream.read(&mut buffer) {
Ok(n) => {
println!("Received: {}", String::from_utf8_lossy(&buffer[..n]));
}
Err(e) => {
eprintln!("Failed to receive data: {}", e);
}
}
match stream.write_all(b"Hello from the Server!") {
Ok(_) => {
println!("Sent response!");
}
Err(e) => {
eprintln!("Failed to send response: {}", e);
}
}
}
Ok(())
}
이제 클라이언트 측 코드를 살펴보겠습니다.
use std::net::TcpStream;
use std::io::{Read, Write};
fn main() -> std::io::Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:8484")?;
println!("Connected to the server!");
let mut input = String::new();
std::io::stdin().read_line(&mut input)?;
stream.write_all(input.as_bytes())?;
let mut buffer = [0; 512];
stream.read(&mut buffer)?;
println!("Received: {}", String::from_utf8_lossy(&buffer));
Ok(())
}
Multi-Threaded TCP Server
서버에 연결하려는 클라이언트가 다수 존재하는 경우 서버는 각 클라이언트에 대해 별도의 스레드를 생성하여 처리할 수 있습니다.
use std::net::{TcpListener, TcpStream};
use std::io::{Read, Write};
fn handle_client(mut stream: TcpStream) -> std::io::Result<()> {
let mut buffer = [0; 512];
stream.read(&mut buffer)?;
println!("Received: {}", String::from_utf8_lossy(&buffer));
stream.write_all(b"Hello from the Server!")?;
println!("Sent response!");
Ok(())
}
fn main() -> std::io::Result<()> {
let listener = TcpListener::bind("127.0.0.1:8484")?;
println!("Server listening on port 8484");
for stream in listener.incoming() {
match stream {
Ok(stream) => {
println!("Connection established!");
std::thread::spawn(|| {
handle_client(stream).unwrap_or_else(|error| eprintln!("{:?}", error));
});
}
Err(e) => {
eprintln!("Error: {}", e);
}
}
}
Ok(())
}
UDP Server-Client
connection 과정이 필요 없는 UDP는 연결을 기다릴 필요가 없고, 필요에 따라 소켓에 데이터를 전송하거나, 데이터를 수신할 수 있습니다.
Rust에서는 UdpSocket
을 이용하여 UDP 통신을 수행할 수 있습니다.
우선 서버 측 코드를 살펴보겠습니다.
use std::net::{UdpSocket, SocketAddr};
fn main() -> std::io::Result<()> {
let socket = UdpSocket::bind("127.0.0.1:8484")?;
println!("Server listening on port 8484");
let mut buffer = [0; 512];
loop {
let (n, src) = socket.recv_from(&mut buffer)?;
println!("Received: {}", String::from_utf8_lossy(&buffer[..n]));
socket.send_to(b"Hello from the Server!", &src)?;
println!("Sent response!");
}
이제 클라이언트 측 코드를 살펴보겠습니다.
use std::net::UdpSocket;
fn main() -> std::io::Result<()> {
let socket = UdpSocket::bind("127.0.0.1:0")?;
socket.connect("127.0.0.1:8484")?;
println!("Connected to the server!");
let mut input = String::new();
std::io::stdin().read_line(&mut input)?;
socket.send(input.as_bytes())?;
let mut buffer = [0; 512];
let n = socket.recv(&mut buffer)?;
println!("Received: {}", String::from_utf8_lossy(&buffer[..n]));
Ok(())
}
connect()
메소드를 사용하지 않을 경우 send_to()
또는 recv_from()
으로 항상 목적지 주소를 지정해야 합니다.
Remote Procedure Call (RPC)
소켓 프로그래밍은 데이터의 구조화를 지원하지 않기 때문에 low-level communication으로 취급됩니다. 반면 RPC는 데이터의 구조화를 지원하며, abstracted된 함수 호출을 통해 데이터를 주고 받을 수 있는 high-level communication입니다. Client-server 컴퓨팅뿐 아니라 Android에서도 같은 시스템 내 IPC를 RPC로 구현하고 있습니다.
Client는 server의 함수를 각 노드의 stub을 거쳐 로컬 함수처럼 호출할 수 있습니다.
Stub이란 서버 측의 실제 procedure를 abstract하는 proxy object입니다. 덕분에 socket interface의 implementation detail을 몰라도 쉽게 remote procedure를 호출할 수 있습니다. Client 측 stub은 서버를 특정하고 parameter들을 marshalling(data serialization)하여 서버로 전송합니다. Server 측 stub은 parameter들을 unmarshalling(unserialization)하고 실제 procedure를 호출합니다.
Rust에서는 gRPC 또는 JSON-RPC crate를 이용하여 RPC를 구현할 수 있습니다.
gRPC는 tonic
crate를 통해 이용 가능하고, JSON-RPC는 jsonrpsee
crate를 통해 이용 가능합니다.
참고 문헌
-
고려대학교 컴퓨터학과 오상은 교수님의 시스템 프로그래밍(COSE322) 과목 강의자료
Rust 맛보기
- 1. Rust - 입문하기
- 2. Rust - 타입 모음
- 3. Rust - Memory Ownership
- 4. Rust - Control Flow
- 5. Rust - Structured Data Types
- 6. Rust - Project Organization
- 7. Rust - Error Handling
- 8. Rust - Collections
- 9. Rust - Generics
- 10. Rust - Test Automation
- 11. Rust - Functional Programming
- 12. Rust - Memory Management
- 13. Rust - I/O Management
- 14. Rust - Process & Thread Management
- 15. Rust - Inter-Process Communication