< 4. 실 거래 후 보완된 ETF 변동성 돌파 전략 - 최적 Scope 계산>


1. 개요

2. 데이터 수집

3. 개별 종목 수익율 계산


지난 포스트까지 모든 종목의 일별 변동성 돌파 전략 수익율을 계산했다. 이번 포스트는 최적 Scope를 계산해 이에 해당하는 수익율을 저장할 것이다. 우리는 Scope를 0.4, 0.8, 1.2로 해서 각 종목 일별 수익율을 구했다. 이 데이터를 이용해서 지난 10일, 20일, 40일, 60일, 120일동안 가장 높은 수익율을 준 Scope 값을 찾아서 이에 해당하는 수익율 데이터를 구할 것이다. 
예로 지난 10일동안 매매동안 Kodex 200 종목이 Scope가 0.8일때 변동성 돌파 전략의 수익율이 좋았다고 하면 오늘은 Scope를 0.8로 적용해 변동성 돌파 전략을 수행한다. 
참고로 2011년 1월 1일이후 데이터만 사용할 예정이다. 그 이전에는 ETF 종목 개수가 30개도 안 되서 충분한 데이터가 아니다고 판단해 2007년부터 2010년 데이터는 사용하지 않는다. 
"1.개요"와 "2.데이터 수집' 포스트에서 언급한 거래량 문제를 여기서 해결한다. 수익율 데이터를 저장할 때 "2.데이터 수집"에서 구했던 "is_trade" 값(전날 거래대금 또는 이전 3일 평균 거래대금이 1억이 넘는 지) 이 true인지 확인한다. 
 
이러한 정보를 저장할 테이블 이름과 스키마는 아래와 같다.


아래 그림은 'profit_date_10' 테이블에 저장된 일부 정보이다. '069500' Kodex 200은 2011년 1월 17일 기준 이전 10일동안 가장 많은 수익율을 준 Scope가 0.8인 걸 알 수 있고 0.8을 적용해 당일 변동성 돌파 전략을 수행했을 때 1, 즉 0퍼의 수익율을 보인다.(변동성 돌파 전략의 목표 매수가까지 도달하지 않았기 때문에 0퍼) 2011년 1월 19일은 scope가 0.8로 적용됬고 0.0019퍼의 손실을 봤다. 1월 20일 이후부터는 이전 10일 최대 수익율을 주는 Scope가 1.2이다.


  


위에서 보인 데이터를 모든 종목에 대해 2011년 1월 이후부터 최근 10일, 20일, 40일, 60일, 120일 최적 Scope를 구해 변동성 돌파 전략을 적용할 것이다.


이전 포스트까지는 데이터 프레임을 이용해 전체 데이터를 한번에 벡터 연산을 했기에 실행 시간이 짧았다. 이번에는 각 종목 별로 데이터를 계산하기 때문에 실행 시간이 길어진다. 따라서 실행 시간을 최소화 하기 위해 비동기와 멀티 프로세싱 방식을 이용한다. 


먼저 DB 관련 전체 코드이다. 코드 아래 이에 대한 설명이 있다.

import aiomysql as aio
import logging as log
import pymysql
pymysql.install_as_MySQLdb()
import pandas as pd


class StockDB():
    async def init(self,loop, date):
        log.info("Connection to Connection Pool")
        try:
            self.__pool = await aio.create_pool(host='127.0.0.1', port=3306, user='root', password='qhdks12#$', db='etf1',loop=loop, maxsize=64)
            self.date = date
        except:
            log.warning("Connecting Pool Error {}".format(repr(0)))
            raise

    # 이전 거래 대금에 따른 거래 가능 유무 데이터를 가져온다.
    async def req_is_trade_data(self,code ):
        log.info("Selecting {} is_trade data".format(code))

        sql = "select Date,is_trade from etf_market where Code = '"+ code + "' and Date >= '"+self.date+"'"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    rows = await cur.fetchall()
                    if rows == ():
                        return pd.DataFrame()
                    result = pd.DataFrame.from_records(list(rows))
                    result.columns = ['Date','is_trade']
                    result = result.set_index('Date').astype('float')
        except aio.Error as e:
            log.warning("Selecting {} is_trade data Aiomysql  Error : {}".format(code, repr(e)))
            raise
        except Exception as e:
            log.warning("Selecting {} is_trade data Error : {}".format(code, repr(e)))
            raise

        return result

    # scope, 오버나잇 유무에 따른 데이터를 가져온다.
    async def req_profit_sope_data(self,code, scope, over ):
        log.info("Selecting {} profit_scope_{} data".format(code, scope))
        log.info("is overnight : {}".format(over))

        if over==True:
            sql = "select Code, Date, Profit from profit_scope_"+scope+"_over where Code = '"+ code + "' and Date >= '"+self.date+"'"
        else:
            sql = "select Code, Date, Profit from profit_scope_" + scope + " where Code = '" + code + "' and Date >= '" +self.date + "'"

        print(sql)
        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    rows = await cur.fetchall()
                    if rows == ():
                        return pd.DataFrame()
                    result = pd.DataFrame.from_records(list(rows))
                    result.columns = ['Code', 'Date','Profit']
                    result = result.set_index('Date').astype('float')
        except aio.Error as e:
            log.warning("Selecting {} profit_scope_{} data Aiomysql  Error : {}".format(code, scope, repr(e)))
            raise
        except Exception as e:
            log.warning("Selecting {} profit_scope_{} data  Error : {}".format(code, scope, repr(e)))
            raise

        return result

    # 최적 Scope로 계산한 수익율 데이터를 insert
    async def insert_backtest_profit(self,data, during, over):
        log.info("Insert data {}".format(data.iloc[0]['Code']))
        log.info("During : {} , Over_night : {}".format(during, over))

        if over == True:
            sql = "insert into profit_date_" + during + "_over (Scope, Date, Profit, Code) values(%s,%s,%s,%s)"
        else:
            sql = "insert into profit_date_" + during + "(Scope, Date, Profit, Code) values(%s,%s,%s,%s)"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.executemany(sql, data.values.tolist())
                    await conn.commit()
        except Exception as e:
            print(data)
            log.warning("Insert Error : {} {}".format(data['Code'][0],repr(e)))


    # 주식 종목 전체 코드를 가져온다.
    async def req_code_list(self):
        log.info("Selecting code list")

        sql = "select DISTINCT Code from etf_market;"

        try:
            async with self.__pool.acquire() as conn:
                async with conn.cursor() as cur:
                    await cur.execute(sql)
                    rows = await cur.fetchall()
        except aio.Error as e:
            log.warning("Selecting code list Aiomysql Error : {}".format(repr(e)))

            raise
        except Exception as e:
            log.warning("Selecting code list Error".format(repr(e)))
            raise
        print(rows)
        return list(rows)

"req_is_trade_data()" 함수는 "Code" 인자에 해당하는 종목의 일별 거래 가능 유무 데이터를 가져온다. 이 데이터는 "2. 데이터 수집" 포스트에서 구했다. 해당 데이터가 true 이면 전날 거래 대금 또는 이전 3일 평균 거래 대금이 1억 이상이다는 뜻이고 이를 기반으로 당일날 해당 종목을 거래할 지 판별한다. true이면 당일날 거래할 수 있는 종목으로 분류한다.


"req_profit_sope_data()" 함수는 "3. 개별 종목 수익율 계산" 포스트에서 계산한 "Code" 인자에 해당하는 개별 종목 수익율 데이터를 불러온다. scope와 over을 통해 "3. 개별 종목 수익율 계산" 포스트에서 설명한 특정 테이블에 접근한다.


"insert_backtest_profit()" 함수는 이번에 계산할 거래 가능한 종목들 대상 최적 Scope 적용 종목별 일별 수익율을 계산해 저장한다. 저장한 결과는 위에서 설명한 사진과 같이 된다. 


"req_code_list()" 함수는 수집한 주식 종목 코드를 가져온다. 



다음 코드는 최적 Scope로 계산한 수익율을 구하는 코드이다. 먼저 전체 코드를 보자.

import pandas as pd
import DB as db
import asyncio as asy
import logging as log
import sys
from multiprocessing import Pool

log.basicConfig(stream=sys.stdout, level=log.DEBUG)

date = "2011-01-01"
over_nights = [True,False]
durings = [10,20,40,60,120]
dB = db.StockDB()

async def cal_scope(code, during, over_night):

    is_trade = await dB.req_is_trade_data(code)

    data_scope_4 = await dB.req_profit_sope_data(code, "04", over_night)
    data_scope_8 = await dB.req_profit_sope_data(code, "08", over_night)
    data_scope_12 = await dB.req_profit_sope_data(code, "12", over_night)

    profit_data = pd.DataFrame({'04':data_scope_4['Profit'], '08':data_scope_8['Profit'], '12':data_scope_12['Profit']})

    # during 평균 곱을 구한다.
    cum_profit_data = profit_data.rolling(window=during).apply(lambda  x: x.prod())
    # 곱이 가장 큰(수익율이 좋은) scope 값을 구한다.
    max_scope = cum_profit_data.idxmax(axis=1)
    # shift 해서 다음날 구한 scope를 적용하게 한다.
    max_scope = max_scope.shift(1)
    max_scope.name='Scope'

    # 최적 scope와 join
    profit_data = pd.merge(profit_data, pd.DataFrame(max_scope), how='outer',left_index=True, right_index=True)

    # 거래량에 따른 거래 가능 여부 join
    profit_data = pd.merge(profit_data, is_trade, how='outer',left_index=True, right_index=True)
    # 20일 평균이라면 초기 19일은 scope가 NA임
    profit_data = profit_data.dropna()
    # 거래량에 따른 거래 가능 데이터만 추출
    profit_data = profit_data[profit_data['is_trade']==True]
    profit_data = profit_data.drop('is_trade',1)

    profit_data.index = profit_data.index.set_names(['Date'])
    profit_data = profit_data.reset_index()

    # melt를 해서 최적 scope와 같은 scope 값을 가지는 수익율을 구한다.
    profit_data = pd.melt(profit_data, id_vars=['Scope','Date'])
    profit_data = profit_data[profit_data['Scope'] == profit_data['variable']]
    profit_data = profit_data.drop('variable',1)


    profit_data['Scope'] = profit_data['Scope'].astype(float)/10
    profit_data['Code'] = code

    if len(profit_data) == 0:
        return
    await dB.insert_backtest_profit(profit_data, str(during), over_night)


async def main_function(loop, index):

    await dB.init(loop, date)
    code_list = await dB.req_code_list()

    divide = int(len(code_list)/4)+1
    code_list = code_list[index*divide:(index+1)*divide]

    for over_night in over_nights:
        for during in durings:
            futures = [asy.ensure_future(cal_scope(code[0], during, over_night)) for code in code_list]
            await asy.gather(*futures)

def process_thread( index):
    loop = asy.get_event_loop()
    loop.run_until_complete(main_function(loop, index))

if __name__ == '__main__':
    ranges = [0,1,2,3]
    pool = Pool(processes=4)
    pool.map(process_thread, ranges)

"cal_scope()"가 최적 Scope를 이용해 수익율을 계산하고 DB에 저장하는 메서드이다.  
    is_trade = await dB.req_is_trade_data(code)

    data_scope_4 = await dB.req_profit_sope_data(code, "04", over_night)
    data_scope_8 = await dB.req_profit_sope_data(code, "08", over_night)
    data_scope_12 = await dB.req_profit_sope_data(code, "12", over_night)

    profit_data = pd.DataFrame({'04':data_scope_4['Profit'], '08':data_scope_8['Profit'], '12':data_scope_12['Profit']})
인자로 받은 'Code' 종목의 거래 가능 유무 데이터와 오버나잇 유무에 따른 0.4, 0.8, 1.2 Scope를 적용한 일별 수익율 데이터를 DB에서 가져온 후 "profit_data" 변수에 0.4, 0.8, 1.2 수익율 데이터를 저장한다. 
 
    # during 평균 곱을 구한다.
    cum_profit_data = profit_data.rolling(window=during).apply(lambda  x: x.prod())
    # 곱이 가장 큰(수익율이 좋은) scope 값을 구한다.
    max_scope = cum_profit_data.idxmax(axis=1)
    # shift 해서 다음날 구한 scope를 적용하게 한다.
    max_scope = max_scope.shift(1)
    max_scope.name='Scope'

    # 최적 scope와 join
    profit_data = pd.merge(profit_data, pd.DataFrame(max_scope), how='outer',left_index=True, right_index=True)

"during" 인자에 따른 누적곱을 먼저 구한다. "during"이 "10"이라면 10일동안 수익율을 곱해 10일간 누적 수익율을 구한다. 그 다음 "idxmax()" 메서드를 사용해 일별로 누적곱의 최대 값을 가지는 칼럼명을 구한다. 즉 2018년 4월 4일의 10일 누적 수익율이 가장 큰 Scope 값이 0.8이라면 "08"이 계산된다. 그리고 shift() 메서드를 이용해 계산된 칼럼명을 아래 행으로 움직인다. 즉 "08"이 2018년 4월 5일 데이터 행으로 옮겨진다. 그래야 "08" 데이터를 2018년  4월 5일의 변동성 돌파 전략 Scope로 사용 가능하다. 마지막으로 merge를 이용해 계산된 칼럼명을 실제 수익율 데이터와 join 한다. 즉, "08" 데이터를 2018년 4월 5일 데이터와 결합시킨다. 

 

    # 거래량에 따른 거래 가능 여부 join
    profit_data = pd.merge(profit_data, is_trade, how='outer',left_index=True, right_index=True)
    # 20일 평균이라면 초기 19일은 scope가 NA임
    profit_data = profit_data.dropna()
    # 거래량에 따른 거래 가능 데이터만 추출
    profit_data = profit_data[profit_data['is_trade']==True]
    profit_data = profit_data.drop('is_trade',1)

"profit_data" 데이터와 거래 가능 유무 데이터를 merge 한다. 'is_trade' 값이 'false'인 것은 drop 한다. 'true'인 데이터만 거래할 수 있기 때문이다. 


    profit_data.index = profit_data.index.set_names(['Date'])
    profit_data = profit_data.reset_index()

    # melt를 해서 최적 scope와 같은 scope 값을 가지는 수익율을 구한다.
    profit_data = pd.melt(profit_data, id_vars=['Scope','Date'])
    profit_data = profit_data[profit_data['Scope'] == profit_data['variable']]
    profit_data = profit_data.drop('variable',1)


    profit_data['Scope'] = profit_data['Scope'].astype(float)/10
    profit_data['Code'] = code

    if len(profit_data) == 0:
        return
    await dB.insert_backtest_profit(profit_data, str(during), over_night)

위 코드는 이해를 위해 별도의 예를 준비했다. 


위 예제를 이해하면 위 코드도 바로 이해할 수 있을 것이다. 위 예제에서 최종적으로 각 행별로 "04", "08" 칼럼 중 큰 값을 추출한다. 위 예제에 제시한 로직이 위 코드와 같다. 따라서 최적 Scope 에 해당하는 수익율을 구할 수 있고 이를 DB에 저장한다. 


나머지 코드는 비동기와 멀티 프로세싱 방식에 관한 부분이므로 생략한다.  

+ Recent posts