스레드 스폰

프로세스에서 스레드를 만드는 것을 스레드를 스폰(spawn)한다고 말합니다.

싱글 스레드 스폰하기

모든 프로그램은 메인 스레드로부터 시작합니다. 메인 스레드가 main 함수를 실행하고, 다른 스레드들을 실행시킬 수도 있습니다. 스레드 한 개를 스폰하는 방법은 다음과 같습니다.

from threading import Thread

from time import sleep


def func1():
    for i in range(0, 10):
        print(f"Hello, can you hear me?: {i}")
        sleep(0.001)


thread = Thread(target=func1)
thread.start()

for i in range(0, 5):
    print(f"Hello from the main thread: {i}")
    sleep(0.001)

실행 결과

Hello, can you hear me?: 0
Hello from the main thread: 0
Hello from the main thread: 1
Hello from the main thread: 2
Hello from the main thread: 3
Hello from the main thread: 4
Hello, can you hear me?: 1
Hello, can you hear me?: 2
Hello, can you hear me?: 3
Hello, can you hear me?: 4
Hello, can you hear me?: 5
Hello, can you hear me?: 6
Hello, can you hear me?: 7
Hello, can you hear me?: 8
Hello, can you hear me?: 9

메인 스레드와 생성된 스레드가 번갈아 가면서 작업을 수행하는 것을 알 수 있습니다.

러스트에서는 새로운 스레드를 만들 때 표준 라이브러리의 std::thread::spawn 함수를 사용합니다. spawn 함수는 스레드에서 실행시킬 함수를 인자로 받습니다. 만일 전달받은 함수가 종료된다면, 스레드도 종료됩니다. 다음 코드에서는 thread::spawn 을 사용해 스레드를 생성하고, thread::sleep 을 사용해 1ms 만큼 쉬었다가 다음 루프를 실행합니다. 파이썬과 마찬가지로, 메인 스레드와 생성된 스레드가 번갈아가며 실행되는 것을 확인할 수 있습니다.

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 0..10 {
            println!("Hello, can you hear me?: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 0..5 {
        println!("Hello from the main thread: {}", i);
        thread::sleep(Duration::from_millis(1));
    }

    handle.join().unwrap();
}

실행 결과

Hello from the main thread: 0
Hello, can you hear me?: 0
Hello from the main thread: 1
Hello, can you hear me?: 1
Hello from the main thread: 2
Hello, can you hear me?: 2
Hello, can you hear me?: 3
Hello from the main thread: 3
Hello, can you hear me?: 4
Hello from the main thread: 4
Hello, can you hear me?: 5
Hello, can you hear me?: 6
Hello, can you hear me?: 7
Hello, can you hear me?: 8
Hello, can you hear me?: 9

대몬(daemon) 스레드 만들기

스폰할 스레드를 대몬 스레드로 만들 수도 있습니다. 대몬 스레드는 백그라운드에서 실행되며, 메인 스레드가 종료되면 함께 종료되는 스레드입니다.

from threading import Thread

from time import sleep


def func1():
    for i in range(0, 10):
        print(f"Hello, can you hear me?: {i}")
        sleep(0.001)


thread = Thread(target=func1, daemon=True)
thread.start()

for i in range(0, 5):
    print(f"Hello from the main thread: {i}")
    sleep(0.001)

Hello, can you hear me?: 0
Hello from the main thread: 0
Hello, can you hear me?: 1
Hello from the main thread: 1
Hello from the main thread: 2
Hello, can you hear me?: 2
Hello, can you hear me?: 3
Hello from the main thread: 3
Hello from the main thread: 4
Hello, can you hear me?: 4

thread::spawnJoinHandle 을 반환합니다. JoinHandle 은 스레드가 종료될 때까지 기다리는 join 메서드를 가지고 있습니다. join 을 호출하면 메인 스레드가 종료되지 않고, 생성된 스레드가 종료될 때까지 기다립니다. 만일 메인 함수에서 join 을 호출하지 않으면, 스폰된 스레드가 실행 중이더라도 메인 스레드가 종료되어 프로그램이 종료됩니다.

use std::thread;
use std::time::Duration;

fn main() {
    thread::spawn(|| {
        for i in 0..10 {
            println!("Hello, can you hear me?: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });

    for i in 0..5 {
        println!("Hello from the main thread: {}", i);
        thread::sleep(Duration::from_millis(1));
    }
}

실행 결과

Hello from the main thread: 0
Hello, can you hear me?: 0
Hello from the main thread: 1
Hello, can you hear me?: 1
Hello from the main thread: 2
Hello, can you hear me?: 2
Hello from the main thread: 3
Hello, can you hear me?: 3
Hello from the main thread: 4
Hello, can you hear me?: 4
Hello, can you hear me?: 5

실행 결과에서 알 수 있듯이 스폰된 스레드가 10번 출력이 되어야 하는데 6번만 출력되고 프로그램이 종료되었습니다. 이를 해결하기 위해서는 join 을 호출해야 합니다.

join 을 사용해 스레드 기다리기

일반적으로 스레드는 시작된 다음 join 함수를 사용해 스레드 작업이 끝날 때까지 기다려집니다.

from threading import Thread

from time import sleep


def func1():
    for i in range(0, 10):
        print(f"Hello, can you hear me?: {i}")
        sleep(0.001)


thread = Thread(target=func1)
thread.start()
thread.join()

for i in range(0, 5):
    print(f"Hello from the main thread: {i}")
    sleep(0.001)

실행 결과

Hello, can you hear me?: 0
Hello, can you hear me?: 1
Hello, can you hear me?: 2
Hello, can you hear me?: 3
Hello, can you hear me?: 4
Hello, can you hear me?: 5
Hello, can you hear me?: 6
Hello, can you hear me?: 7
Hello, can you hear me?: 8
Hello, can you hear me?: 9
Hello from the main thread: 0
Hello from the main thread: 1
Hello from the main thread: 2
Hello from the main thread: 3
Hello from the main thread: 4

이번에는 join()을 사용해 스폰된 스레드가 끝까지 실행되기를 기다렸다가 메인 스레드를 실행합니다.

use std::thread;
use std::time::Duration;

fn main() {
    let handle = thread::spawn(|| {
        for i in 0..10 {
            println!("Hello, can you hear me?: {}", i);
            thread::sleep(Duration::from_millis(1));
        }
    });
    handle.join().unwrap();

    for i in 0..5 {
        println!("Hello from the main thread: {}", i);
        thread::sleep(Duration::from_millis(1));
    }
}

실행 결과

Hello, can you hear me?: 0
Hello, can you hear me?: 1
Hello, can you hear me?: 2
Hello, can you hear me?: 3
Hello, can you hear me?: 4
Hello, can you hear me?: 5
Hello, can you hear me?: 6
Hello, can you hear me?: 7
Hello, can you hear me?: 8
Hello, can you hear me?: 9
Hello from the main thread: 0
Hello from the main thread: 1
Hello from the main thread: 2
Hello from the main thread: 3
Hello from the main thread: 4

스레드와 소유권

std::thread::spawn에는 함수 이름을 전달할 수도 있지만, 앞에서처럼 클로저(closure)를 전달하는 경우가 더 많습니다. 클로저를 사용하면 특정 값을 스레드 안으로 이동시키는 것이 가능합니다.

#![allow(unused)]
fn main() {
let numbers = vec![1, 2, 3];
thread::spawn(move || {
    for n in numbers {
        println!("{n}");
    }
})
.join()
.unwrap();
}

변수 numbers의 소유권이 새로 만들어진 스레드로 이동됩니다. 바로 move 키워드가 클로저에 사용되었기 때문입니다. 만일 move를 사용하지 않았다면, 클로저는 numbers를 레퍼런스로 사용하게 되고, 이는 컴파일 에러의 원인이 됩니다. 왜냐하면 소유권을 빌린 변수 numbers보다 스레드의 지속 범위(scope)가 더 넓을 수 있기 때문입니다.

스레드는 프로그램이 종료될 때까지도 계속 실행될 수 있기 때문에, spawn 함수는 인자로 'static 타입을 입력받습니다. 영원히, 즉 프로그램이 종료될 때까지 존재할 수 있는 함수만을 입력받는 것입니다. 만일 클로저가 지역 변수의 레퍼런스를 입력받는 경우, 지역 변수가 범위를 벗어나 메모리에서 지워져 버리는 문제가 발생합니다.

클로저의 리턴값은 스레드로 전달됩니다. 이 값은 join 메소드가 호출될 때 Result로 감싸져서 리턴됩니다.

#![allow(unused)]
fn main() {
let numbers = Vec::from_iter(0..=1000);

let t = thread::spawn(move || {
    let len = numbers.len();
    let sum = numbers.into_iter().sum::<usize>();
    sum / len // 1
});

let average = t.join().unwrap(); // 2
println!("average: {average}");
}

위의 예제에서, 스레드의 클로저(1)에서 리턴된 값은 join 메소드(2)를 통해 메인 스레드로 전달됩니다.

만일 변수 numbers의 길이가 0이라면 스레드는 (1)에서 값을 0으로 나누려고 하다가 패닉을 발생시키게 됩니다. 그러면 join은 패닉 메시지를 리턴하게 되고, unwrap(2)에 의해 메인 스레드 역시 패닉이 발생합니다.

범위 제한(scoped) 스레드

만일 어떤 스레드가 반드시 특정 범위에서만 존재하는 것이 확실할 경우, 이 스레드는 지역 변수의 소유권을 빌려올 수 있습니다. 왜냐하면 스레드 역시 지역변수와 같이 특정 범위에서만 존재하기 때문입니다.

이러한 스레드를 범위 제한 스레드라고 부릅니다. 범위 제한 스레드를 만들기 위해서는 std::thread::scope를 사용하면 됩니다. 이제 지역 변수를 입력받는 클로저와 범위가 동일한 스레드를 만들 수 있습니다.

use std::thread;

fn main() {
    let numbers = vec![1, 2, 3];

    thread::scope(|s| {
        s.spawn(|| {
            println!("length: {}", numbers.len());
        });
        s.spawn(|| {
            for n in &numbers {
                println!("{n}");
            }
        });
    });
}
  1. 먼저 스레드의 범위를 만들어주기 위해 std::thread::scope 함수에 클로저를 전달합니다. 해당 클로저는 즉시 실행되고, 현재 범위를 나타내는 인자 s를 입력으로 받습니다.
  2. 그다음 s를 사용해 스레드를 생성합니다. 스레드에 전달되는 클로저는 지역 변수 numbers를 사용할 수 있습니다.
  3. 범위가 끝날 때, 실행 중인 스레드들이 종료될 때까지 기다립니다.

이러한 패턴을 사용하면 범위 안의 스레드들이 절대 범위 밖으로 나가지 못하는 것이 보장됩니다. 따라서 spawn 함수에 ``static타입이 아닌 인자를 입력받을 수 있게 됩니다. 예를 들어numbers변수의 경우는 해당 스코프s보다 오래 존재하기 때문에 범위 안의 스레드에서 numbers`를 참조할 수 있습니다.

위의 예제에서 두 스레드 모두 numbers 변수에 동시적으로 접근할 수 있습니다. 메인 스레드를 포함해 어느 스레드도 numbers 의 값을 바꾸고 있지 않기 때문에 동시에 접근하는 것 자체는 괜찮습니다. 하지만 코드를 아래와 같이 바꿔서 numbers에 새로운 값을 넣으려고 하면 컴파일 오류가 발생합니다.

use std::thread;

fn main() {
    let mut numbers = vec![1, 2, 3];
    thread::scope(|s| {
        s.spawn(|| {
            numbers.push(1);
        });
        s.spawn(|| {
            numbers.push(2); // Error!
        });
    });
}

컴파일 오류는 다음과 같습니다. 참고로 러스트 컴파일러 버전에 따라서 에러 메시지는 조금씩 다를 수 있습니다.

error[E0499]: cannot borrow `numbers` as mutable more than once at a time
  --> src/main.rs:9:17
   |
5  |       thread::scope(|s| {
   |                      - has type `&'1 Scope<'1, '_>`
6  |           s.spawn(|| {
   |           -       -- first mutable borrow occurs here
   |  _________|
   | |
7  | |             numbers.push(1);
   | |             ------- first borrow occurs due to use of `numbers` in closure
8  | |         });
   | |__________- argument requires that `numbers` is borrowed for `'1`
9  |           s.spawn(|| {
   |                   ^^ second mutable borrow occurs here
10 |               numbers.push(2); // Error!
   |               ------- second borrow occurs due to use of `numbers` in closure

GIL(Global interpreter lock)

GIL은 한 번에 하나의 스레드만 파이썬 바이트코드를 실행하도록 하기 위해 인터프리터에서 사용되는 메커니즘입니다. GIL은 인터프리터가 코드를 실행하기 전에 획득하는 락(lock)입니다. 스레드가 GIL을 획득하면 파이썬 바이트코드를 실행할 수 있지만 다른 모든 스레드는 GIL이 해제될 때까지 파이썬 코드를 실행할 수 없도록 차단됩니다.

GIL의 목적은 여러 스레드의 동시 액세스로부터 인터프리터의 내부 데이터 구조를 보호하는 것입니다. GIL이 없으면 두 개 이상의 스레드가 동일한 데이터 구조를 동시에 수정할 수 있으며, 이로 인해 레이스 컨디션 등의 문제가 발생할 수 있습니다.

GIL의 단점

그러나 GIL에는 몇 가지 단점도 있습니다. 한 번에 하나의 스레드만 바이트코드를 실행할 수 있기 때문에 GIL은 멀티스레드 프로그램, 특히 CPU에 바인딩된 작업을 포함하는 프로그램의 성능에 치명적인 영향을 줍니다. 한 스레드가 CPU에 바인딩된 작업을 실행하는 경우, 다른 스레드가 I/O 또는 기타 CPU에 바인딩되지 않은 작업을 대기 중이더라도 Python 코드를 실행하지 못하고 작업이 끝날 때까지 기다려야 합니다.

GIL의 한계를 극복하기 위해 파이썬은 몇 가지 기능을 지원합니다.

  • 비동기 프로그래밍
  • 멀티스레딩
  • 멀티프로세싱
  • 제한적 GIL 해제

이 중에서 실제 개발자 입장에서 가장 널리 쓰이는 방식은 비동기 프로그래밍과 멀티스레딩입니다. numpy나 pandas와 같이 C를 사용해 GIL을 제한적으로 해제하거나, 15장에서 다룰 방법인 러스트를 사용해 성능을 비약적으로 향상시키는 방식도 가능합니다. 다만 이 방법은 서드파티 패키지를 사용하거나, 별도 패키지를 자체적으로 빌드해서 사용해야하는 만큼 사용 방법이 다소 제한적입니다.

GIL 경합(contention) 문제

아래 코드를 실행해 보면 count 함수를 연속으로 두 번 호출하는 것과, 스레드를 사용하는 것과의 실제 실행 속도 차이가 그리 크지 않습니다. 그 이유는 파이썬은 한 번에 단 하나의 코드만 실행할 수 있기 때문에, 스레드를 여러 개를 만들더라도 빠르게 계산하지 못하기 때문입니다.

import time
import threading

N = 10000000

def count(n):
    for i in range(n):
        pass

start = time.time()
count(N)
count(N)
print(f"Elapsed time(sequential): {(time.time() - start) * 1000:.3f}ms")

start = time.time()
t1 = threading.Thread(target=count, args=(N,))
t2 = threading.Thread(target=count, args=(N,))

t1.start()
t2.start()

t1.join()
t2.join()

print(f"Elapsed time(threaded): {(time.time() - start) * 1000:.3f}ms")

실행 결과

Elapsed time(sequential): 0.4786410331726074
Elapsed time(threaded): 0.4163088798522949