[Python] 병렬 api 호출 및 적재
테스트 케이스를 고민하다가 특정 api 를 호출하고 응답 및 헤더 데이터를 postgresql 에 적재하는 것을 생각해보았다.
스프링으로 구현할까 했는데, 병렬처리를 검색해보니 파이썬에서 RAY 라이브러리를 통해 멀티프로세싱을 쉽게 구현할 수 있어 한번 써볼까 싶어 작성한다.
데이터베이스(PostgreSQL)
간단하게 Docker로 올려두었다.
Create DB
데이터베이스 이름은 'mydata' 로 하였다.
create database mydata;
데이터베이스가 생성되면 \c <database_name> 명령어를 통해 연결한다.
postgres=# \c mydata
You are now connected to database "mydata" as user "postgres".
Create Table
테이블 이름은 'api_log'로 지정하였으며, 자동으로 시퀀스를 증가시키는(Auto increment) 기능인 SERIAL을 추가하였다.
간단히 요청 헤더를 넣을 값과 api 요청응답값을 저장할 수 있도록 설계하였다.
create table api_log (
id SERIAL PRIMARY KEY,
x_api_tran_id varchar(22),
status_code int,
token text
);
\d 명령어를 통해 생성된 테이블의 정보를 확인할 수 있다.
List of relations
Schema | Name | Type | Owner
--------+----------------+----------+----------
public | api_log | table | postgres
public | api_log_id_seq | sequence | postgres
(2 rows)
간단한 데이터 입력 테스트
insert into api_log values ('1','test_tran_id',100,'Bearer ej');
INSERT 0 1
select * from api_log;
id | x_api_tran_id | status_code | token
----+---------------+-------------+-----------
1 | test_tran_id | 100 | Bearer ej
(1 row)
파이썬(Python)
라이브러리를 먼저 설명하겠다.
- requests : API 요청
- pgsycopg2-binary : PostgreSQL
- ray : 멀티프로세싱
- ray[default] : ray dashboard
위 라이브러리를 pip 로 설치하자
pip install <liberary>
PostgreSQL
먼저 데이터베이스에 접속한 뒤 insert query를 한번 날려보자.
import psycopg2
# 변수 할당
HOST = '127.0.0.1'
DBNAME = 'mydata'
USER = 'postgres'
PASS = 'joon1234!@#$'
PORT = '5432'
# postgresql 접속
con = psycopg2.connect(host=HOST, dbname=DBNAME, user=USER, password=PASS, port=PORT)
cur = con.cursor()
# Insert Query
cur.execute("INSERT INTO api_log (x_api_tran_id, status_code, token) VALUES (%s, %s, %s);", ('test',200,'ej~'))
con.commit()
# 할당 해제
cur.close()
con.close()
Api 호출
필자의 API는 헤더에 user_key 값을 체크하여 응답을 주는 형식이다. API를 요청해보자.
(x-api-tran-id는 마이데이터 사업 때 생각하며 그냥 난수생성 해보았다.)
import requests
import string
import random
import math
# 샘플토큰(size 채우기용도)
TOKEN = 'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPbXhuSXYtRVNKY0o0NXM2MXFPWHBRanNTc1FfcTA1WG1pc2JpMm1CYUNnIn0.eyJleHAiOjE2NDk5OTY3MDksImlhdCI6MTY0OTk5NTQwOCwianRpIjoiMWEwY2M5Y2EtZWJlZi00OGQyLTg3NTQtN2FhMDUxNDNjNzc3IiwiaXNzIjoiaHR0cHM6Ly9rZXljbG9hay1yaHNzby5hcHBzLm9jcDQuY29mZmVlLm10cC9hdXRoL3JlYWxtcy9teWRhdGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoiMTc5NjE5ZmYtZWYzMS00OWZkLThmYjItNmY1MDI0N2VlMjMxIiwidHlwIjoiQmVhcmVyIiwiYXpwIjoiYjJiNGRmYWMiLCJzZXNzaW9uX3N0YXRlIjoiOTMzMGRjOTUtNGVkOS00MzVhLWJjMTUtMmQ3M2QzMTFlMGQ5IiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsImRlZmF1bHQtcm9sZXMtbXlkYXRhIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJiMmI0ZGZhYyI6eyJyb2xlcyI6WyJ1bWFfcHJvdGVjdGlvbiJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvZmZsaW5lX2FjY2VzcyBjYXJkLmxpc3QgY2FyZC5jYXJkIiwic2lkIjoiOTMzMGRjOTUtNGVkOS00MzVhLWJjMTUtMmQ3M2QzMTFlMGQ5In0.jLpt6jY-gLA9LNcKrBG56RhNo3QTg7545NK_boKTdUY9Fi79jqdYNItgpdRSVSTUp6rVsZgkjmwhQQE-0tL_066bBtDBZ8ygVivm59kkads7zrwaIBwhIwkFV38Cyn_NzDmQCYIVY1oxvP2IlB3YTOMgiZU3Y-ur32JlCdc4-tf-SD4Jj75pgF-P687KSydIPEJLKVY9HX2-64rtnWcde_79py6SXGsYKzj4gJWjY7RfGEdKuB1P1VEtCJDTqqej4MvaTFd-7fYCjgqVbNppCCbontfj-QadCB1VV-b5yxGGE4YoMy_tk3GuETkoCN6ij7VVaRvaxWvKwfdgrYkIhg'
# 난수 생성
BUR_ID = 'ZVAA010101'
length_of_string = 12
TRAN_ID = BUR_ID+''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length_of_string))
# 헤더 조합
header = {
'Authorization':'Bearer '+TOKEN,
'x-api-tran-id':TRAN_ID,
'user_key':'456c704c049123a58e60b3501aa3147b'
}
# api 요청
response = requests.get("https://backend-api", verify=False, headers = header)
response.status_code
Ray
먼저 ray 사용을 위해 라이브러리를 설치해준다
pip install ray
pip install ray[default] # 대시보드를 위함
pip install redis # 클러스터링을 위함
conda Prompt에서 ~/anaconda3/Scripts/ 폴더로 이동한 뒤 ray cluster를 기동한다.
(바로 ray.init() 해서 사용해도 되는데, 대시보드 사용해보고 싶어서 라이브러리를 다운했더니 자꾸 cluster서버를 찾아버려서...이렇게 했다)
대시보드 default port 는 8265
ray start --head --num-cpus 8 --num-gpus 1 --include-dashboard 1 --dashboard-host 0.0.0.0 --dashboard-port 8265 --node-ip-address 127.0.0.1
이제 ray cluster에 붙여보자. 주피터 노트북으로 이동한 뒤 위 명령어를 기동!
#ray 초기화 1회만
import ray
ray.init(address='auto', _node_ip_address='127.0.0.1')
그럼 이렇게 ray 버전과, 대시보드 url 이 출력되고, 들어가보면 깔끔한(?) 페이지가 나온다
이 상태로 ray 를 적용시키려면 함수를 정의하고 @ray.remote 를 붙여주면 된다.
실행 방법은 함수명.remote()
@ray.remote
def api_access():
내용...
# 호출
api_access.remote()
반복문을 이용한 호출
[api_access.remote() for i in range(100)]
참고로 conda prompt 에서 ray status로 현재 상태를 볼 수 있다.
10000번 호출 눌러놨더니 8코어로 부족한... 팬딩상태의 프로세스들을 볼 수 있다.
테이블에 잘 쌓이고 있는지 카운트 체크..
최종 소스
import string
import random
import requests
import psycopg2
import math
import datetime
@ray.remote
def api_access():
# postgresql 접속
HOST = '127.0.0.1'
DBNAME = 'mydata'
USER = 'postgres'
PASS = 'joon1234!@#$'
PORT = '5432'
con = psycopg2.connect(host=HOST, dbname=DBNAME, user=USER, password=PASS, port=PORT)
cur = con.cursor()
# 난수 생성
BUR_ID = 'ZVAA010101'
length_of_string = 12
TRAN_ID = BUR_ID+''.join(random.choice(string.ascii_letters + string.digits) for _ in range(length_of_string))
# 샘플토큰(size 채우기용도)
TOKEN = 'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJPbXhuSXYtRVNKY0o0NXM2MXFPWHBRanNTc1FfcTA1WG1pc2JpMm1CYUNnIn0.eyJleHAiOjE2NDk5OTY3MDksImlhdCI6MTY0OTk5NTQwOCwianRpIjoiMWEwY2M5Y2EtZWJlZi00OGQyLTg3NTQtN2FhMDUxNDNjNzc3IiwiaXNzIjoiaHR0cHM6Ly9rZXljbG9hay1yaHNzby5hcHBzLm9jcDQuY29mZmVlLm10cC9hdXRoL3JlYWxtcy9teWRhdGEiLCJhdWQiOiJhY2NvdW50Iiwic3ViIjoiMTc5NjE5ZmYtZWYzMS00OWZkLThmYjItNmY1MDI0N2VlMjMxIiwidHlwIjoiQmVhcmVyIiwiYXpwIjoiYjJiNGRmYWMiLCJzZXNzaW9uX3N0YXRlIjoiOTMzMGRjOTUtNGVkOS00MzVhLWJjMTUtMmQ3M2QzMTFlMGQ5IiwiYWNyIjoiMSIsInJlYWxtX2FjY2VzcyI6eyJyb2xlcyI6WyJvZmZsaW5lX2FjY2VzcyIsImRlZmF1bHQtcm9sZXMtbXlkYXRhIiwidW1hX2F1dGhvcml6YXRpb24iXX0sInJlc291cmNlX2FjY2VzcyI6eyJiMmI0ZGZhYyI6eyJyb2xlcyI6WyJ1bWFfcHJvdGVjdGlvbiJdfSwiYWNjb3VudCI6eyJyb2xlcyI6WyJtYW5hZ2UtYWNjb3VudCIsIm1hbmFnZS1hY2NvdW50LWxpbmtzIiwidmlldy1wcm9maWxlIl19fSwic2NvcGUiOiJvZmZsaW5lX2FjY2VzcyBjYXJkLmxpc3QgY2FyZC5jYXJkIiwic2lkIjoiOTMzMGRjOTUtNGVkOS00MzVhLWJjMTUtMmQ3M2QzMTFlMGQ5In0.jLpt6jY-gLA9LNcKrBG56RhNo3QTg7545NK_boKTdUY9Fi79jqdYNItgpdRSVSTUp6rVsZgkjmwhQQE-0tL_066bBtDBZ8ygVivm59kkads7zrwaIBwhIwkFV38Cyn_NzDmQCYIVY1oxvP2IlB3YTOMgiZU3Y-ur32JlCdc4-tf-SD4Jj75pgF-P687KSydIPEJLKVY9HX2-64rtnWcde_79py6SXGsYKzj4gJWjY7RfGEdKuB1P1VEtCJDTqqej4MvaTFd-7fYCjgqVbNppCCbontfj-QadCB1VV-b5yxGGE4YoMy_tk3GuETkoCN6ij7VVaRvaxWvKwfdgrYkIhg'
# 헤더 조합
header = {
'Authorization':'Bearer '+TOKEN,
'x-api-tran-id':TRAN_ID,
'user_key':'456c704c049123a58e60b3501aa3147b'
}
# api 요청
response = requests.get("https://backend-api", verify=False, headers = header)
cur.execute("INSERT INTO api_log (x_api_tran_id, status_code, token) VALUES (%s, %s, %s);",
(TRAN_ID,response.status_code,TOKEN))
con.commit()
# 할당 해제
cur.close()
con.close()
#print(con)
중지
conda prompt 에서 클러스터 중지
ray stop
주피터 노트북에서 ray 중지
ray.shutdown()
마치며
대학생 2학년때 python을 처음 배웠는데, 벌써 4년이나 지났다.
타 개발언어 대비 간단하게(?) 결과물을 도출할 수 있는 최고의 언어 파이썬!
좋은 시간이 되었다.