개발/Python

[Python] 병렬 api 호출 및 적재

joon95 2022. 9. 19. 18:14
반응형

테스트 케이스를 고민하다가 특정 api 를 호출하고 응답 및 헤더 데이터를 postgresql 에 적재하는 것을 생각해보았다.

스프링으로 구현할까 했는데, 병렬처리를 검색해보니 파이썬에서 RAY 라이브러리를 통해 멀티프로세싱을 쉽게 구현할 수 있어 한번 써볼까 싶어 작성한다.

 

데이터베이스(PostgreSQL)

간단하게 Docker로 올려두었다.

 

Create DB

데이터베이스 이름은 'mydata' 로 하였다.

create database mydata;

데이터베이스가 생성되면 \c <database_name> 명령어를 통해 연결한다.

postgres=# \c mydata
You are now connected to database "mydata" as user "postgres".

Create Table

테이블 이름은 'api_log'로 지정하였으며, 자동으로 시퀀스를 증가시키는(Auto increment) 기능인 SERIAL을 추가하였다.

간단히 요청 헤더를 넣을 값과 api 요청응답값을 저장할 수 있도록 설계하였다.

create table api_log (
	id SERIAL PRIMARY KEY,
	x_api_tran_id varchar(22),
	status_code int,
	token text
);

\d 명령어를 통해 생성된 테이블의 정보를 확인할 수 있다.

               List of relations
 Schema |      Name      |   Type   |  Owner
--------+----------------+----------+----------
 public | api_log        | table    | postgres
 public | api_log_id_seq | sequence | postgres
(2 rows)

간단한 데이터 입력 테스트

insert into api_log values ('1','test_tran_id',100,'Bearer ej');
INSERT 0 1

select * from api_log;
 id | x_api_tran_id | status_code |   token
----+---------------+-------------+-----------
  1 | test_tran_id  |         100 | Bearer ej
(1 row)

 

파이썬(Python)

라이브러리를 먼저 설명하겠다.

- requests : API 요청

- pgsycopg2-binary : PostgreSQL

- ray : 멀티프로세싱

- ray[default] : ray dashboard

 

위 라이브러리를 pip 로 설치하자

pip install <liberary>

PostgreSQL

먼저 데이터베이스에 접속한 뒤 insert query를 한번 날려보자.

import psycopg2

# 변수 할당
HOST = '127.0.0.1'
DBNAME = 'mydata'
USER = 'postgres'
PASS = 'joon1234!@#$'
PORT = '5432'

# postgresql 접속
con = psycopg2.connect(host=HOST, dbname=DBNAME, user=USER, password=PASS, port=PORT)
cur = con.cursor()

# Insert Query
cur.execute("INSERT INTO api_log (x_api_tran_id, status_code, token) VALUES (%s, %s, %s);", ('test',200,'ej~'))
con.commit()

# 할당 해제
cur.close()
con.close()

 

Api 호출

필자의 API는 헤더에 user_key 값을 체크하여 응답을 주는 형식이다. API를 요청해보자.

(x-api-tran-id는 마이데이터 사업 때 생각하며 그냥 난수생성 해보았다.)

import requests
import string
import random
import math

# 샘플토큰(size 채우기용도)
TOKEN = 'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPbXhuSXYtRVNKY0o0NXM2MXFPWHBRanNTc1FfcTA1WG1pc2JpMm1CYUNnIn0.eyJleHAiOjE2NDk5OTY3MDksImlhdCI6MTY0OTk5NTQwOCwianRpIjoiMWEwY2M5Y2EtZWJlZi00OGQyLTg3NTQtN2FhMDUxNDNjNzc3IiwiaXNzIjoiaHR0cHM6Ly9rZXljbG9hay1yaHNzby5hcHBzLm9jcDQuY29mZmVlLm10cC9hdXRoL3JlYWxtcy9teWRhdGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoiMTc5NjE5ZmYtZWYzMS00OWZkLThmYjItNmY1MDI0N2VlMjMxIiwidHlwIjoiQmVhcmVyIiwiYXpwIjoiYjJiNGRmYWMiLCJzZXNzaW9uX3N0YXRlIjoiOTMzMGRjOTUtNGVkOS00MzVhLWJjMTUtMmQ3M2QzMTFlMGQ5IiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsImRlZmF1bHQtcm9sZXMtbXlkYXRhIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJiMmI0ZGZhYyI6eyJyb2xlcyI6WyJ1bWFfcHJvdGVjdGlvbiJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvZmZsaW5lX2FjY2VzcyBjYXJkLmxpc3QgY2FyZC5jYXJkIiwic2lkIjoiOTMzMGRjOTUtNGVkOS00MzVhLWJjMTUtMmQ3M2QzMTFlMGQ5In0.jLpt6jY-gLA9LNcKrBG56RhNo3QTg7545NK_boKTdUY9Fi79jqdYNItgpdRSVSTUp6rVsZgkjmwhQQE-0tL_066bBtDBZ8ygVivm59kkads7zrwaIBwhIwkFV38Cyn_NzDmQCYIVY1oxvP2IlB3YTOMgiZU3Y-ur32JlCdc4-tf-SD4Jj75pgF-P687KSydIPEJLKVY9HX2-64rtnWcde_79py6SXGsYKzj4gJWjY7RfGEdKuB1P1VEtCJDTqqej4MvaTFd-7fYCjgqVbNppCCbontfj-QadCB1VV-b5yxGGE4YoMy_tk3GuETkoCN6ij7VVaRvaxWvKwfdgrYkIhg'

# 난수 생성
BUR_ID = 'ZVAA010101'
length_of_string = 12
TRAN_ID = BUR_ID+''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length_of_string))

# 헤더 조합
header = {
    'Authorization':'Bearer '+TOKEN,
    'x-api-tran-id':TRAN_ID,
    'user_key':'456c704c049123a58e60b3501aa3147b'
}
# api 요청
response = requests.get("https://backend-api", verify=False, headers = header)
response.status_code

 

Ray

먼저 ray 사용을 위해 라이브러리를 설치해준다

pip install ray
pip install ray[default] # 대시보드를 위함
pip install redis # 클러스터링을 위함

conda Prompt에서 ~/anaconda3/Scripts/ 폴더로 이동한 뒤 ray cluster를 기동한다.

(바로 ray.init() 해서 사용해도 되는데, 대시보드 사용해보고 싶어서 라이브러리를 다운했더니 자꾸 cluster서버를 찾아버려서...이렇게 했다)

대시보드 default port 는 8265

ray start --head --num-cpus 8 --num-gpus 1 --include-dashboard 1 --dashboard-host 0.0.0.0 --dashboard-port 8265 --node-ip-address 127.0.0.1

 

이제 ray cluster에 붙여보자. 주피터 노트북으로 이동한 뒤 위 명령어를 기동!

 

#ray 초기화 1회만
import ray
ray.init(address='auto', _node_ip_address='127.0.0.1')

그럼 이렇게 ray 버전과, 대시보드 url 이 출력되고, 들어가보면 깔끔한(?) 페이지가 나온다

이 상태로 ray 를 적용시키려면 함수를 정의하고 @ray.remote 를 붙여주면 된다.

실행 방법은 함수명.remote()

@ray.remote
def api_access():
	내용...
# 호출
api_access.remote()

반복문을 이용한 호출

[api_access.remote() for i in range(100)]

 

참고로 conda prompt 에서 ray status로 현재 상태를 볼 수 있다.

10000번 호출 눌러놨더니 8코어로 부족한... 팬딩상태의 프로세스들을 볼 수 있다.

테이블에 잘 쌓이고 있는지 카운트 체크..

 

최종 소스

import string
import random
import requests
import psycopg2
import math
import datetime

@ray.remote
def api_access():
    # postgresql 접속
    HOST = '127.0.0.1'
    DBNAME = 'mydata'
    USER = 'postgres'
    PASS = 'joon1234!@#$'
    PORT = '5432'
    con = psycopg2.connect(host=HOST, dbname=DBNAME, user=USER, password=PASS, port=PORT)
    cur = con.cursor()

    # 난수 생성
    BUR_ID = 'ZVAA010101'
    length_of_string = 12
    TRAN_ID = BUR_ID+''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length_of_string))

    # 샘플토큰(size 채우기용도)
    TOKEN = 'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPbXhuSXYtRVNKY0o0NXM2MXFPWHBRanNTc1FfcTA1WG1pc2JpMm1CYUNnIn0.eyJleHAiOjE2NDk5OTY3MDksImlhdCI6MTY0OTk5NTQwOCwianRpIjoiMWEwY2M5Y2EtZWJlZi00OGQyLTg3NTQtN2FhMDUxNDNjNzc3IiwiaXNzIjoiaHR0cHM6Ly9rZXljbG9hay1yaHNzby5hcHBzLm9jcDQuY29mZmVlLm10cC9hdXRoL3JlYWxtcy9teWRhdGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoiMTc5NjE5ZmYtZWYzMS00OWZkLThmYjItNmY1MDI0N2VlMjMxIiwidHlwIjoiQmVhcmVyIiwiYXpwIjoiYjJiNGRmYWMiLCJzZXNzaW9uX3N0YXRlIjoiOTMzMGRjOTUtNGVkOS00MzVhLWJjMTUtMmQ3M2QzMTFlMGQ5IiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsImRlZmF1bHQtcm9sZXMtbXlkYXRhIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJiMmI0ZGZhYyI6eyJyb2xlcyI6WyJ1bWFfcHJvdGVjdGlvbiJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvZmZsaW5lX2FjY2VzcyBjYXJkLmxpc3QgY2FyZC5jYXJkIiwic2lkIjoiOTMzMGRjOTUtNGVkOS00MzVhLWJjMTUtMmQ3M2QzMTFlMGQ5In0.jLpt6jY-gLA9LNcKrBG56RhNo3QTg7545NK_boKTdUY9Fi79jqdYNItgpdRSVSTUp6rVsZgkjmwhQQE-0tL_066bBtDBZ8ygVivm59kkads7zrwaIBwhIwkFV38Cyn_NzDmQCYIVY1oxvP2IlB3YTOMgiZU3Y-ur32JlCdc4-tf-SD4Jj75pgF-P687KSydIPEJLKVY9HX2-64rtnWcde_79py6SXGsYKzj4gJWjY7RfGEdKuB1P1VEtCJDTqqej4MvaTFd-7fYCjgqVbNppCCbontfj-QadCB1VV-b5yxGGE4YoMy_tk3GuETkoCN6ij7VVaRvaxWvKwfdgrYkIhg'

    # 헤더 조합
    header = {
        'Authorization':'Bearer '+TOKEN,
        'x-api-tran-id':TRAN_ID,
        'user_key':'456c704c049123a58e60b3501aa3147b'
    }

    # api 요청
    response = requests.get("https://backend-api", verify=False, headers = header)

    cur.execute("INSERT INTO api_log (x_api_tran_id, status_code, token) VALUES (%s, %s, %s);",
               (TRAN_ID,response.status_code,TOKEN))
    con.commit()


    # 할당 해제
    cur.close()
    con.close()
    #print(con)

 

중지

conda prompt 에서 클러스터 중지

ray stop

주피터 노트북에서 ray 중지

ray.shutdown()

 

마치며

대학생 2학년때 python을 처음 배웠는데, 벌써 4년이나 지났다.

타 개발언어 대비 간단하게(?) 결과물을 도출할 수 있는 최고의 언어 파이썬!

좋은 시간이 되었다.

반응형