MASOCON 2019 데모 자료 입니다.
발표 내용을 참고해주시면 됩니다.
- 사이트 : https://www.imaso.co.kr/masocon2019
- 발표자료 : https://www.slideshare.net/MinJunKim5/masocon-2019-serverless-kimminjun (슬라이드쉐어)
https://speakerdeck.com/microsoftware/masokon-2019-seobeoriseureul-hwalyonghan-bunsan-ceori-gimminjun (스피커덱 - 마소콘공식)
데모를 무엇으로 준비할지 고민이 엄청 많았습니다.
공공기관의 API는 대부분 한도가 있다보니 적절한 예시를 찾아야 했는데...
관세청에서 찾았습니다.
관세청API에서 '관세환율'을 이용했습니다.
먼저 관세청API에 대해 설명을 하자면, 1:N 의 데이터 입니다. XML로 리턴받으며 GET방식으로 1개씩 받아올 수 있습니다.
아래 이미지는 관세청 API 문서의 일부입니다.
(출처 : https://unipass.customs.go.kr)
형태는 일반적인 공공기관의 API와 크게 다르지는 않습니다.
요청메시지는 단순하게 날짜와 수출입 구분만 넣습니다.
응답 메시지는 국가부호/화폐단위명/환율/통화부호/적용개시일자/수출입구분 총 6개의 필드로 구성되어 있습니다.
요청/응답에 대한 예시입니다.
인증키는 개별적으로 신청하여 받으시면 됩니다. 과정에 대해서는 생략합니다.
구조도입니다.
1. AWS Console을 이용하여 실행.
2~5. API Gateway로 처리, Lambda에서 데이터(날짜범위)값을 갖고 옴.
6~8. SNS를 통해 Invoke된 Lambda는 각각 할당 받은 만큼 처리한다.
9. 처리가 완료된 데이터는 DynamoDB로 저장한다.
Lambda에 접근을 하면 좌측 메뉴중에 가장 하단에 '계층'이 존재합니다.
이미지는 제가 이미 등록해둔 라이브러리 패키징이네요. 등록한 'python36-lib'를 계속해서 재사용하여, Lambda를 실행합니다.
우선 위의 API에 필요한 날짜값을 갖고오는 Lambda를 준비합니다.
Designer에 Lambda이름 밑에 Layers를 등록되어 있는모습이 보입니다.
Layers는 최대 5개를 등록할수 있어요. (라이브러리가 중복되거나 하는 문제가 발생할 수 있습니다.)
그리고 API Gateway도 확인할 수 있어요. API Gateway를 생성하여 Lambda와 연결한 상태입니다.
API Gateway를 이용하여 처음 시작하는 Lambda가 위의 이미지의 Lambda를 호출해서 날짜값을 갖고 올꺼예요.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
#-*- coding:utf-8 -*-
import datetime
from datetime import date
import json
def lambda_handler(event, context):
strdate = ''
for i in range(1, 3660):
# for i in range(1, 31):
datedata = datetime.date.today() + datetime.timedelta(-i)
if (i != 1):
strdate = strdate + ',' + datedata.strftime('%Y%m%d')
else :
strdate = datedata.strftime('%Y%m%d')
# print(datedata.strftime('%Y%m%d'))
return strdate
# if __name__ == '__main__':
# lambda_handler('','');
|
cs |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
|
#-*- coding:utf-8 -*-
import json
import zeep
import urllib3
import xml.etree.ElementTree as ET
import time
import ast
import boto3
import requests
import os
import re
from urllib.request import urlopen
from requests.auth import HTTPBasicAuth
from requests import Session
from zeep import Client
from zeep.transports import Transport
from bs4 import BeautifulSoup
def errorCheck(msg) :
print(msg)
def get_unipass_data(list):
"""
# get_unipass_data
# (list)
"""
print('# [get_unipass_data] Count : ', len(list))
# print(list)
print(os.environ['ARN_SCRAPING'])
try:
print('# [SNS 전송 시작]')
client = boto3.client('sns')
response = client.publish(
TargetArn = os.environ['ARN_SCRAPING'],
Message = json.dumps({'default': json.dumps(list)}),
Subject = 'PY36_MASOCON_DEMO_UNIPASS_URL',
MessageStructure='json'
)
print('# [SNS 전송 끝]')
except Exception as e:
print('# [SNS 전송 오류]')
print(e)
return errorCheck(e,'')
return 'OK'
def url_list():
try:
headers = {
'x-api-key': os.environ['xapikey'],
'Content-type':'application/json;charset=utf-8',
'Accept':'*/*'
}
data = {}
resp = requests.post(os.environ['api_url'], headers=headers, data=json.dumps(data))
dict = resp.text.split(',')
url_list = []
if len(dict) > 0 :
for i in dict:
url_list.append(os.environ['unipass_url'] + i.replace("\"",""))
print('# 총 건수 : ', len(url_list))
data = []
if len(url_list) > 0 :
count = 0
total_group_count = 0
for i in url_list:
data.append(url_list[count])
count = count + 1
if (len(data)/36) == 1 :
get_unipass_data(data)
total_group_count = total_group_count + 1
data.clear()
#break
if len(data) > 0 :
total_group_count = total_group_count + 1
get_unipass_data(data)
data.clear()
print('# 36개씩 나눈 작업 개수 : ', total_group_count)
return url_list
except Exception as e:
print(e)
return errorCheck(e)
def lambda_handler(event, context):
url_list()
# if __name__ == '__main__':
# lambda_handler('','')
|
cs |
그럼 Amazon SNS를 통해 전달받은 Lambda를 보여드릴게요.
SNS가 연결되어 있는것을 볼 수 있습니다. 활성화도 되어 있구요.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
#-*- coding:utf-8 -*-
######################
############## lib
######################
import json
import zeep
import urllib3
import xml.etree.ElementTree as ET
import time
import ast
import boto3
from urllib.request import urlopen
from requests.auth import HTTPBasicAuth
from requests import Session
from zeep import Client
from zeep.transports import Transport
from bs4 import BeautifulSoup
from multiprocessing import Process
def dynamo_insert(list):
"""
# get_unipass_data
# (list)
"""
print('# [DynamoDB] Count : ', len(list))
try:
print('# [SNS 전송 시작]')
client = boto3.client('sns')
response = client.publish(
TargetArn = 'ARN값을 넣습니다...',
Message = json.dumps({'default': json.dumps(list)}),
Subject = 'PY36_MASOCON_DEMO_UNIPASS_SCRAPING',
MessageStructure='json'
)
print('# [SNS 전송 끝]')
except Exception as e:
print('# [SNS 전송 오류]')
print(e)
return errorCheck(e,'')
return 'OK'
def errorCheck(msg) :
print(msg)
def change_date_format(content):
if content != None:
return content[0:4]+'-'+ content[4:6] + '-' + content[6:8]
def Unipass_scraping(list):
count = 0
print('# [Unipass_scraping] Count : ', len(list))
err_url = ''
try:
if len(list) > 0 :
for url in list:
err_url = url
count = count + 1
tree = ET.ElementTree(file=urlopen(url))
tCnt = 0
tCnt = int(tree.find('tCnt').text)
print(tCnt)
list_trifFxrtInfoQryRsltVo = []
if tCnt > 0:
for item in tree.findall('trifFxrtInfoQryRsltVo'):
if item.find('cntySgn').text == "KR" :
trifFxrtInfoQryRsltVo = {
'cntySgn' : item.find('cntySgn').text if item.find('cntySgn').text != None else None,
'mtryUtNm' : item.find('mtryUtNm').text if item.find('mtryUtNm').text != None else None,
'fxrt' :item.find('fxrt').text if item.find('fxrt').text != None else None,
'aplyBgnDt' : change_date_format(item.find('aplyBgnDt').text) if item.find('aplyBgnDt').text != None else None,
'currSgn':item.find('currSgn').text if item.find('currSgn').text != None else None,
'imexTp' : 2
}
dynamo_insert(trifFxrtInfoQryRsltVo)
err_url = ''
print('success')
except Exception as e:
print('# [Unipass_scraping] Error : ', e)
print('# [Unipass_scraping] err_url : ', err_url)
# pass
return errorCheck(e,err_url)
def lambda_handler(event, context):
if 'API' in event :
print('# 작업명 (Event == API)')
job_type = event['API'][0]['Type']
message = json.dumps(event['API'][0]['Data'])
messageList = ast.literal_eval(message)
Unipass_scraping(messageList)
return job_type
elif 'Records' in event :
print('# 작업명 (Event == Records)')
job_type = event['Records'][0]['Sns']['Subject']
message = event['Records'][0]['Sns']['Message']
messageList = ast.literal_eval(message)
Unipass_scraping(messageList)
return job_type
else :
return 'not working'
|
cs |
핸들러는 이벤트 값을 받고, 이벤트 값에 제가 API 인지 Records인지 구분을 합니다.
발표내용을 보면 'API Gateway를 통해서 직접 실행할수 있게 구현했다.' 라는 내용이 있는데요. 그 내용입니다.
간단하게 구분값을 API / Records 로 했어요. 그리고 Subject에 타입을 넣어서 던졌어요.
실제 구현한 소스에는 Subject를 기준으로 분기처리도 했었어요.
message에는 데이터를 담았어요. 그래서 message 를 list로 만들어서 Unipass_scraping으로 던집니다.
처음에 언급한바와 같이 관세청 API는 1:N 이예요. 1개씩 던져서 여러행을 받습니다.
list 를 반복하여 관세청 API를 통신하고, 결과 값을 받아서 DynamoDB에 만든 TEST테이블에 Insert합니다.
(저는 리턴받은 데이터중 KR만 뽑아서 담았어요)
DynamoDB를 Insert하는 Lambda에 SNS를 이용해서 던집니다.
DynamoDB에서 테이블은 간단하게 1개의 키로만 구성했어요. (DynamoDB는 NoSQL입니다.)
DynamoDB에 Insert하는 Lambda의 소스입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
|
import json
import ast
import boto3
def dynamoDB_INSERT(data):
dynamodb = boto3.resource('dynamodb', region_name='ap-northeast-2')
dynamodb.Table('MASOCON_DEMO').put_item(
Item={
'DATA': data
}
)
return 'success'
def lambda_handler(event, context):
if 'API' in event :
print('# 작업명 (Event == API)')
job_type = event['API'][0]['Type']
message = json.dumps(event['API'][0]['Data'])
messageList = ast.literal_eval(message)
dynamoDB_INSERT(message)
return 'success'
elif 'Records' in event :
print('# 작업명 (Event == Records)')
job_type = event['Records'][0]['Sns']['Subject']
message = event['Records'][0]['Sns']['Message']
messageList = ast.literal_eval(message)
print(message)
dynamoDB_INSERT(message)
return 'success'
else :
# data = 'aaaa'
# dynamoDB_INSERT(data)
return 'not working'
|
cs |
'DEV > AWS' 카테고리의 다른 글
Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #2 (0) | 2019.05.14 |
---|---|
Pyhton(v3.6)으로 AWS를 활용한 분산 처리 #1 (0) | 2019.05.11 |
Visual Studio Code 로 .NET Core 2.1 사용하여 AWS Lambda 만들기 (0) | 2019.05.08 |