<파이썬을 이용한 주식 변동성 돌파 전략 분석을 위한 최적화 변수 계산(2)>

부제 : 멀티 프로세싱과 비동기 프로그래밍을 이용한 파이썬 코드




이전 포스트 "파이썬을 이용한 주식 변동성 돌파 전략 분석을 위한 최적화 변수 계산(1)" 에 이어서 아직 설명하지 못한 Utility.py와 MainFunctions.py 에 대해 살펴보자. 참고로 코드에는 비동기와 멀티프로세싱 방식을 사용했으며 이에 대한 자세한 설명은 하지 않겠다. 궁금한 점은 댓글로 남겨주면 바로 답해주겠다.


먼저 Utility.py를 살펴보자. Utility.py는 이름 그대로 일반적인 기능을 가진 간단한 함수들로 구성된다. "str_to_date()" 메서드는 str 객체를 date 객체로 바꿔준다. "get_next_year_date()" 메서드는 인자로 받은 날짜의 다음 년도 1월 1일 객체를 반환한다. "get_date_range()" 메서드는 during을 기준으로 start일부터 end일까지의 날을 나눈 zip 타입 데이터를 반환한다. 이전 포스트와 전전 포스트인 "퀀트 데이터 수집" 포스트를 읽었다고 가정하에 예를 들어보겠다. 우리의 최종 목적은 최적화 변수 계산이다. 최적화 변수는 과거 데이터를 기반으로 계산된다. 2개월 단위로 계산한다면 현재 시점에서 두 달동안의 과거 데이터를 토대로 최적화 변수를 계산하고 해당 변수를 현재 시점에서 미래 두 달동안의 매수/매도를 위한 최적화 변수로 활용된다. 즉 특정 "during" 개월 단위로 계산하다면 현재 시점에서 "during" 개월 전 날짜부터 현재까지의 데이터와 현재부터 "during" 개월 후까지의 데이터가 있어야 한다. 따라서 현재 날짜, "during" 개월 전 날짜, "during" 개월 후 날짜를 계산하기 위해 해당 메서드를 만들었다. 아래 사진에 해당 메서드 실행 결과가 있다. 

import datetime from dateutil.relativedelta import relativedelta import numpy as np KOSPI = "kospi" KOSDAQ = "kosdaq" DEFAULT_SCOPE = 0.4 def str_to_date(str): return datetime.datetime.strptime(str, '%Y-%m-%d').date() def get_date_range(start,end, during): if(start>=end): raise ValueError now_dates = np.arange(np.datetime64(start), np.datetime64(end), np.timedelta64(during, 'M'), dtype='datetime64[M]').astype('datetime64[D]').tolist() pre_dates = list(map(lambda date: str(date - relativedelta(months=during)), now_dates)) next_dates = list(map(lambda date: str(date + relativedelta(months=during)), now_dates)) return zip(pre_dates,list(map(str,now_dates)), next_dates) def get_next_year_date(str_date): date = str_to_date(str_date) next_date = datetime.date(date.year+1,1,1) return str(next_date)


print(list(get_date_range("2007-01-01","2009-01-01",4)))




다음은 MainFunctions.py에 대해 살펴볼 것이다. MainFuctions.py는 최종적으로 최적화 변수를 계산한다. 계산하는 데 있어 성능 향상을 위해 비동기와 멀티 프로세싱 방식을 이용한다. 전체 코드는 다음과 같고 각 메서드에 대해 알아보자.

import StockDB as db import pandas as pd import asyncio as asy import sys import logging as log import Utility from functools import reduce from multiprocessing import Pool log.basicConfig(stream=sys.stdout, level=log.DEBUG) class MainFunctions(): def __init__(self, dB,market): self.scopes = (0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4) self.market = market self.durings = (3,6,12, 24) self.dB = dB async def db_init(self, loop): self.dB = db.StockDB() await self.dB.init_pool(loop) def req_cal_profit(self, data, scope): log.info("Calculating daily profit") try: data['Sub'] = (data['High'] - data['Low']).shift(1) * scope data['Criteria'] = data['Sub'] + data['Open'] data['buy'] = (data['High'] >= data['Criteria']) & data['Sub'] > 0 data['Next_Open'] = data['Open'].shift(-1) data = data[data['buy']] data = data.dropna(axis=0) data['Profit'] = round(data['Next_Open'] / data['Criteria'] - 1, 4) profit = pd.DataFrame({scope: data['Profit']}) profit.index.name = 'Date' except Exception as e: log.warning("Calculating daily profit Error : {}".format(repr(e))) raise return profit def req_max_profit_scope(self, data): log.info("Requesting max profit scope") try: cul_profit = (data + 1).cumprod() scope = cul_profit.iloc[-1].idxmax() except Exception as e: log.warning("Requesting max profit scope Error : {}".format(repr(e))) raise return float(scope) async def cal_profit(self, code,prev, now, next, during): log.info("Calculating and Inserting {} ~ {} {} profit".format(now, next, code)) try: stock_data = await self.dB.req_stock_daily_data(self.market, code, prev, next) test_data = stock_data[stock_data.index.values < Utility.str_to_date(now)] data = stock_data[stock_data.index.values >= Utility.str_to_date(now)] test_profit_list = [self.req_cal_profit(test_data, scope) for scope in self.scopes] test_profits = reduce(lambda df1, df2: pd.merge(df1, df2, how='outer', left_index=True, right_index=True), test_profit_list).fillna(0) if test_profits.empty == False: optimum_scope = self.req_max_profit_scope(test_profits) else: optimum_scope = Utility.DEFAULT_SCOPE profit = self.req_cal_profit(data, optimum_scope) profit['Scope'] = optimum_scope profit['Code'] = code await self.dB.insert_profit(self.market, str(during), profit) except Exception as e: log.info("Calculating and Inserting {} ~ {} {} profit Error : {}".format(now, next, code,repr(e))) log.info("Success {} ~ {} {} cal_profit function".format(now, next, code)) async def cal_code_data(self, code, start, end): log.info("Calculating and Inserting {} stock data".format(code)) try: db_start_date = await self.dB.req_min_date(self.market, code) if db_start_date > Utility.str_to_date(start): start_date = Utility.get_next_year_date(str(db_start_date)) else: start_date = start date_range_list = [Utility.get_date_range(start_date, end, during) for during in self.durings] for i, date_range in enumerate(date_range_list): futures = [asy.ensure_future(self.cal_profit(code, date[0], date[1], date[2], self.durings[i])) for date in date_range] await asy.gather(*futures) except ValueError as e: log.warning("date range Error : {} ~ {} data is not exist".format(start,end)) return except Exception as e: log.warning("Calculating and Inserting {} stock data Error : {}".format(code,repr(e))) log.info("****************Finished Calculating and Inserting {} stock data******************".format(code)) async def main_function(loop, index): dB = db.StockDB() await dB.init_pool(loop) code_list = await dB.req_code_list("kospi") divide = int(len(code_list)/4)+1 code_list = code_list[index * divide:(index + 1) * divide] functions = MainFunctions(dB, "kospi") for code in code_list: await functions.cal_code_data(code[0], '2008-01-01', '2018-01-01') code_list = await dB.req_code_list("kosdaq") divide = int(len(code_list)/4)+1 code_list = code_list[index * divide:(index + 1) *divide] functions = MainFunctions(dB, "kosdaq") for code in code_list: await functions.cal_code_data(code[0], '2008-01-01', '2018-01-01') def main(index): loop = asy.get_event_loop() loop.run_until_complete(main_function(loop,index)) if __name__ == '__main__': ranges = [0,1,2,3] pool = Pool(processes=4) pool.map(main,ranges)


다음은 초기화 코드이다. "scopes"는 최적화 변수로 대입할 값을 나타낸다. 변동성 돌파 전략에서 (고가 - 저가)에 곱할 변수 값들이다. 각 변수 값들에 대해 계산한 후 가장 높은 수익률을 기록한 변수를 최종 최적화 변수로 정한다. "market"은 계산할 코드들이 코스피 종목인지 코스닥 종목인지 나타낸다. "durings"는 최적화 변수를 계산할 기간이다. 위에서 설명한 "during"을 일컷는다. 2,3,4,6,12 개월 단위로 최적화 변수를 계산하고 이를 "kospi_profit_X", "kosdaq_profit_X" 테이블에 저장한다. X는 2,3,4,6,12 기간이다. "db_init" 메서드는 DB를 비동기적으로 접근하기 위해 초기화한다.    

class MainFunctions():
    def __init__(self, dB,market):
        self.scopes = (0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4)
        self.market = market
        self.durings = (2,3,4,6,12)
        self.dB = dB

    async def db_init(self, loop):
        self.dB = db.StockDB()
        await self.dB.init_pool(loop)

다음 코드는 입력 받은 코드에 대해서 모든 during 값을 기준으로 start일에서 end일까지 최적화 변수를 계산하는 코드이다. 만약 "cal_code_data("005930", "2009-01-01", "2011-01-01")을 호출했다면 삼성전자의 2,3,4,6,12 개월 기준 2009년 1월 1일부터 2011년 1월 1일까지의 최적화 변수를 구하고 DB에 저장한다.  
    async def cal_code_data(self, code, start, end):
        log.info("Calculating and Inserting {} stock data".format(code))

        try:
            db_start_date = await self.dB.req_min_date(self.market, code)
            print(type(db_start_date))
            if db_start_date > Utility.str_to_date(start):
                start_date = Utility.get_next_year_date(str(db_start_date))
            else:
                start_date = start

            date_range_list = [Utility.get_date_range(start_date, end, during) for during in self.durings]

            for i, date_range in enumerate(date_range_list):
                futures = [asy.ensure_future(self.cal_profit(code, date[0], date[1], date[2], self.durings[i])) for date
                           in date_range]
                await asy.gather(*futures)
        except ValueError as e:
            log.warning("date range Error : {} ~ {} data is not exist".format(start,end))
            return
        except Exception as e:
            log.warning("Calculating and Inserting {} stock data Error : {}".format(code,repr(e)))

        log.info("****************Finished Calculating and Inserting {} stock data******************".format(code))

먼저 최적화 변수를 계산할 일자 범위를 계산한다. 이 때 이전 포스트에서 설명했던 req_min_date() 함수를 사용한다. 일자 범위가 정해 졌다면 during 기준으로 date 범위 zip을 get_date_range() 메서드로 구한다. 그리고 각 date 범위 별로 self.cal_profit를 호출한다. cal_profit 메서드는 code에 대해 date[1]에서 date[2] 기간 동안 사용할 최적화 변수를 during 별로 계산하고 DB에 저장한다. asyncio.ensure_future() 메서드를 이용해 비동기로 작동하게 한다.  이 함수가 모두 완료되면 해당 코드에 대한 최적화 변수 계산 및 DB 저장이 끝난다. 


그러면 cal_profit 메서드에 대해 알아보자. 먼저 prev부터 next 일까지의 일봉데이터를 불러온다. 이를 prev~now와 now~next로 나눈다. 각 scope에 대해 req_cal_profit를 호출한다. 이 메서드는 인자로 받은 scope를 이용해 매수/매도 했을 때의 수익률 계산하고 리턴한다. 각 scope에 대한 수익률을 reduce() 함수와 merge()를 이용해 합친다. 이를 req_max_profit_scope()의 인자로 사용한다. req_max_profit_scope()는 각 scope()에 대한 수익률을 누적곱해 최종 수익률을 구한 후 가장 높은 수익률을 기록한  scope 값을 리턴한다. 마지막으로 해당 scope를 적용한 수익률 데이터를 db에 저장한다. 

참고로 해당 포스트를 쓰면서 코드를 리뷰해보니 아래 방식처럼 scope()에 대한 각 수익률을 계산해 조인한 후 최종 수익률을 구하는 것보다는 scope에 대한 수익률 계산 후 바로 최종 수익률을 계산해 리턴한 다음 최대값을 구하는 게 더 효율적인 것 같다. 일부로 각 수익률과 최종 수익률을 구하는 기능을 나눌려고 아래 방식으로 했는 데 효율성이 조금은 떨어져 보인다.   

    async def cal_profit(self, code,prev, now, next, during):
        log.info("Calculating and Inserting {} ~ {} {} profit".format(now, next, code))

        try:
            stock_data = await self.dB.req_stock_daily_data(self.market, code, prev, next)
            test_data = stock_data[stock_data.index.values < Utility.str_to_date(now)]
            data = stock_data[stock_data.index.values >= Utility.str_to_date(now)]

            test_profit_list = [self.req_cal_profit(test_data, scope) for scope in self.scopes]
            test_profits = reduce(lambda df1, df2: pd.merge(df1, df2, how='outer', left_index=True, right_index=True),
                                  test_profit_list).fillna(0)
            if test_profits.empty == False:
                optimum_scope = self.req_max_profit_scope(test_profits)
            else:
                optimum_scope = Utility.DEFAULT_SCOPE

            profit = self.req_cal_profit(data, optimum_scope)
            profit['Scope'] = optimum_scope
            profit['Code'] = code

            await self.dB.insert_profit(self.market, str(during), profit)

        except Exception as e:
            log.info("Calculating and Inserting {} ~ {} {} profit Error : {}".format(now, next, code,repr(e)))

        log.info("Success {} ~ {} {} cal_profit function".format(now, next, code))



다음으로 수익률을 계산하는 req_cal_profit()에 대해 알아보자. 수익률 계산은 매수 시점과 매도 시점을 결정하는 것과 같다. 변동성 돌파 전략의 매수 시점은 (전일 고가 - 전일 저가)*Scope+시가가 현재가 보다 같거나 클 때이고 매도 시점은 다음날 시가이다. 이를 코드로 나타냈다. 이해하기 쉬울 것이다. 다만 data['buy']를 계산할 때 data['Sub']>0을 추가한 이유는 어떤 주식이 특정 이유로 거래가 정지되어있을 때 data['Sub']가 0이기 때문이다. 그러면 거래가 잠시 정지된 주식은 계속 매수하게 된다. 

    def req_cal_profit(self, data, scope):
        log.info("Calculating daily profit")

        try:
            data['Sub'] = (data['High'] - data['Low']).shift(1) * scope
            data['Criteria'] = data['Sub'] + data['Open']
            data['buy'] = (data['High'] >= data['Criteria']) & data['Sub'] > 0
            data['Next_Open'] = data['Open'].shift(-1)
            data = data[data['buy']]
            data = data.dropna(axis=0)
            data['Profit'] = round(data['Next_Open'] / data['Criteria'] - 1, 4)

            profit = pd.DataFrame({scope: data['Profit']})
            profit.index.name = 'Date'
        except Exception as e:
            log.warning("Calculating daily profit Error : {}".format(repr(e)))
            raise

        return profit


다음은 각 매수/매도에 대한 수익률을 토대로 최종 수익률을 계산한 후 최대 수익률을 가진 Scope 값을 리턴하는 코드이다. 최종 수익율을 구하기 위해 cumprod() 즉, 누적곱을 이용했다. 

    def req_max_profit_scope(self, data):
        log.info("Requesting max profit scope")

        try:
            cul_profit = (data + 1).cumprod()
            scope = cul_profit.iloc[-1].idxmax()

        except Exception as e:
            log.warning("Requesting max profit scope Error : {}".format(repr(e)))
            raise

        return float(scope)


나머지 부분을 살펴보자. 마지막 부분을 보면 Pool 클래스를 이용해 4개의 프로세스를 생성한다. 필자의 노트북이 쿼드코어 이기에 4로 했다. ranges는 프로세스 별로 작업을 나누기 위해 사용한다. main_function() 메서드를 보면 알다시피 코스피 또는 코스닥에 상장된 전체 주식 코드 리스트를 불러오고 이를 4부분으로 쪼갠다. 각 쪼개진 부분은 4개의 프로세스가 하나씩 할당 받아 같이 일하게 된다. 이는 각 작업이 서로 충돌되는 부분이 없고 독립적이기에 쉽게 구현 가능했다.  

async def main_function(loop, index):
    dB = db.StockDB()
    await dB.init_pool(loop)

    code_list = await dB.req_code_list("kospi")
    divide = int(len(code_list)/4)+1
    code_list = code_list[index * divide:(index + 1) * divide]
    functions = MainFunctions(dB, "kospi")

    for code in code_list:
        await functions.cal_code_data(code[0], '2008-01-01', '2018-01-01')

    code_list = await dB.req_code_list("kosdaq")
    divide = int(len(code_list)/4)+1
    code_list = code_list[index * divide:(index + 1) *divide]
    functions = MainFunctions(dB, "kosdaq")

    for code in code_list:
        await functions.cal_code_data(code[0], '2008-01-01', '2018-01-01')

def main(index):

    loop = asy.get_event_loop()
    loop.run_until_complete(main_function(loop,index))


if __name__ == '__main__':
    ranges = [0,1,2,3]
    pool = Pool(processes=4)
    pool.map(main,ranges)


여기까지가 최적화 변수를 계산하는 부분이였다. 해당 최적화 변수를 계산하는 데는 많은 시간이 걸린다. 물론 필자 노트북이 안 좋은 측면도 있다. 5년 전에 출시된 저가용 노트북이기 때문이다. 그래서 최종적으로 모든 데이터를 모으는 데 4일 정도 걸렸다. 노트북 코어 전부에 프로세스를 할당했기에 다른 작업을 수행하는 데 제약이 있어 그 동안 노트북 사용도 못 했다. 만약 해당 데이터가 필요하면 댓글 남겨주시길 바란다. 


필자는 현업 프로그래머가 아닌 평범한 컴퓨터 공학과 대학생입니다. 따라서 개발 코드에 비효율적인 부분이 많을 수 있습니다. 수정해야할 부분이 있다면 댓글 달아주시면 감사하겠습니다.   

+ Recent posts