NGMsoftware

NGMsoftware
로그인 회원가입
  • 매뉴얼
  • 학습
  • 매뉴얼

    학습


    딥러닝 딥러닝으로 로또 당첨 번호를 예측할 수 있을까?

    페이지 정보

    본문

    안녕하세요. 엔지엠소프트웨어입니다. 여러분들도 일정 금액을 받고 매주 로또 당첨 번호를 보내주는 광고를 접한 경험이 있을겁니다. 사실 저처럼 제조 환경에서 수율을 개선하고, 통계쪽 개발하는 개발자는 로또 당첨 번호를 예측한다는게 얼마나 말이 안되는것인지 잘 알고 있을겁니다. 하지만~ 많은 사람들이 유료로 결제해서 이런 서비스를 이용하고 있다는걸 알게 되었습니다. 물론, 이런 업체들이 내세우는 알고리즘 또는 인공지능 기술이 어떤것인지는 모르겠지만 로또와 같은 인과 관계가 없는 연속된 데이타를 예측한다는건 불가능합니다. 로또 번호와 같이 독립적 사건은 언제나 한 번호가 뽑힐 확률은 45/1이기 때문입니다. 저번주에 나온 번호와 이번주에 나올 번호와는 그 어떠한 인과 관계가 존재하지 않는다는 뜻입니다. 만약, 통계적으로 예측이 가능하다면 세계의 모든 수학자들은 이미 부자가 되어 있어야 합니다. 실상은 그렇지 않죠^^;

    ※ 로또 보다는 카지노에 가서 덱(보통 6덱 사용)에서 빠진 카드를 계산하는게 돈 버는 확률은 더 높습니다. 카드 카운팅이 불법은 아니기 때문이죠. 머리가 좋아야 가능한 부분이다보니 누구나 다 적용되는건 아닙니다. 6덱에서 카드 카운팅 후 확률을 계산해서 베팅해야 하기 때문에 암기와 암산 능력이 탁월한 또는 훈련된 사람만 가능합니다. CRM이라는 카지노 운영 시스템을 개발 및 관리한 경험 때문에 알고 있는 내용입니다-_-;

     

    유튜브에 검색해보면 수많은 자료를 볼 수 있을겁니다. 요즘은 파이썬과 텐서플로우로 인해 많은 개발자가 빠르고 쉽게 머신 러닝을 테스트하고 평가해볼 수 있는 환경이 되었습니다. 엔지엠소프트웨어에서도 이런 딥러닝으로 학습한 모델을 이용하여 그동안 매크로로 할 수 없었던 많은 것들을 처리할 수 있게 되었습니다. 요즘은 RPA라는 용어로 자동화를 진행하고 있습니다. 이 내용은 재미로만 봐주시고, 문제점에 대해서는 언제든지 피드백을 주시면 수정하도록 하겠습니다.

    ※ 이 글은 유튜버 조코딩님의 강좌를 참고했습니다.

     

    딥러닝을 하기 위해서는 학습할 데이타셋(그동안 1등 당첨된 번호들)이 필요하고, 이 데이타셋을 어떻게 학습시킬건지 설정해야 합니다. 그리고, 데이타셋을 기반으로 학습된 모델을 이용해서 다음 회차에 어떤 번호가 1등으로 당첨될지 예측하는 순서로 진행됩니다. 자~ 첫번째로 해야할 일은 그동안 누적된 1등 당첨 번호를 알아내야 합니다. 다행스럽게도 로또645에서는 그동안 1등 번호들을 정리해서 제공하고 있습니다. 아래 링크를 클릭하여 로또645 페이지로 이동하세요.

    [ 로또645 ]

    ck5ic92.png

     

     

    마지막으로 스크롤을 내려보면 회차별로 엑셀 파일을 다운로드 받을 수 있습니다. 아래 그림과같이 1부터 마지막 회차까지 선택하고, "엑셀다운로드"를 클릭하세요.

    owQv2p0.png

     

     

    다운로드 받은 엑셀 파일을 열어보면 1회차부터 선택한 마지막 회차까지 모든 정보가 정리되어 있는걸 확인할 수 있습니다.

    RugfoxM.png

     

     

    이 데이타를 직접 사용할수는 없습니다. 프로그래밍으로 데이타를 정리하고 파싱해서 사용해도 되지만, 이런 코딩까지 작성하려면 많은 시간과 노력이 필요합니다. 데이타 정제에 대한 내용이 아니므로 이 부분은 수동으로 진행해야 합니다. 엑셀에서 불필요한 데이타를 모두 삭제하고 CSV로 만들어서 첨부해두었으니 여러분들은 첨부된 파일을 다운로드해서 테스트 해보세요. "로또645.csv" 파일이 다운로드 되었으면 Visual Stduio Code에서 열어보세요. 쉼표(Comma)로 구분된 데이타셋을 확인할 수 있습니다.

    PG9GxIM.png

     

     

    데이타셋을 보면 당첨 금액쪽에 쉼표 사이에 공백이 있는걸 알 수 있습니다. 이 공백들도 모두 제거해줘야 합니다. 아래 동영상처럼 공백을 하나 선택하고 Ctrl+H를 눌러서 모두 바꾸기를 클릭하세요. 여러분들도 모든 공백이 사라졌을겁니다. 만약, 단축키로 동작하지 않으면 메뉴의 편집 > 바꾸기를 이용하셔도 됩니다.

    prv1N3Z.gif

     

     

    범주형 데이타를 딥러닝에서 이용하기 위해 원핫인코딩으로 변형해서 진행해야 합니다. 원핫인코딩은 단어 집합의 크기를 벡터의 차원으로 하고, 표현하고 싶은 단어의 인덱스에 1의 값을 부여하고, 다른 인덱스에는 0을 부여하는 단어의 벡터 표현 방식입니다. 이렇게 표현된 벡터를 원핫 벡터(One Hot Vector)라고 합니다. 사실 어떤 데이타를 어떻게 전처리할지는 분석하려는 대상에 따라 달라질 수 있다는걸 알아야 합니다. 우리가 분석할 데이타는 야구에서 보면 출루율, 타율, 방어율과 같은 수치형 데이타와 지역, 성씨, 정세등과 같은 범주형 데이타로 나눌 수 있습니다. 로또 당첨 번호는 숫자지만 연속된 인과 관계가 없으므로 매번 출현하는 번호는 범주형으로 분류할 수 있습니다. Visual Studio Code를 실행하고, "lotto645.py" 파일을 추가합니다.

    BkpaNJn.gif

     

     

    첨부되어 있는 "로또645.csv" 파일을 워크스페이스의 dataset에 복사해줍니다.

    ZVkPH47.png

     

     

    아래 코드를 붙여넣기하고, 저장(Ctrl+S) 후 실행(F5) 해보세요.

    ※ 출처는 [ 조코딩님 깃헙 ]입니다. 보통은 내용을 보고 직접 타이핑하지만... 이건 내용이 길어서 복붙 후 로컬 환경에서 동작하도록 약간 수정했습니다.

    import os
    os.add_dll_directory('C:\\Program Files\\NVIDIA GPU Computing Toolkit\\CUDA\\v11.0\\bin')
    
    import numpy as np
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras import layers
    from tensorflow.keras import models
    import matplotlib.pyplot as plt
    from matplotlib.ticker import MaxNLocator
    
    rows = np.loadtxt("./dataset/로또645.csv", delimiter=",")
    row_count = len(rows)
    
    # 당첨번호를 원핫인코딩벡터(ohbin)으로 변환
    def numbers2ohbin(numbers):
    
        ohbin = np.zeros(45) #45개의 빈 칸을 만듬
    
        for i in range(6): #여섯개의 당첨번호에 대해서 반복함
            ohbin[int(numbers[i])-1] = 1 #로또번호가 1부터 시작하지만 벡터의 인덱스 시작은 0부터 시작하므로 1을 뺌
        
        return ohbin
    
    # 원핫인코딩벡터(ohbin)를 번호로 변환
    def ohbin2numbers(ohbin):
    
        numbers = []
        
        for i in range(len(ohbin)):
            if ohbin[i] == 1.0: # 1.0으로 설정되어 있으면 해당 번호를 반환값에 추가한다.
                numbers.append(i+1)
        
        return numbers
    
    numbers = rows[:, 1:7]
    ohbins = list(map(numbers2ohbin, numbers))
    
    x_samples = ohbins[0:row_count-1]
    y_samples = ohbins[1:row_count]
    
    #원핫인코딩으로 표시
    print("ohbins")
    print("X[0]: " + str(x_samples[0]))
    print("Y[0]: " + str(y_samples[0]))
    
    #번호로 표시
    print("numbers")
    print("X[0]: " + str(ohbin2numbers(x_samples[0])))
    print("Y[0]: " + str(ohbin2numbers(y_samples[0])))
    
    train_idx = (0, 800)
    val_idx = (801, 900)
    test_idx = (901, len(x_samples))
    
    print("train: {0}, val: {1}, test: {2}".format(train_idx, val_idx, test_idx))
    
    # 모델을 정의합니다.
    model = keras.Sequential([
        keras.layers.LSTM(128, batch_input_shape=(1, 1, 45), return_sequences=False, stateful=True),
        keras.layers.Dense(45, activation='sigmoid')
    ])
    
    # 모델을 컴파일합니다.
    model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])
    
    # 매 에포크마다 훈련과 검증의 손실 및 정확도를 기록하기 위한 변수
    train_loss = []
    train_acc = []
    val_loss = []
    val_acc = []
    
    # 최대 100번 에포크까지 수행
    for epoch in range(100):
    
        model.reset_states() # 중요! 매 에포크마다 1회부터 다시 훈련하므로 상태 초기화 필요
    
        batch_train_loss = []
        batch_train_acc = []
        
        for i in range(train_idx[0], train_idx[1]):
            
            xs = x_samples[i].reshape(1, 1, 45)
            ys = y_samples[i].reshape(1, 45)
            
            loss, acc = model.train_on_batch(xs, ys) #배치만큼 모델에 학습시킴
    
            batch_train_loss.append(loss)
            batch_train_acc.append(acc)
    
        train_loss.append(np.mean(batch_train_loss))
        train_acc.append(np.mean(batch_train_acc))
    
        batch_val_loss = []
        batch_val_acc = []
    
        for i in range(val_idx[0], val_idx[1]):
    
            xs = x_samples[i].reshape(1, 1, 45)
            ys = y_samples[i].reshape(1, 45)
            
            loss, acc = model.test_on_batch(xs, ys) #배치만큼 모델에 입력하여 나온 답을 정답과 비교함
            
            batch_val_loss.append(loss)
            batch_val_acc.append(acc)
    
        val_loss.append(np.mean(batch_val_loss))
        val_acc.append(np.mean(batch_val_acc))
    
        print('epoch {0:4d} train acc {1:0.3f} loss {2:0.3f} val acc {3:0.3f} loss {4:0.3f}'.format(epoch, np.mean(batch_train_acc), np.mean(batch_train_loss), np.mean(batch_val_acc), np.mean(batch_val_loss)))
    
        
    fig, loss_ax = plt.subplots()
    
    acc_ax = loss_ax.twinx()
    
    loss_ax.plot(train_loss, 'y', label='train loss')
    loss_ax.plot(val_loss, 'r', label='val loss')
    
    acc_ax.plot(train_acc, 'b', label='train acc')
    acc_ax.plot(val_acc, 'g', label='val acc')
    
    loss_ax.set_xlabel('epoch')
    loss_ax.set_ylabel('loss')
    acc_ax.set_ylabel('accuray')
    
    loss_ax.legend(loc='upper left')
    acc_ax.legend(loc='lower left')
    
    plt.show()
    
    # 88회부터 지금까지 1등부터 5등까지 상금의 평균낸다.
    mean_prize = [ np.mean(rows[87:, 8]),
               np.mean(rows[87:, 9]),
               np.mean(rows[87:, 10]),
               np.mean(rows[87:, 11]),
               np.mean(rows[87:, 12])]
    
    print(mean_prize)
    
    # 등수와 상금을 반환함
    # 순위에 오르지 못한 경우에는 등수가 0으로 반환함
    def calc_reward(true_numbers, true_bonus, pred_numbers):
    
        count = 0
    
        for ps in pred_numbers:
            if ps in true_numbers:
                count += 1
    
        if count == 6:
            return 0, mean_prize[0]
        elif count == 5 and true_bonus in pred_numbers:
            return 1, mean_prize[1]
        elif count == 5:
            return 2, mean_prize[2]
        elif count == 4:
            return 3, mean_prize[3]
        elif count == 3:
            return 4, mean_prize[4]
    
        return 5, 0
        
    
    def gen_numbers_from_probability(nums_prob):
        
        ball_box = []
    
        for n in range(45):
            ball_count = int(nums_prob[n] * 100 + 1)
            ball = np.full((ball_count), n+1) #1부터 시작
            ball_box += list(ball)
    
        selected_balls = []
    
        while True:
            
            if len(selected_balls) == 6:
                break
            
            ball_index = np.random.randint(len(ball_box), size=1)[0]
            ball = ball_box[ball_index]
    
            if ball not in selected_balls:
                selected_balls.append(ball)
    
        return selected_balls
    
        train_total_reward = []
    train_total_grade = np.zeros(6, dtype=int)
    
    val_total_reward = []
    val_total_grade = np.zeros(6, dtype=int)
    
    test_total_reward = []
    test_total_grade = np.zeros(6, dtype=int)
    
    model.reset_states()
    
    print('[No. ] 1st 2nd 3rd 4th 5th 6th Rewards')
    
    for i in range(len(x_samples)):
        xs = x_samples[i].reshape(1, 1, 45)
        ys_pred = model.predict_on_batch(xs) # 모델의 출력값을 얻음
        
        sum_reward = 0
        sum_grade = np.zeros(6, dtype=int) # 6등까지 변수
    
        for n in range(10): # 10판 수행
            numbers = gen_numbers_from_probability(ys_pred[0])
            
            #i회차 입력 후 나온 출력을 i+1회차와 비교함
            grade, reward = calc_reward(rows[i+1,1:7], rows[i+1,7], numbers) 
            
            sum_reward += reward
            sum_grade[grade] += 1
    
            if i >= train_idx[0] and i < train_idx[1]:
                train_total_grade[grade] += 1
            elif i >= val_idx[0] and i < val_idx[1]:
                val_total_grade[grade] += 1
            elif i >= test_idx[0] and i < test_idx[1]:
                val_total_grade[grade] += 1
        
        if i >= train_idx[0] and i < train_idx[1]:
            train_total_reward.append(sum_reward)
        elif i >= val_idx[0] and i < val_idx[1]:
            val_total_reward.append(sum_reward)
        elif i >= test_idx[0] and i < test_idx[1]:
            test_total_reward.append(sum_reward)
                            
        print('[{0:4d}] {1:3d} {2:3d} {3:3d} {4:3d} {5:3d} {6:3d} {7:15,d}'.format(i+1, sum_grade[0], sum_grade[1], sum_grade[2], sum_grade[3], sum_grade[4], sum_grade[5], int(sum_reward)))
    
    print('Total') 
    print('==========')    
    print('Train {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(train_total_grade[0], train_total_grade[1], train_total_grade[2], train_total_grade[3], train_total_grade[4], train_total_grade[5], int(sum(train_total_reward))))
    print('Val   {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(val_total_grade[0], val_total_grade[1], val_total_grade[2], val_total_grade[3], val_total_grade[4], val_total_grade[5], int(sum(val_total_reward))))
    print('Test  {0:5d} {1:5d} {2:5d} {3:5d} {4:5d} {5:5d} {6:15,d}'.format(test_total_grade[0], test_total_grade[1], test_total_grade[2], test_total_grade[3], test_total_grade[4], test_total_grade[5], int(sum(test_total_reward))))
    print('==========')
    
    total_reward = train_total_reward + val_total_reward + test_total_reward
    
    plt.plot(total_reward)
    plt.ylabel('rewards')
    plt.show()
    
    ax = plt.figure().gca()
    ax.xaxis.set_major_locator(MaxNLocator(integer=True))
    
    rewards = [sum(train_total_reward), sum(val_total_reward), sum(test_total_reward)]
    
    class_color=['green', 'blue', 'red']
    
    plt.bar(['train', 'val', 'test'], rewards, color=class_color)
    plt.ylabel('rewards')
    plt.show()
    
    # 최대 100번 에포크까지 수행
    for epoch in range(100):
    
        model.reset_states() # 중요! 매 에포크마다 1회부터 다시 훈련하므로 상태 초기화 필요
    
        for i in range(len(x_samples)):
            
            xs = x_samples[i].reshape(1, 1, 45)
            ys = y_samples[i].reshape(1, 45)
            
            loss, acc = model.train_on_batch(xs, ys) #배치만큼 모델에 학습시킴
    
            batch_train_loss.append(loss)
            batch_train_acc.append(acc)
    
        train_loss.append(np.mean(batch_train_loss))
        train_acc.append(np.mean(batch_train_acc))
    
        print('epoch {0:4d} train acc {1:0.3f} loss {2:0.3f}'.format(epoch, np.mean(batch_train_acc), np.mean(batch_train_loss)))
    
        # 마지막 회차까지 학습한 모델로 다음 회차 추론
    
    print('receive numbers')
    
    xs = x_samples[-1].reshape(1, 1, 45)
    
    ys_pred = model.predict_on_batch(xs)
    
    list_numbers = []
    
    for n in range(10):
        numbers = gen_numbers_from_probability(ys_pred[0])
        numbers.sort()
        print('{0} : {1}'.format(n, numbers))
        list_numbers.append(numbers)

     

    훈련셋(train loss)에 대해서는 에포크¹가 늘어날수록 정확도가 올라가고 손실은 떨어지는것을 확인할 수 있습니다. 하지만, 검증셋은 일반적인 학습 모델과는 다른 양상을 보여줍니다. 

    1) 에포크란?

    CNN과 같은 딥러닝 모델을 훈련시키다보면 꼭 만나게 되는 것이 배치(batch), 에포크(epoch)라는 단어입니다. 이 두 단어가 무엇을 지칭하는 것인지를 알아야 모델을 제대로 훈련시킬 수 있습니다. 먼저 batch의 사전적 의미에는 "집단, 무리, 한 회분, (일괄 처리를 위해) 함께 묶다" 등이 있습니다. 딥러닝에서 배치는 모델의 가중치를 한번 업데이트 시킬 때 사용되는 샘플들의 묶음을 의미합니다. 만약에 총 1000개의 훈련 샘플이 있는데, 배치 사이즈가 20이라면 20개의 샘플 단위마다 모델의 가중치를 한번씩 업데이트 시킵니다. 그러니까 총 50번(=1000/20) 가중치가 업데이트 되겠죠. 하나의 데이터셋을 총 50개의 배치로 나눠서 훈련을 진행한겁니다. epoch의 사전적 의미는 "시대(중요한 사건, 변화들이 일어난)"입니다. 딥러닝에서 에포크는 학습의 횟수를 의미합니다. 만약 에포크가 10이고 배치 사이즈가 20이면, 가중치를 50번 업데이트하는 것을 총 10번 반복합니다. 각 데이터 샘플이 총 10번씩 사용되는 것입니다. 결과적으로 가중치가 총 500번 업데이트 됩니다.

    QRkUfLA.png

     

     

    학습 데이타로 훈련한 구간이 아닌 800회부터는 당첨이 안되는걸 알 수 있습니다.

    wCCw6HK.png

     

     

    다른 그래프로 확인해봐도 동일한 결과가 나오는 것을 알 수 있습니다.

    DClcZpv.png

     

     

    딥러닝으로 학습한 후 예측한 결과 값은 아래와 같습니다.

    yHZdCrw.png

     

     

    오늘이 토요일이라서 저 결과로 로또를 사보겠습니다-_-; 사실 로또는 운입니다. 기대하면 안됩니다. 반도체 공정쪽에서 수율을 증가시키기 위한 여러가지 활동들이 있습니다. 장비의 상태나 날씨, PM, 재료, RMS, 시간, 두께, 습도등등... 많은 변수를 y값으로 통계적으로 계산합니다. 그래도~ 예측이 어렵습니다. 이전 Raw data가 상관 관계를 가지고 있을것이라고 생각하고 여러가지 방법으로 전처리를 한 후 가우스 분석, 변량 분석, 아노바 분석, 페르마나 프랙탈, 회귀, 분산분포등등... 이런 분석에는 다양한 외부 환경 요인도 가중치를 계산해서 넣어줘야 합니다. 다시 로또 당첨 번호 예측을 하려면 단순히 1등 번호만으로는 불가능하다는걸 여러분들도 이제 알게 되었을겁니다. 여기서, 로또 번호를 추첨할 당시에 기상 상황, 스테이지의 컨디션, 추첨 기계의 노후나 정비 상태, 건물의 떨림 척도를 계측할 수 있는 진동 스펙트럼등등... 수많은 변수를 대입해서 처리하면 될수도 있지 않을까 하는 생각을 해봅니다. 1번 공이 나올때와 100프로 일치하는 시공간적 환경을 만들어준다면 똑같이 1이 나오지 않을까요? 하지만, 저런 환경을 만든다는것 자체가 불가능하기 때문에 즐거운 상상으로만 만족하시길 바랍니다^^

     

    개발자에게 후원하기

    MGtdv7r.png

     

    추천, 구독, 홍보 꼭~ 부탁드립니다.

    여러분의 후원이 빠른 귀농을 가능하게 해줍니다~ 답답한 도시를 벗어나 귀농하고 싶은 개발자~

    감사합니다~

    • 네이버 공유하기
    • 페이스북 공유하기
    • 트위터 공유하기
    • 카카오스토리 공유하기
    추천0 비추천1

    댓글목록

    등록된 댓글이 없습니다.