# developer: Taoshidev # Copyright © 2023 Taoshi, LLC import numpy as np import tensorflow from numpy import ndarray import xgboost as xgb class BaseMiningModel: def __init__(self, features): self.neurons = [[50,0]] self.features = features self.loaded_model = None self.window_size = 100 self.model_dir = None self.batch_size = 16 self.learning_rate = 0.01 def set_neurons(self, neurons): self.neurons = neurons return self def set_window_size(self, window_size): self.window_size = window_size return self def set_model_dir(self, model, stream_id=None): if model is None and stream_id is not None: # self.model_dir = f'mining_models/{stream_id}.keras' self.model_dir = f'./mining_models/{stream_id}.model' elif model is not None: self.model_dir = model else: raise Exception("stream_id is not provided to define model") return self def set_batch_size(self, batch_size): self.batch_size = batch_size return self def set_learning_rate(self, learning_rate): self.learning_rate = learning_rate return self def load_model(self): # self.loaded_model = tensorflow.keras.models.load_model(self.model_dir) self.loaded_model = xgb.Booster() self.loaded_model.load_model(self.model_dir) return self def train(self, data: ndarray)#, epochs: int = 100): try: model = tensorflow.keras.models.load_model(self.model_dir) except OSError: model = None output_sequence_length = 100 if model is None: model = xgb.XGBRegressor(random_state = 1) # model = tensorflow.keras.models.Sequential() # if len(self.neurons) > 1: # model.add(tensorflow.keras.layers.LSTM(self.neurons[0][0], # input_shape=(self.window_size, self.features), # return_sequences=True)) # for ind, stack in enumerate(self.neurons[1:]): # return_sequences = True # if ind+1 == len(self.neurons)-1: # return_sequences = False # model.add(tensorflow.keras.layers.Dropout(stack[1])) # model.add(tensorflow.keras.layers.LSTM(stack[0], return_sequences=return_sequences)) # else: # model.add(tensorflow.keras.layers.LSTM(self.neurons[0][0], # input_shape=(self.window_size, self.features))) # model.add(tensorflow.keras.layers.Dense(1)) # optimizer = tensorflow.keras.optimizers.Adam(learning_rate=self.learning_rate) # model.compile(optimizer=optimizer, loss='mean_squared_error') # X_train, Y_train = [], [] # X_train_data = data # Y_train_data = data.T[0].T # for i in range(len(Y_train_data) - output_sequence_length - self.window_size): # target_sequence = Y_train_data[i+self.window_size+output_sequence_length:i+self.window_size+output_sequence_length+1] # Y_train.append(target_sequence) # for i in range(len(X_train_data) - output_sequence_length - self.window_size): # input_sequence = X_train_data[i:i+self.window_size] # X_train.append(input_sequence) # X_train = np.array(X_train) # Y_train = np.array(Y_train) # X_train = tensorflow.convert_to_tensor(np.array(X_train, dtype=np.float32)) # Y_train = tensorflow.convert_to_tensor(np.array(Y_train, dtype=np.float32)) X_train, Y_train = [], [] target = data[:, 0] # First column for the target # X_train = data[:, 1:] # All other columns for features #for i in range(len(data) - self.window_size): # input_sequence = data[i:i + self.window_size] # target_value = data[i + self.window_size] # X_train.append(input_sequence) # Y_train.append(target_value) for i in range(len(data) - 1): input_sequence = data[i] target_value = target[i+1] X_train.append(input_sequence) Y_train.append(target_value) X_train = np.array(X_train) Y_train = np.array(Y_train) X_train = tensorflow.convert_to_tensor(np.array(X_train, dtype=np.float32)) Y_train = tensorflow.convert_to_tensor(np.array(Y_train, dtype=np.float32)) # early_stopping = tensorflow.keras.callbacks.EarlyStopping(monitor="loss", patience=10, # restore_best_weights=True) model.fit(X_train, Y_train)#, epochs=epochs, batch_size=self.batch_size, callbacks=[early_stopping]) model.save_model(self.model_dir) def predict(self, data: ndarray): predictions = [] window_data = data[-1:] # window_data = window_data.reshape(1, 1, self.features) dtest = xgb.DMatrix(window_data) predicted_value = self.loaded_model.predict(dtest) predictions.append(predicted_value) return predictions @staticmethod def base_model_dataset(samples): min_cutoff = 0 cutoff_close = samples.tolist()[1][min_cutoff:] cutoff_high = samples.tolist()[2][min_cutoff:] cutoff_low = samples.tolist()[3][min_cutoff:] cutoff_volume = samples.tolist()[4][min_cutoff:] return np.array([cutoff_close, cutoff_high, cutoff_low, cutoff_volume]).T