메모리 공유

스레드 소유권과 레퍼런스 카운팅

지금까지 객체의 소유권을 move 키워드를 사용해 클로저로 넘기거나, 범위 제한 스레드를 사용하는 방법을 살펴보았습니다. 두 스레드가 데이터를 공유하는 상황에서, 두 스레드 모두가 나머지 하나보다 더 오래 존재한다는 사실이 보장되지 않는다면, 어떤 스레드도 데이터의 소유권을 가질 수 없습니다. 공유되는 어떤 데이터도 두 스레드보다 더 오래 존재해야만 합니다.

스태틱(static)

러스트에는 어떠한 스레드에도 소속되지 않는 변수를 만드는 방법이 있는데, 바로 static입니다. static 변수는 프로그램 자체가 소유권을 가지기 때문에 반드시 어떤 스레드보다도 오래 존재할 수 있습니다. 다음 예제에서는 두 스레드 모두 X에 접근할 수 있지만, 두 스레드 모두 X를 소유할 수는 없습니다.

#![allow(unused)]
fn main() {
static X: [i32; 3] = [1, 2, 3];
thread::spawn(|| dbg!(&X));
thread::spawn(|| dbg!(&X));
}

static으로 선언된 변수는 상수 값을 가지며, 프로그램이 시작되기 전에 생성됩니다. 따라서 어떤 스레드도 static 변수로부터 값을 빌려올 수 있게 됩니다.

유출(Leaking)

또 다른 데이터 공유 방법은 값의 할당을 유출시키는 방법입니다. Box::leak 함수를 사용하면 Box의 소유권을 해제하고 절대 이 값이 삭제되지 않게 할 수 있습니다. 이때부터 Box는 프로그램이 종료될 때까지 존재하게 되고 어느 스레드에서도 값을 빌려 갈 수 있게 됩니다.

#![allow(unused)]
fn main() {
let x: &'static [i32; 3] = Box::leak(Box::new([1, 2, 3]));

thread::spawn(move || dbg!(x));
thread::spawn(move || dbg!(x));
}

여기서 move 클로저가 값의 소유권을 가져가는 것처럼 보이지만, 자세히 살펴보면 x는 단순히 원래 Box의 레퍼런스라는 사실을 알 수 있습니다.

레퍼런스란 정수형이나 불리언 타입처럼 원본 데이터는 그대로 두고 값만 복사해가는 것입니다.

여기서 주의해야 할 점은 'static 으로 선언되었다고 해서 이 값이 프로그램 시작 전에 만들어진다는 것은 아니라는 것입니다. 중요한 점은 이 값이 프로그램이 종료될 때까지 유지된다는 사실입니다.

이렇게 Box를 유출시키게 되면 메모리가 유출되는 단점이 있습니다. 메모리에 어떤 객체를 할당했지만, 객체를 삭제하고 메모리에서 할당 해제하지 않는 것입니다. 전체 프로그램에서 이러한 패턴이 몇 번 존재하지 않는다면 큰 상관이 없지만, 이러한 패턴이 반복되면 프로그램의 메모리가 점차 부족해질 것입니다.

레퍼런스 카운팅

스레드 사이에서 공유된 데이터가 확실히 삭제되고 메모리에서 할당 해제되게 하려면, 해당 데이터의 소유권을 포기해서는 안됩니다. 대신, 소유권을 공유하면 가능합니다. 해당 데이터의 소유자들을 지속적으로 관리함으로써 더 이상 해당 데이터의 소유자가 없을 때 객체를 삭제할 수 있습니다.

이러한 방법을 레퍼런스 카운팅(Reference counting)이라고 하고, 러스트에서는 std::rc::Rc 타입을 사용해 구현이 가능합니다. Box와 비슷하지만, 데이터를 복사하게 되면 새로운 데이터가 메모리에 할당되는 것이 아니고 레퍼런스 카운터의 값이 증가합니다. 결론적으로 원본 데이터와 복사된 데이터 모두 같은 메모리에 할당된 값을 참조합니다. 이러한 원리 때문에 소유권을 공유한다고 말하는 것입니다.

#![allow(unused)]
fn main() {
use std::rc::Rc;

let a = Rc::new([1, 2, 3]);
let b = a.clone();

assert_eq!(a.as_ptr(), b.as_ptr()); // Same allocation!
}

위 예제에서는 Rc타입의 변수 aclone() 메소드로 복사해서 b를 만들었습니다. 그리고 두 변수의 메모리 주소를 확인해보면 동일하다는 것을 알 수 있습니다.

Rc가 삭제되면 카운터가 감소됩니다. 가장 마지막으로 존재하는 Rc가 삭제되면, 카운터가 0이 되고 따라서 메모리에서 값이 할당 해제됩니다.

하지만 Rc를 다른 스레드로 보내려고 하면 에러가 발생합니다.

    error[E0277]: `Rc` cannot be sent between threads safely
        |
    8   |     thread::spawn(move || dbg!(b));
        |                   ^^^^^^^^^^^^^^^

결론적으로 Rc는 스레드 안전성이 보장되지 않는 타입입니다. 만일 여러 개의 스레드가 특정 값에 대해 Rc를 사용한다면, 각 스레드에서 레퍼런스 카운터를 동시에 변경할 가능성이 있고 이것은 예측하지 못한 결과를 발생시킵니다.

Arc(Atomic refernce counting)

대신 아토믹(atomically)한 레퍼런스 카운팅을 사용하는 std::sync::Arc을 사용할 수 있습니다. Rc와 동일한 기능을 제공하지만, Arc는 여러 스레드에서 레퍼런스 카운터를 변경하는 것이 허용된다는 점이 다릅니다. 레퍼런스 카운터가 변경되는 작업이 아토믹하게 이루어지기 때문에, 여러 개의 스레드에서 동시에 카운터를 변경하더라도 스레드 안전성이 보장됩니다.

#![allow(unused)]
fn main() {
use std::sync::Arc;

let a = Arc::new([1, 2, 3]);
let b = a.clone();

thread::spawn(move || dbg!(a));
thread::spawn(move || dbg!(b));
}
  • (1)에서 배열을 Arc를 사용해 메모리에 할당합니다. 이때 레퍼런스 카운터는 1이 됩니다.
  • Arc를 클론하면 레퍼런스 카운트는 2가 되고, ab 모두 같은 메모리 주소를 사용합니다.
  • 각 스레드마다 고유한 Arc를 전달받았습니다. 즉 배열이 스레드 사이에 공유되었습니다. 각 스레드에서 Arc가 삭제될 때마다 레퍼런스 카운터가 감소하고, 카운터가 0이 되면 배열은 메모리에서 할당 해제됩니다.

뮤텍스(mutex)

뮤텍스는 Mutual exclusion(상호 배제)의 약자로, 뮤텍스는 주어진 시간에 하나의 스레드만 데이터에 액세스할 수 있도록 허용합니다. 뮤텍스의 데이터에 액세스하려면 먼저 스레드가 뮤텍스의 락(lock)을 획득하도록 요청하여 액세스를 원한다는 신호를 보내야 합니다. 락은 뮤텍스의 일부인 데이터 구조로, 어떤 스레드에서 뮤텍스를 참조하고 있는지가 저장되어 있습니다.

뮤텍스를 사용하기 위해서는 두 가지 규칙을 지켜야 합니다.

  • 데이터를 사용하기 전에 반드시 잠금을 해제해야 합니다.
  • 뮤텍스가 보호하는 데이터를 사용한 후에는 다른 스레드가 잠금을 획득할 수 있도록 데이터의 잠금을 해제해야 합니다.

뮤텍스의 API

뮤텍스 사용 방법을 살펴보기 위해 단일 스레드에서 뮤텍스를 사용하는 것부터 시작해 보겠습니다.

use std::sync::Mutex;

fn main() {
    let m = Mutex::new(5);

    {
        let mut num = m.lock().unwrap();
        *num = 6;
    }

    println!("m = {:?}", m);
}

실행 결과

m = Mutex { data: 6, poisoned: false, .. }

많은 타입과 마찬가지로, 연관 함수 new를 사용해 Mutex를 생성합니다. 뮤텍스 내부의 데이터에 접근하기 위해 lock 메서드를 사용해 락을 획득합니다. 이 호출은 현재 스레드를 차단하여 다른 스레드에서 락을 가질 차례가 될 때까지 어떤 작업도 할 수 없도록 합니다.

만약 락을 보유한 다른 스레드가 패닉에 빠지면 lock 호출이 실패합니다. 이 경우 아무도 락을 얻을 수 없게 되기 때문에 스레드에서 패닉을 발생시킵니다.

락을 획득한 후에는 반환 값(이 경우 num)을 뮤텍스 내부의 값에 대한 가변 레퍼런스로 취급할 수 있습니다. 락을 해제하고 나서 뮤텍스의 값을 출력하면 내부에 보관하고 있던 정수형 값이 5에서 6으로 변경된 것을 확인할 수 있습니다.

balance가 인출하고자 하는 금액보다 크다고 판단해 잔고를 인출합니다.

import threading
import time

balance = 100

def withdraw(amount):
    global balance
    if balance >= amount:
        time.sleep(0.01)
        balance -= amount
        print(f"Withdrawal successful. Balance: {balance}")
    else:
        print("Insufficient balance.")

def main():
    t1 = threading.Thread(target=withdraw, args=(50,))
    t2 = threading.Thread(target=withdraw, args=(75,))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

if __name__ == '__main__':
    main()

실행 결과

Withdrawal successful. Balance: 50
Withdrawal successful. Balance: -25

스레드가 lock을 획득했을 때만 데이터에 접근 가능하도록 하면 이러한 문제를 막을 수 있습니다.

import time
import threading

balance = 100
lock = threading.Lock()

def withdraw(amount):
    global balance
    thread_id = threading.get_ident()
    with lock:
        print(f"Thread {thread_id}: Checking balance...")
        if balance >= amount:
            time.sleep(1)
            balance -= amount
            print(f"Thread {thread_id}: Withdrawal successful. Balance: {balance}")
        else:
            print(f"Thread {thread_id}: Insufficient balance.")

def check_lock():
    while lock.locked():
        print("Lock is locked.")
        time.sleep(0.1)

def main():
    t1 = threading.Thread(target=withdraw, args=(50,))
    t2 = threading.Thread(target=withdraw, args=(75,))
    t3 = threading.Thread(target=check_lock)
    t1.start()
    t2.start()
    t3.start()
    t1.join()
    t2.join()
    t3.join()

if __name__ == '__main__':
    main()

실행 결과

Thread 123145503645696: Checking balance...
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Thread 123145503645696: Withdrawal successful. Balance: 50
Thread 123145520435200: Checking balance...
Thread 123145520435200: Insufficient balance.

다음 코드는 10개의 스레드가 100번씩 1씩 증가하는 값을 더하는 코드입니다.

use std::sync::Arc;
use std::thread;
use std::time::Duration;

fn withdraw(balance: &mut i32, amount: i32) {
    if *balance >= amount {
        thread::sleep(Duration::from_millis(10));
        *balance -= amount;
        println!("Withdrawal successful. Balance: {balance}");
    } else {
        println!("Insufficient balance.");
    }
}

fn main() {
    let mut balance = Arc::new(100);

    let t1 = thread::spawn(move || {
        withdraw(&mut balance, 50); // 🤯
    });

    let t2 = thread::spawn(move || {
        withdraw(&mut balance, 75);
    });

    t1.join().unwrap();
    t2.join().unwrap();
}

가변 레퍼런스로 각 스레드에 값을 전달할 수 없습니다.

뮤텍스를 사용해 데이터를 보호하고 있습니다. 뮤텍스를 사용하면 여러 스레드가 동시에 데이터에 액세스할 수 없습니다. 따라서 데이터가 더해지는 동안 다른 스레드는 데이터에 액세스할 수 없습니다.

use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

fn withdraw(balance: Arc<Mutex<i32>>, amount: i32) {
    let mut balance = balance.lock().unwrap();
    println!("{:?}: Checking balance.", thread::current().id());
    if *balance >= amount {
        thread::sleep(Duration::from_millis(100));
        *balance -= amount;
        println!("Withdrawal successful. Balance: {balance}");
    } else {
        println!("Insufficient balance.");
    }
}

fn check_lock(balance: Arc<Mutex<i32>>) {
    while let Err(_) = balance.try_lock() {
        println!("Lock is locked.");
        thread::sleep(Duration::from_millis(10));
    }
}

fn main() {
    let balance = Arc::new(Mutex::new(100));

    let balance1 = Arc::clone(&balance);
    let t1 = thread::spawn(move || {
        withdraw(Arc::clone(&balance1), 50);
    });
    let balance2 = Arc::clone(&balance);
    let t2 = thread::spawn(move || {
        withdraw(Arc::clone(&balance2), 75);
    });
    let balance3 = Arc::clone(&balance);
    let t3 = thread::spawn(move || {
        check_lock(Arc::clone(&balance3));
    });

    t1.join().unwrap();
    t2.join().unwrap();
    t3.join().unwrap();
}

실행 결과

ThreadId(2): Checking balance.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Lock is locked.
Withdrawal successful. Balance: 50
ThreadId(3): Checking balance.
Insufficient balance.