메시지 전달
스레드 간에 데이터를 공유하는 방법 중 하나로 널리 쓰이는 것 중 하나가 바로 MPSC입니다. 다중 생성자-단일 소비자(Multiple Producer Single Consumer)란 뜻으로, 여러 개의 스레드에서 하나의 스레드로 데이터를 보내는 방식입니다.
파이썬에서는 공식적으로 MPSC를 만드는 방법이 없기 때문에, 스레드 안정성이 보장되는 큐 자료형인 Queue
를 사용해 이를 구현해 보겠습니다.
import threading
import time
from queue import Queue
channel = Queue(maxsize=3)
def producer():
for msg in ["hello", "from", "the", "other", "side"]:
print(f"Producing {msg}...")
channel.put(msg)
def consumer():
while not channel.empty():
item = channel.get()
print(f"Consuming {item}...")
channel.task_done()
time.sleep(0.01)
producer_thread = threading.Thread(target=producer)
producer_thread.start()
consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
mpsc(multiple producer, single consumer)는 여러 개의 송신자와 하나의 수신자를 가진 채널을 만듭니다. 이 채널은 송신자가 메시지를 전송하면 수신자가 메시지를 수신할 때까지 기다립니다. 이 채널은 Sender
와 Receiver
로 구성됩니다.
Receiver
에는 두 가지 유용한 메서드, 즉 recv
와 try_recv
가 있습니다. 여기서는 메인 스레드의 실행을 차단하고 값이 채널로 전송될 때까지 기다리는 _receive_의 줄임말인 recv
를 사용합니다. 값이 전송되면 recv
는 Result<T, E>
에 값을 반환합니다. 송신기가 닫히면 recv
는 에러를 반환하여 더 이상 값이 오지 않을 것임을 알립니다.
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { for msg in vec!["hello", "from", "the", "other", "side"] { let val = String::from(msg); println!("Producing {}...", val); tx.send(val).unwrap(); thread::sleep(Duration::from_millis(10)); } }); for re in rx { println!("Consuming {}...", re); } }
try_recv
를 사용하면 다음과 같이 할 수 있습니다.
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); thread::spawn(move || { let val = String::from("hello"); thread::sleep(Duration::from_millis(1000)); tx.send(val).unwrap(); }); loop { println!("Waiting for the signal..."); if let Ok(received) = rx.try_recv() { println!("Message: {}", received); break; } thread::sleep(Duration::from_millis(300)); } }
rx가 어떻게 앞의 메시지를 다 기다릴 수 있는지? Receiver, 즉 rx
는 송신자 tx
가 메시지를 전송할 때까지 기다리려고 시도하며, 해당 채널이 끊어지면 오류를 반환합니다.
이 함수는 사용 가능한 데이터가 없고 더 많은 데이터를 전송할 수 있는 경우(적어도 한 명의 발신자가 여전히 존재할 경우) 항상 현재 스레드를 블럭합니다. 해당 발신자(또는 동기화 발신자)에게 메시지가 전송되면 이 수신자가 깨어나서 해당 메시지를 반환합니다. 즉 수신자가 메시지를 아직 보내지 않았다면 계속해서 메시지를 기다립니다.
해당 발신자가 연결을 끊었거나 이 스레드가 차단되는 동안 연결이 끊어지면, 이 채널에서 더 이상 메시지를 수신할 수 없음을 나타내는 Err을 반환합니다. 그러나 채널이 버퍼링되므로 연결이 끊어지기 전에 보낸 메시지는 계속 정상적으로 수신됩니다.
use std::sync::mpsc; use std::thread; use std::time::Duration; fn main() { let (tx, rx) = mpsc::channel(); let tx1 = tx.clone(); thread::spawn(move || { for msg in vec!["hello", "from", "the", "other", "side"] { let val = String::from(msg); thread::sleep(Duration::from_millis(100)); tx1.send(val).unwrap(); } }); thread::spawn(move || { let val = String::from("bye"); thread::sleep(Duration::from_millis(1000)); tx.send(val).unwrap(); }); for re in rx { println!("{}", re); } }