본문 바로가기

Programming

[Library] Dask 추가 정보 및 테스트

728x90
반응형

지난 포스트에 이어 Dask의 추가적인 특징 및 pandas aggregation과 간단히 비교한 내용을 올리고자 한다. 

 

[Library] Dask? Spark?

캐글을 진행하며 중용량(?) 데이터(약 16GB ~ 40GB)를 처리하게 되었다. 파일 포맷과 데이터 타입, 값 변환 등의 처리만으로는 부족하다 싶어 대용량 데이터 처리를 위한 도구를 모색하게 되었다. 대

sha-sha-sha.tistory.com


특징

1. 가상 데이터 프레임

  • pandas dataframe과 비슷한 기능을 제공하지만 모든 데이터가 메모리에 있는 것은 아님 → 하나 이상의 파일 또는 데이터베이스에 존재하는 채로 처리할 수 있는 기능
  • 메모리 크기와 관계 없이 큰 csv 파일을 가상 데이터 프레임으로 로드 또는 비슷한 형식의 여러개의 csv 파일을 하나의 데이터 프레임에 로드 가능
  • Apache Spark의 Transformation, Action 개념과 유사

2. 병렬 처리용 작업 스케쥴러

  • 대량 데이터의 분석 작업을 돕기 위한 작업 스케줄러(task scheduler) 제공 ⇒ 하나의 작업을 여러개의 쓰레드, 프로세스, 노드 등이 나누어 분담
  • 종류
    • dask.get : 단일 쓰레드
    • dask.threaded.get : 멀티 쓰레드 풀(pool)
    • dask.multiprocessing.get : 멀티 프로세스 풀
    • distributed.Client.get : 여러대의 컴퓨터에서 분산처리
< 프로세스>
- 사전적 의미 : 컴퓨터에서 연속적으로 실행되고 있는 컴퓨터 프로그램 / 메모리에 올라와 실행되고 있는 프로그램의 인스턴스(독립적인 개체) / 실행된 프로그램
- 할당받는 시스템 자원의 예 cpu 시간 / 운영에 필요한 주소 공간 / Code, Data, Stack, Heap구조로 되어 있는 독립된 메모리 영역
- 특징
    - 기본적으로 프로세스당 최소 1개의 스레드(메인스레드)가짐
    - 한 프로세스는 다른 프로세스의 변수나 자료구조에 접근 X (프로세스간의 통신으로 다른 프로세스 자원에 접근)
<쓰레드>
- 사전적 의미 : 프로세스 내에서 실행되는 여러 흐름의 단위 / 프로세스의 특정한 수행 경로 / 프로세스가 할당받은 자원을 이용하는 실행 단위
- 특징
    - 쓰레드는 프로세스 내에서 Stack만 따로 할당 / Code, Data, Heap영역은 공유 - 한 프로세스 내에서 동작되는 여러 실행의 흐름. 같은 프로세스 안에서 여러 쓰레드들은 같은 힙 공간 공유
    - 한 쓰레드가 프로세스 자원을 변경하면 다른 이웃 쓰레드(sibling thread)도 해당 변경 결과를 즉히 볼 수 있다.
< 멀티 프로세스>
- 의미 : 하나의 응용프로그램을 여러개의 프로세스로 구성, 각 프로세스가 하나의 작업(태스크)를 처리하도록 하는 것
- 장점 : 여러개의 자식 프로세스 중 하나에 문제 발생 → 해당 자식 프로세스만 죽고 다른 영향이 확산 X
- 단점
    - Context Switching (CPU에서 여러 프로세스 돌아가면서 작업을 처리하는 것) 과정에서 메모리 초기화등의 무거운 작업, 많은 시간등의 오버 헤드 발생
    - 프로세스 사이의 복잡한 통신 기법
<멀티 쓰레드>
- 의미 : 하나의 응용 프로그램을 여러 개의 쓰레드로 구성. 각 쓰레드에서 하나의 작업 처리 / 많은 운영체제들이 멀티 쓰레딩을 기본으로
- 장점
    - 시스템 자원 소모 감소
    - 시스템 처리량 증가 (처리 비용 감소)
    - 간단한 통신 방법으로 프로그램 응답 시간 단축
- 단점
    -설계 중요
    - 디버깅이 까다로움
    - 단일 프로세스 시스템에서는 효과 기대 어려움
    - 자원 공유 문제 발생
    - 하나의 쓰레드에 문제 발생 시 전체 프로세스 영향 받음
    - 자원 공유는 전역 변수 (데이터 세그먼트)를 이용하므로 함께 상용 시 충돌 발생 가능성 있음

 

설치

통합설치

pip install "dask[complete]"

 

구조

DataFrame

 

Groupby 비교

1. 96개 열의 단일 aggregation

Dask
수행

start = timeit.default_timer()
ds_test = ds_data[['customer_ID']+cols_D].groupby('customer_ID').mean().compute()
print('Dask Elapse: ', timeit.default_timer() - start)

결과

Dask Elapse:  32.0115955
Dask Elapse:  19.39781479999988

D_39  ...     D_145
customer_ID                                                   ...          
0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fb...  0.010705  ...  0.005812
00000fd6641609c6ece5454664794f0340ad84dddce9a26...  0.215144  ...  0.004902
00001b22f846c82c51f6e3958ccd81970162bae8b007e80...  0.004181  ...  0.004500
000041bdba6ecadd89a52d11886e8eaaec9325906c97233...  0.048866  ...  0.005235
00007889e4fcd2614b6cbe7f8f3d2e5c728eca32d9eb8ad...  0.004643  ...  0.004218
                                                      ...  ...       ...
ffff41c8a52833b56430603969b9ca48d208e7c192c6a40...  0.119216  ...  0.005315
ffff518bb2075e4816ee3fe9f3b152c57fc0e6f01bf7fdd...  0.066406  ...  0.005620
ffff9984b999fccb2b6127635ed0736dda94e544e67e026...  0.221605  ...  0.003916
ffffa5c46bc8de74f5a4554e74e239c8dee6b9baf388145...  0.030874  ...  0.186599
fffff1d38b785cef84adeace64f8f83db3a0c31e8d92eab...  0.290415  ...  0.005934

Pandas

수행

start = timeit.default_timer()
pd_test = data.loc[:, ['customer_ID']+cols_D].groupby('customer_ID').mean()
print('Pandas Elapse: ', timeit.default_timer() - start)

결과

Pandas Elapse:  13.098526000000106

D_39  ...     D_145
customer_ID                                                   ...          
0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fb...  0.010704  ...  0.005814
00000fd6641609c6ece5454664794f0340ad84dddce9a26...  0.215210  ...  0.004902
00001b22f846c82c51f6e3958ccd81970162bae8b007e80...  0.004181  ...  0.004498
000041bdba6ecadd89a52d11886e8eaaec9325906c97233...  0.048859  ...  0.005234
00007889e4fcd2614b6cbe7f8f3d2e5c728eca32d9eb8ad...  0.004642  ...  0.004219
                                                      ...  ...       ...
ffff41c8a52833b56430603969b9ca48d208e7c192c6a40...  0.119202  ...  0.005314
ffff518bb2075e4816ee3fe9f3b152c57fc0e6f01bf7fdd...  0.066406  ...  0.005623
ffff9984b999fccb2b6127635ed0736dda94e544e67e026...  0.221558  ...  0.003918
ffffa5c46bc8de74f5a4554e74e239c8dee6b9baf388145...  0.030884  ...  0.186646
fffff1d38b785cef84adeace64f8f83db3a0c31e8d92eab...  0.290527  ...  0.005936

 

2. 여러 Task 수행

Dask

수행

start = timeit.default_timer()
test2_data = ds_data[['customer_ID']+cols_D]
groupby_col_name = 'customer_ID'

feats = ['avg', 'min', 'max']

for feat in feats:
    if feat == 'avg':
        feat_df1 = test2_data.groupby(groupby_col_name).mean().add_suffix('_' + feat)
    elif feat == 'min':
        feat_df2 = test2_data.groupby(groupby_col_name).min().add_suffix('_' + feat)
    elif feat == 'max':
        feat_df3 = test2_data.groupby(groupby_col_name).max().add_suffix('_' + feat)
    else:
        pass

result = dd.concat([feat_df1, feat_df2, feat_df3], axis=1)
result_temp = result.compute()
print('Dask Elapse: ', timeit.default_timer() - start)

결과

Dask Elapse:  36.87883160000001

result_temp.head()
Out[36]: 
                                                    D_39_avg  ...  D_145_max
customer_ID                                                   ...           
0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fb...  0.010705  ...   0.009827
00000fd6641609c6ece5454664794f0340ad84dddce9a26...  0.215144  ...   0.009392
00001b22f846c82c51f6e3958ccd81970162bae8b007e80...  0.004181  ...   0.006931
000041bdba6ecadd89a52d11886e8eaaec9325906c97233...  0.048866  ...   0.009834
00007889e4fcd2614b6cbe7f8f3d2e5c728eca32d9eb8ad...  0.004643  ...   0.009666*

 

Pandas

수행

def make_basic_summ_features(df: pd.DataFrame, groupby_col_name:str) -> pd.DataFrame:
    """
    :param df: groupby 컬럼을 포함한 dataframe
    :param groupby_col_name: groupby 적용할 컬럼명 (ex:id)
    :return:
    """
    feats = ['avg', 'min', 'max']
    result = pd.DataFrame()
    for feat in feats:
        if feat == 'avg':
            feat_df = df.groupby(groupby_col_name).mean().add_suffix('_'+feat)
            result = pd.concat([result, feat_df], axis=1)

        elif feat == 'min':
            feat_df = df.groupby(groupby_col_name).min().add_suffix('_'+feat)
            result = pd.concat([result, feat_df], axis=1)
        elif feat == 'max':
            feat_df = df.groupby(groupby_col_name).max().add_suffix('_'+feat)
            result = pd.concat([result, feat_df], axis=1)
        else:
            pass
    return result


start = timeit.default_timer()
pd_result = make_basic_summ_features(data.loc[:, ['customer_ID']+cols_D], groupby_col_name)
print('Pandas Elapse: ', timeit.default_timer() - start)

결과

Pandas Elapse:  45.59000760000026

pd_result
Out[38]: 
                                                    D_39_avg  ...  D_145_max
customer_ID                                                   ...           
0000099d6bd597052cdcda90ffabf56573fe9d7c79be5fb...  0.010704  ...   0.009827
00000fd6641609c6ece5454664794f0340ad84dddce9a26...  0.215210  ...   0.009392
00001b22f846c82c51f6e3958ccd81970162bae8b007e80...  0.004181  ...   0.006931
000041bdba6ecadd89a52d11886e8eaaec9325906c97233...  0.048859  ...   0.009834
00007889e4fcd2614b6cbe7f8f3d2e5c728eca32d9eb8ad...  0.004642  ...   0.009666
                                                      ...  ...        ...
ffff41c8a52833b56430603969b9ca48d208e7c192c6a40...  0.119202  ...   0.009865
ffff518bb2075e4816ee3fe9f3b152c57fc0e6f01bf7fdd...  0.066406  ...   0.007778
ffff9984b999fccb2b6127635ed0736dda94e544e67e026...  0.221558  ...   0.008537
ffffa5c46bc8de74f5a4554e74e239c8dee6b9baf388145...  0.030884  ...   0.191406
fffff1d38b785cef84adeace64f8f83db3a0c31e8d92eab...  0.290527  ...   0.009293

 

결론

최적화 없이 단일 작업을 실행하면 기존 pandas와 다른점은 없어 보이지만 작업이 추가되면서 필요시점에 수행 및 분산 작업을 잘 적용한다면 유용한 도구로 느껴짐

728x90
반응형

'Programming' 카테고리의 다른 글

파이썬 로그 핸들링  (0) 2022.08.01
파이썬 예외처리  (0) 2022.08.01
Black을 이용한 코드 스타일 맞추기  (0) 2022.07.14
[Library] Dask? Spark?  (0) 2022.07.05
정규표현식 / 파이썬 예제 (re)  (0) 2022.06.27