# developer: Taoshidev | |
# Copyright © 2023 Taoshi, LLC | |
import numpy as np | |
import tensorflow | |
from numpy import ndarray | |
import xgboost as xgb | |
class BaseMiningModel: | |
def __init__(self, features): | |
self.neurons = [[50,0]] | |
self.features = features | |
self.loaded_model = None | |
self.window_size = 100 | |
self.model_dir = None | |
self.batch_size = 16 | |
self.learning_rate = 0.01 | |
def set_neurons(self, neurons): | |
self.neurons = neurons | |
return self | |
def set_window_size(self, window_size): | |
self.window_size = window_size | |
return self | |
def set_model_dir(self, model, stream_id=None): | |
if model is None and stream_id is not None: | |
# self.model_dir = f'mining_models/{stream_id}.keras' | |
self.model_dir = f'./mining_models/{stream_id}.model' | |
elif model is not None: | |
self.model_dir = model | |
else: | |
raise Exception("stream_id is not provided to define model") | |
return self | |
def set_batch_size(self, batch_size): | |
self.batch_size = batch_size | |
return self | |
def set_learning_rate(self, learning_rate): | |
self.learning_rate = learning_rate | |
return self | |
def load_model(self): | |
# self.loaded_model = tensorflow.keras.models.load_model(self.model_dir) | |
self.loaded_model = xgb.Booster() | |
self.loaded_model.load_model(self.model_dir) | |
return self | |
def train(self, data: ndarray)#, epochs: int = 100): | |
try: | |
model = tensorflow.keras.models.load_model(self.model_dir) | |
except OSError: | |
model = None | |
output_sequence_length = 100 | |
if model is None: | |
model = xgb.XGBRegressor(random_state = 1) | |
# model = tensorflow.keras.models.Sequential() | |
# if len(self.neurons) > 1: | |
# model.add(tensorflow.keras.layers.LSTM(self.neurons[0][0], | |
# input_shape=(self.window_size, self.features), | |
# return_sequences=True)) | |
# for ind, stack in enumerate(self.neurons[1:]): | |
# return_sequences = True | |
# if ind+1 == len(self.neurons)-1: | |
# return_sequences = False | |
# model.add(tensorflow.keras.layers.Dropout(stack[1])) | |
# model.add(tensorflow.keras.layers.LSTM(stack[0], return_sequences=return_sequences)) | |
# else: | |
# model.add(tensorflow.keras.layers.LSTM(self.neurons[0][0], | |
# input_shape=(self.window_size, self.features))) | |
# model.add(tensorflow.keras.layers.Dense(1)) | |
# optimizer = tensorflow.keras.optimizers.Adam(learning_rate=self.learning_rate) | |
# model.compile(optimizer=optimizer, loss='mean_squared_error') | |
# X_train, Y_train = [], [] | |
# X_train_data = data | |
# Y_train_data = data.T[0].T | |
# for i in range(len(Y_train_data) - output_sequence_length - self.window_size): | |
# target_sequence = Y_train_data[i+self.window_size+output_sequence_length:i+self.window_size+output_sequence_length+1] | |
# Y_train.append(target_sequence) | |
# for i in range(len(X_train_data) - output_sequence_length - self.window_size): | |
# input_sequence = X_train_data[i:i+self.window_size] | |
# X_train.append(input_sequence) | |
# X_train = np.array(X_train) | |
# Y_train = np.array(Y_train) | |
# X_train = tensorflow.convert_to_tensor(np.array(X_train, dtype=np.float32)) | |
# Y_train = tensorflow.convert_to_tensor(np.array(Y_train, dtype=np.float32)) | |
X_train, Y_train = [], [] | |
target = data[:, 0] # First column for the target | |
# X_train = data[:, 1:] # All other columns for features | |
#for i in range(len(data) - self.window_size): | |
# input_sequence = data[i:i + self.window_size] | |
# target_value = data[i + self.window_size] | |
# X_train.append(input_sequence) | |
# Y_train.append(target_value) | |
for i in range(len(data) - 1): | |
input_sequence = data[i] | |
target_value = target[i+1] | |
X_train.append(input_sequence) | |
Y_train.append(target_value) | |
X_train = np.array(X_train) | |
Y_train = np.array(Y_train) | |
X_train = tensorflow.convert_to_tensor(np.array(X_train, dtype=np.float32)) | |
Y_train = tensorflow.convert_to_tensor(np.array(Y_train, dtype=np.float32)) | |
# early_stopping = tensorflow.keras.callbacks.EarlyStopping(monitor="loss", patience=10, | |
# restore_best_weights=True) | |
model.fit(X_train, Y_train)#, epochs=epochs, batch_size=self.batch_size, callbacks=[early_stopping]) | |
model.save_model(self.model_dir) | |
def predict(self, data: ndarray): | |
predictions = [] | |
window_data = data[-1:] | |
# window_data = window_data.reshape(1, 1, self.features) | |
dtest = xgb.DMatrix(window_data) | |
predicted_value = self.loaded_model.predict(dtest) | |
predictions.append(predicted_value) | |
return predictions | |
def base_model_dataset(samples): | |
min_cutoff = 0 | |
cutoff_close = samples.tolist()[1][min_cutoff:] | |
cutoff_high = samples.tolist()[2][min_cutoff:] | |
cutoff_low = samples.tolist()[3][min_cutoff:] | |
cutoff_volume = samples.tolist()[4][min_cutoff:] | |
return np.array([cutoff_close, | |
cutoff_high, | |
cutoff_low, | |
cutoff_volume]).T |