본문 바로가기

파이썬으로 퀀트 프로그램 만들기 project/강화학습

강화학습으로 비트코인 차트 학습시키기

728x90

저번 포스트에서 예고했듯이 snake game ai에 이용했던 강화학습 알고리즘을 비트코인 차트에 적용시켜 보겠습니다.

 

# game.py

비트코인 차트 환경이 game 환경이 될 것입니다.

본격적인 설명에 앞서 먼저 말씀드릴게 있습니다.

snake game과 비트코인 차트 학습은 다른 점이 있습니다.

snake game은 특정상황에서 해야할 행동에 대한 보상이 바로 정해집니다. 그리고 그 보상이 모델을 옳은 방향으로 학습시켜줍니다. 또한 발생되는 환경의 경우의 수가 그렇게 많지 않습니다.

그러나 차트는 그렇지 않습니다. 셀 수없이 많은 경우의 수가 있습니다.

제가 사용하는 한정된 차트 데이터는 overfitting을 유발할 수 있으므로 차트 데이터를 두 가지로 나눠줄 필요가 있습니다.

하나는 학습 데이터, 하나는 평가 데이터입니다.

학습 데이터로만 모델을 학습시키고 평가 데이터은 철저히 구분시켜주었습니다.

 

본격적인 내용은 아래의 코드를 통해 설명 드리겠습니다.

import random
import numpy as np
from binance.client import Client
import config
import pandas as pd
import datetime
import time
import matplotlib.pyplot as plt


client = Client(config.apiKey, config.apiSecurity)



class TradingAI:   
    def __init__(self, ticker= "BTCUSDT",interval= "5m", count_start= 1000, count_end= 0, isFuture= True, window_size= 50):
        self.count = count_start - count_end
        self.window_size = window_size
        # 수익률을 reward로 설정하였는데, 수수료 때문에 모델이 매매 횟수 자체가 적어질 것을 염려하여
        # 우선 수수료를 0으로 설정하였습니다.
        # 또한 수익률이 이득이든 손실이든 똑같은 비중으로 모델을 학습시키기 위해 수수료를 0으로 설정하였습니다. 
        self.tf = 0.0 # 수수료
        self.df = self._get_ohlcv(ticker, interval, count= count_start, isFuture= isFuture)
        self.df = self.df.iloc[:count_start-count_end, :5].copy()

        self.df['buy_condition'] = 0 # 0: no position, 1: buy, 2: sell 
        self.df['hpr'] = 1. # 누적 수익률
        self.df['ror'] = 1. # 현재 포지션 수익률
        self.df['buy_price'] = 0. # 평단가
        self.df['dr'] = 1. # 각 봉에 대한 수익률
        self.df["trading_number"] = 0
        self.df["win"] = 0
        self.df["lose"] = 0
		
        # dataframe의 iloc 메소드 사용하기 위해 dataframe의 열 인덱스를 미리 따왔습니다.
        self.open_idx = np.where(self.df.columns == 'open')[0][0]
        self.high_idx = np.where(self.df.columns == 'high')[0][0]
        self.low_idx = np.where(self.df.columns == 'low')[0][0]
        self.close_idx = np.where(self.df.columns == 'close')[0][0]
        self.buy_condition_idx = np.where(self.df.columns == 'buy_condition')[0][0]
        self.hpr_idx = np.where(self.df.columns == 'hpr')[0][0]
        self.ror_idx = np.where(self.df.columns == 'ror')[0][0]
        self.buy_price_idx = np.where(self.df.columns == 'buy_price')[0][0]
        self.dr_idx = np.where(self.df.columns == 'dr')[0][0]
        self.trading_number_idx = np.where(self.df.columns == 'trading_number')[0][0]
        self.win_idx = np.where(self.df.columns == 'win')[0][0]
        self.lose_idx = np.where(self.df.columns == 'lose')[0][0]
        
        self.reset()
    
    def reset(self): 
        # init game state
        self.iter = self.window_size - 1 # iter in df
        self.score = 0       

    def get_window_df(self): # 한 윈도우가 한 state 덩어리가 될 것입니다.
        self.window_df = self.df.iloc[self.iter-self.window_size+1 : self.iter+1,:5]
        return self.window_df

    def _get_ohlcv(self, ticker, interval, count, isFuture):
        info = client.get_historical_klines(ticker, interval, limit=2)
        dif = info[1][0]-info[0][0]
        st = info[1][0] - dif*(count-1)
        if isFuture:
            info = client.futures_historical_klines(ticker,interval,start_str=st)
        else:
            info = client.get_historical_klines(ticker,interval,start_str=st)

        length = len(info) 
        info = pd.DataFrame(info)
        
        for i in range(length):
            t = int(info.iloc[i,0])
            t = t/1000
            t = time.localtime(t)
            dt = datetime.datetime(t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec)
            info.iloc[i,0] = dt


        
        info.set_index(keys=[info.loc[:,0].values], inplace=True)
            
        info = info[[1,2,3,4,5]] 
        info[[1,2,3,4,5]] = info[[1,2,3,4,5]].astype('float')
        info.columns = ['open','high','low','close','volume']
        info
        
        return info.copy()   
    
    # 데이터 정규화
    def normalize(self, df):
        open = np.where(df.columns == 'open')[0][0]
        high = np.where(df.columns == 'high')[0][0]
        low = np.where(df.columns == 'low')[0][0]
        close = np.where(df.columns == 'close')[0][0]
        volume = np.where(df.columns == 'volume')[0][0]

        min_value = min(min(df['open']),min(df['high']),min(df['low']),min(df['close']))
        max_value = max(max(df['open']),max(df['high']),max(df['low']),max(df['close']))
        min_volume = min(df['volume'])
        max_volume = max(df['volume'])
        
        for i in range(len(df)):
            df.iloc[i,open] = (df.iloc[i,open] - min_value) / (max_value - min_value)
            df.iloc[i,high] = (df.iloc[i,high] - min_value) / (max_value - min_value)
            df.iloc[i,low] = (df.iloc[i,low] - min_value) / (max_value - min_value)
            df.iloc[i,close] = (df.iloc[i,close] - min_value) / (max_value - min_value)
            df.iloc[i,volume] = (df.iloc[i,volume] - min_volume) / (max_volume - min_volume)
        
        return df.copy()

	# 모델 인풋 모양이 1차원이므로 데이터도 1차원으로 바꿔주는 함수입니다.
    def make_lst(self, df):
        temp_lst = []
        for i in range(len(df)):
            temp_lst.extend(df.iloc[i,:].to_list())

        return temp_lst
    
    def play_step(self, action): 
        ## 1. check if game over (trading period is over)
        game_over = False
        # 마지막에서 2번째까지만 게임이 진행되도록 설정
        # 마지막까지 게임 진행될 경우 play step이후 new state를 받을때 window df의 데이터가 1개 적게 나오는 것을 방지
        if self.iter >= self.count-1: 
            game_over = True
            reward = self.get_reward()
            self.score = self.df.iloc[-2,self.hpr_idx]
            return reward, game_over, self.score  


        ## 2. move
        for i, value in enumerate(action):
            if value == 1:
                self.df.iloc[self.iter,self.buy_condition_idx] = i
                break

                
        ## 3. calculate rate of return(reward)
            
        # if no position before
        if self.df.iloc[self.iter-1, self.buy_condition_idx] == 0: 
            # no position
            if self.df.iloc[self.iter, self.buy_condition_idx] == 0: 
                self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] #tradubg_number
            # buy 
            elif self.df.iloc[self.iter, self.buy_condition_idx] == 1: 
                self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter, self.close_idx] #buy_price(=close price)
                temp = self.df.iloc[self.iter, self.buy_price_idx]
                self.df.iloc[self.iter, self.dr_idx] = (temp - temp*self.tf)/temp #dr
                self.df.iloc[self.iter, self.ror_idx] = self.df.iloc[self.iter, self.dr_idx] #ror
                self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] + 1 #tradubg_number+1
            # sell
            else: 
                self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter, self.close_idx] #buy_price(=close price)
                temp = self.df.iloc[self.iter, self.buy_price_idx]
                self.df.iloc[self.iter, self.dr_idx] = (temp - temp*self.tf)/temp #dr
                self.df.iloc[self.iter, self.ror_idx] = self.df.iloc[self.iter, self.dr_idx] #ror
                self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] + 1 #tradubg_number+1
            self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] #win
            self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] #lose

        # if buy position before
        elif self.df.iloc[self.iter-1, self.buy_condition_idx] == 1: 
            # no position
            if self.df.iloc[self.iter, self.buy_condition_idx] == 0: 
                end_price = self.df.iloc[self.iter, self.close_idx]
                self.df.iloc[self.iter, self.dr_idx] = (end_price - end_price*self.tf)/self.df.iloc[self.iter-1, self.close_idx] #dr
                self.df.iloc[self.iter, self.ror_idx] = (end_price - end_price*self.tf - self.df.iloc[self.iter-1, self.buy_price_idx]*self.tf) / self.df.iloc[self.iter-1, self.buy_price_idx] #ror
                self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] #tradubg_number
                if self.df.iloc[self.iter, self.close_idx] > self.df.iloc[self.iter-1, self.buy_price_idx]: #win
                    self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] + 1
                    self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx]
                else: #lose
                    self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] 
                    self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] + 1                  
            # buy
            elif self.df.iloc[self.iter, self.buy_condition_idx] == 1:  
                self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter-1, self.buy_price_idx] #buy_price
                self.df.iloc[self.iter, self.dr_idx] = self.df.iloc[self.iter, self.close_idx]/self.df.iloc[self.iter-1, self.close_idx] #dr
                self.df.iloc[self.iter, self.ror_idx] = (self.df.iloc[self.iter, self.close_idx]- self.df.iloc[self.iter-1, self.buy_price_idx]*self.tf)/self.df.iloc[self.iter-1, self.buy_price_idx] #ror
                self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] #trading_number
                self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] #win
                self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] #lose
            # sell
            else: 
                self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter, self.close_idx] #buy_price
                end_price = self.df.iloc[self.iter, self.close_idx]
                self.df.iloc[self.iter, self.dr_idx] = (end_price - end_price*self.tf*2)/self.df.iloc[self.iter-1, self.close_idx] #dr
                self.df.iloc[self.iter, self.ror_idx] = (end_price - end_price*self.tf - self.df.iloc[self.iter-1, self.buy_price_idx]*self.tf) / self.df.iloc[self.iter-1, self.buy_price_idx] #ror
                self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx]+1 #trading_number
                if self.df.iloc[self.iter, self.ror_idx] > 1: #win
                    self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] + 1
                    self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx]
                else: #lose
                    self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] 
                    self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] + 1  

        # if sell position before        
        else: 
            # no position
            if self.df.iloc[self.iter, self.buy_condition_idx] == 0: 
                end_price = self.df.iloc[self.iter, self.close_idx]
                self.df.iloc[self.iter, self.dr_idx] = (self.df.iloc[self.iter-1,self.buy_price_idx] - end_price - end_price*self.tf)/self.df.iloc[self.iter-1,self.buy_price_idx]+1 #dr
                self.df.iloc[self.iter, self.ror_idx] = (self.df.iloc[self.iter-1,self.buy_price_idx] - end_price - end_price*self.tf - self.df.iloc[self.iter-1,self.buy_price_idx]*self.tf)/self.df.iloc[self.iter-1,self.buy_price_idx]+1 #ror
                self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] #tradubg_number
                if self.df.iloc[self.iter, self.ror_idx] > 1: #win
                    self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] + 1
                    self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx]
                else: #lose
                    self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] 
                    self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] + 1
            # buy 
            elif self.df.iloc[self.iter, self.buy_condition_idx] == 1: 
                self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter, self.close_idx] #buy_price
                end_price = self.df.iloc[self.iter, self.close_idx]
                self.df.iloc[self.iter, self.dr_idx] = (self.df.iloc[self.iter-1, self.close_idx] - end_price - end_price*self.tf*2)/self.df.iloc[self.iter-1,self.close_idx]+1 #dr
                self.df.iloc[self.iter, self.ror_idx] = (end_price - end_price*self.tf)/end_price #ror
                self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx]+1 #trading_number
                if self.df.iloc[self.iter, self.ror_idx] > 1: #win
                    self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] + 1
                    self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx]
                else: #lose
                    self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] 
                    self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] + 1
                    
            # sell
            else: 
                self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter-1, self.buy_price_idx] #buy_price
                self.df.iloc[self.iter, self.dr_idx] = (self.df.iloc[self.iter-1, self.close_idx] - self.df.iloc[self.iter, self.close_idx])/self.df.iloc[self.iter-1, self.close_idx] + 1 #dr
                self.df.iloc[self.iter, self.ror_idx] = (self.df.iloc[self.iter-1, self.buy_price_idx] - self.df.iloc[self.iter, self.close_idx] - self.df.iloc[self.iter-1, self.buy_price_idx] * self.tf)/self.df.iloc[self.iter-1, self.buy_price_idx] #ror
                self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] #trading_number
                self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] #win
                self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] #lose
        
        self.df.iloc[self.iter, self.hpr_idx] = self.df.iloc[self.iter-1, self.hpr_idx] * self.df.iloc[self.iter, self.dr_idx] #hpr

        reward = self.get_reward()
        self.score = self.get_score()
        self.iter += 1

        # 5. return game over and score
        return reward, game_over, self.score
    

    def get_score(self):
        if self.iter >= self.count:
            return self.df.iloc[-1,self.hpr_idx]
        return self.df.iloc[self.iter,self.hpr_idx]     

	# 수익률의 10배를 reward로 설정
    # 예: 1.5% 이득 -> 15 reward
    def get_reward(self):
        if self.iter >= self.count:
            return (self.df.iloc[-1,self.ror_idx]-1)*10  
        return (self.df.iloc[self.iter,self.ror_idx]-1)*10

 

 

# agent.py

import torch
import random
import numpy as np
from collections import deque
from game import TradingAI
from model import Linear_QNET, QTrainer
from helper import plot
import pandas as pd
import matplotlib.pyplot as plt
from IPython import display
import datetime
import time

MAX_MEMORY = 100_000
BATCH_SIZE = 1000
LR = 0.001 

class Agent:

    def __init__(self, window_size=30)
        self.n_games = 0 
        self.epsilon = 0 # randomness
        self.gamma = 0.9 # discount
        self.memory = deque(maxlen=MAX_MEMORY) # popleft()
        self.model = Linear_QNET(window_size*5, 256, 256, 3) # 30일의 ohlcv
        self.trainer = QTrainer(self.model, lr=LR, gamma=self.gamma)

    def get_state(self,game):
        df = game.get_window_df()
        normalized_df = game.normalize(df)
        normalized_lst = game.make_lst(normalized_df)
        
        return np.array(normalized_lst, dtype=float)


    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done)) # popleft if MAX_MEMORY is reached

    def train_long_memory(self):
        if len(self.memory) > BATCH_SIZE:
            mini_sample = random.sample(self.memory, BATCH_SIZE) # list of tuples
        else:
            mini_sample = self.memory
        
        states, actions, rewards, next_states, dones = zip(*mini_sample)
        self.trainer.train_step(states, actions, rewards, next_states, dones)
    
    def train_short_memory(self, state, action, reward, next_state, done):
        self.trainer.train_step(state, action, reward, next_state, done)

    def get_action(self, state, t= 1):
        # random moves: tradeoff exploration / exploitation
        # epsilon: randomness of beginning
        self.epsilon = 100 - self.n_games 
        final_move = [0,0,0] 
        if t == 1 and random.randint(0, 100) < self.epsilon: # more games, smaller epsilon
            move = random.randint(0, 2) # 0,1,2
            final_move[move] = 1
        else:
            state0 = torch.tensor(state, dtype=torch.float)
            prediction = self.model(state0)
            move = torch.argmax(prediction).item()
            final_move[move] = 1
        
        return final_move
    
# count1 == 학습 데이터 1000개
# count2 == 검증 데이터 500개
def train(count1= 1000, count2= 500, window_size= 30):
    plot_scores = []
    plot_mean_scores = []
    plot_scores2 = []
    total_score = 0
    record = 0
    record2 = 0
    agent = Agent(window_size= window_size)
    game1 = TradingAI(count_start= count1+count2, count_end= count2, window_size= window_size) # 1500 ~ 500 봉 전까지의 data
    game2 = TradingAI(count_start= count2, count_end=0, window_size= window_size) # 500 ~ 현재까지의 data
    train_level = True # True: train, False: estimate

    while True:
        if train_level:
            # get old state
            state_old = agent.get_state(game1)

            # get move
            final_move = agent.get_action(state_old)

            # perform move and get new state
            reward, done, score = game1.play_step(final_move)
            state_new = agent.get_state(game1)

            # train short memory
            agent.train_short_memory(state_old, final_move, reward, state_new, done)

            # remember
            agent.remember(state_old, final_move, reward, state_new, done)
            # print("train iter:",game1.iter-1,"/ scroe: ",score, " reward: ", reward)

            if done:
                # train long memory, plot result
                
                agent.n_games += 1
                agent.train_long_memory()

                if score > record:
                    record = score
                    # agent.model.save()
                    
                print("======================")
                print('Train Game', agent.n_games, 'Score', score, 'Record:', record)

                plot_scores.append(score)
                total_score += score
                mean_score = total_score / agent.n_games 
                plot_mean_scores.append(mean_score)
                
                
                train_level = False

        else: # 학습된 모델을 검증 데이터를 통해 테스트
            state_old2 = agent.get_state(game2)
            final_move2 = agent.get_action(state_old2, t= 2)
            reward2, done2, score2 = game2.play_step(final_move2)
            # print("evaluation iter:",game2.iter,"/ scroe: ",score2, " reward: ", reward2)
            if done2:
                if score2 > record2:
                    record2 = score2
                print('Evaluation Game', agent.n_games, 'Score', score2, 'Record:', record2)
                print("======================")
                plot_scores2.append(score2)        
                plot(plot_scores, plot_mean_scores, plot_scores2)
                game1.reset()
                game2.reset()
                train_level = True

 

 

# model.py

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import os

class Linear_QNET(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, output_size):
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size1)
        self.linear2 = nn.Linear(hidden_size1, hidden_size2)
        self.linear3 = nn.Linear(hidden_size2, output_size)

    def forward(self, x):
        x = self.linear1(x)
        x = self.linear2(x)
        x = self.linear3(x)
        return x
    
    def save(self, file_name='model.pth'):
        model_folder_path = './model'
        if not os.path.exists(model_folder_path):
            os.makedirs(model_folder_path)

        file_name = os.path.join(model_folder_path)
        torch.save(self.state_dict(), file_name)

class QTrainer():
    def __init__(self, model, lr, gamma):
        self.lr = lr
        self.gamma = gamma
        self.model = model
        self.optimizer = optim.Adam(model.parameters(), lr=self.lr)
        self.criterion = nn.MSELoss()

    def train_step(self, state, action, reward, next_state, done):
        # (n, x)
        state = torch.tensor(state, dtype=torch.float)
        next_state = torch.tensor(next_state, dtype=torch.float)
        action = torch.tensor(action, dtype=torch.long)
        reward = torch.tensor(reward, dtype=torch.float)

        # (1, x)
        if len(state.shape) == 1:
            state = torch.unsqueeze(state, 0)
            next_state = torch.unsqueeze(next_state, 0)
            action = torch.unsqueeze(action, 0)
            reward = torch.unsqueeze(reward, 0)
            done = (done, )
            
        # 1: predicted Q values with current state
        pred = self.model(state)

        target = pred.clone()
        for idx in range(len(done)):
            Q_new = reward[idx]
            if not done[idx]:
                Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))

            target[idx][torch.argmax(action).item()] = Q_new

        # 2: Q_new = r + y * max(next_predicted Q value) -> only do this if not done
        # pred.clone()
        # preds[argmax(action)] = Q_new
        self.optimizer.zero_grad()
        loss = self.criterion(target, pred)
        loss.backward()

        self.optimizer.step()

 

 

# helper.py

import matplotlib.pyplot as plt
from IPython import display


plt.ion()
def plot(scores, mean_scores, scores2):
    
    display.clear_output(wait=True)
    display.display(plt.gcf())
    plt.clf()
    plt.figure(1)
    plt.title('Training...')
    plt.xlabel('Number of Games')
    plt.ylabel('Score')
    plt.plot(scores, label= "current scores")
    plt.plot(mean_scores, label= "mean scores")
    plt.plot(scores2, label= "evaluation scores")   
    plt.ylim(ymin=0)
    plt.text(len(scores)-1, scores[-1], str(round(scores[-1],3)))
    plt.text(len(mean_scores)-1, mean_scores[-1], str(round(mean_scores[-1],3)))
    plt.text(len(scores2)-1, scores2[-1], str(round(scores2[-1],3)))
    plt.show(block=False)
    plt.legend()
    plt.pause(.1)

 

 

# 결과

100회 이상 게임을 돌렸을때의 결과입니다. 수익률이 score입니다.

train data set에서의 수익률은 천천히 우상향 하는 모습을 보이지만 evaluation data set에서의 수익률은 1을 벗어나지 못하는 모습입니다.

이러한 나쁜 결과가 나온 것의 원인은 아래의 경우가 될 수 있습니다.

 

1. input 데이터를 1차원으로 변환

실제 데이터는 2차원인데 1차원으로 바꾸면 데이터의 손실이 있을 수 있습니다.

모델 레이어를 2차원으로 바꿀 필요가 있을 듯합니다.

 

2. 데이터 부족

1000개 봉의 차트 데이털을 학습데이터로 사용했습니다. 데이터가 적어 보입니다.

그런데 천개 데이터를 학습돌릴 때에도 꽤 많은 시간이 걸렸습니다.

결과를 출력할때 0.1 초씩 멈췄는데, 빠른 연산을 위해 결과 출력을 서른 번의 한 번으로든 줄여야 할 것 같습니다.

빠르게 학습시키기 위해서는 전체적인 코드 구조도 바꿔야 할 수도 있습니다.

 

3. hidden layer의 단순함

  모델 레이어는 150 * 256 * 256* 3 이었습니다. 어쩌면 인풋 대비 적은 hidden layer의 파라미터 였을듯 합니다.

이전 snake game 에서는 인풋이 11개 였어서 256으로도 충분했을지 모릅니다. 

150개 인풋에서는 적어도 지금의 10~100배는 되어야 할 듯합니다.

그렇게 되면 당연히 계산시간도 커질것입니다. 지금까지는 gpu사용 안했지만 gpu를 사용해야할듯 합니다.

 

4. 모델 

순방향 신경말 모델을 사용하였는데 이것이 주식 데이터를 학습하는데에 부적합할 수 있습니다.

보통 주식데이터 같은 시계열 데이터를 학습할 때에는 rnn을 사용한다고 하는데 수정해야 하겠습니다.

 

5. 인풋 데이터의 단순함

인풋 데이터를 5분봉만 이용하였습니다. 더 입체적으로 15분, 30분, 1시간, 3시간, 1일 봉의 데이터를 이용하여

3차원의 입체적인 데이터를 인풋으로 사용하면 어땠을까 하는 생각이 듭니다.

실제로 사람이 차트를 보고 투자를 할 때에도 여러 시계열 단위 데이터를 비교해 보면서 매매하니까 말입니다.