< 4. 실 거래 후 보완된 ETF 변동성 돌파 전략 - 최적 Scope 계산>
아래 그림은 'profit_date_10' 테이블에 저장된 일부 정보이다. '069500' Kodex 200은 2011년 1월 17일 기준 이전 10일동안 가장 많은 수익율을 준 Scope가 0.8인 걸 알 수 있고 0.8을 적용해 당일 변동성 돌파 전략을 수행했을 때 1, 즉 0퍼의 수익율을 보인다.(변동성 돌파 전략의 목표 매수가까지 도달하지 않았기 때문에 0퍼) 2011년 1월 19일은 scope가 0.8로 적용됬고 0.0019퍼의 손실을 봤다. 1월 20일 이후부터는 이전 10일 최대 수익율을 주는 Scope가 1.2이다.
위에서 보인 데이터를 모든 종목에 대해 2011년 1월 이후부터 최근 10일, 20일, 40일, 60일, 120일 최적 Scope를 구해 변동성 돌파 전략을 적용할 것이다.
이전 포스트까지는 데이터 프레임을 이용해 전체 데이터를 한번에 벡터 연산을 했기에 실행 시간이 짧았다. 이번에는 각 종목 별로 데이터를 계산하기 때문에 실행 시간이 길어진다. 따라서 실행 시간을 최소화 하기 위해 비동기와 멀티 프로세싱 방식을 이용한다.
먼저 DB 관련 전체 코드이다. 코드 아래 이에 대한 설명이 있다.
import aiomysql as aio import logging as log import pymysql pymysql.install_as_MySQLdb() import pandas as pd class StockDB(): async def init(self,loop, date): log.info("Connection to Connection Pool") try: self.__pool = await aio.create_pool(host='127.0.0.1', port=3306, user='root', password='qhdks12#$', db='etf1',loop=loop, maxsize=64) self.date = date except: log.warning("Connecting Pool Error {}".format(repr(0))) raise # 이전 거래 대금에 따른 거래 가능 유무 데이터를 가져온다. async def req_is_trade_data(self,code ): log.info("Selecting {} is_trade data".format(code)) sql = "select Date,is_trade from etf_market where Code = '"+ code + "' and Date >= '"+self.date+"'" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) rows = await cur.fetchall() if rows == (): return pd.DataFrame() result = pd.DataFrame.from_records(list(rows)) result.columns = ['Date','is_trade'] result = result.set_index('Date').astype('float') except aio.Error as e: log.warning("Selecting {} is_trade data Aiomysql Error : {}".format(code, repr(e))) raise except Exception as e: log.warning("Selecting {} is_trade data Error : {}".format(code, repr(e))) raise return result # scope, 오버나잇 유무에 따른 데이터를 가져온다. async def req_profit_sope_data(self,code, scope, over ): log.info("Selecting {} profit_scope_{} data".format(code, scope)) log.info("is overnight : {}".format(over)) if over==True: sql = "select Code, Date, Profit from profit_scope_"+scope+"_over where Code = '"+ code + "' and Date >= '"+self.date+"'" else: sql = "select Code, Date, Profit from profit_scope_" + scope + " where Code = '" + code + "' and Date >= '" +self.date + "'" print(sql) try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) rows = await cur.fetchall() if rows == (): return pd.DataFrame() result = pd.DataFrame.from_records(list(rows)) result.columns = ['Code', 'Date','Profit'] result = result.set_index('Date').astype('float') except aio.Error as e: log.warning("Selecting {} profit_scope_{} data Aiomysql Error : {}".format(code, scope, repr(e))) raise except Exception as e: log.warning("Selecting {} profit_scope_{} data Error : {}".format(code, scope, repr(e))) raise return result # 최적 Scope로 계산한 수익율 데이터를 insert async def insert_backtest_profit(self,data, during, over): log.info("Insert data {}".format(data.iloc[0]['Code'])) log.info("During : {} , Over_night : {}".format(during, over)) if over == True: sql = "insert into profit_date_" + during + "_over (Scope, Date, Profit, Code) values(%s,%s,%s,%s)" else: sql = "insert into profit_date_" + during + "(Scope, Date, Profit, Code) values(%s,%s,%s,%s)" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.executemany(sql, data.values.tolist()) await conn.commit() except Exception as e: print(data) log.warning("Insert Error : {} {}".format(data['Code'][0],repr(e))) # 주식 종목 전체 코드를 가져온다. async def req_code_list(self): log.info("Selecting code list") sql = "select DISTINCT Code from etf_market;" try: async with self.__pool.acquire() as conn: async with conn.cursor() as cur: await cur.execute(sql) rows = await cur.fetchall() except aio.Error as e: log.warning("Selecting code list Aiomysql Error : {}".format(repr(e))) raise except Exception as e: log.warning("Selecting code list Error".format(repr(e))) raise print(rows) return list(rows)
"req_is_trade_data()" 함수는 "Code" 인자에 해당하는 종목의 일별 거래 가능 유무 데이터를 가져온다. 이 데이터는 "2. 데이터 수집" 포스트에서 구했다. 해당 데이터가 true 이면 전날 거래 대금 또는 이전 3일 평균 거래 대금이 1억 이상이다는 뜻이고 이를 기반으로 당일날 해당 종목을 거래할 지 판별한다. true이면 당일날 거래할 수 있는 종목으로 분류한다.
"req_profit_sope_data()" 함수는 "3. 개별 종목 수익율 계산" 포스트에서 계산한 "Code" 인자에 해당하는 개별 종목 수익율 데이터를 불러온다. scope와 over을 통해 "3. 개별 종목 수익율 계산" 포스트에서 설명한 특정 테이블에 접근한다.
"insert_backtest_profit()" 함수는 이번에 계산할 거래 가능한 종목들 대상 최적 Scope 적용 종목별 일별 수익율을 계산해 저장한다. 저장한 결과는 위에서 설명한 사진과 같이 된다.
"req_code_list()" 함수는 수집한 주식 종목 코드를 가져온다.
다음 코드는 최적 Scope로 계산한 수익율을 구하는 코드이다. 먼저 전체 코드를 보자.
import pandas as pd import DB as db import asyncio as asy import logging as log import sys from multiprocessing import Pool log.basicConfig(stream=sys.stdout, level=log.DEBUG) date = "2011-01-01" over_nights = [True,False] durings = [10,20,40,60,120] dB = db.StockDB() async def cal_scope(code, during, over_night): is_trade = await dB.req_is_trade_data(code) data_scope_4 = await dB.req_profit_sope_data(code, "04", over_night) data_scope_8 = await dB.req_profit_sope_data(code, "08", over_night) data_scope_12 = await dB.req_profit_sope_data(code, "12", over_night) profit_data = pd.DataFrame({'04':data_scope_4['Profit'], '08':data_scope_8['Profit'], '12':data_scope_12['Profit']}) # during 평균 곱을 구한다. cum_profit_data = profit_data.rolling(window=during).apply(lambda x: x.prod()) # 곱이 가장 큰(수익율이 좋은) scope 값을 구한다. max_scope = cum_profit_data.idxmax(axis=1) # shift 해서 다음날 구한 scope를 적용하게 한다. max_scope = max_scope.shift(1) max_scope.name='Scope' # 최적 scope와 join profit_data = pd.merge(profit_data, pd.DataFrame(max_scope), how='outer',left_index=True, right_index=True) # 거래량에 따른 거래 가능 여부 join profit_data = pd.merge(profit_data, is_trade, how='outer',left_index=True, right_index=True) # 20일 평균이라면 초기 19일은 scope가 NA임 profit_data = profit_data.dropna() # 거래량에 따른 거래 가능 데이터만 추출 profit_data = profit_data[profit_data['is_trade']==True] profit_data = profit_data.drop('is_trade',1) profit_data.index = profit_data.index.set_names(['Date']) profit_data = profit_data.reset_index() # melt를 해서 최적 scope와 같은 scope 값을 가지는 수익율을 구한다. profit_data = pd.melt(profit_data, id_vars=['Scope','Date']) profit_data = profit_data[profit_data['Scope'] == profit_data['variable']] profit_data = profit_data.drop('variable',1) profit_data['Scope'] = profit_data['Scope'].astype(float)/10 profit_data['Code'] = code if len(profit_data) == 0: return await dB.insert_backtest_profit(profit_data, str(during), over_night) async def main_function(loop, index): await dB.init(loop, date) code_list = await dB.req_code_list() divide = int(len(code_list)/4)+1 code_list = code_list[index*divide:(index+1)*divide] for over_night in over_nights: for during in durings: futures = [asy.ensure_future(cal_scope(code[0], during, over_night)) for code in code_list] await asy.gather(*futures) def process_thread( index): loop = asy.get_event_loop() loop.run_until_complete(main_function(loop, index)) if __name__ == '__main__': ranges = [0,1,2,3] pool = Pool(processes=4) pool.map(process_thread, ranges)
is_trade = await dB.req_is_trade_data(code) data_scope_4 = await dB.req_profit_sope_data(code, "04", over_night) data_scope_8 = await dB.req_profit_sope_data(code, "08", over_night) data_scope_12 = await dB.req_profit_sope_data(code, "12", over_night) profit_data = pd.DataFrame({'04':data_scope_4['Profit'], '08':data_scope_8['Profit'], '12':data_scope_12['Profit']})
# during 평균 곱을 구한다. cum_profit_data = profit_data.rolling(window=during).apply(lambda x: x.prod()) # 곱이 가장 큰(수익율이 좋은) scope 값을 구한다. max_scope = cum_profit_data.idxmax(axis=1) # shift 해서 다음날 구한 scope를 적용하게 한다. max_scope = max_scope.shift(1) max_scope.name='Scope' # 최적 scope와 join profit_data = pd.merge(profit_data, pd.DataFrame(max_scope), how='outer',left_index=True, right_index=True)
"during" 인자에 따른 누적곱을 먼저 구한다. "during"이 "10"이라면 10일동안 수익율을 곱해 10일간 누적 수익율을 구한다. 그 다음 "idxmax()" 메서드를 사용해 일별로 누적곱의 최대 값을 가지는 칼럼명을 구한다. 즉 2018년 4월 4일의 10일 누적 수익율이 가장 큰 Scope 값이 0.8이라면 "08"이 계산된다. 그리고 shift() 메서드를 이용해 계산된 칼럼명을 아래 행으로 움직인다. 즉 "08"이 2018년 4월 5일 데이터 행으로 옮겨진다. 그래야 "08" 데이터를 2018년 4월 5일의 변동성 돌파 전략 Scope로 사용 가능하다. 마지막으로 merge를 이용해 계산된 칼럼명을 실제 수익율 데이터와 join 한다. 즉, "08" 데이터를 2018년 4월 5일 데이터와 결합시킨다.
# 거래량에 따른 거래 가능 여부 join profit_data = pd.merge(profit_data, is_trade, how='outer',left_index=True, right_index=True) # 20일 평균이라면 초기 19일은 scope가 NA임 profit_data = profit_data.dropna() # 거래량에 따른 거래 가능 데이터만 추출 profit_data = profit_data[profit_data['is_trade']==True] profit_data = profit_data.drop('is_trade',1)
"profit_data" 데이터와 거래 가능 유무 데이터를 merge 한다. 'is_trade' 값이 'false'인 것은 drop 한다. 'true'인 데이터만 거래할 수 있기 때문이다.
profit_data.index = profit_data.index.set_names(['Date']) profit_data = profit_data.reset_index() # melt를 해서 최적 scope와 같은 scope 값을 가지는 수익율을 구한다. profit_data = pd.melt(profit_data, id_vars=['Scope','Date']) profit_data = profit_data[profit_data['Scope'] == profit_data['variable']] profit_data = profit_data.drop('variable',1) profit_data['Scope'] = profit_data['Scope'].astype(float)/10 profit_data['Code'] = code if len(profit_data) == 0: return await dB.insert_backtest_profit(profit_data, str(during), over_night)
위 코드는 이해를 위해 별도의 예를 준비했다.
위 예제를 이해하면 위 코드도 바로 이해할 수 있을 것이다. 위 예제에서 최종적으로 각 행별로 "04", "08" 칼럼 중 큰 값을 추출한다. 위 예제에 제시한 로직이 위 코드와 같다. 따라서 최적 Scope 에 해당하는 수익율을 구할 수 있고 이를 DB에 저장한다.
나머지 코드는 비동기와 멀티 프로세싱 방식에 관한 부분이므로 생략한다.
'주식 프로그래밍(시스템 트레이딩)' 카테고리의 다른 글
6. 실 거래 후 보완된 ETF 변동성 돌파 전략(파이썬) (18) | 2018.04.05 |
---|---|
5. 실 거래 후 보완된 ETF 변동성 돌파 전략(파이썬) (0) | 2018.04.05 |
3. 실 거래 후 보완된 ETF 변동성 돌파 전략(파이썬) (0) | 2018.04.04 |
2. 실 거래 후 보완된 ETF 변동성 돌파 전략(파이썬) (1) | 2018.04.04 |
1. 실 거래 후 보완된 ETF 변동성 돌파 전략 (0) | 2018.04.04 |