Subnet8 / xgb-mining-model.py
Asif Ahmad
Update xgb-mining-model.py
c7a9443
# developer: Taoshidev
# Copyright © 2023 Taoshi, LLC
import numpy as np
import tensorflow
from numpy import ndarray
import xgboost as xgb
class BaseMiningModel:
def __init__(self, features):
self.neurons = [[50,0]]
self.features = features
self.loaded_model = None
self.window_size = 100
self.model_dir = None
self.batch_size = 16
self.learning_rate = 0.01
def set_neurons(self, neurons):
self.neurons = neurons
return self
def set_window_size(self, window_size):
self.window_size = window_size
return self
def set_model_dir(self, model, stream_id=None):
if model is None and stream_id is not None:
# self.model_dir = f'mining_models/{stream_id}.keras'
self.model_dir = f'./mining_models/{stream_id}.model'
elif model is not None:
self.model_dir = model
else:
raise Exception("stream_id is not provided to define model")
return self
def set_batch_size(self, batch_size):
self.batch_size = batch_size
return self
def set_learning_rate(self, learning_rate):
self.learning_rate = learning_rate
return self
def load_model(self):
# self.loaded_model = tensorflow.keras.models.load_model(self.model_dir)
self.loaded_model = xgb.Booster()
self.loaded_model.load_model(self.model_dir)
return self
def train(self, data: ndarray)#, epochs: int = 100):
try:
model = tensorflow.keras.models.load_model(self.model_dir)
except OSError:
model = None
output_sequence_length = 100
if model is None:
model = xgb.XGBRegressor(random_state = 1)
# model = tensorflow.keras.models.Sequential()
# if len(self.neurons) > 1:
# model.add(tensorflow.keras.layers.LSTM(self.neurons[0][0],
# input_shape=(self.window_size, self.features),
# return_sequences=True))
# for ind, stack in enumerate(self.neurons[1:]):
# return_sequences = True
# if ind+1 == len(self.neurons)-1:
# return_sequences = False
# model.add(tensorflow.keras.layers.Dropout(stack[1]))
# model.add(tensorflow.keras.layers.LSTM(stack[0], return_sequences=return_sequences))
# else:
# model.add(tensorflow.keras.layers.LSTM(self.neurons[0][0],
# input_shape=(self.window_size, self.features)))
# model.add(tensorflow.keras.layers.Dense(1))
# optimizer = tensorflow.keras.optimizers.Adam(learning_rate=self.learning_rate)
# model.compile(optimizer=optimizer, loss='mean_squared_error')
# X_train, Y_train = [], []
# X_train_data = data
# Y_train_data = data.T[0].T
# for i in range(len(Y_train_data) - output_sequence_length - self.window_size):
# target_sequence = Y_train_data[i+self.window_size+output_sequence_length:i+self.window_size+output_sequence_length+1]
# Y_train.append(target_sequence)
# for i in range(len(X_train_data) - output_sequence_length - self.window_size):
# input_sequence = X_train_data[i:i+self.window_size]
# X_train.append(input_sequence)
# X_train = np.array(X_train)
# Y_train = np.array(Y_train)
# X_train = tensorflow.convert_to_tensor(np.array(X_train, dtype=np.float32))
# Y_train = tensorflow.convert_to_tensor(np.array(Y_train, dtype=np.float32))
X_train, Y_train = [], []
target = data[:, 0] # First column for the target
# X_train = data[:, 1:] # All other columns for features
#for i in range(len(data) - self.window_size):
# input_sequence = data[i:i + self.window_size]
# target_value = data[i + self.window_size]
# X_train.append(input_sequence)
# Y_train.append(target_value)
for i in range(len(data) - 1):
input_sequence = data[i]
target_value = target[i+1]
X_train.append(input_sequence)
Y_train.append(target_value)
X_train = np.array(X_train)
Y_train = np.array(Y_train)
X_train = tensorflow.convert_to_tensor(np.array(X_train, dtype=np.float32))
Y_train = tensorflow.convert_to_tensor(np.array(Y_train, dtype=np.float32))
# early_stopping = tensorflow.keras.callbacks.EarlyStopping(monitor="loss", patience=10,
# restore_best_weights=True)
model.fit(X_train, Y_train)#, epochs=epochs, batch_size=self.batch_size, callbacks=[early_stopping])
model.save_model(self.model_dir)
def predict(self, data: ndarray):
predictions = []
window_data = data[-1:]
# window_data = window_data.reshape(1, 1, self.features)
dtest = xgb.DMatrix(window_data)
predicted_value = self.loaded_model.predict(dtest)
predictions.append(predicted_value)
return predictions
@staticmethod
def base_model_dataset(samples):
min_cutoff = 0
cutoff_close = samples.tolist()[1][min_cutoff:]
cutoff_high = samples.tolist()[2][min_cutoff:]
cutoff_low = samples.tolist()[3][min_cutoff:]
cutoff_volume = samples.tolist()[4][min_cutoff:]
return np.array([cutoff_close,
cutoff_high,
cutoff_low,
cutoff_volume]).T