All Articles

Predicting Bitcoin with Pytorch

First install Correct Libraries

!pip install chart_studio plotly==4.9.0 statsmodels==0.11.0 pmdarima ipdb wandb pyarrow==2.0.0
!pip install pytorch-lightning==1.0.4

Import everything we need

import numpy as np
import pandas as pd
import plotly.graph_objects as go
import os
import torch.nn as nn
import torch
from torch.autograd import Variable
import ipdb
from torch.utils.data import TensorDataset, DataLoader, Dataset
device = torch.device("cuda:0")
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
import shutil
from IPython.display import clear_output 
import time
import urllib

I've separately collected 3 years bitcoin data at 15 minute intervals from the binance api. It is stored in parquet because it loads faster and preserves type information.

bitcoin_data_url = "https://drive.google.com/u/0/uc?id=14iEVdVtBaVfN6dMg0bO4QrfSaUoeXJ4Y&export=download"
urllib.request.urlretrieve(bitcoin_data_url, "data.parquet")
('data.parquet', <http.client.HTTPMessage at 0x7fed6e1bf7b8>)

Now lets create the Model. I'm using GRU instead of LSTM, because it's a little simpler in implementation

class GRU(nn.Module):
    def __init__(self, i_size, h_size, n_layers, o_size):
        super(GRU, self).__init__()

        self.rnn = nn.GRU(
            input_size=i_size,
            hidden_size=h_size,
            num_layers=n_layers,
            batch_first=True
        )
        self.hidden_size = h_size
        self.num_layers = n_layers
        self.out = nn.Linear(h_size, o_size)

    def init_hidden(self, batch_size=32):
      return torch.zeros(self.num_layers, batch_size, self.hidden_size)

    def forward(self, x, hidden=None):
        #num layers, batch size, hidden
        if hidden is None:
          batch_size = x.shape[0]
          hidden = self.init_hidden(batch_size)
          hidden = hidden.type_as(x)

        out, next_hidden = self.rnn(x, hidden)
        outs = self.out(out[:,-1,:])

        return outs, next_hidden 

I'm going to create a function because functions are nice, they're reusable, if you want to transfer this code out of jupyter, it makes life a lot easier. Actually if I weren't writing a blog post, I would keep most of my code outside of the ipynb file, just importing the functions I need, and merely using the notebook as a way to display graphs inline. There's more to this tangent, but I'll leave that for another blog post.

def run(model_klass, 
        dataset_train, 
        dataset_test, 
        sc, 
        name='test',
        input_size =15,
        hidden_size=64,
        num_layers=2,
        output_size=1,
        num_epochs=3, 
        batch_size=32,
        learning_rate=.001):
  print(f"RUNNING {name}")
  training_set = dataset_train
  training_set_scaled = sc.fit_transform(training_set)

  X_train = []
  y_train = []
  for i in range(input_size, training_set_scaled.shape[0]):
      X_train.append(training_set_scaled[i-input_size:i, 0])
      y_train.append(training_set_scaled[i, 0])
  X_train, y_train = np.array(X_train), np.array(y_train)

  X_train = np.reshape(X_train, (X_train.shape[0], 1, X_train.shape[1]))

  rnn = model_klass(input_size, hidden_size, num_layers, output_size).to(device)

  optimiser = torch.optim.Adam(rnn.parameters(), lr=learning_rate)
  criterion = nn.MSELoss()

  inputs = Variable(torch.from_numpy(X_train).float()).to(device)
  labels = Variable(torch.from_numpy(y_train).float()).to(device)

  dataset = TensorDataset(inputs, labels)
  loader = DataLoader(dataset, batch_size=batch_size, shuffle=True, drop_last=True)
  for epoch in range(num_epochs):
      losses = []
      hidden = rnn.init_hidden(batch_size=batch_size).to(device)
      for inputs, labels in loader:
        hidden = hidden.data
        output, hidden = rnn(inputs, None) 

        optimiser.zero_grad()
        loss = criterion(output.view(-1), labels)
        loss.backward()                     # back propagation
        optimiser.step()                                     # update the parameters
        losses.append(loss.item())
      if epoch % 5 == 0:
        print('epoch {}, loss {}'.format(epoch,np.mean(losses)))

  real_stock_price = dataset_test #open values
  # Getting the predicted stock price of 2017
  dataset_total = np.concatenate((dataset_train, dataset_test), axis = 0)
  # inputs = dataset_total[len(dataset_total) - len(dataset_test) - INPUT_SIZE:].values
  inputs = dataset_total
  inputs = inputs.reshape(-1,1)
  # inputs = np.diff(inputs, axis=0)
  inputs = sc.transform(inputs)
  X_test = []
  for i in range(input_size, len(inputs)):
      X_test.append(inputs[i-input_size:i, 0])
  X_test = np.array(X_test)
  X_test = np.reshape(X_test, (X_test.shape[0], 1, X_test.shape[1]))

  # X_train_X_test = np.concatenate((X_train, X_test),axis=0)
  test_inputs = Variable(torch.from_numpy(X_test).float()).to(device)
  # test_inputs = Variable(torch.from_numpy(X_train_X_test).float()).to(dev)
  predicted_stock_price, b = rnn(test_inputs)
  predicted_stock_price = np.reshape(predicted_stock_price.detach().cpu().numpy(), (test_inputs.cpu().shape[0], 1))

  predicted_stock_price = sc.inverse_transform(predicted_stock_price)[:,0]
  # predicted_stock_price = diffinv(predicted_stock_price, start).reshape(-1, 1)

  real_stock_price_all = dataset_total[input_size:][:,0]

  # Visualising the results
  N = predicted_stock_price.shape[0]
  test_start = int(N * 0.75)

  fig = go.Figure()
  fig.add_trace(go.Scatter(y=real_stock_price_all, name='Real'))
  fig.add_trace(go.Scatter(y=predicted_stock_price, name='Pred'))
  fig.add_shape(type="line",
    x0=test_start, y0=0, x1=test_start, y1=20000,
    line=dict(color="RoyalBlue",width=1))
  fig.add_trace(go.Scatter(
      x=[test_start - 5000], y=[15000],
      text=["Train/Test Split"],
      mode="text",
  ))
  fig.show()

  # mean_squared_error(real_stock_price_all, predicted_stock_price, squared=False)
  t_d = predicted_stock_price[test_start:]
  r_d = real_stock_price_all[test_start:]
  test_rse = mean_squared_error(t_d, r_d, squared=False)

  t_d = predicted_stock_price[0:test_start]
  r_d = real_stock_price_all[0:test_start]
  train_rse = mean_squared_error(t_d, r_d, squared=False)
  return name, train_rse, test_rse

Ok, now everything is setup, we're going to run everything

start = time.time()

df = pd.read_parquet('./data.parquet')
cols = ['open']
data_train, data_test = train_test_split(df[cols].values, test_size=0.25, shuffle=False)

errors = []

sc = MinMaxScaler(feature_range = (-1, 1))
err = run(GRU, data_train, data_test, sc, 'gru minmax 0,1')
errors.append(err)

total_time = time.time() - start
print("Total time:", total_time)

df = pd.DataFrame(errors)
df.columns = ['name','train error', 'test error']
print(df)
RUNNING gru minmax 0,1 epoch 0, loss 0.001470431994195942
Total time: 39.861626863479614 name train error test error 0 gru minmax 0,1 62.584384 42.73687

From the graph we can see that the model does a good, job predicting the price in just a few epochs.