<5. 지표 혼합 수익률 백테스팅 프로그래밍 - (2) >


1. 시가 총액/PER/PBR 기준 변동성 돌파 전략 분석 프로그래밍 - 개요

2. 각 지표 순위별 수익률 백테스팅 프로그래밍 - (1)

3. 각 지표 순위별 수익률 백테스팅 프로그래밍 - (2)


해당 포스트를 읽기 전에 위 포스트를 읽길 추천한다.


이전 포스트에서 "지표 혼합 수익률 백테스팅" 프로그램과 Utitlity.py 코드에 대해 알아봤다. 이번 포스트에서는 해당 프로그램의 DB 관련 코드를 살펴볼 것이다. 먼저 전체 소스를 보고 세세한 부분을 알아가보자. 

import aiomysql as aio import logging as log import pandas as pd import Utility class StockDB(): def __init__(self, during, type, divide): self.during = during self.type = type self.divide = divide if type == Utility.TMV_PBR: self.get_bactesting_sql = self.__get_tmv_and_pbr_sql elif type == Utility.TMV_PER: self.get_bactesting_sql = self.__get_tmv_and_per_sql elif type == Utility.TMV_PSR: self.get_bactesting_sql = self.__get_tmv_and_psr_sql elif type == Utility.PER_PBR: self.get_bactesting_sql = self.__get_pbr_and_per_sql elif type == Utility.PBR_PSR: self.get_bactesting_sql = self.__get_pbr_and_psr_sql elif type == Utility.TMV_PBR_PER: self.get_bactesting_sql = self.__get_tmv_and_pbr_and_per_sql async def init_pool(self,loop): log.info("Connection to Connection Pool") try: self.__pool = await aio.create_pool(host='127.0.0.1', port=3306, user='root', password='qhdks12#$', db='stock',loop=loop) except: log.warning("Connecting Pool Error {}".format(repr(0))) raise async def req_stock_index_count(self, sql): log.debug("Selecting market index row count") try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) result = await cur.fetchone() except aio.Error as e: log.warning("Selecting stock index Error : {}".format(repr(e))) raise log.debug("Stock index count - {}".format(result)) return result[0] async def req_backtesting_data(self, market, start, end, quarter): log.info("Selecting backtesting data") sql = "select kos.Code, kos.Date, kos.Profit from " \ "(select Code, Date, Profit from " + market + "_profit_" + self.during + " where Date>\'" + start + "\' and Date<\'" + end + "\') " \ "as kos join " sql = sql + await self.get_bactesting_sql(market, quarter) try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) rows = await cur.fetchall() result = pd.DataFrame.from_records(list(rows)) if result.empty is True: raise ResourceWarning result.columns = ['Code','Date', 'Profit'] except aio.Error as e: log.warning("Selecting backtesting data Error : {}".format(repr(e))) raise except ResourceWarning as re: log.warning("No Data - sql : {}".format(sql)) raise ResourceWarning return result def __get_tmv_count_sql(self, quarter): log.debug("Creating total market value row count sql") sql = "select count(Code) from tmvalue where " + quarter["_1qt"] + " != 0 " return sql def __get_pbr_count_sql(self, quarter): log.debug("Creating pbr row count sql") sql = "select count(Code) from pbr where " + quarter["_3qt"] + " >= 0.2 " return sql def __get_per_count_sql(self, quarter): log.debug("Creating per row count sql") sql = "select count(Code) from per where " + quarter["_3qt"] + " >= 3 " return sql def __get_psr_count_sql(self, quarter): log.debug("Creating psr row count sql") sql = "select count(Code) from psr where " + quarter["_3qt"] + " > 0 " return sql async def __get_tmv_and_pbr_sql(self,market, quarter): tmv_count = await self.req_stock_index_count(self.__get_tmv_count_sql(market, quarter)) pbr_count = await self.req_stock_index_count(self.__get_pbr_count_sql(market, quarter)) tmv_offset = str(int(tmv_count/self.divide)) pbr_offset = str(int(pbr_count/self.divide)) sql = "(select tmv.Code from (select Code from tmvalue where " + quarter["_1qt"] + " != 0 order by " \ + quarter["_1qt"] + " limit 0 , " + tmv_offset + ")" \ " as tmv join (select Code from pbr where " + quarter["_3qt"] + " >=0.2 order by " \ + quarter["_3qt"] + " limit 0 , " + pbr_offset + ") " \ "as pb on tmv.Code = pb.Code) as res on kos.Code=res.Code;" return sql async def __get_tmv_and_per_sql(self,market, quarter): tmv_count = await self.req_stock_index_count(self.__get_tmv_count_sql(market, quarter)) per_count = await self.req_stock_index_count(self.__get_per_count_sql(market, quarter)) tmv_offset = str(int(tmv_count/self.divide)) per_offset = str(int(per_count/self.divide)*5) sql = "(select tmv.Code from (select Code from tmvalue where " + quarter["_1qt"] + " != 0 order by " \ + quarter["_1qt"] + " desc limit 0 , " + tmv_offset + ")" \ " as tmv join (select Code from per where " + quarter["_3qt"] + " >=3 order by " \ + quarter["_3qt"] + " desc limit 0 , " + per_offset + ") " \ "as pe on tmv.Code = pe.Code) as res on kos.Code=res.Code;" return sql async def __get_tmv_and_psr_sql(self,market, quarter): tmv_count = await self.req_stock_index_count(self.__get_tmv_count_sql(market, quarter)) psr_count = await self.req_stock_index_count(self.__get_psr_count_sql(market, quarter)) tmv_offset = str(int(tmv_count/self.divide)) psr_offset = str(int(psr_count/self.divide)) sql = "(select tmv.Code from (select Code from tmvalue where " + quarter["_1qt"] + " != 0 order by " \ + quarter["_1qt"] + " limit 0 , " + tmv_offset + ")" \ " as tmv join (select Code from psr where " + quarter["_3qt"] + " >0 order by " \ + quarter["_3qt"] + " limit 0 , " + psr_offset + ") " \ "as pe on tmv.Code = pe.Code) as res on kos.Code=res.Code;" return sql async def __get_pbr_and_per_sql(self,market, quarter): pbr_count = await self.req_stock_index_count(self.__get_pbr_count_sql(market, quarter)) per_count = await self.req_stock_index_count(self.__get_per_count_sql(market, quarter)) pbr_offset = str(int(pbr_count/self.divide)) per_offset = str(int(per_count/self.divide)) sql = "(select pb.Code from (select Code from pbr where " + quarter["_3qt"] + " >=0.2 order by " \ + quarter["_3qt"] + " limit 0 , " + pbr_offset + ") " \ " as pb join (select Code from per where " + quarter["_3qt"] + " >=3 order by " \ + quarter["_3qt"] + " limit 0 , " + per_offset + ") " \ "as pe on pb.Code = pe.Code) as res on kos.Code=res.Code;" return sql async def __get_pbr_and_psr_sql(self,market, quarter): pbr_count = await self.req_stock_index_count(self.__get_pbr_count_sql(market, quarter)) psr_count = await self.req_stock_index_count(self.__get_per_count_sql(market, quarter)) pbr_offset = str(int(pbr_count/self.divide)) psr_offset = str(int(psr_count/self.divide)) sql = "(select pb.Code from (select Code from pbr where " + quarter["_3qt"] + " >=0.2 order by " \ + quarter["_3qt"] + " limit 0 , " + pbr_offset + ") " \ " as pb join (select Code from per where " + quarter["_3qt"] + " >0 order by " \ + quarter["_3qt"] + " limit 0 , " + psr_offset + ") " \ "as pe on pb.Code = pe.Code) as res on kos.Code=res.Code;" return sql async def __get_tmv_and_pbr_and_per_sql(self,market , quarter): tmv_count = await self.req_stock_index_count(self.__get_tmv_count_sql(market, quarter)) pbr_count = await self.req_stock_index_count(self.__get_pbr_count_sql(market, quarter)) per_count = await self.req_stock_index_count(self.__get_per_count_sql(market, quarter)) pbr_offset = str(int(pbr_count / self.divide)) per_offset = str(int(per_count / self.divide)) tmv_offset = str(int(tmv_count/self.divide)) sql = "(select distinct(tmv.Code) from (select Code from tmvalue where " + quarter["_1qt"] + " != 0 " \ "order by " + quarter["_1qt"] + " limit 0 , " + tmv_offset + ") as tmv join " \ "(select pb.Code from (select Code from pbr where " + quarter["_3qt"] + " >=0.2 " \ "order by " + quarter["_3qt"] + " limit 0 , " + pbr_offset + ") as pb join " \ "(select Code from per where " + quarter["_3qt"] + " >= 3 " \ "order by " + quarter["_3qt"] + " limit 0 , " + per_offset + ") as pe on pb.Code = pe.Code) " \ "as p on tmv.Code=p.Code) as res on kos.Code=res.Code" return sql async def req_kospi_kosdaq_data(self, start, end): log.info("Selecting kospi/kosdaq data") kospi_sql = "select Date, Close from kospi where Date<\'"+end+"\' and Date >=\'"+start+"\';" kosdaq_sql = "select Date, Close from kosdaq where Date<\'"+end+"\' and Date >=\'"+start+"\';" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(kospi_sql) rows = await cur.fetchall() kospi = pd.DataFrame.from_records(list(rows)) kospi.columns = ['Date', 'Close'] kospi = kospi.set_index('Date') await cur.execute(kosdaq_sql) rows = await cur.fetchall() kosdaq = pd.DataFrame.from_records(list(rows)) kosdaq.columns = ['Date', 'Close'] kosdaq = kosdaq.set_index('Date') except aio.Error as e: log.warning("Selecting backtesting data Error : {}".format(repr(e))) raise return [kospi, kosdaq]


먼저 초기화 부분을 알아보자. init_pool()은 디비와의 커낵션 풀을 생성한다. during은 이전 포스트들에서 계속 말했듯이 Scope 값을 결정할 때 사용한다. divide는 지표를 기준으로 주식을 divide 등분으로 나눠 최상위 또는 최하위 주식들을 선택하게 한다. type은 백테스팅하는 데 사용한 지표이다. 이전 포스트에서 본 Utility.py의 상수로 값을 대입한다. 그리고 초기화할 때 받은 type 값으로 self.get_bactesting_sql 을 설정한다. 설정될 각각의 __get~() 메서드는 sql 문을 생성하는 함수이다. 선택하는 각 지표마다 DB에서 접근하는 SQL문이 다르다. 또한 sql문 안에 들어가는 변수 값도 실행할 때마다 다르다. 따라서 초기화 할 때 어떤 sql문을 이용할 지 결정해 매번 DB에 접근할 때마다 어떤 sql문을 이용할 지 계산하는 번거로움을 없앴다. 

    def __init__(self, during, type, divide):
        self.during = during
        self.type = type
        self.divide = divide

        if type == Utility.TMV_PBR:
            self.get_bactesting_sql = self.__get_tmv_and_pbr_sql
        elif type == Utility.TMV_PER:
            self.get_bactesting_sql = self.__get_tmv_and_per_sql
        elif type == Utility.TMV_PSR:
            self.get_bactesting_sql = self.__get_tmv_and_psr_sql
        elif type == Utility.PER_PBR:
            self.get_bactesting_sql = self.__get_pbr_and_per_sql
        elif type == Utility.PBR_PSR:
            self.get_bactesting_sql = self.__get_pbr_and_psr_sql
        elif type == Utility.TMV_PBR_PER:
            self.get_bactesting_sql = self.__get_tmv_and_pbr_and_per_sql

    async def init_pool(self,loop):
        log.info("Connection to Connection Pool")
        try:
            self.__pool = await aio.create_pool(host='127.0.0.1', port=3306, user='root', password='qhdks12#$', db='stock',loop=loop)
        except:
            log.warning("Connecting Pool Error {}".format(repr(0)))
            raise



다음 메서드는 인자로 받은 sql문을 실행한다. 해당 sql문은 특정 지표에 대한 주식 개수를 리턴한다. 해당 함수 인자로 받을 sql문 생성 함수를 알아보자. 

    async def req_stock_index_count(self, sql):
        log.debug("Selecting market index row count")

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    result = await cur.fetchone()
        except aio.Error as e:
            log.warning("Selecting stock index Error : {}".format(repr(e)))
            raise

        log.debug("Stock index count - {}".format(result))

        return result[0]


아래 4개의 메서드는 시가총액, pbr, per, psr 기준으로 총 주식 개수를 반환한다. __get_tmv_count_sql()은 인자로 받은 분기에 시가총액이 0이 아닌 주식 개수를 반환한다. pbr은 0.2 이상인 주식 개수, per은 3 이상인 주식 개수, psr은 0보다 큰 주식 개수를 반환한다. 

def __get_tmv_count_sql(self, quarter): log.debug("Creating total market value row count sql") sql = "select count(Code) from tmvalue where " + quarter["_1qt"] + " != 0 " return sql def __get_pbr_count_sql(self, quarter): log.debug("Creating pbr row count sql") sql = "select count(Code) from pbr where " + quarter["_3qt"] + " >= 0.2 " return sql def __get_per_count_sql(self, quarter): log.debug("Creating per row count sql") sql = "select count(Code) from per where " + quarter["_3qt"] + " >= 3 " return sql def __get_psr_count_sql(self, quarter): log.debug("Creating psr row count sql") sql = "select count(Code) from psr where " + quarter["_3qt"] + " > 0 " return sql



아래 메서드는 설정한 지표들의 상위 또는 하위 n% 내에 모두 들어가는 주식의 매수/매도 수익율 데이터를 DB에서 불러오는 메서드이다. sql문을 생성하는 부분을 제외하면 이전 포스트에서 설명했기에 쉽다. 아래 메서드에서 사용하는 sql 문에 대해 알아보자. 아래 sql 문을 보자.

    async def req_backtesting_data(self, market, start, end, quarter):
        log.info("Selecting backtesting data")

        sql = "select kos.Code, kos.Date, kos.Profit from " \
              "(select Code, Date, Profit from " + market + "_profit_" + self.during + " where Date>\'" + start + "\' and Date<\'" + end + "\') " \
              "as kos join "


        sql = sql + await self.get_bactesting_sql(market, quarter)


        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    rows = await cur.fetchall()
                    result = pd.DataFrame.from_records(list(rows))
                    if result.empty is True:
                        raise  ResourceWarning
                    result.columns = ['Code','Date', 'Profit']
        except aio.Error as e:
            log.warning("Selecting backtesting data Error : {}".format(repr(e)))
            raise
        except ResourceWarning as re:
            log.warning("No Data - sql : {}".format(sql))
            raise ResourceWarning

        return result


아래 sql 문은 17년 1분기 시가 총액이 0이 아닌 종목들 중 하위 1위에서 115등까지의 종목을 불러온다. 또한 16년도 pbr이 0.2 이상인 종목들 중 하위 1위에서 112등까지 종목들을 불러온다. 시가총액과 pbr에서 불러온 종목들을 join해서 공통으로 들어 있는 종목들을 찾는다. 그 종목들 중에 kosdaq_profit_6에 있는 종목들의 17년 1월 1일부터 18년 1월 1일까지 종목들을 불러온다. 아래는 최하위 종목들만 불러온다. 만약 최상위로 바꾸고 싶다면 간단히 "order by 분기 " 바로 뒤에 desc를 넣으면 된다. desc가 없으면 오름차순으로 불러오고 desc가 있다면 내림차순으로 데이터를 불러오기 때문이다.

select kos.Code, kos.Date, kos.Profit from 
(select Code, Date, Profit from kosdaq_profit_6 where Date>'2017-01-01' and Date<'2018-01-01') 
as kos join 
(select tmv.Code from (select Code from tmvalue where 17_1Q != 0 order by 17_1Q limit 0 , 115)
 as tmv join 
(select Code  from pbr where 16_3Q >=0.2 order by 16_3Q limit 0 , 112) as pb on tmv.Code = pb.Code) 
as res on kos.Code=res.Code;


다른 sql 문들도 위와 마찬가지이다. 아래 sql 생성 함수를 보자. 생성함수의 로직이 비슷해서 모든 메서드는 위에서 참조하길 바란다. 아래는 위 sql문을 생성하는 메서드이다. req_stock_index_count로 특정 지표의 종목 수를 불러온다. 결과를 divide로 나눠 n개의 종목을 선택하게 한다. 그리고  해당 값으로 sql문을 생성한다. 

    async def __get_tmv_and_pbr_sql(self,market, quarter):
        tmv_count = await self.req_stock_index_count(self.__get_tmv_count_sql(market, quarter))
        pbr_count = await self.req_stock_index_count(self.__get_pbr_count_sql(market, quarter))

        tmv_offset = str(int(tmv_count/self.divide))
        pbr_offset = str(int(pbr_count/self.divide))

        sql = "(select tmv.Code from (select Code from tmvalue where " + quarter["_1qt"] + " != 0 order by " \
              + quarter["_1qt"] + " limit 0 , " + tmv_offset + ")" \
              " as tmv join (select Code  from pbr where " + quarter["_3qt"] + " >=0.2 order by " \
              + quarter["_3qt"] + " limit 0 , " + pbr_offset + ") " \
              "as pb on tmv.Code = pb.Code) as res on kos.Code=res.Code;"
        return sql


마지막으로 볼 함수는 코스피와 코스닥 지수 종가 데이터를 불러온다. 이는 'buy and hold' 수익율을 구하기 위해 사용한다. 

    async def req_kospi_kosdaq_data(self, start, end):
        log.info("Selecting kospi/kosdaq data")

        kospi_sql = "select Date, Close from kospi where Date<\'"+end+"\' and Date >=\'"+start+"\';"
        kosdaq_sql = "select Date, Close from kosdaq where Date<\'"+end+"\' and Date >=\'"+start+"\';"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(kospi_sql)
                    rows = await cur.fetchall()
                    kospi = pd.DataFrame.from_records(list(rows))
                    kospi.columns = ['Date', 'Close']
                    kospi = kospi.set_index('Date')


                    await cur.execute(kosdaq_sql)
                    rows = await cur.fetchall()
                    kosdaq = pd.DataFrame.from_records(list(rows))
                    kosdaq.columns = ['Date', 'Close']
                    kosdaq = kosdaq.set_index('Date')
        except aio.Error as e:
            log.warning("Selecting backtesting data Error : {}".format(repr(e)))
            raise

        return [kospi, kosdaq]


이것으로 DB 관련 코드에 대한 설명을 마쳤다. 다음 포스트에서는 DB 관련 코드를 이용해 실제 수익율을 계산해 볼 것이다. 수익율 계산 부분은 "각 지표 순위별 수익률 백테스팅 프로그래밍 "을 봤다고 전제하에 설명할 것이다. 해당 포스트를 이해하면 다음 포스트에서 볼 내용은 바로 이해할 수 있기 때문이다. 참고로 다음 포스트 내용이 더 쉽다. 




참고로 필자는 컴퓨터 공학과를 재학 중인 대학생입니다. 따라서 코드가 완벽할 수 없습니다. 알고리즘이나 코드가 비효율적이거나 오류가 있다면 댓글 달아주세요..

+ Recent posts