<파이썬을 이용한 주식 변동성 돌파 전략 분석을 위한 최적화 변수 계산(1)>
부제 : 멀티 프로세싱과 비동기 프로그래밍을 이용한 파이썬 코드
이전 포스트 "퀀트를 위한 주식 데이터 수집(2007년~2018/01/10년 데이터)" 에서 "변동성 돌파 전략" 분석을 위해 수집했던 데이터를 보여줬다. 이번 포스트는 지난 포스트에서 살펴본 변동성 돌파 전략의 특정 변수 "kospi_profit_X", "kosdaq_profit_X" 테이블에 들어갈 데이터를 계산하는 방법을 알아본다.
계산하는 데는 파이썬을 사용했다. 계산하기 위해 상장된 모든 주식의 일봉 데이터가 필요하기에 이전 포스트에서 보았던 "kospi_market", "kosdaq_market" 테이블을 이용했다. 또한 계산 속도 향상을 위해 비동기 프로그래밍 방식과 멀티 프로세싱 방법을 사용한다. 그 이유는 현재 상장한 모든 주식의 2007년부터 일봉 데이터를 다루기 때문이다. 아래에서 이번 포스트에서 다룰 코스피, 코스닥 일봉 데이터의 총 개수를 볼 수 있다. 약 4 백만개 정도 된다. 또한 이 4백 만개의 데이터에 대해 적어도 6번 정도 반복해서 계산을 수행한다. 따라서 어떤 측면에선 2천 4백 만개의 데이터를 이용한다고 할 수 있다. 이렇게 방대한 양의 데이터를 가지고 계산을 수행하기에 속도가 중요하다. 그래서 비동기와 멀티프로세싱 방식을 추가로 사용한다.
멀티프로세싱 방식으로는 Pool 클래스를 사용한다. 비동기 방식은 asyncio를 사용하며 asyncio를 사용해 mysql을 접근할 수 있도록 만든 aiomysql 패키지를 이용한다.
"주식 변동성 돌파 전략 분석"을 위한 최적화 변수 계산에는 3개의 파이썬 파일을 사용했다.
- StockDB.py : 비동기적으로 MySQL 데이터베이스에 acceess 한다.
- Utility.py : 일반적인 기능(날짜 계산 등)을 가진다.
- MainFunctions.py : StockDB와 Utitlity 클래스를 이용해 최종 계산을 수행하는 main을 수행한다.
이번 포스트에서는 StockDB.py에 대해 알아본다. 다음 포스트에서 나머지 코드와 소스 첨부파일을 개재한다. StockDB.py는 아래와 같다.
import aiomysql as aio import logging as log import pymysql pymysql.install_as_MySQLdb() import pandas as pd class StockDB(): async def init_pool(self,loop): log.info("Connection to Connection Pool") try: self.__pool = await aio.create_pool(host='127.0.0.1', port=3306, user='root', password='qhdks12#$', db='stock',loop=loop, maxsize=64) except: log.warning("Connecting Pool Error {}".format(repr(0))) raise async def req_stock_daily_data(self, market,code, start, end ): log.info("Selecting {} daily data {} ~ {}".format(code, start, end)) sql = "select Date, Open, High, Low from "+market+"_market where Code = '"+ code + "' and Date >= '"+start+"' and Date < '"+end+"';" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) rows = await cur.fetchall() if rows == (): return pd.DataFrame() result = pd.DataFrame.from_records(list(rows)) result.columns = ['Date', 'Open','High','Low'] result = result.set_index('Date').astype('float') except aio.Error as e: log.warning("Selecting {} daily data {} ~ {} Aiomysql Error : {}".format(code, start, end, repr(e))) raise except Exception as e: log.warning("Selecting {} daily data {} ~ {} Error : {}".format(code, start, end, repr(e))) raise return result async def req_min_date(self, market, code): log.info("Selecting {}'s min Date".format(code)) sql = "select min(Date) from "+market+"_market where Code= '"+code+"';" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) result = await cur.fetchone() except aio.Error as e: log.warning("Selecting {}'s min Date Aiomysql Error : {}".format(code ,repr(e))) raise except Exception as e: log.warning("Selecting {}'s min Date Error : {}".format(code ,repr(e))) raise return result[0] async def req_code_list(self, market): log.info("Selecting {} code list".format(market)) sql = "select DISTINCT Code from "+market+"_market;" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) rows = await cur.fetchall() except aio.Error as e: log.warning("Selecting {} code list Aiomysql Error : {}".format(market, repr(e))) raise except Exception as e: log.warning("Selecting {} code list Error".format(market, repr(e))) raise return list(rows) async def insert_profit(self,market,type, data): log.info("Inserting {}_profit_{} data".format(market,type)) sql = "insert into "+market+"_profit_"+type+" (Date, Profit, Scope, Code ) values (%s,%s,%s,%s)" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.executemany(sql, data.reset_index().values.tolist()) await conn.commit() except aio.Error as e: log.warning("Inserting {}_profit_{} data Aiomysql Error : {}".format(market, repr(e),type)) raise except Exception as e: log.warning("Inserting {}_profit_{} data Error".format(market, repr(e),type)) raise
aiomysql 패키지를 사용해 Connection Pool을 만들어 DB와 많은 Connection을 생성한 후 비동기적으로 접근하도록 했다. "maxsize"를 64로 해 한 StockDB 객체당 최대 64개의 Connection을 가진 pool을 생성한다.
StockDB 클래스가 가진 각 메서드에 대해 알아보자.
다음 메서드는 비동기적으로 주식 일봉 데이터를 가지고 온다. 인자로 받은 "market"은 코스피 시장에 상장된 주식인지 코스닥 시장에 상장된 주식인지 나타낸다. code는 특정 주식의 코드를 가리킨다. 해당 code의 start 에서 end 기간까지 주식 일봉데이터를 DB에 요청한다. 변동성 돌파 전략의 매수 시점을 결정하는 데 고가, 저가, 다음날 시가만을 이용하기에 DB에서 해당 데이터들만 가지고 온다. 그 값을 데이터 프레임 형식으로 바꾸어 리턴한다.
async def req_stock_daily_data(self, market,code, start, end ): log.info("Selecting {} daily data {} ~ {}".format(code, start, end)) sql = "select Date, Open, High, Low from "+market+"_market where Code = '"+ code + "' and Date >= '"+start+"' and Date < '"+end+"';" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) rows = await cur.fetchall() if rows == (): return pd.DataFrame() result = pd.DataFrame.from_records(list(rows)) result.columns = ['Date', 'Open','High','Low'] result = result.set_index('Date').astype('float') except aio.Error as e: log.warning("Selecting {} daily data {} ~ {} Aiomysql Error : {}".format(code, start, end, repr(e))) raise except Exception as e: log.warning("Selecting {} daily data {} ~ {} Error : {}".format(code, start, end, repr(e))) raise return result
async def req_min_date(self, market, code): log.info("Selecting {}'s min Date".format(code)) sql = "select min(Date) from "+market+"_market where Code= '"+code+"';" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) result = await cur.fetchone() except aio.Error as e: log.warning("Selecting {}'s min Date Aiomysql Error : {}".format(code ,repr(e))) raise except Exception as e: log.warning("Selecting {}'s min Date Error : {}".format(code ,repr(e))) raise return result[0]
다음 메서드는 "kospi_market"과 "kosdaq_market" 테이블에 있는 모든 주식의 코드 리스트를 반환한다. 즉, 코스피 또는 코스닥에 현재 상장된 모든 주식의 코드를 반환한다. 변동성 돌파 전략 분석을 위한 최적화 변수를 계산할 때 전체 주식 코드가 필요하다.
async def req_code_list(self, market): log.info("Selecting {} code list".format(market)) sql = "select DISTINCT Code from "+market+"_market;" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) rows = await cur.fetchall() except aio.Error as e: log.warning("Selecting {} code list Aiomysql Error : {}".format(market, repr(e))) raise except Exception as e: log.warning("Selecting {} code list Error".format(market, repr(e))) raise return list(rows)
async def insert_profit(self,market,type, data): log.info("Inserting {}_profit_{} data".format(market,type)) sql = "insert into "+market+"_profit_"+type+" (Date, Profit, Scope, Code ) values (%s,%s,%s,%s)" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.executemany(sql, data.reset_index().values.tolist()) await conn.commit() except aio.Error as e: log.warning("Inserting {}_profit_{} data Aiomysql Error : {}".format(market, repr(e),type)) raise except Exception as e: log.warning("Inserting {}_profit_{} data Error".format(market, repr(e),type)) raise
여기까지가 StockDB.py 에 대한 설명이였다. 다음 포스트에서는 비동기 방식과 멀티 프로세싱을 이용해 최종적으로 최적화 변수를 계산하는 과정을 살펴볼 예정이다.
'주식 프로그래밍(시스템 트레이딩)' 카테고리의 다른 글
1. 시가 총액/PER/PBR 기준 변동성 돌파 전략 분석 프로그래밍(파이썬) (0) | 2018.01.23 |
---|---|
파이썬을 이용한 주식 변동성 돌파 전략 분석을 위한 최적화 변수 계산(2) (1) | 2018.01.19 |
퀀트를 위한 주식 데이터 수집(2007년~2018년/01/10 데이터) (144) | 2018.01.12 |
주식 변동성 돌파 전략 백테스팅 프로그램(키움 OPEN API 활용) (3) | 2018.01.09 |
주가 일봉 데이터 저장 프로그램 코드 및 파일 (3) | 2018.01.09 |