<파이썬을 이용한 주식 변동성 돌파 전략 분석을 위한 최적화 변수 계산(1)>

부제 : 멀티 프로세싱과 비동기 프로그래밍을 이용한 파이썬 코드


이전 포스트 "퀀트를 위한 주식 데이터 수집(2007년~2018/01/10년 데이터)" 에서 "변동성 돌파 전략" 분석을 위해 수집했던 데이터를 보여줬다. 이번 포스트는 지난 포스트에서 살펴본 변동성 돌파 전략의 특정 변수 "kospi_profit_X", "kosdaq_profit_X" 테이블에 들어갈 데이터를 계산하는 방법을 알아본다. 


계산하는 데는 파이썬을 사용했다. 계산하기 위해 상장된 모든 주식의 일봉 데이터가 필요하기에 이전 포스트에서 보았던 "kospi_market", "kosdaq_market" 테이블을 이용했다. 또한 계산 속도 향상을 위해 비동기 프로그래밍 방식과 멀티 프로세싱 방법을 사용한다. 그 이유는 현재 상장한 모든 주식의 2007년부터 일봉 데이터를 다루기 때문이다. 아래에서 이번 포스트에서 다룰 코스피, 코스닥 일봉 데이터의 총 개수를 볼 수 있다. 약 4 백만개 정도 된다. 또한 이 4백 만개의 데이터에 대해 적어도 6번 정도 반복해서 계산을 수행한다. 따라서 어떤 측면에선 2천 4백 만개의 데이터를 이용한다고 할 수 있다. 이렇게 방대한 양의 데이터를 가지고 계산을 수행하기에 속도가 중요하다. 그래서 비동기와 멀티프로세싱 방식을 추가로 사용한다.  



멀티프로세싱 방식으로는 Pool 클래스를 사용한다. 비동기 방식은 asyncio를 사용하며 asyncio를 사용해 mysql을 접근할 수 있도록 만든 aiomysql 패키지를 이용한다.  



"주식 변동성 돌파 전략 분석"을 위한 최적화 변수 계산에는 3개의 파이썬 파일을 사용했다. 

- StockDB.py : 비동기적으로 MySQL 데이터베이스에 acceess 한다.

- Utility.py : 일반적인 기능(날짜 계산 등)을 가진다.

- MainFunctions.py : StockDB와 Utitlity 클래스를 이용해 최종 계산을 수행하는 main을 수행한다.


이번 포스트에서는 StockDB.py에 대해 알아본다. 다음 포스트에서 나머지 코드와 소스 첨부파일을 개재한다. StockDB.py는 아래와 같다.


import aiomysql as aio
import logging as log
import pymysql
pymysql.install_as_MySQLdb()
import pandas as pd


class StockDB():

    async def init_pool(self,loop):
        log.info("Connection to Connection Pool")
        try:
            self.__pool = await aio.create_pool(host='127.0.0.1', port=3306, user='root', password='qhdks12#$', db='stock',loop=loop, maxsize=64)
        except:
            log.warning("Connecting Pool Error {}".format(repr(0)))
            raise

    async def req_stock_daily_data(self, market,code, start, end ):
        log.info("Selecting {} daily data {} ~ {}".format(code, start, end))

        sql = "select Date, Open, High, Low from "+market+"_market where Code = '"+ code + "' and Date >= '"+start+"' and Date < '"+end+"';"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    rows = await cur.fetchall()
                    if rows == ():
                        return pd.DataFrame()
                    result = pd.DataFrame.from_records(list(rows))
                    result.columns = ['Date', 'Open','High','Low']
                    result = result.set_index('Date').astype('float')
        except aio.Error as e:
            log.warning("Selecting {} daily data {} ~ {} Aiomysql  Error : {}".format(code, start, end, repr(e)))
            raise
        except Exception as e:
            log.warning("Selecting {} daily data {} ~ {}  Error : {}".format(code, start, end, repr(e)))
            raise

        return result


    async def req_min_date(self, market, code):
        log.info("Selecting {}'s min Date".format(code))

        sql = "select min(Date) from "+market+"_market where Code= '"+code+"';"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    result =  await cur.fetchone()

        except aio.Error as e:
            log.warning("Selecting {}'s min Date Aiomysql Error : {}".format(code ,repr(e)))
            raise
        except Exception as e:
            log.warning("Selecting {}'s min Date Error : {}".format(code ,repr(e)))
            raise

        return result[0]


    async def req_code_list(self, market):
        log.info("Selecting {} code list".format(market))

        sql = "select DISTINCT Code from "+market+"_market;"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    rows = await cur.fetchall()
        except aio.Error as e:
            log.warning("Selecting {} code list Aiomysql Error : {}".format(market, repr(e)))
            raise
        except Exception as e:
            log.warning("Selecting {} code list Error".format(market, repr(e)))
            raise

        return list(rows)

    async def insert_profit(self,market,type, data):
        log.info("Inserting {}_profit_{} data".format(market,type))

        sql = "insert into "+market+"_profit_"+type+" (Date, Profit, Scope, Code ) values (%s,%s,%s,%s)"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.executemany(sql, data.reset_index().values.tolist())
                    await conn.commit()
        except aio.Error as e:
            log.warning("Inserting {}_profit_{} data Aiomysql Error : {}".format(market, repr(e),type))
            raise
        except Exception as e:
            log.warning("Inserting {}_profit_{} data Error".format(market, repr(e),type))
            raise


aiomysql 패키지를 사용해 Connection Pool을 만들어 DB와 많은 Connection을 생성한 후 비동기적으로 접근하도록 했다. "maxsize"를 64로 해 한 StockDB 객체당 최대 64개의 Connection을 가진 pool을 생성한다. 


StockDB 클래스가 가진 각 메서드에 대해 알아보자. 


다음 메서드는 비동기적으로 주식 일봉 데이터를 가지고 온다. 인자로 받은 "market"은 코스피 시장에 상장된 주식인지 코스닥 시장에 상장된 주식인지 나타낸다. code는 특정 주식의 코드를 가리킨다. 해당 code의 start 에서 end 기간까지 주식 일봉데이터를 DB에 요청한다. 변동성 돌파 전략의 매수 시점을 결정하는 데 고가, 저가, 다음날 시가만을 이용하기에 DB에서 해당 데이터들만 가지고 온다. 그 값을 데이터 프레임 형식으로 바꾸어 리턴한다.

    async def req_stock_daily_data(self, market,code, start, end ):
        log.info("Selecting {} daily data {} ~ {}".format(code, start, end))

        sql = "select Date, Open, High, Low from "+market+"_market where Code = '"+ code + "' and Date >= '"+start+"' and Date < '"+end+"';"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    rows = await cur.fetchall()
                    if rows == ():
                        return pd.DataFrame()
                    result = pd.DataFrame.from_records(list(rows))
                    result.columns = ['Date', 'Open','High','Low']
                    result = result.set_index('Date').astype('float')
        except aio.Error as e:
            log.warning("Selecting {} daily data {} ~ {} Aiomysql  Error : {}".format(code, start, end, repr(e)))
            raise
        except Exception as e:
            log.warning("Selecting {} daily data {} ~ {}  Error : {}".format(code, start, end, repr(e)))
            raise

        return result


다음 메서드는 DB에 저장된 주식 일봉 데이터의 최소 날짜를 반환한다. 자세히 말하면 2007년 이전에 상장한 주식들은 2007년 주식 개장 첫 날짜가 리턴된다. 2007년 이후 상장한 주식들은 상장한 날 즉, 최초의 일봉 데이터가 나타난 날짜를 반환한다. 이 함수가 존재하는 이유는 만약 2008년 중순에 주식이 상장됬다면 2009년부터의 수익률 데이터만 필요하기 때문이다. 시가총액, PER, PBR 등의 지표로 연도별 리밸런싱을 하는 데 2008년에 상장한 주식이 상장하기 전에 해당 연도 주식 리밸런싱이 끝났기 때문이다. 또한 2007년 이전 상장한 주식들이 2007년 주식 개장 첫 날짜를 반환하는 이유는 DB에 2007년도 일봉 데이터부터 있기 때문이다. 
    async def req_min_date(self, market, code):
        log.info("Selecting {}'s min Date".format(code))

        sql = "select min(Date) from "+market+"_market where Code= '"+code+"';"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    result =  await cur.fetchone()

        except aio.Error as e:
            log.warning("Selecting {}'s min Date Aiomysql Error : {}".format(code ,repr(e)))
            raise
        except Exception as e:
            log.warning("Selecting {}'s min Date Error : {}".format(code ,repr(e)))
            raise

        return result[0]


다음 메서드는 "kospi_market"과 "kosdaq_market" 테이블에 있는 모든 주식의 코드 리스트를 반환한다. 즉, 코스피 또는 코스닥에 현재 상장된 모든 주식의 코드를 반환한다. 변동성 돌파 전략 분석을 위한 최적화 변수를 계산할 때 전체 주식 코드가 필요하다. 

    async def req_code_list(self, market):
        log.info("Selecting {} code list".format(market))

        sql = "select DISTINCT Code from "+market+"_market;"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    rows = await cur.fetchall()
        except aio.Error as e:
            log.warning("Selecting {} code list Aiomysql Error : {}".format(market, repr(e)))
            raise
        except Exception as e:
            log.warning("Selecting {} code list Error".format(market, repr(e)))
            raise

        return list(rows)

다음 메서드는 최종적으로 계산한 최적화 변수를 "kospi_profit_X" 또는 "kosdaq_profit_X" 테이블에 저장한다. 
    async def insert_profit(self,market,type, data):
        log.info("Inserting {}_profit_{} data".format(market,type))

        sql = "insert into "+market+"_profit_"+type+" (Date, Profit, Scope, Code ) values (%s,%s,%s,%s)"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.executemany(sql, data.reset_index().values.tolist())
                    await conn.commit()
        except aio.Error as e:
            log.warning("Inserting {}_profit_{} data Aiomysql Error : {}".format(market, repr(e),type))
            raise
        except Exception as e:
            log.warning("Inserting {}_profit_{} data Error".format(market, repr(e),type))
            raise


여기까지가 StockDB.py 에 대한 설명이였다. 다음 포스트에서는 비동기 방식과 멀티 프로세싱을 이용해 최종적으로 최적화 변수를 계산하는 과정을 살펴볼 예정이다.

+ Recent posts