저번 포스트에서 예고했듯이 snake game ai에 이용했던 강화학습 알고리즘을 비트코인 차트에 적용시켜 보겠습니다.
# game.py
비트코인 차트 환경이 game 환경이 될 것입니다.
본격적인 설명에 앞서 먼저 말씀드릴게 있습니다.
snake game과 비트코인 차트 학습은 다른 점이 있습니다.
snake game은 특정상황에서 해야할 행동에 대한 보상이 바로 정해집니다. 그리고 그 보상이 모델을 옳은 방향으로 학습시켜줍니다. 또한 발생되는 환경의 경우의 수가 그렇게 많지 않습니다.
그러나 차트는 그렇지 않습니다. 셀 수없이 많은 경우의 수가 있습니다.
제가 사용하는 한정된 차트 데이터는 overfitting을 유발할 수 있으므로 차트 데이터를 두 가지로 나눠줄 필요가 있습니다.
하나는 학습 데이터, 하나는 평가 데이터입니다.
학습 데이터로만 모델을 학습시키고 평가 데이터은 철저히 구분시켜주었습니다.
본격적인 내용은 아래의 코드를 통해 설명 드리겠습니다.
import random
import numpy as np
from binance.client import Client
import config
import pandas as pd
import datetime
import time
import matplotlib.pyplot as plt
client = Client(config.apiKey, config.apiSecurity)
class TradingAI:
def __init__(self, ticker= "BTCUSDT",interval= "5m", count_start= 1000, count_end= 0, isFuture= True, window_size= 50):
self.count = count_start - count_end
self.window_size = window_size
# 수익률을 reward로 설정하였는데, 수수료 때문에 모델이 매매 횟수 자체가 적어질 것을 염려하여
# 우선 수수료를 0으로 설정하였습니다.
# 또한 수익률이 이득이든 손실이든 똑같은 비중으로 모델을 학습시키기 위해 수수료를 0으로 설정하였습니다.
self.tf = 0.0 # 수수료
self.df = self._get_ohlcv(ticker, interval, count= count_start, isFuture= isFuture)
self.df = self.df.iloc[:count_start-count_end, :5].copy()
self.df['buy_condition'] = 0 # 0: no position, 1: buy, 2: sell
self.df['hpr'] = 1. # 누적 수익률
self.df['ror'] = 1. # 현재 포지션 수익률
self.df['buy_price'] = 0. # 평단가
self.df['dr'] = 1. # 각 봉에 대한 수익률
self.df["trading_number"] = 0
self.df["win"] = 0
self.df["lose"] = 0
# dataframe의 iloc 메소드 사용하기 위해 dataframe의 열 인덱스를 미리 따왔습니다.
self.open_idx = np.where(self.df.columns == 'open')[0][0]
self.high_idx = np.where(self.df.columns == 'high')[0][0]
self.low_idx = np.where(self.df.columns == 'low')[0][0]
self.close_idx = np.where(self.df.columns == 'close')[0][0]
self.buy_condition_idx = np.where(self.df.columns == 'buy_condition')[0][0]
self.hpr_idx = np.where(self.df.columns == 'hpr')[0][0]
self.ror_idx = np.where(self.df.columns == 'ror')[0][0]
self.buy_price_idx = np.where(self.df.columns == 'buy_price')[0][0]
self.dr_idx = np.where(self.df.columns == 'dr')[0][0]
self.trading_number_idx = np.where(self.df.columns == 'trading_number')[0][0]
self.win_idx = np.where(self.df.columns == 'win')[0][0]
self.lose_idx = np.where(self.df.columns == 'lose')[0][0]
self.reset()
def reset(self):
# init game state
self.iter = self.window_size - 1 # iter in df
self.score = 0
def get_window_df(self): # 한 윈도우가 한 state 덩어리가 될 것입니다.
self.window_df = self.df.iloc[self.iter-self.window_size+1 : self.iter+1,:5]
return self.window_df
def _get_ohlcv(self, ticker, interval, count, isFuture):
info = client.get_historical_klines(ticker, interval, limit=2)
dif = info[1][0]-info[0][0]
st = info[1][0] - dif*(count-1)
if isFuture:
info = client.futures_historical_klines(ticker,interval,start_str=st)
else:
info = client.get_historical_klines(ticker,interval,start_str=st)
length = len(info)
info = pd.DataFrame(info)
for i in range(length):
t = int(info.iloc[i,0])
t = t/1000
t = time.localtime(t)
dt = datetime.datetime(t.tm_year, t.tm_mon, t.tm_mday, t.tm_hour, t.tm_min, t.tm_sec)
info.iloc[i,0] = dt
info.set_index(keys=[info.loc[:,0].values], inplace=True)
info = info[[1,2,3,4,5]]
info[[1,2,3,4,5]] = info[[1,2,3,4,5]].astype('float')
info.columns = ['open','high','low','close','volume']
info
return info.copy()
# 데이터 정규화
def normalize(self, df):
open = np.where(df.columns == 'open')[0][0]
high = np.where(df.columns == 'high')[0][0]
low = np.where(df.columns == 'low')[0][0]
close = np.where(df.columns == 'close')[0][0]
volume = np.where(df.columns == 'volume')[0][0]
min_value = min(min(df['open']),min(df['high']),min(df['low']),min(df['close']))
max_value = max(max(df['open']),max(df['high']),max(df['low']),max(df['close']))
min_volume = min(df['volume'])
max_volume = max(df['volume'])
for i in range(len(df)):
df.iloc[i,open] = (df.iloc[i,open] - min_value) / (max_value - min_value)
df.iloc[i,high] = (df.iloc[i,high] - min_value) / (max_value - min_value)
df.iloc[i,low] = (df.iloc[i,low] - min_value) / (max_value - min_value)
df.iloc[i,close] = (df.iloc[i,close] - min_value) / (max_value - min_value)
df.iloc[i,volume] = (df.iloc[i,volume] - min_volume) / (max_volume - min_volume)
return df.copy()
# 모델 인풋 모양이 1차원이므로 데이터도 1차원으로 바꿔주는 함수입니다.
def make_lst(self, df):
temp_lst = []
for i in range(len(df)):
temp_lst.extend(df.iloc[i,:].to_list())
return temp_lst
def play_step(self, action):
## 1. check if game over (trading period is over)
game_over = False
# 마지막에서 2번째까지만 게임이 진행되도록 설정
# 마지막까지 게임 진행될 경우 play step이후 new state를 받을때 window df의 데이터가 1개 적게 나오는 것을 방지
if self.iter >= self.count-1:
game_over = True
reward = self.get_reward()
self.score = self.df.iloc[-2,self.hpr_idx]
return reward, game_over, self.score
## 2. move
for i, value in enumerate(action):
if value == 1:
self.df.iloc[self.iter,self.buy_condition_idx] = i
break
## 3. calculate rate of return(reward)
# if no position before
if self.df.iloc[self.iter-1, self.buy_condition_idx] == 0:
# no position
if self.df.iloc[self.iter, self.buy_condition_idx] == 0:
self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] #tradubg_number
# buy
elif self.df.iloc[self.iter, self.buy_condition_idx] == 1:
self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter, self.close_idx] #buy_price(=close price)
temp = self.df.iloc[self.iter, self.buy_price_idx]
self.df.iloc[self.iter, self.dr_idx] = (temp - temp*self.tf)/temp #dr
self.df.iloc[self.iter, self.ror_idx] = self.df.iloc[self.iter, self.dr_idx] #ror
self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] + 1 #tradubg_number+1
# sell
else:
self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter, self.close_idx] #buy_price(=close price)
temp = self.df.iloc[self.iter, self.buy_price_idx]
self.df.iloc[self.iter, self.dr_idx] = (temp - temp*self.tf)/temp #dr
self.df.iloc[self.iter, self.ror_idx] = self.df.iloc[self.iter, self.dr_idx] #ror
self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] + 1 #tradubg_number+1
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] #win
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] #lose
# if buy position before
elif self.df.iloc[self.iter-1, self.buy_condition_idx] == 1:
# no position
if self.df.iloc[self.iter, self.buy_condition_idx] == 0:
end_price = self.df.iloc[self.iter, self.close_idx]
self.df.iloc[self.iter, self.dr_idx] = (end_price - end_price*self.tf)/self.df.iloc[self.iter-1, self.close_idx] #dr
self.df.iloc[self.iter, self.ror_idx] = (end_price - end_price*self.tf - self.df.iloc[self.iter-1, self.buy_price_idx]*self.tf) / self.df.iloc[self.iter-1, self.buy_price_idx] #ror
self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] #tradubg_number
if self.df.iloc[self.iter, self.close_idx] > self.df.iloc[self.iter-1, self.buy_price_idx]: #win
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] + 1
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx]
else: #lose
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx]
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] + 1
# buy
elif self.df.iloc[self.iter, self.buy_condition_idx] == 1:
self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter-1, self.buy_price_idx] #buy_price
self.df.iloc[self.iter, self.dr_idx] = self.df.iloc[self.iter, self.close_idx]/self.df.iloc[self.iter-1, self.close_idx] #dr
self.df.iloc[self.iter, self.ror_idx] = (self.df.iloc[self.iter, self.close_idx]- self.df.iloc[self.iter-1, self.buy_price_idx]*self.tf)/self.df.iloc[self.iter-1, self.buy_price_idx] #ror
self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] #trading_number
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] #win
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] #lose
# sell
else:
self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter, self.close_idx] #buy_price
end_price = self.df.iloc[self.iter, self.close_idx]
self.df.iloc[self.iter, self.dr_idx] = (end_price - end_price*self.tf*2)/self.df.iloc[self.iter-1, self.close_idx] #dr
self.df.iloc[self.iter, self.ror_idx] = (end_price - end_price*self.tf - self.df.iloc[self.iter-1, self.buy_price_idx]*self.tf) / self.df.iloc[self.iter-1, self.buy_price_idx] #ror
self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx]+1 #trading_number
if self.df.iloc[self.iter, self.ror_idx] > 1: #win
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] + 1
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx]
else: #lose
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx]
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] + 1
# if sell position before
else:
# no position
if self.df.iloc[self.iter, self.buy_condition_idx] == 0:
end_price = self.df.iloc[self.iter, self.close_idx]
self.df.iloc[self.iter, self.dr_idx] = (self.df.iloc[self.iter-1,self.buy_price_idx] - end_price - end_price*self.tf)/self.df.iloc[self.iter-1,self.buy_price_idx]+1 #dr
self.df.iloc[self.iter, self.ror_idx] = (self.df.iloc[self.iter-1,self.buy_price_idx] - end_price - end_price*self.tf - self.df.iloc[self.iter-1,self.buy_price_idx]*self.tf)/self.df.iloc[self.iter-1,self.buy_price_idx]+1 #ror
self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] #tradubg_number
if self.df.iloc[self.iter, self.ror_idx] > 1: #win
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] + 1
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx]
else: #lose
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx]
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] + 1
# buy
elif self.df.iloc[self.iter, self.buy_condition_idx] == 1:
self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter, self.close_idx] #buy_price
end_price = self.df.iloc[self.iter, self.close_idx]
self.df.iloc[self.iter, self.dr_idx] = (self.df.iloc[self.iter-1, self.close_idx] - end_price - end_price*self.tf*2)/self.df.iloc[self.iter-1,self.close_idx]+1 #dr
self.df.iloc[self.iter, self.ror_idx] = (end_price - end_price*self.tf)/end_price #ror
self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx]+1 #trading_number
if self.df.iloc[self.iter, self.ror_idx] > 1: #win
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] + 1
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx]
else: #lose
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx]
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] + 1
# sell
else:
self.df.iloc[self.iter, self.buy_price_idx] = self.df.iloc[self.iter-1, self.buy_price_idx] #buy_price
self.df.iloc[self.iter, self.dr_idx] = (self.df.iloc[self.iter-1, self.close_idx] - self.df.iloc[self.iter, self.close_idx])/self.df.iloc[self.iter-1, self.close_idx] + 1 #dr
self.df.iloc[self.iter, self.ror_idx] = (self.df.iloc[self.iter-1, self.buy_price_idx] - self.df.iloc[self.iter, self.close_idx] - self.df.iloc[self.iter-1, self.buy_price_idx] * self.tf)/self.df.iloc[self.iter-1, self.buy_price_idx] #ror
self.df.iloc[self.iter, self.trading_number_idx] = self.df.iloc[self.iter-1, self.trading_number_idx] #trading_number
self.df.iloc[self.iter, self.win_idx] = self.df.iloc[self.iter-1, self.win_idx] #win
self.df.iloc[self.iter, self.lose_idx] = self.df.iloc[self.iter-1, self.lose_idx] #lose
self.df.iloc[self.iter, self.hpr_idx] = self.df.iloc[self.iter-1, self.hpr_idx] * self.df.iloc[self.iter, self.dr_idx] #hpr
reward = self.get_reward()
self.score = self.get_score()
self.iter += 1
# 5. return game over and score
return reward, game_over, self.score
def get_score(self):
if self.iter >= self.count:
return self.df.iloc[-1,self.hpr_idx]
return self.df.iloc[self.iter,self.hpr_idx]
# 수익률의 10배를 reward로 설정
# 예: 1.5% 이득 -> 15 reward
def get_reward(self):
if self.iter >= self.count:
return (self.df.iloc[-1,self.ror_idx]-1)*10
return (self.df.iloc[self.iter,self.ror_idx]-1)*10
# agent.py
import torch
import random
import numpy as np
from collections import deque
from game import TradingAI
from model import Linear_QNET, QTrainer
from helper import plot
import pandas as pd
import matplotlib.pyplot as plt
from IPython import display
import datetime
import time
MAX_MEMORY = 100_000
BATCH_SIZE = 1000
LR = 0.001
class Agent:
def __init__(self, window_size=30)
self.n_games = 0
self.epsilon = 0 # randomness
self.gamma = 0.9 # discount
self.memory = deque(maxlen=MAX_MEMORY) # popleft()
self.model = Linear_QNET(window_size*5, 256, 256, 3) # 30일의 ohlcv
self.trainer = QTrainer(self.model, lr=LR, gamma=self.gamma)
def get_state(self,game):
df = game.get_window_df()
normalized_df = game.normalize(df)
normalized_lst = game.make_lst(normalized_df)
return np.array(normalized_lst, dtype=float)
def remember(self, state, action, reward, next_state, done):
self.memory.append((state, action, reward, next_state, done)) # popleft if MAX_MEMORY is reached
def train_long_memory(self):
if len(self.memory) > BATCH_SIZE:
mini_sample = random.sample(self.memory, BATCH_SIZE) # list of tuples
else:
mini_sample = self.memory
states, actions, rewards, next_states, dones = zip(*mini_sample)
self.trainer.train_step(states, actions, rewards, next_states, dones)
def train_short_memory(self, state, action, reward, next_state, done):
self.trainer.train_step(state, action, reward, next_state, done)
def get_action(self, state, t= 1):
# random moves: tradeoff exploration / exploitation
# epsilon: randomness of beginning
self.epsilon = 100 - self.n_games
final_move = [0,0,0]
if t == 1 and random.randint(0, 100) < self.epsilon: # more games, smaller epsilon
move = random.randint(0, 2) # 0,1,2
final_move[move] = 1
else:
state0 = torch.tensor(state, dtype=torch.float)
prediction = self.model(state0)
move = torch.argmax(prediction).item()
final_move[move] = 1
return final_move
# count1 == 학습 데이터 1000개
# count2 == 검증 데이터 500개
def train(count1= 1000, count2= 500, window_size= 30):
plot_scores = []
plot_mean_scores = []
plot_scores2 = []
total_score = 0
record = 0
record2 = 0
agent = Agent(window_size= window_size)
game1 = TradingAI(count_start= count1+count2, count_end= count2, window_size= window_size) # 1500 ~ 500 봉 전까지의 data
game2 = TradingAI(count_start= count2, count_end=0, window_size= window_size) # 500 ~ 현재까지의 data
train_level = True # True: train, False: estimate
while True:
if train_level:
# get old state
state_old = agent.get_state(game1)
# get move
final_move = agent.get_action(state_old)
# perform move and get new state
reward, done, score = game1.play_step(final_move)
state_new = agent.get_state(game1)
# train short memory
agent.train_short_memory(state_old, final_move, reward, state_new, done)
# remember
agent.remember(state_old, final_move, reward, state_new, done)
# print("train iter:",game1.iter-1,"/ scroe: ",score, " reward: ", reward)
if done:
# train long memory, plot result
agent.n_games += 1
agent.train_long_memory()
if score > record:
record = score
# agent.model.save()
print("======================")
print('Train Game', agent.n_games, 'Score', score, 'Record:', record)
plot_scores.append(score)
total_score += score
mean_score = total_score / agent.n_games
plot_mean_scores.append(mean_score)
train_level = False
else: # 학습된 모델을 검증 데이터를 통해 테스트
state_old2 = agent.get_state(game2)
final_move2 = agent.get_action(state_old2, t= 2)
reward2, done2, score2 = game2.play_step(final_move2)
# print("evaluation iter:",game2.iter,"/ scroe: ",score2, " reward: ", reward2)
if done2:
if score2 > record2:
record2 = score2
print('Evaluation Game', agent.n_games, 'Score', score2, 'Record:', record2)
print("======================")
plot_scores2.append(score2)
plot(plot_scores, plot_mean_scores, plot_scores2)
game1.reset()
game2.reset()
train_level = True
# model.py
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import os
class Linear_QNET(nn.Module):
def __init__(self, input_size, hidden_size1, hidden_size2, output_size):
super().__init__()
self.linear1 = nn.Linear(input_size, hidden_size1)
self.linear2 = nn.Linear(hidden_size1, hidden_size2)
self.linear3 = nn.Linear(hidden_size2, output_size)
def forward(self, x):
x = self.linear1(x)
x = self.linear2(x)
x = self.linear3(x)
return x
def save(self, file_name='model.pth'):
model_folder_path = './model'
if not os.path.exists(model_folder_path):
os.makedirs(model_folder_path)
file_name = os.path.join(model_folder_path)
torch.save(self.state_dict(), file_name)
class QTrainer():
def __init__(self, model, lr, gamma):
self.lr = lr
self.gamma = gamma
self.model = model
self.optimizer = optim.Adam(model.parameters(), lr=self.lr)
self.criterion = nn.MSELoss()
def train_step(self, state, action, reward, next_state, done):
# (n, x)
state = torch.tensor(state, dtype=torch.float)
next_state = torch.tensor(next_state, dtype=torch.float)
action = torch.tensor(action, dtype=torch.long)
reward = torch.tensor(reward, dtype=torch.float)
# (1, x)
if len(state.shape) == 1:
state = torch.unsqueeze(state, 0)
next_state = torch.unsqueeze(next_state, 0)
action = torch.unsqueeze(action, 0)
reward = torch.unsqueeze(reward, 0)
done = (done, )
# 1: predicted Q values with current state
pred = self.model(state)
target = pred.clone()
for idx in range(len(done)):
Q_new = reward[idx]
if not done[idx]:
Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))
target[idx][torch.argmax(action).item()] = Q_new
# 2: Q_new = r + y * max(next_predicted Q value) -> only do this if not done
# pred.clone()
# preds[argmax(action)] = Q_new
self.optimizer.zero_grad()
loss = self.criterion(target, pred)
loss.backward()
self.optimizer.step()
# helper.py
import matplotlib.pyplot as plt
from IPython import display
plt.ion()
def plot(scores, mean_scores, scores2):
display.clear_output(wait=True)
display.display(plt.gcf())
plt.clf()
plt.figure(1)
plt.title('Training...')
plt.xlabel('Number of Games')
plt.ylabel('Score')
plt.plot(scores, label= "current scores")
plt.plot(mean_scores, label= "mean scores")
plt.plot(scores2, label= "evaluation scores")
plt.ylim(ymin=0)
plt.text(len(scores)-1, scores[-1], str(round(scores[-1],3)))
plt.text(len(mean_scores)-1, mean_scores[-1], str(round(mean_scores[-1],3)))
plt.text(len(scores2)-1, scores2[-1], str(round(scores2[-1],3)))
plt.show(block=False)
plt.legend()
plt.pause(.1)
# 결과
100회 이상 게임을 돌렸을때의 결과입니다. 수익률이 score입니다.
train data set에서의 수익률은 천천히 우상향 하는 모습을 보이지만 evaluation data set에서의 수익률은 1을 벗어나지 못하는 모습입니다.
이러한 나쁜 결과가 나온 것의 원인은 아래의 경우가 될 수 있습니다.
1. input 데이터를 1차원으로 변환
실제 데이터는 2차원인데 1차원으로 바꾸면 데이터의 손실이 있을 수 있습니다.
모델 레이어를 2차원으로 바꿀 필요가 있을 듯합니다.
2. 데이터 부족
1000개 봉의 차트 데이털을 학습데이터로 사용했습니다. 데이터가 적어 보입니다.
그런데 천개 데이터를 학습돌릴 때에도 꽤 많은 시간이 걸렸습니다.
결과를 출력할때 0.1 초씩 멈췄는데, 빠른 연산을 위해 결과 출력을 서른 번의 한 번으로든 줄여야 할 것 같습니다.
빠르게 학습시키기 위해서는 전체적인 코드 구조도 바꿔야 할 수도 있습니다.
3. hidden layer의 단순함
모델 레이어는 150 * 256 * 256* 3 이었습니다. 어쩌면 인풋 대비 적은 hidden layer의 파라미터 였을듯 합니다.
이전 snake game 에서는 인풋이 11개 였어서 256으로도 충분했을지 모릅니다.
150개 인풋에서는 적어도 지금의 10~100배는 되어야 할 듯합니다.
그렇게 되면 당연히 계산시간도 커질것입니다. 지금까지는 gpu사용 안했지만 gpu를 사용해야할듯 합니다.
4. 모델
순방향 신경말 모델을 사용하였는데 이것이 주식 데이터를 학습하는데에 부적합할 수 있습니다.
보통 주식데이터 같은 시계열 데이터를 학습할 때에는 rnn을 사용한다고 하는데 수정해야 하겠습니다.
5. 인풋 데이터의 단순함
인풋 데이터를 5분봉만 이용하였습니다. 더 입체적으로 15분, 30분, 1시간, 3시간, 1일 봉의 데이터를 이용하여
3차원의 입체적인 데이터를 인풋으로 사용하면 어땠을까 하는 생각이 듭니다.
실제로 사람이 차트를 보고 투자를 할 때에도 여러 시계열 단위 데이터를 비교해 보면서 매매하니까 말입니다.
'파이썬으로 퀀트 프로그램 만들기 project > 강화학습' 카테고리의 다른 글
강화학습으로 snake game 학습시키기_2 (0) | 2024.03.02 |
---|---|
강화학습으로 snake game 학습시키기_1 (0) | 2024.02.16 |