메시지 전달

스레드 간에 데이터를 공유하는 방법 중 하나로 널리 쓰이는 것 중 하나가 바로 MPSC입니다. 다중 생성자-단일 소비자(Multiple Producer Single Consumer)란 뜻으로, 여러 개의 스레드에서 하나의 스레드로 데이터를 보내는 방식입니다.

파이썬에서는 공식적으로 MPSC를 만드는 방법이 없기 때문에, 스레드 안정성이 보장되는 큐 자료형인 Queue 를 사용해 이를 구현해 보겠습니다.

import threading
import time
from queue import Queue

channel = Queue(maxsize=3)


def producer():
    for msg in ["hello", "from", "the", "other", "side"]:
        print(f"Producing {msg}...")
        channel.put(msg)


def consumer():
    while not channel.empty():
        item = channel.get()
        print(f"Consuming {item}...")
        channel.task_done()
        time.sleep(0.01)


producer_thread = threading.Thread(target=producer)
producer_thread.start()

consumer_thread = threading.Thread(target=consumer)
consumer_thread.start()

producer_thread.join()
consumer_thread.join()

mpsc(multiple producer, single consumer)는 여러 개의 송신자와 하나의 수신자를 가진 채널을 만듭니다. 이 채널은 송신자가 메시지를 전송하면 수신자가 메시지를 수신할 때까지 기다립니다. 이 채널은 SenderReceiver로 구성됩니다.

Receiver에는 두 가지 유용한 메서드, 즉 recvtry_recv가 있습니다. 여기서는 메인 스레드의 실행을 차단하고 값이 채널로 전송될 때까지 기다리는 _receive_의 줄임말인 recv를 사용합니다. 값이 전송되면 recvResult<T, E>에 값을 반환합니다. 송신기가 닫히면 recv는 에러를 반환하여 더 이상 값이 오지 않을 것임을 알립니다.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        for msg in vec!["hello", "from", "the", "other", "side"] {
            let val = String::from(msg);
            println!("Producing {}...", val);
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(10));
        }
    });

    for re in rx {
        println!("Consuming {}...", re);
    }
}

try_recv 를 사용하면 다음과 같이 할 수 있습니다.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    thread::spawn(move || {
        let val = String::from("hello");
        thread::sleep(Duration::from_millis(1000));
        tx.send(val).unwrap();
    });

    loop {
        println!("Waiting for the signal...");
        if let Ok(received) = rx.try_recv() {
            println!("Message: {}", received);
            break;
        }

        thread::sleep(Duration::from_millis(300));
    }
}

rx가 어떻게 앞의 메시지를 다 기다릴 수 있는지? Receiver, 즉 rx는 송신자 tx 가 메시지를 전송할 때까지 기다리려고 시도하며, 해당 채널이 끊어지면 오류를 반환합니다.

이 함수는 사용 가능한 데이터가 없고 더 많은 데이터를 전송할 수 있는 경우(적어도 한 명의 발신자가 여전히 존재할 경우) 항상 현재 스레드를 블럭합니다. 해당 발신자(또는 동기화 발신자)에게 메시지가 전송되면 이 수신자가 깨어나서 해당 메시지를 반환합니다. 즉 수신자가 메시지를 아직 보내지 않았다면 계속해서 메시지를 기다립니다.

해당 발신자가 연결을 끊었거나 이 스레드가 차단되는 동안 연결이 끊어지면, 이 채널에서 더 이상 메시지를 수신할 수 없음을 나타내는 Err을 반환합니다. 그러나 채널이 버퍼링되므로 연결이 끊어지기 전에 보낸 메시지는 계속 정상적으로 수신됩니다.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    let tx1 = tx.clone();
    thread::spawn(move || {
        for msg in vec!["hello", "from", "the", "other", "side"] {
            let val = String::from(msg);
            thread::sleep(Duration::from_millis(100));
            tx1.send(val).unwrap();
        }
    });

    thread::spawn(move || {
        let val = String::from("bye");
        thread::sleep(Duration::from_millis(1000));
        tx.send(val).unwrap();
    });

    for re in rx {
        println!("{}", re);
    }
}