< 주가 일봉 데이터 저장 프로그램 개발 - 키움 API 구현>


1. "프로그램 개발 개요"

2. "프로그램 DB 설계 및 구현"


해당 포스트를 읽기 전에 이전 포스트를 읽기 바란다.


이번 포스트는 키움 OPEN API를 이용해 주가 일봉 데이터를 불러올 것이다. 이를 위해서 키움 OPEN API 사용법을 알아야 한다. 사용법은 아래 URL에서 자세히 알 수 있다. 기본적인 키움 OPEN API 사용법은 여기서 설명하지 않는다. 

2. "주식 데이터 불러오기(키움 open api 주식 알고리즘 테스트 프로그래밍)"



여기서 구현할 내용은 간단하다. 주식 종목 일봉 데이터와 코스피/코스닥 지수 일봉 데이터를 불러오는 기능만 구현하면 된다. 


주식 종목 일봉 데이터를 불러오는 키움 API TR은 'opt10081', 주식 일봉 차트 조회 요청이다. 종목 코드와 기준일자, 수정주가 구분을 입력 받는다. 주식 일봉 데이터는 기준일자로부터 내림차순으로 전송 받는다. 

 


코스피/코스닥 지수 일봉 데이터는 'opt20006', 업종 일봉 조회 요청 TR로 얻을 수 있다. 업종 코드로 '001'을 입력하면 코스피, '101'을 입력하면 코스닥 일봉 데이터를 얻을 수 있다. 

 


추가로 일봉 데이터를 불러올 때 별도의 조건식이 필요하다. 이전 포스트 "2. DB 설계 및 구현" 에서 "select_max_date" 메서드를 이용해 특정 종목의 가장 최근에 INSERT된 데이터의 날짜를 계산했다.(계산한 이유는 이전 포스트에서 설명했다.)  이 날짜를 이용해 해당 날짜보다 최근의 일봉 데이터만 추출해야 한다. 따라서 주가 일봉 데이터를 불러올 때 데이터에 대한 조건식이 붙는다. 이에 대해 차차 알아보자.


먼저 키움 OPEN API를 구현한 전체 코드를 보자. 파일명은 'Kiwoom.py'이다. 참고로 일봉 데이터는 프로그램을 실행한 날의 하루 전 데이터부터 불러온다. 따라서 어제 날짜를 계산해야 한다. 이는 '__init__(self)' 메서드에서 계산해 'self.day' 변수에 대입한다.

from PyQt5.QAxContainer import *
from PyQt5.QtCore import *
import time
import pandas as pd
from datetime import date, timedelta


class Kiwoom(QAxWidget):
    def __init__(self):
        super().__init__()

        self.day = (date.today() - timedelta(1)).strftime('%Y%m%d')
        self.stock_info = pd.DataFrame(columns=("Open","High","Low","Close","Volume"))


        self._create_kiwoom_instance()
        self._set_signal_slots()
        self._comm_connect()

# COM을 사용하기 위한 메서드
    def _create_kiwoom_instance(self):
        self.setControl("KHOPENAPI.KHOpenAPICtrl.1")

    def _set_signal_slots(self):
        # 로그인할 시 OnEventConnect 이벤트 발생
        self.OnEventConnect.connect(self._event_connect)
        # tr후 이벤트 발생
        self.OnReceiveTrData.connect(self._receive_tr_data)

    # 로그인 메서드, 로그인 과정에서 프로그램이 진행되면 안 되기 때문에
    # 이벤트 루프 생성
    def _comm_connect(self):
        self.dynamicCall("CommConnect()")
        self.login_event_loop = QEventLoop()
        self.login_event_loop.exec_()

    # 로그인 성공 여부 메서드
    def _event_connect(self, err_code):
        if err_code == 0:
            print("connected")
        else:
            print("disconnected")
        self.login_event_loop.exit()

    # tr을 서버에 전송한다
    def comm_rq_data(self, rqname, trcode, next, screen_no):
        self.dynamicCall("CommRqData(QString, QString, int, QString)", rqname, trcode, next, screen_no)
        self.tr_event_loop = QEventLoop()
        self.tr_event_loop.exec_()

    # 서버한테 받은 데이터를 반환한다.
    def _comm_get_data(self, code, real_type, field_name, index, item_name):
        ret = self.dynamicCall("CommGetData(QString, QString, QString, int, QString)", code,
                               real_type, field_name, index, item_name)
        return ret.strip()

    # 서버한테 받은 데이터의 갯수를 반환한다.
    def _get_repeat_cnt(self, trcode, rqname):
        ret = self.dynamicCall("GetRepeatCnt(QString, QString)", trcode, rqname)
        return ret

    # 종목 이름 반환. 반환 값이 ""이면 code에 해당하는 종목 없음
    def is_stock(self, code):
        ret = self.dynamicCall("GetMasterCodeName(QString)", [code])
        return ret

    # tr 입력값을 서버 통신 전에 입력
    # ex. SetInputValue("종목코드","000660")
    def set_input_value(self, id, value):
        self.dynamicCall("SetInputValue(QString,QString)", id, value)

    def _receive_tr_data(self, screen_no, rqname, trcode, record_name, next, unused1, unused2, unused3, unused4):
        if next == '2':
            self.remained_data = True
        else:
            self.remained_data = False

        if rqname == "opt10081_req":
            self._opt10081_20006(rqname, trcode)
        elif rqname == "opt20006_req":
            self._opt10081_20006(rqname, trcode)

        try:
            self.tr_event_loop.exit()
        except AttributeError:
            pass


    # 종목의 일자, 시가, 종가 데이터를 요청
    def req_stock_daily_value(self, code, day):
        self.stock_info = pd.DataFrame(columns=("Open", "High", "Low", "Close","Volume"))

        print("종목 일봉 데이터 요청")
        self.set_input_value("종목코드", code)
        self.set_input_value("기준일자", self.day)
        self.set_input_value("수정주가구분", "1")
        self.comm_rq_data("opt10081_req", "opt10081", 0, "2000")

        if day is not None:
            day = day.strftime('%Y%m%d')
            temp_data = self.stock_info[self.stock_info.index.values <= day]
            if temp_data.empty is False:
                self.stock_info = self.stock_info[self.stock_info.index.values>day]
                self.remained_data = False


        while self.remained_data:
            print("종목 일봉 데이터 추가 요청")
            time.sleep(0.2)
            self.set_input_value("종목코드", code)
            self.set_input_value("기준일자", self.day)
            self.set_input_value("수정주가구분", "1")
            self.comm_rq_data("opt10081_req", "opt10081", 2, "2000")
            if day is not None:
                temp_data = self.stock_info[self.stock_info.index.values <= day]
                if temp_data.empty is False:
                    self.stock_info = self.stock_info[self.stock_info.index.values > day]
                    break

        self.stock_info.index.name = "Date"
        return self.stock_info


    # 종목/지수 일자, 시가,저가,고가, 종가 데이터를 서버에서 가져오고 stock_info 에 저장
    def _opt10081_20006(self,rqname,trcode):
        data_cnt = self._get_repeat_cnt(trcode, rqname)
        for i in range(data_cnt):
            date = self._comm_get_data(trcode, "", rqname, i, "일자")
            open = self._comm_get_data(trcode, "", rqname, i, "시가")
            low = self._comm_get_data(trcode, "", rqname, i, "저가")
            High = self._comm_get_data(trcode, "", rqname, i, "고가")
            close = self._comm_get_data(trcode, "", rqname, i, "현재가")
            volume = self._comm_get_data(trcode, "", rqname, i, "거래량")
            self.stock_info = self.stock_info.append(pd.DataFrame({'Open':open, 'Low':low, 'High':High,'Close':close, 'Volume':volume}, index=[date]))


    # 코스피, 코스닥의 일자, 시가, 종가 데이터를 요청
    def req_index_daily_value(self, code, day):
        self.stock_info = pd.DataFrame(columns=("Open", "High", "Low", "Close", "Volume"))
        print("업종 일봉 데이터 요청")
        self.set_input_value("업종코드", code)
        self.set_input_value("기준일자", self.day)
        self.set_input_value("수정주가구분", "1")
        self.comm_rq_data("opt20006_req", "opt20006", 0, "1999")

        if day is not None:
            day = day.strftime('%Y%m%d')
            temp_data = self.stock_info[self.stock_info.index.values <= day]
            if temp_data.empty is False:
                self.stock_info = self.stock_info[self.stock_info.index.values>day]
                self.remained_data = False

        while self.remained_data:
            print("업종 일봉 데이터 추가 요청")
            time.sleep(0.2)
            self.set_input_value("업종코드", code)
            self.set_input_value("기준일자", self.day)
            self.set_input_value("수정주가구분", "1")
            self.comm_rq_data("opt20006_req", "opt20006", 2, "1999")

            if day is not None:
                temp_data = self.stock_info[self.stock_info.index.values <= day]
                if temp_data.empty is False:
                    self.stock_info = self.stock_info[self.stock_info.index.values > day]
                    break

        self.stock_info.index.name = "Date"
        return self.stock_info



먼저 종목 일봉 데이터를 요청하는 소스를 보자. 매개변수 'code'는 주식 종목/ETF의 코드 값이다. 'day'는 위에서 설명한 DB의 "select_max_date" 메서드의 결과값이 들어온다. 해당 변수를 이용해 조건식을 사용한다. 일봉 데이터는 'self.stock_info' DataFrame 객체에 넣는다. 매번 요청할 때마다 초기화가 필요하다. 여러 번 요청했을 시 초기화 하지 않으면 여러 종목의 데이터가 하나의 변수에 들어간다. TR로는 위에서 설명한 'opt10081'을 이용한다.   'self.comm_rq_data()'를 이용해 일봉 데이터를 요청한다. 요청 후 인자로 받은 'day'값이 'None'인지 확인한다. 'None'이라면 주식의 상장일까지 전체 일봉 데이터를 불러온다. 'None'이 아니라면 'day'일 이후 데이터만 필요하다. 따라서 'day'일 이후 데이터만 추출한다. 


'day'일 이후 데이터를 추출하는 방법으로 여러가지가 있다. 대표적으로 종목 일봉 데이터 하나 불러올 때마다 조건식을 이용해 'day'를 확인하는 방법과 종목 데이터를 한 번에 여러개 불러오는 대신 조건식 이용을 최소화하는 방법이 있다. 필자는 후자를 선택했다. 예로 1990년 이후부터 오늘까지 코스피 지수 일봉 데이터를 불러온다고 가정해보자. 약 8000개의 데이터를 불러온다. 만약 첫 번째 방법을 선택했을 시 조건문을 8000번 돌려야 한다. 그러나 후자의 방법을 사용하면 파이썬의 벡터 특성을 이용해 약 5번 정도만 조건문을 사용한다. 이에 대해 알아보자. 

'self.stock_info' 변수는 일봉 데이터의 날짜를 인덱스로 가진다.  self.stock_info[self.stock_info.index.values<=day)를 이용하면 day 이전 데이터만 추출할 수 있다. 'day' 변수를 'day.strftime()'으로 'str' 타입으로 바꾸는 이유는 'self.stock_info.index.values'가 'str' 타입으로 되있기 때문이다. 

추출한 temp_data 값이 empty인지 확인한다. empty가 아니라면 'day' 이전 데이터이 있다는 의미이므로 데이터 전송을 멈추고 'day' 이후 데이터만 추출한 후 최종 데이터를 리턴한다.

    # 종목의 일자, 시가, 종가 데이터를 요청
    def req_stock_daily_value(self, code, day):
        self.stock_info = pd.DataFrame(columns=("Open", "High", "Low", "Close","Volume"))

        print("종목 일봉 데이터 요청")
        self.set_input_value("종목코드", code)
        self.set_input_value("기준일자", self.day)
        self.set_input_value("수정주가구분", "1")
        self.comm_rq_data("opt10081_req", "opt10081", 0, "2000")

        if day is not None:
            day = day.strftime('%Y%m%d')
            temp_data = self.stock_info[self.stock_info.index.values <= day]
            if temp_data.empty is False:
                self.stock_info = self.stock_info[self.stock_info.index.values>day]
                self.remained_data = False


        while self.remained_data:
            print("종목 일봉 데이터 추가 요청")
            time.sleep(0.2)
            self.set_input_value("종목코드", code)
            self.set_input_value("기준일자", self.day)
            self.set_input_value("수정주가구분", "1")
            self.comm_rq_data("opt10081_req", "opt10081", 2, "2000")
            if day is not None:
                temp_data = self.stock_info[self.stock_info.index.values <= day]
                if temp_data.empty is False:
                    self.stock_info = self.stock_info[self.stock_info.index.values > day]
                    break

        self.stock_info.index.name = "Date"
        return self.stock_info


코스피/코스닥 지수 일봉 데이터를 요청하는 소스는 아래와 같다. 위에서 본 주가 종목 지수 일봉 데이터 요청 코드와 거의 같다. 단지 tr 요청 값과 tr 매개변수만 다르다.

    # 코스피, 코스닥의 일자, 시가, 종가 데이터를 요청
    def req_index_daily_value(self, code, day):
        self.stock_info = pd.DataFrame(columns=("Open", "High", "Low", "Close", "Volume"))
        print("업종 일봉 데이터 요청")
        self.set_input_value("업종코드", code)
        self.set_input_value("기준일자", self.day)
        self.set_input_value("수정주가구분", "1")
        self.comm_rq_data("opt20006_req", "opt20006", 0, "1999")

        if day is not None:
            day = day.strftime('%Y%m%d')
            temp_data = self.stock_info[self.stock_info.index.values <= day]
            if temp_data.empty is False:
                self.stock_info = self.stock_info[self.stock_info.index.values>day]
                self.remained_data = False

        while self.remained_data:
            print("업종 일봉 데이터 추가 요청")
            time.sleep(0.2)
            self.set_input_value("업종코드", code)
            self.set_input_value("기준일자", self.day)
            self.set_input_value("수정주가구분", "1")
            self.comm_rq_data("opt20006_req", "opt20006", 2, "1999")

            if day is not None:
                temp_data = self.stock_info[self.stock_info.index.values <= day]
                if temp_data.empty is False:
                    self.stock_info = self.stock_info[self.stock_info.index.values > day]
                    break

        self.stock_info.index.name = "Date"
        return self.stock_info


마지막으로 TR 요청 후 데이터를 전송받을 때 호출되는 함수를 살펴보자. 키움 open api를 사용할 줄 안다면 아래 코드는 바로 이해 가능할 것이다. 서버에서 데이터를 받을 때마다 self.stock_info.append()를 통해 데이터를 추가하고 index는 date 값을 넣는다.

    # 종목/지수 일자, 시가,저가,고가, 종가 데이터를 서버에서 가져오고 stock_info 에 저장
    def _opt10081_20006(self,rqname,trcode):
        data_cnt = self._get_repeat_cnt(trcode, rqname)
        for i in range(data_cnt):
            date = self._comm_get_data(trcode, "", rqname, i, "일자")
            open = self._comm_get_data(trcode, "", rqname, i, "시가")
            low = self._comm_get_data(trcode, "", rqname, i, "저가")
            High = self._comm_get_data(trcode, "", rqname, i, "고가")
            close = self._comm_get_data(trcode, "", rqname, i, "현재가")
            volume = self._comm_get_data(trcode, "", rqname, i, "거래량")
            self.stock_info = self.stock_info.append(pd.DataFrame({'Open':open, 'Low':low, 'High':High,'Close':close, 'Volume':volume}, index=[date]))


우리는 키움 API를 이용해서 코스닥/코스피, 주식 종목/ETF 일봉 데이터를 요청하고 서버에서 받는 기능을 구현했다. 해당 전체 소스 파일은 첨부파일에 있다. 


Kiwoom.py

 

< 주가 일봉 데이터 저장 프로그램 개발 - DB 설계 및 구현>


1. 주가 일봉 데이터 저장 프로그램 개발(개요)


해당 포스트를 읽기 전에 이전 포스트를 읽기 바란다.


이번 포스트에서는 DB를 설계하고 DB 관련 코드를 파이썬으로 구현할 것이다. 우리는 MySQL을 사용한다. 주가 일봉 데이터 저장 프로그램에서 DB에 저장하는 데이터는 '일자', '시가', '고가', '저가', '종가',' 거래량'으로 구성된다.  테이블은 각 종목으로 한다. 따라서 DB는 종목 테이블로 구성되고 테이블 내에 '일자', '시가', '고가', '저가', '종가',' 거래량' 칼럼이 있다. 코스피와 코스닥 지수의 경우 테이블 명을 'kospi', 'kosdaq'으로 일반 종목들과 etf는 'a+종목코드'로 한다. 테이블의 구조는 아래와 같다.



따라서 테이블을 생성 sql은 다음과 같다. 

sql = 'create table ' + code + '(Date date primary key,Open Decimal,High Decimal,Low Decimal,Close Decimal, Volume Decimal);'


DB를 설계해봤다. 다음으로 DB 관련 코드를 구현해보자. DB 코드에 필요한 기능은 아래와 같다.


1. 데이터 베이스 생성(이미 생성되어 있는 지 확인 필요)

2. 테이블 생성(이미 생성되어 있는 지 확인 필요)

3. 종목의 주가 일봉 데이터 INSERT

4. 테이블 내 특정 종목 데이터의 가장 최근 INSERT 날짜 반환 


'4'번 기능을 구현하는 이유는 효율성 때문이다. 예로 한 달 전에 코스피 지수 데이터를 DB에 저장하고 오늘 코스피 지수 데이터를 사용한다고 하자. 최근 한 달동안 코스피 지수 데이터가 없기 때문에 DB에 코스피 지수 데이터를 INSERT를 해야 한다. 하지만 그냥 INSERT 하면 우리나라 주식 시장 최초 개장일부터의 데이터가 다시 INSERT 된다. 테이블의 'Date' 필드를 primary key로 했기 때문에 에러가 난다. primary key로 하지 않았더라도 어마어마한 데이터 중복이 나타난다. 코스피 개장일 1985년부터 2017년까지 데이터 약 8700개 행이 중복된다. 


또한 코스피 테이블 전체 데이터를 지우고 다시 INSERT 하는 방법도 있다. 하지만 많은 양의 데이터를 다시 INSERT 하기는 시간 효율성이 떨어진다. 그래서 '4'번 기능을 구현했다. 가장 최근에 INSERT 된 데이터의 Date(날짜)를 알면 그 이후 데이터만 INSERT 하면 된다. 


그러면 위에서 제시한 4가지 기능의 코드를 보자. 파일명은 StockDB.py 이다.  전체 코드는 아래와 같고 순차적으로 기능 하나씩 알아보자. 필자는 파이썬에서 MySQL에 접근하는 API로 pymysql을 사용했다.

from sqlalchemy import create_engine import pymysql pymysql.install_as_MySQLdb() import pandas as pd class StockDB(): def init(self, password): if self._create_database(password) is False: return False self.engine = create_engine("mysql+mysqldb://root:"+password+"@localhost/stock", encoding='utf-8') self.conn = pymysql.connect(host='localhost', user='root', password=password, db='stock', charset='utf8') self.cursor = self.conn.cursor() return True def _create_database(self,password): try: conn = pymysql.connect(host='localhost', user='root', password=password, charset='utf8') cursor = conn.cursor() sql = 'SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = \'stock\'' result = cursor.execute(sql) if result == 0: sql = 'CREATE DATABASE stock' cursor.execute(sql) conn.commit() except: return False return True def close(self): self.conn.close() def select_max_date(self,table_name): sql = 'select max(Date) from ' + table_name self.cursor.execute(sql) result = self.cursor.fetchone() return result[0] def insert_chart(self,data, table_name): data.to_sql(name=table_name, con=self.engine, if_exists='append') self.conn.commit() def create_table(self,table_name): sql = 'SHOW TABLES LIKE \'' + table_name + '\'' result = self.cursor.execute(sql) if result == 0: sql = 'create table ' + table_name + '(Date date primary key,Open Decimal,High Decimal,Low Decimal,Close Decimal, Volume Decimal);' self.cursor.execute(sql) self.conn.commit()



'1'번, 데이터 베이스 생성(이미 생성되 있는 지 확인) 기능 코드를 보자. DB를 사용하려면 당연히 데이터 베이스를 생성해야 한다. 사용자의 DB 패스워드를 인자로 받으면 'pymysql.connect'를 통해 DB에 접속한다. 패스워드가 틀리면 예외가 발생해 except문으로 넘어가 False를 리턴한다. 다음 행의 sql 문으로 데이터 베이스가 현재 있는 지 확인한다. 이미 생성되었으면 데이터 베이스 생성을 취소해야 하기 때문이다. 

'SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = \'stock\''


위 sql을 사용해 'stock' 데이터베이스가 있는 지 확인한다. 'cursor.execute(sql)'을 호출하면 해당 sql 문이 접속된 DB에 실행된다. 반환 값이 '0'이면 'stock' 데이터베이스가 DB 상에 없다는 의미이기 때문에 'CREATE DATABASE stock'으로 데이터 베이스를 생성한다.  

    def _create_database(self,password):
        try:
            conn = pymysql.connect(host='localhost', user='root', password=password, charset='utf8')
            cursor = conn.cursor()
            sql = 'SELECT SCHEMA_NAME FROM INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = \'stock\''
            result = cursor.execute(sql)

            if result == 0:
                sql = 'CREATE DATABASE stock'
                cursor.execute(sql)
                conn.commit()
        except:
            return False
        return True


'2' 번 기능은 테이블을 생성한다. 테이블이 이미 있는 지도 확인한다. '1'번 기능과 로직은 같다. ( SHOW TABLE LIKE \''+table_name+ '\'' ) sql 문을 이용해 인자로 받은 테이블 네임과 동일한 테이블이 있는 지 확인하다. 없다면 위에서 설계한 테이블 스펙대로 테이블을 생성한다. 

    def create_table(self,table_name):
        sql = 'SHOW TABLES LIKE \'' + table_name + '\''
        result = self.cursor.execute(sql)
        if result == 0:
            sql = 'create table ' + table_name + '(Date date primary key,Open Decimal,High Decimal,Low Decimal,Close Decimal, Volume Decimal);'
            self.cursor.execute(sql)
            self.conn.commit()


'3' 번 기능은 인자로 받은 주가 일봉 데이터를 테이블에 INSERT 한다. 인자로 받은 'data'가 주가 일봉 데이터다. 'data' 변수 타입은 'DataFrame'이다. DataFrame 변수는 'to_sql' 메서드로 한 번에 DataFrame 내부 데이터를 INSERT 할 수 있다. 

    def insert_chart(self,data, table_name):
        data.to_sql(name=table_name, con=self.engine, if_exists='append')
        self.conn.commit()


'4'번 기능은 특정 종목(테이블) 데이터의 가장 최근 INSERT된 데이터의 Date(날짜) 칼럼를 반환한다. 이 기능을 구현하는 이유는 위에서 설명했다.  ( 'select max(Date) from '+ table_name ) sql 문을 이용해서 테이블 내 데이터의 Date 칼럼 최댓값을 리턴한다. 'self.cursor.execute(sql)'을 통해 DB에서 sql문을 실행하고 'self.cursor.fetchone()'을 이용해 입력한 SELECT 문의 리턴 값을 전달 받는다. 리턴 값의 타입은 튜플이기 때문에 'result[0]' 을 이용해 실제 max(Date) 값에 접근한다. 

    def select_max_date(self,table_name):
        sql = 'select max(Date) from ' + table_name
        self.cursor.execute(sql)
        result = self.cursor.fetchone()
        return result[0]


이 것으로 DB 설계 및 구현을 마쳤다. 사실상 주가 일봉 데이터 저장 프로그램에서는 단순히 종목의 일봉 데이터만 저장하기에 매우 간단하다. DB에 대한 전체 소스파일은 첨부파일에 있다.


StockDB.py


+ Recent posts