tokio

앞에서 설명했듯이, 비동기 함수를 설명하려면 프로그램 내부에서 비동기 함수들의 실행을 관리하는 비동기 런타임이 필요합니다. 러스트는 빌트인 async 런타임이 존재하지 않아서 tokio 를 사용해야 합니다. Tokio는 다음 세 가지의 주요 구성 요소를 제공합니다.

  • 비동기 코드 실행을 위한 멀티스레드 런타임.
  • 표준 라이브러리의 비동기 버전.
  • 대규모 라이브러리 에코시스템.

이제 프로젝트에 tokio 를 설치하기 위해 Cargo.toml파일에 tokio를 추가합니다. 이때 features = ["full"] 을 넣어야 전체 기능을 다 사용할 수 있습니다.

[dependencies]
tokio = { version = "1.25.0", features = ["full"] }

비동기 함수 만들어보기

파이썬에는 비동기 함수를 만들기 위해서 asyncio 라는 내장 라이브러리가 있습니다.

비동기 함수를 선언하려면 일반 함수 정의 앞에 async 키워드를 붙여주면 됩니다.

async def func():
    print("async")

비동기 함수는 그냥 호출하게 되면 결과 대신 coroutine 객체가 리턴되고, 경고가 발생합니다.

>>> print(func())

<coroutine object asynch at 0x10fe8e3b0>
/Users/temp/python/main.py:9: RuntimeWarning: coroutine 'func' was never awaited
  print(func())
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

비동기 함수를 호출하려면, asyncio.runawait 키워드를 같이 사용해야 합니다.

import asyncio

async def func():
    print("async")

async def main():
    await func()

asyncio.run(main())

러스트에서도 파이썬과 동일하게 비동기 함수를 만드려면 asyncawait을 모두 사용합니다. 위의 파이썬 코드와 동일한 러스트 코드를 만들어보면 다음과 같습니다. 파이썬에서는 함수의 앞에 await 을 사용하는 반면, 러스트에서는 함수 호출 뒤에 .await 을 붙여줍니다. 또한, 비동기 함수는 비동기 함수에서만 호출이 가능하므로 비동기 함수를 실행하기 위해서 mainasync 로 바꾸고, #[tokio::main]main 함수에 붙여서 비동기 메인 함수를 만들 수 있습니다. 이제 메인 함수가 비동기 함수이기 때문에 내부에서 asyncawait 키워드를 사용 가능합니다.

use tokio;

async fn func() {
    println!("async");
}

#[tokio::main]
async fn main() {
    func().await
}

그렇다면 비동기 함수를 기다리지 않고 프린트해보면 어떨까요? 일단 아래 코드는 컴파일되지 않습니다.

use tokio;

async fn func() {
    println!("async");
}

#[tokio::main]
async fn main() {
    println!("{:?}", func()) // 🤯
}

실행 결과

error[E0277]: `impl Future<Output = i32>` doesn't implement `std::fmt::Display`
 --> src/main.rs:9:22
  |
9 |     println!("{:#}", func())
  |                      ^^^^^^ `impl Future<Output = i32>` cannot be formatted with the default formatter

컴파일 에러를 들여다보면, 함수 func의 리턴값이impl Future<Output = i32>라는 것을 알 수 있습니다. 즉 비동기 함수를 기다리지 않으면, 파이썬에서는 coroutine이, 러스트에서는 Future 가 리턴됩니다. 둘 다 await 을 사용해 함수 호출이 끝나기를 기다려줘야 실제 함수의 결과를 얻을 수 있습니다.

자바스크립트나 C#과 같은 언어의 비동기 함수들은 프라미스(Promise) 기반으로 만들어져 있어서 비동기 함수를 호출하는 즉시 실행되지만, 파이썬과 러스트는 실행할 수 있을 때 실행하는 방식(lazy execution)을 사용하고 있습니다.

하지만 단순히 await를 사용하기만 해서는 기존의 동기 함수와 비슷한 결과가 나옵니다.

async fn hello() -> i32 {
    println!("Hello World!");
    tokio::time::sleep(std::time::Duration::from_secs(3)).await;
    println!("waited");
    1
}

async fn bye() -> i32 {
    println!("Goodbye!");
    2
}

#[tokio::main]
async fn main() {
    hello().await; // blocking
    bye().await;
}

실행 결과

Hello World!
waited
Goodbye!

리턴값이 있는 비동기 함수

만일 비동기 함수에서 값을 리턴한다면 그 값을 어떻게 받을 수 있을까요? 파이썬과 러스트 모두, 단순히 기존 동기 함수처럼 코드를 작성하고, 함수 정의에 async 키워드만 붙여주면 됩니다. 먼저 파이썬의 경우는 다음과 같습니다. await 키워드를 붙여서 호출하고 나면 함수의 리턴값을 얻을 수 있습니다.

import asyncio


async def func():
    return "async"


async def main():
    result = await func()
    print(result)


asyncio.run(main())

실행 결과

async

러스트에서도 함수의 리턴값은 await 을 사용하고 나면 얻을 수 있습니다.

use tokio;

async fn func<'a>() -> &'a str {
    "async"
}

#[tokio::main]
async fn main() {
    println!("{:?}", func().await) // 🤯
}

실행 결과

"async"

여러 작업 실행하기

asyncio.gather 를 사용하면 여러 개의 비동기 함수를 한꺼번에 실행할 수도 있습니다.

import asyncio

async def give_order(order):
    print(f"Processing {order}...")
    await asyncio.sleep(3 - order)
    print(f"Finished {order}")

async def main():
    await asyncio.gather(give_order(1), give_order(2), give_order(3))

asyncio.run(main())

실행 결과

Processing 1...
Processing 2...
Processing 3...
Finished 3
Finished 2
Finished 1

만일 각 함수에 리턴값이 있다면 값이 모아져서 리턴됩니다.

import asyncio

async def give_order(order):
    print(f"Processing {order}...")
    await asyncio.sleep(3 - order)
    print(f"Finished {order}")
    return order

async def main():
    results = await asyncio.gather(give_order(1), give_order(2), give_order(3))
    print(results)

asyncio.run(main())

실행 결과

Processing 1...
Processing 2...
Processing 3...
Finished 3
Finished 2
Finished 1
[1, 2, 3]

실제 함수 실행 속도가 다르더라도 결과는 gather 에 넣은 순서대로 나오게 됩니다.

러스트에서는 tokio::join!에 기다리고자 하는 함수들을 넣어주면 됩니다. 이게 가능한 이유는, 위에서 만든 hellobye 함수 모두 실제로는 Future라고 하는 비동기 객체를 리턴하게 되고, tokioFuture를 사용해 내부적으로 비동기 관련 처리를 해주기 때문입니다.

async fn give_order(order: u64) -> u64 {
    println!("Processing {order}...");
    tokio::time::sleep(std::time::Duration::from_secs(3 - order)).await;
    println!("Finished {order}");
    order
}

#[tokio::main]
async fn main() {
    let result = tokio::join!(give_order(1), give_order(2), give_order(3));

    println!("{:?}", result);
}

실행 결과

Processing 1...
Processing 2...
Processing 3...
Finished 3
Finished 2
Finished 1
(1, 2, 3)

예제: 빠르게 HTTP 요청 보내보기

파이썬 동기

import requests
from random import randint


# The highest Pokemon id
MAX_POKEMON = 898


def fetch(total):
    urls = [
        f"https://pokeapi.co/api/v2/pokemon/{randint(1, MAX_POKEMON)}"
        for _ in range(total)
    ]
    with requests.Session() as session:
        for url in urls:
            response = session.get(url).json()
            yield response["name"]


def main():
    for name in fetch(10):
        print(name)

    print([name for name in fetch(10)])


main()

Cargo.toml 에 추가

- rand: 이 크레이트는 난수 생성을 위한 유용한 유틸리티를 제공합니다. 여기에는 다양한 유형의 난수값을 생성하고, 이를 유용한 확률 분포로 변환하고, 무작위성 관련 알고리즘을 구현하는 기능이 포함되어 있습니다. 이 크레이트는 다양한 난수 생성기에 대해 구현된 Rng 특성을 통해 사용하기 쉬운 API를 제공합니다. - reqwest: 이 크레이트는 요청을 보내고 응답을 처리하기 위한 사용하기 쉬운 사용자 친화적인 HTTP 클라이언트를 제공합니다. 동기 및 비동기 요청을 모두 지원하며 상위 수준 API부터 고급 사용 사례를 위한 하위 수준 API까지 일반적인 사용 사례를 지원합니다. - serde_json: 이 크레이트는 Rust에서 데이터를 직렬화(serialize) 및 역직렬화(deserialize)하기 위한 프레임워크를 제공하는 serde 프로젝트의 일부입니다. serde_json 크레이트는 특히 데이터를 JSON으로 직렬화하고 JSON에서 데이터를 역직렬화하는 기능을 제공합니다.

rand = "0.8.5"
reqwest = { version="0.11.16", features = ["blocking", "json"] }
serde_json = "1.0.95"

rand::thread_rng() 로 1부터 898까지의 무작위 난수 생성

reqwest::blocking::Client::new() 동기 방식의 HTTP 클라이언트 생성

.json::<serde_json::Value>() 응답 json의 형식을 미리 알 수 없는 경우 사용, 일반적으로는 응답 형식을 알고 있어서 구조체를 사용해 타입을 명시

use rand::Rng;
use reqwest;
use serde_json;

const MAX_POKEMON: u32 = 898;

fn fetch(total: u32) -> Vec<String> {
    let mut urls = Vec::new();
    for _ in 0..total {
        let url = format!(
            "https://pokeapi.co/api/v2/pokemon/{}",
            rand::thread_rng().gen_range(1..=MAX_POKEMON)
        );
        urls.push(url);
    }
    let client = reqwest::blocking::Client::new();
    let mut names = Vec::new();
    for url in urls {
        let response = client
            .get(&url)
            .send()
            .unwrap()
            .json::<serde_json::Value>()
            .unwrap();
        names.push(response["name"].as_str().unwrap().to_string());
    }
    names
}

fn main() {
    for name in fetch(10) {
        println!("{}", name);
    }

    println!("{:?}", fetch(10));
}

실행 결과

grumpig
dartrix
celesteela
piloswine
tangrowth
virizion
glastrier
dewpider
hattrem
glameow
['boltund', 'gourgeist-average', 'shellos', 'shiinotic', 'eevee', 'cranidos', 'celesteela', 'solosis', 'houndour', 'landorus-incarnate']

파이썬 비동기

import asyncio
import aiohttp
from random import randint


# The highest Pokemon id
MAX_POKEMON = 898


async def _fetch(session, url):
    async with session.get(url) as response:
        return await response.json()


async def fetch(total):
    urls = [
        f"https://pokeapi.co/api/v2/pokemon/{randint(1, MAX_POKEMON)}"
        for _ in range(total)
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [_fetch(session, url) for url in urls]
        responses = await asyncio.gather(*tasks)
        for response in responses:
            yield response["name"]


async def main():
    async for name in fetch(10):
        print(name)

    print([name async for name in fetch(10)])


asyncio.run(main())

러스트 비동기

use rand::Rng;
use reqwest;

const MAX_POKEMON: u32 = 898;

async fn fetch(total: u32) -> Vec<String> {
    let mut urls = Vec::new();
    for _ in 0..total {
        let url = format!(
            "https://pokeapi.co/api/v2/pokemon/{}",
            rand::thread_rng().gen_range(1..=MAX_POKEMON)
        );
        urls.push(url);
    }
    let client = reqwest::Client::new();
    let mut names = Vec::new();
    for url in urls {
        let response = client
            .get(&url)
            .send()
            .await
            .unwrap()
            .json::<serde_json::Value>()
            .await
            .unwrap();
        names.push(response["name"].as_str().unwrap().to_string());
    }
    names
}

#[tokio::main]
async fn main() {
    for name in fetch(10).await {
        println!("{}", name);
    }

    println!("{:?}", fetch(10).await);
}