본문 바로가기

Category/Python

AsyncResult와 Multiprocessing.pool

728x90

이 두 가지는 Python에서 병렬 처리를 할 때 핵심적으로 사용되는 개념입니다. 특히, 여러 CPU 코어를 활용하여 무거운 작업을 동시에 처리하고 싶을 때 유용합니다.

1. Multiprocessing.Pool (멀티프로세싱 풀)

  • 무엇인가? multiprocessing.Pool은 Python에서 여러 개의 독립적인 프로세스(Process)들을 미리 생성해 놓고 작업을 분배하는 방식을 제공하는 클래스입니다. "풀(Pool)"이라는 이름처럼, 작업에 사용할 일꾼(프로세스)들을 미리 대기시켜 놓는 개념입니다.
  • 왜 사용하는가?
    • GIL (Global Interpreter Lock) 회피: Python은 GIL이라는 특성 때문에 기본적으로 한 번에 하나의 스레드만 Python 코드를 실행할 수 있습니다 (I/O 작업 중에는 GIL이 해제되기도 하지만, CPU 연산은 제한됩니다). multiprocessing 모듈은 아예 별도의 OS 프로세스를 생성하여 이 GIL의 제약을 우회합니다. 각 프로세스는 자체 Python 인터프리터와 메모리 공간을 가지므로, 여러 CPU 코어에서 동시에 Python 코드를 실행할 수 있게 됩니다.
    • 작업 분배: 복잡하거나 시간이 오래 걸리는 작업(예: 대규모 데이터 처리, 이미지 처리, 복잡한 계산)을 여러 조각으로 나누어 이 프로세스들에게 분배할 수 있습니다.
    • 자원 관리: 미리 정해진 개수의 프로세스만 생성하여 관리하므로, 무한정 프로세스가 생성되어 시스템 자원을 고갈시키는 것을 방지합니다.
  • 주요 메서드:
    • Pool(processes=None): 풀을 생성합니다. processes는 생성할 프로세스 수이며, 지정하지 않으면 CPU 코어 수만큼 생성합니다.
    • pool.apply(func, args=(), kwds={}): 단일 작업을 풀에 제출하고, 작업이 완료될 때까지 블록(Block) 합니다. 결과를 즉시 반환합니다.
    • pool.apply_async(func, args=(), kwds={}): 단일 작업을 풀에 비동기적으로 제출하고, 즉시 AsyncResult 객체를 반환합니다. 작업이 백그라운드에서 실행되므로, 제출 즉시 다음 코드가 실행됩니다.
    • pool.map(func, iterable, chunksize=None): 여러 작업을 풀에 제출하고, 모든 작업이 완료될 때까지 블록합니다. map 함수처럼 iterable의 각 항목에 func를 적용합니다.
    • pool.map_async(func, iterable, chunksize=None): 여러 작업을 풀에 비동기적으로 제출하고, 즉시 AsyncResult 객체를 반환합니다.
    • pool.close(): 풀에 더 이상 새로운 작업을 제출할 수 없도록 합니다.
    • pool.join(): 풀 내의 모든 작업 프로세스가 완료될 때까지 기다립니다. close() 후에 호출해야 합니다.

2. AsyncResult (비동기 결과 객체)

  • 무엇인가? AsyncResult는 multiprocessing.Pool의 apply_async()나 map_async()와 같이 비동기적으로 제출된 작업의 미래 결과를 나타내는 객체입니다. 작업이 백그라운드에서 실행되는 동안, 이 AsyncResult 객체를 통해 작업의 상태를 확인하고, 최종적으로 결과를 가져올 수 있습니다.
  • 왜 사용하는가?
    • 비블록킹(Non-blocking) 작업 제출: 작업을 제출한 후 즉시 AsyncResult를 받으므로, 메인 프로그램은 작업이 완료될 때까지 기다리지 않고 다른 유용한 작업을 계속 수행할 수 있습니다. 이는 특히 I/O 작업이나 병렬로 여러 독립적인 계산을 수행할 때 효율적입니다.
    • 작업 상태 확인: 작업이 현재 실행 중인지, 완료되었는지, 오류가 발생했는지 등을 확인할 수 있습니다.
    • 결과 회수: 작업이 완료되면 get() 메서드를 통해 최종 결과를 가져올 수 있습니다.
  • 주요 메서드/속성:
    • result.get(timeout=None): 작업의 결과를 반환합니다. 만약 작업이 아직 완료되지 않았다면, timeout이 지정되지 않은 경우 완료될 때까지 블록합니다. timeout이 지정되었는데 시간 내에 완료되지 않으면 TimeoutError를 발생시킵니다.
    • result.ready(): 작업이 완료되었으면 True를, 아직 완료되지 않았으면 False를 반환합니다.
    • result.successful(): 작업이 성공적으로 완료되었으면 True를, 예외 발생 등으로 실패했으면 False를 반환합니다. ready()가 True일 때만 의미가 있습니다.
    • result.wait(timeout=None): 작업이 완료될 때까지 기다립니다. get()과 비슷하지만 결과를 반환하지 않고, 단순히 기다리기만 합니다.

예시 코드:

import multiprocessing
import time

def long_running_task(number):
    """오래 걸리는 계산 작업"""
    print(f"Task {number}: 시작...")
    time.sleep(2) # 2초 동안 작업
    result = number * number
    print(f"Task {number}: 완료. 결과 = {result}")
    return result

if __name__ == "__main__":
    # 1. 프로세스 풀 생성 (여기서는 3개의 프로세스 사용)
    with multiprocessing.Pool(processes=3) as pool:
        print("------------- apply_async() 사용 예시 -------------")
        # 2. 작업을 비동기적으로 제출하고 AsyncResult 객체 받기
        async_result1 = pool.apply_async(long_running_task, (5,))
        async_result2 = pool.apply_async(long_running_task, (10,))
        async_result3 = pool.apply_async(long_running_task, (15,))

        print("메인 프로그램은 다른 작업을 계속 수행합니다...")
        time.sleep(1) # 메인 스레드에서 다른 작업 수행

        # 3. AsyncResult 객체를 통해 작업 상태 확인 및 결과 가져오기
        print(f"Task 1 완료 여부: {async_result1.ready()}") # False가 출력될 수 있음

        # 결과를 기다리고 가져옴 (여기서 블록될 수 있음)
        final_result1 = async_result1.get()
        final_result2 = async_result2.get()
        final_result3 = async_result3.get()

        print(f"모든 apply_async 작업 결과: {final_result1}, {final_result2}, {final_result3}")

        print("\n------------- map_async() 사용 예시 -------------")
        numbers = [1, 2, 3, 4]
        # map_async으로 여러 작업을 동시에 제출
        map_async_result = pool.map_async(long_running_task, numbers)

        print("메인 프로그램은 map_async이 완료될 때까지 기다립니다...")
        # 모든 map_async 작업이 완료될 때까지 기다리고 결과 가져오기
        final_map_results = map_async_result.get()
        print(f"map_async 작업 결과: {final_map_results}")

    print("모든 풀 작업이 종료되었습니다.")

이 예시에서 apply_async()와 map_async()를 통해 작업을 제출할 때 즉시 AsyncResult 객체를 반환하는 것을 볼 수 있습니다. 메인 프로그램은 이 객체들을 가지고 작업의 완료를 기다리거나, 다른 작업을 수행한 후 나중에 결과를 가져올 수 있습니다.

728x90