본문 바로가기

파이썬으로 퀀트 프로그램 만들기 project/강화학습

강화학습으로 snake game 학습시키기_2

728x90

해당 블로그는 유튜브 "freeCodeCampe.org" 님의 https://youtu.be/L8ypSXwyBds?si=GTo6sJqoXQU25LM6 영상을 참고했습니다.

관련 코드는 https://github.com/patrickloeber/snake-ai-pytorch 를 참고해 주세요.

 

# 필요 라이브러리 다운

이번 포스팅에서는 강화학습에 필요한 라이브러리를 다운해보겠습니다.

터미널에서 pip install pygame을 통해 pygame을 다운로드하여주세요.

그리고 강화학습에 필요한 pytorch를 다운로드받아 주겠습니다.

https://pytorch.org/get-started/locally/ 해당 파이토치 다운 사이트로 가서 본인에게 맞는 os를 선택한 후

pip, python, cpu을 선택해 주세요. 그러면 아래와 같은 command가 나올 것입니다. 실행시켜 주세요.

(torchaudio는 사용하지 않을 것이니 다운 안 받으셔도 됩니다.)

pip3 install torch torchvision

 

그리고 위의 github 사이트에 가서 snake_game_human.py을 다운받아주세요.

이제 snake game을 실행시킬 수 있을 실 겁니다.

 

이제 저희는 강화학습에 맞게 snake_game_human.py 코드를 수정할 필요가 있습니다.

저번 포스트에서 설명했듯이 저희는 3가지 파트를 만들 것인데, 우선 Game 파트를 만들겠습니다.

snake_game_human.py 를 수정해 주시면 됩니다.

 

 

# game.py 

game파일에서 수정해야 할 것들을 먼저 말씀드리겠습니다.

1. reset

게임마다 에이전트가 게임을 리셋하고 새 게임을 시작할 수 있도록 reset함수가 필요합니다.

2. reward

에이전트가 받는 보상을 구현해야 합니다.

3. play(action) -> direction

 play함수를 수정해서 action을 받아들이고 뱀의 방향을 계산하여 return 하도록 합니다.

4. frame_iteration

현재 게임 프레임을 추적하여야 합니다. 게임이 얼마나 진행되었는지 확인할 수 있는 변수입니다.

뱀이 점수를 못 얻은 채 뱅뱅 돌기만 하는 것을 방지해 주는 변수입니다.

5. is_collision

벽이나 뱀의 몸에 대한 충돌 여부를 알려주는 함수입니다.



이제 수정된 snake_game.py 코드를 보여드리며 설명해 드리겠습니다.

import pygame
import random
from enum import Enum
from collections import namedtuple
import numpy as np

pygame.init()
font = pygame.font.SysFont('arial', 25)

class Direction(Enum):
    RIGHT = 1
    LEFT = 2
    UP = 3
    DOWN = 4

Point = namedtuple('Point', 'x, y')

# rgb colors
WHITE = (255, 255, 255)
RED = (200,0,0)
BLUE1 = (0, 0, 255)
BLUE2 = (0, 100, 255)
BLACK = (0,0,0)

BLOCK_SIZE = 20
SPEED = 40

class SnakeGameAI: ## SnakeGame -> SnakeGameAI

    def __init__(self, w=640, h=480):
        self.w = w
        self.h = h
        # init display
        self.display = pygame.display.set_mode((self.w, self.h))
        pygame.display.set_caption('Snake')
        self.clock = pygame.time.Clock()
        self.reset()


    def reset(self): ## reset 함수 추가
        # init game state
        self.direction = Direction.RIGHT

        self.head = Point(self.w/2, self.h/2)
        self.snake = [self.head,
                      Point(self.head.x-BLOCK_SIZE, self.head.y),
                      Point(self.head.x-(2*BLOCK_SIZE), self.head.y)]

        self.score = 0
        self.food = None
        self._place_food()
        self.frame_iteration = 0 ## 게임 몇번 반복됐는지 체크


    def _place_food(self):
        x = random.randint(0, (self.w-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE
        y = random.randint(0, (self.h-BLOCK_SIZE )//BLOCK_SIZE )*BLOCK_SIZE
        self.food = Point(x, y)
        if self.food in self.snake:
            self._place_food()


    def play_step(self, action): ## agent로 부터 action을 받으므로 action을 argument로 지정
        self.frame_iteration += 1 ## 한 프레임 지날때 마다 +1
        # 1. collect user input
        for event in pygame.event.get():
            if event.type == pygame.QUIT:
                pygame.quit()
                quit()
        ## 기존에 있던 키보드 입력 받는 부분 삭제
        
        # 2. move
        self._move(action) # update the head ## direction대신 action받음
        self.snake.insert(0, self.head)
        
        # 3. check if game over
        reward = 0 ## reward 변수 초기화
        game_over = False
        
        ## 100*len(self.snake) 프레임 이상 게임 진행되는경우도 gameover
        if self.is_collision() or self.frame_iteration > 100*len(self.snake): 
            game_over = True
            reward = -10 ## reward 업데이트
            return reward, game_over, self.score ## reward 변수 리턴 추가

        # 4. place new food or just move
        if self.head == self.food:
            self.score += 1
            reward = 10 ## reward 업데이트
            self._place_food()
        else:
            self.snake.pop()
        
        # 5. update ui and clock
        self._update_ui()
        self.clock.tick(SPEED)
        # 6. return game over and score
        return reward, game_over, self.score ## reward 변수 리턴 추가

	## state 변수에서 danger를 체크해야 하는 부분을 위해 pt 변수 추가
    def is_collision(self, pt=None):
        if pt is None:
            pt = self.head
        # hits boundary
        if pt.x > self.w - BLOCK_SIZE or pt.x < 0 or pt.y > self.h - BLOCK_SIZE or pt.y < 0:
            return True
        # hits itself
        if pt in self.snake[1:]:
            return True

        return False


    def _update_ui(self):
        self.display.fill(BLACK)

        for pt in self.snake:
            pygame.draw.rect(self.display, BLUE1, pygame.Rect(pt.x, pt.y, BLOCK_SIZE, BLOCK_SIZE))
            pygame.draw.rect(self.display, BLUE2, pygame.Rect(pt.x+4, pt.y+4, 12, 12))

        pygame.draw.rect(self.display, RED, pygame.Rect(self.food.x, self.food.y, BLOCK_SIZE, BLOCK_SIZE))

        text = font.render("Score: " + str(self.score), True, WHITE)
        self.display.blit(text, [0, 0])
        pygame.display.flip()


    def _move(self, action): ## user input이었던 direction 안 받고 action을 받음
    	## 기존 user input을 받던 방식과 다른 방식이므로 그에 따른 추가적이 계산을 해줌
        ## 예를 들어, 오른쪽 가고 있을때 action이 straight이면 그대로 오른쪽
        ## 오른쪽 가고 있을때 action이 right이면 아래, left면 위쪽이 된다.
        # [straight, right, left]

        clock_wise = [Direction.RIGHT, Direction.DOWN, Direction.LEFT, Direction.UP]
        idx = clock_wise.index(self.direction)

        if np.array_equal(action, [1, 0, 0]):
            new_dir = clock_wise[idx] # no change
        elif np.array_equal(action, [0, 1, 0]):
            next_idx = (idx + 1) % 4
            new_dir = clock_wise[next_idx] # right turn r -> d -> l -> u
        else: # [0, 0, 1]
            next_idx = (idx - 1) % 4
            new_dir = clock_wise[next_idx] # left turn r -> u -> l -> d

        self.direction = new_dir

        x = self.head.x
        y = self.head.y
        if self.direction == Direction.RIGHT:
            x += BLOCK_SIZE
        elif self.direction == Direction.LEFT:
            x -= BLOCK_SIZE
        elif self.direction == Direction.DOWN:
            y += BLOCK_SIZE
        elif self.direction == Direction.UP:
            y -= BLOCK_SIZE

        self.head = Point(x, y)
        
 ## game실행하는 반복문은 agent 파일에서 진행할 것이므로 삭제

## 로 시작되는 부분은 기존의 game파일의 코드와 달라진 부분에 대한 설명입니다.

 

 

# agent.py 

이제 게임을 플레이하는 agent를 만들어 보겠습니다.

저번 블로그 포스트를 보시면 아시겠지만 agent가 하는 일을 다시 간략히 말씀드리겠습니다.

get_state함수를 통해 현재 진행 중인 game을 받아들여서  state를 계산합니다.

get_move함수를 통해 게임을 진행시킵니다.

get_move 함수 내부에서 model.predcit 함수를 호출하여 action을 얻어내고

play_step함수에 action을 대입하여 게임을 진행합니다. 그다음 다시 state를 계산하고 

이 일련의 과정을 remember함수를 통해 기억한 후, model.train을 통해 모델을 학습합니다.

그러므로 agent 클래스에서 모델과 게임을 저장해야 합니다.

 

이제 agent.py 파일에 대해 말씀드리겠습니다.

game 파일과 동일한 저장소에 agent.py 파일을 만들어줍니다.

코드의 내용은 아래와 같습니다.

import torch
import random
import numpy as np
from collections import deque
from game import SnakeGameAI, Direction, Point
 ## model, helper 파일은 추후에 만들것입니다.
from model import Linear_QNet, QTrainer
from helper import plot 

MAX_MEMORY = 100_000
BATCH_SIZE = 1000
LR = 0.001

class Agent:

    def __init__(self):
        self.n_games = 0
        self.epsilon = 0 # randomness
        self.gamma = 0.9 # discount rate
        self.memory = deque(maxlen=MAX_MEMORY) # popleft()
        ## hidden layer는 조정 가능하나, input과 output은 정해져있으므로 고정입니다.
        self.model = Linear_QNet(11, 256, 3) ## input layer, hidden layer, output layer
        self.trainer = QTrainer(self.model, lr=LR, gamma=self.gamma)

	## 앞서 보여준 11개의 state를 계산하여 얻어냅니다.
    ## [danger straight, danger right, danger left,
    ## direction left, direction right, direction up, direction down,
    ## food left, food right, food up, food, down]
    def get_state(self, game):
        head = game.snake[0]
        point_l = Point(head.x - 20, head.y)
        point_r = Point(head.x + 20, head.y)
        point_u = Point(head.x, head.y - 20)
        point_d = Point(head.x, head.y + 20)
        
        dir_l = game.direction == Direction.LEFT
        dir_r = game.direction == Direction.RIGHT
        dir_u = game.direction == Direction.UP
        dir_d = game.direction == Direction.DOWN

        state = [
            # Danger straight
            (dir_r and game.is_collision(point_r)) or 
            (dir_l and game.is_collision(point_l)) or 
            (dir_u and game.is_collision(point_u)) or 
            (dir_d and game.is_collision(point_d)),

            # Danger right
            (dir_u and game.is_collision(point_r)) or 
            (dir_d and game.is_collision(point_l)) or 
            (dir_l and game.is_collision(point_u)) or 
            (dir_r and game.is_collision(point_d)),

            # Danger left
            (dir_d and game.is_collision(point_r)) or 
            (dir_u and game.is_collision(point_l)) or 
            (dir_r and game.is_collision(point_u)) or 
            (dir_l and game.is_collision(point_d)),
            
            # Move direction
            dir_l,
            dir_r,
            dir_u,
            dir_d,
            
            # Food location 
            game.food.x < game.head.x,  # food left
            game.food.x > game.head.x,  # food right
            game.food.y < game.head.y,  # food up
            game.food.y > game.head.y  # food down
            ]

        return np.array(state, dtype=int)
	
    ## state, action, reward of this action, next_state에 대한 정보를 기억합니다
    ## init함수에서 만들었던 메모리를 deque형태로 저장합니다.
    ## MAX_MEMORY를 넘어서면 가장 오래됐던 기억은 삭제됩니다.
    def remember(self, state, action, reward, next_state, done):
        self.memory.append((state, action, reward, next_state, done)) # popleft if MAX_MEMORY is reached

	## train on long memory
    ## 최대 BACTH_SIZE 크기의 데이터를 학습합니다.
    def train_long_memory(self):
        if len(self.memory) > BATCH_SIZE: # BATCH_SIZE크기를 초과할 경우 랜덤으로 그 사이즈만큼의 데이터를 뽑아냅니다.
            mini_sample = random.sample(self.memory, BATCH_SIZE) # list of tuples
        else:
            mini_sample = self.memory

        states, actions, rewards, next_states, dones = zip(*mini_sample)
        self.trainer.train_step(states, actions, rewards, next_states, dones)
        ## 아래 주석의 반복문 과정을 위의 2줄의 코드로 작성 가능합니다. 또한 위의 과정이 pytorch에서 더 빠릅니다. 
        #for state, action, reward, nexrt_state, done in mini_sample:
        #    self.trainer.train_step(state, action, reward, next_state, done)

	## train of short memory wiht one step
    def train_short_memory(self, state, action, reward, next_state, done):
        self.trainer.train_step(state, action, reward, next_state, done)

    def get_action(self, state):
    	## 처음에는 random한 움직으로 exploration 하겠지만
        ## 학습하게된 나중에는 exploitation 하는 비율을 늘려 나갈 것입니다.
        # random moves: tradeoff exploration / exploitation
        
        ## game 횟수가 늘어날수록 epsilon은 줄어들 것입니다.
        self.epsilon = 80 - self.n_games
        final_move = [0,0,0]
        
        ## eplsilon이 줄어들수록 확률적으로 아래 조건문에 달성할 확률 또한 줄어듭니다
        ## 즉, epsilon이 줄어들수록 랜덤한 움직임이 줄어듭니다.
        if random.randint(0, 200) < self.epsilon:
            move = random.randint(0, 2)
            final_move[move] = 1
        else:
        ## exploitation 합니다. model.predict를 통해 다음 action을 return합니다.
            state0 = torch.tensor(state, dtype=torch.float)
            prediction = self.model(state0) # model의 forward 함수가 실행될것입니다. 
            move = torch.argmax(prediction).item() ##predction의 raw값 중 가장 큰 값만 뽑아냅니다.
            final_move[move] = 1

        return final_move


# global function인 train함수를 통해 에이전트한테 게임을 학습시킬 것입니다.
def train():
    plot_scores = [] ## train결과를 plot할 score 리스트입니다.
    plot_mean_scores = [] ## train결과를 plot할 평균 score 리스트입니다.
    total_score = 0
    record = 0
    agent = Agent()
    game = SnakeGameAI()
    ## 바로 이전 블로그 포스트에 코드 구성분분 슬라이드에서 보았던 반복문 구현 부분입니다.
    ## agent 의 train 핵심 부분이라 할 수 있습니다.
    while True:
        # get old state
        state_old = agent.get_state(game)

        # get move
        final_move = agent.get_action(state_old)

        # perform move and get new state
        reward, done, score = game.play_step(final_move)
        state_new = agent.get_state(game)

        # train short memory
        agent.train_short_memory(state_old, final_move, reward, state_new, done)

        # remember
        agent.remember(state_old, final_move, reward, state_new, done)

        if done:
            # train long memory, plot result
            ## train_long_memory 함수는 이전의 moves와 게임들을 모두 학습하여 agent의 play에 엄청난 도움이 될 것입니다.
            game.reset() # game이 done 됐으므로 reset 해줘야합니다. 
            agent.n_games += 1 # game number를 +1 해줍니다.
            agent.train_long_memory()

            if score > record: ## 최고 기록을 갱신하면 record를 갱신해줍니다.
                record = score
                #agent.model.save() ## 이때 원하면 주석을 제거하여 model을 save 가능합니다.

            print('Game', agent.n_games, 'Score', score, 'Record:', record)

            plot_scores.append(score)
            total_score += score
            mean_score = total_score / agent.n_games
            plot_mean_scores.append(mean_score)
            plot(plot_scores, plot_mean_scores)


if __name__ == '__main__':
    train()

마찬가지로 ## 표시로 제가 추가 설명을 넣었습니다.

주석하나(#)은 원본 github파일의 원본 주석 설명입니다.

 

# model.py

이제 model.py파일 만들어 보겠습니다. 같은 작업파일에 model.py를 만들어주세요.

 

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import os

class Linear_QNet(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.linear2 = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        x = F.relu(self.linear1(x))
        x = self.linear2(x)
        return x

    def save(self, file_name='model.pth'):
        model_folder_path = './model'
        if not os.path.exists(model_folder_path):
            os.makedirs(model_folder_path)

        file_name = os.path.join(model_folder_path, file_name)
        torch.save(self.state_dict(), file_name)


class QTrainer:
    def __init__(self, model, lr, gamma):
        self.lr = lr
        self.gamma = gamma
        self.model = model
        self.optimizer = optim.Adam(model.parameters(), lr=self.lr) ## 최적화함수
        self.criterion = nn.MSELoss() ## 손실함수

    def train_step(self, state, action, reward, next_state, done):
        state = torch.tensor(state, dtype=torch.float)
        next_state = torch.tensor(next_state, dtype=torch.float)
        action = torch.tensor(action, dtype=torch.long)
        reward = torch.tensor(reward, dtype=torch.float)
        # (n, x)
		
        ## train short memory 일 경우
        if len(state.shape) == 1:
            # (1, x)
            state = torch.unsqueeze(state, 0)
            next_state = torch.unsqueeze(next_state, 0)
            action = torch.unsqueeze(action, 0)
            reward = torch.unsqueeze(reward, 0)
            done = (done, )

        # 1: predicted Q values with current state
        ## Bellman방정식의 간단화 버전 이용(지난 포스트 참고)
        pred = self.model(state)

        target = pred.clone()
        for idx in range(len(done)):
            Q_new = reward[idx]
            if not done[idx]:
                Q_new = reward[idx] + self.gamma * torch.max(self.model(next_state[idx]))

            target[idx][torch.argmax(action[idx]).item()] = Q_new
    
        # 2: Q_new = r + y * max(next_predicted Q value) -> only do this if not done
        # pred.clone()
        # preds[argmax(action)] = Q_new
        self.optimizer.zero_grad()
        loss = self.criterion(target, pred)
        loss.backward()

        self.optimizer.step()

 

 

# helper.py

결과를 plot해주는 함수입니다.

import matplotlib.pyplot as plt
from IPython import display

plt.ion()

def plot(scores, mean_scores):
    display.clear_output(wait=True)
    display.display(plt.gcf())
    plt.clf()
    plt.title('Training...')
    plt.xlabel('Number of Games')
    plt.ylabel('Score')
    plt.plot(scores)
    plt.plot(mean_scores)
    plt.ylim(ymin=0)
    plt.text(len(scores)-1, scores[-1], str(scores[-1]))
    plt.text(len(mean_scores)-1, mean_scores[-1], str(mean_scores[-1]))
    plt.show(block=False)
    plt.pause(.1)

 

 

# 결과

 

약 100회 정도 시도하니 상당히 고득점을 달성하는 모습을 볼 수 있습니다. 학습이 잘 된 것 같습니다.

 

다음 포스트에서는 snake game에 이용되었던 강화학습 알고리즘을 차트투자에 적용시켜 보겠습니다.