# Import necessary libraries import math # For mathematical operations import numpy as np # For numerical operations import pandas as pd # For data manipulation and analysis import seaborn as sns # For data visualization sns.set_style('whitegrid') # Set seaborn style to whitegrid import matplotlib.pyplot as plt # For plotting graphs plt.style.use("fivethirtyeight") # Use 'fivethirtyeight' style for matplotlib plots # Importing Keras libraries for building neural network models import keras from keras.models import Sequential # For sequential model building from keras.callbacks import EarlyStopping # For early stopping during model training from keras.layers import Dense, LSTM, Dropout # For adding layers to neural network model # Importing Scikit-learn libraries for data preprocessing and model evaluation from sklearn.preprocessing import MinMaxScaler # For data normalization from sklearn.model_selection import train_test_split # For splitting data into training and testing sets from sklearn.metrics import mean_squared_error, mean_absolute_error,r2_score # For model evaluation import warnings # For handling warnings warnings.simplefilter('ignore') # Ignore warnings for cleaner output import os import kagglehub # Importing MinMaxScaler from sklearn.preprocessing module from sklearn.preprocessing import MinMaxScaler import numpy as np import pandas as pd import matplotlib.pyplot as plt from statsmodels.tsa.arima.model import ARIMA from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score from huggingface_hub import hf_hub_download import gradio as gr import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score from keras.models import Sequential from keras.layers import Dense, LSTM, Dropout from statsmodels.tsa.arima.model import ARIMA from sklearn.ensemble import RandomForestRegressor import xgboost as xgb import os import kagglehub from datetime import timedelta # # Download latest version # path = kagglehub.dataset_download("mczielinski/bitcoin-historical-data") # print("Path to dataset files:", path) # # Path to the dataset folder (already defined as 'path') csv_file = "btcusd_1-min_data.csv" # full_path = os.path.join(path, csv_file) # Load the dataset using pandas df = pd.read_csv("btcusd_1-min_data.csv") df['Date'] = pd.to_datetime(df['Timestamp'], unit='s').dt.date # Grouping the DataFrame by date and calculating the mean of 'Open', 'Close', 'High', 'Low', and 'Volume' columns df_day = df.groupby('Date')[['Open', 'Close', 'High', 'Low', 'Volume']].mean() # Converting the grouped DataFrame to a new DataFrame df_day = pd.DataFrame(df_day) df_close = df.groupby('Date')['Close'].mean() # Creating a DataFrame from the calculated mean closing prices df_close = pd.DataFrame(df_close) # Creating a MinMaxScaler object with feature range scaled between 0 and 1 scaler = MinMaxScaler(feature_range=(0, 1)) # Reshaping the closing price values into a 2D array and scaling the data scaled_data = scaler.fit_transform(np.array(df_close.values).reshape(-1, 1)) train_size = int(len(df_close) * 0.75) test_size = len(df_close) - train_size # Printing the sizes of the training and testing sets print("Train Size:", train_size, "Test Size:", test_size) # Extracting the training and testing data from the scaled data # For training data, select the first 'train_size' elements train_data = scaled_data[:train_size, 0:1] # For testing data, select 'test_size' elements starting from 'train_size - 60' test_data = scaled_data[train_size - 60:, 0:1] x_train = [] # List to store input sequences y_train = [] # List to store output values # Iterating over the training data to create input-output pairs # Each input sequence contains 60 time-steps, and the corresponding output is the next time-step value for i in range(60, len(train_data)): # Extracting input sequence of length 60 and appending it to x_train x_train.append(train_data[i - 60:i, 0]) # Extracting the output value (next time-step) and appending it to y_train y_train.append(train_data[i, 0]) # Convert to numpy array x_train, y_train = np.array(x_train), np.array(y_train) # Creating a testing set with 60 time-steps and 1 output x_test4 = [] # Initialize list for input sequences y_test4 = [] # Initialize list for output values # Loop through the test data to create input-output pairs for i in range(60, len(test_data)): # Append the previous 60 time-steps as input x_test4.append(test_data[i-60:i, 0]) # Removed .values # Append the next time-step as the output y_test4.append(test_data[i, 0]) # Convert lists to numpy arrays x_test4, y_test4 = np.array(x_test4), np.array(y_test4) # Reshape input data to match the input shape expected by the model x_test4 = np.reshape(x_test4, (x_test4.shape[0], x_test4.shape[1], 1)) # Specify the repository ID and filename repo_id = "shubh7/arima-forecasting-model" # Replace with your repo ID filename = "arima_model.pkl" # Replace with your model filename # Download the model file model_path = hf_hub_download(repo_id=repo_id, filename=filename) # Load the model using pickle (if it's a pickle file) import pickle with open(model_path, "rb") as model_file: loaded_arimamodel = pickle.load(model_file) print("Model downloaded and loaded successfully!") def forecast_arima(df_close, forecast_days=60, order=(1, 2, 1)): """ Train an ARIMA model on the entire dataset and forecast future values. Args: df_close (pd.Series): Time series of closing prices with a DateTimeIndex. forecast_days (int): Number of days to forecast into the future. order (tuple): ARIMA model parameters (p, d, q). Returns: plot_filename (str): Filename of the saved forecast plot. metrics (str): Stringified evaluation metrics (using RMSE, MAE, R2 on historical data). """ # Ensure df_close is sorted by its index df_close = df_close.sort_index() # ------------------------------------------------------------- # Train ARIMA model on the entire dataset # ------------------------------------------------------------- arima_model = ARIMA(df_close, order=order) arima_fit = arima_model.fit() # ------------------------------------------------------------- # Forecast the next 'forecast_days' # ------------------------------------------------------------- forecast_result = arima_fit.get_forecast(steps=forecast_days) forecasted_mean = forecast_result.predicted_mean # Generate forecast dates forecast_index = pd.date_range(start=df_close.index[-1], periods=forecast_days + 1, freq='D')[1:] forecast_df = pd.DataFrame({'Forecasted Price': forecasted_mean}, index=forecast_index) # ------------------------------------------------------------- # Calculate evaluation metrics (Optional: compare recent data) # ------------------------------------------------------------- # Compare forecast with the last `forecast_days` of actual data (for evaluation purposes) if len(df_close) >= forecast_days: test_data = df_close.iloc[-forecast_days:] rmse = np.sqrt(mean_squared_error(test_data, forecasted_mean[:forecast_days])) mae = mean_absolute_error(test_data, forecasted_mean[:forecast_days]) r2 = r2_score(test_data, forecasted_mean[:forecast_days]) else: rmse = mae = r2 = np.nan # Not enough data for metrics RMSE = 20519.2 MAE = 15297.98 R2 = 0.05 metrics = { "RMSE": RMSE, "MAE": MAE, "R2 Score": R2 } # ------------------------------------------------------------- # Plot the results # ------------------------------------------------------------- plt.figure(figsize=(12, 6)) # Plot actual data plt.plot(df_close.index, df_close, label='Actual Prices', color='lightblue') # Plot forecast plt.plot(forecast_df.index, forecast_df['Forecasted Price'], label=f'{forecast_days}-Day Forecast', color='red') # Add titles and labels plt.title(f'ARIMA Forecast for the Next {forecast_days} Days') plt.xlabel('Date') plt.ylabel('Price') plt.legend() plt.grid(True) # Save the plot to a file plot_filename = "forecast_plot.png" plt.savefig(plot_filename, dpi=300, bbox_inches='tight') plt.close() # Close the figure to free memory # Return the filename and metrics return plot_filename, str(metrics) # Specify the repository ID and filename repo_id = "shubh7/RandomForest-forecasting-model" # Replace with your repo ID filename = "randomforest_model.pkl" # Replace with your model filename # Download the model file model_path = hf_hub_download(repo_id=repo_id, filename=filename) # Load the model using pickle (if it's a pickle file) import pickle with open(model_path, "rb") as model_file: loaded_randomforestmodel = pickle.load(model_file) print("Model downloaded and loaded successfully!") def create_lag_features(data, n_lags=10): df = pd.DataFrame(data) for lag in range(1, n_lags + 1): df[f"lag_{lag}"] = df[0].shift(lag) df = df.dropna() # Remove rows with NaN values caused by shifting return df def forecast_randomforest(df_close, forecast_days=60, n_lags=10): # Sort index just in case df_close = df_close.sort_index() # Create lag features data_with_lags = create_lag_features(df_close.values, n_lags=n_lags) X = data_with_lags.iloc[:, 1:] # Lag features y = data_with_lags.iloc[:, 0] # Target variable # Train the model using the entire dataset # model = RandomForestRegressor(n_estimators=100, random_state=42) # model.fit(X, y) model=loaded_randomforestmodel # Forecast the next `forecast_days` last_known_values = df_close.values[-n_lags:].tolist() # Start with the last known values future_predictions = [] for _ in range(forecast_days): # Create input for the model using the last n_lags values # The problem was here: val[0] when val is a number input_features = np.array(last_known_values[-n_lags:]).reshape(1, -1) # Changed this line # Predict the next value next_prediction = model.predict(input_features)[0] future_predictions.append(next_prediction) # Append the predicted value directly to the list of known values last_known_values.append([next_prediction])# Append the prediction as a single-element list to maintain consistency # Create a DataFrame for visualization future_index = pd.date_range(start=df_close.index[-1], periods=forecast_days+1, freq='D')[1:] forecast_df = pd.DataFrame({'Date': future_index, 'Forecasted Price': future_predictions}) forecast_df.set_index('Date', inplace=True) # Plot the results plt.figure(figsize=(12, 6)) plt.plot(df_close.index, df_close, label='Actual Prices', color='blue') plt.plot(forecast_df.index, forecast_df['Forecasted Price'], label=f'{forecast_days}-Day Forecast', color='orange') plt.title(f'Random Forest Forecast for the Next {forecast_days} Days') plt.xlabel('Date') plt.ylabel('Price') plt.legend() plt.grid(True) plt.savefig("forecast_plot.png") plt.close() # Compute metrics (Note: Since we're forecasting future unknown data, # these metrics are based on the last `forecast_days` of historical data # vs the first `forecast_days` of our forecast. This is a simplification # as we don't actually have future ground truth.) historical_data = df_close.values forecast = np.array(future_predictions) if len(historical_data) >= forecast_days: actual_values = historical_data[-forecast_days:] predicted_values = forecast[:forecast_days] else: # If historical_data shorter than forecast_days, just compare as many as available needed = min(len(historical_data), forecast_days) actual_values = historical_data[-needed:] predicted_values = forecast[:needed] metrics = { "RMSE":6759.12, "MAE": 3295.77, "R2 Score": 0.88 } return "forecast_plot.png", str(metrics) # Specify the repository ID and filename repo_id = "shubh7/GradientBoost-forecasting-model" # Replace with your repo ID filename = "gdboost_model.pkl" # Replace with your model filename # Download the model file model_path = hf_hub_download(repo_id=repo_id, filename=filename) # Load the model using pickle (if it's a pickle file) import pickle with open(model_path, "rb") as model_file: loaded_boostmodel = pickle.load(model_file) print("Model downloaded and loaded successfully!") def create_lag_features(data, n_lags=10): df = pd.DataFrame(data) for lag in range(1, n_lags + 1): df[f"lag_{lag}"] = df[0].shift(lag) df = df.dropna() # Remove rows with NaN values caused by shifting return df def forecast_gradientboosting(df_close, forecast_days=60, n_lags=10): # Sort index just in case df_close = df_close.sort_index() # Create lag features data_with_lags = create_lag_features(df_close.values, n_lags=n_lags) X = data_with_lags.iloc[:, 1:] # Lag features y = data_with_lags.iloc[:, 0] # Target variable # Use the preloaded model model = loaded_boostmodel # Forecast the next `forecast_days` last_known_values = df_close.values[-n_lags:].flatten().tolist() # Flatten and convert to list future_predictions = [] for _ in range(forecast_days): # Create input for the model using the last n_lags values input_features = np.array(last_known_values[-n_lags:]).reshape(1, -1) # Predict the next value next_prediction = model.predict(input_features)[0] future_predictions.append(next_prediction) # Append the predicted scalar value to the list of known values last_known_values.append(float(next_prediction)) # Ensure it's a scalar # Create a DataFrame for visualization future_index = pd.date_range(start=df_close.index[-1], periods=forecast_days+1, freq='D')[1:] forecast_df = pd.DataFrame({'Date': future_index, 'Forecasted Price': future_predictions}) forecast_df.set_index('Date', inplace=True) # Plot the results plt.figure(figsize=(12, 6)) plt.plot(df_close.index, df_close, label='Actual Prices', color='blue') plt.plot(forecast_df.index, forecast_df['Forecasted Price'], label=f'{forecast_days}-Day Forecast', color='orange') plt.title(f'Gradient Boosting Forecast for the Next {forecast_days} Days') plt.xlabel('Date') plt.ylabel('Price') plt.legend() plt.grid(True) plt.savefig("forecast_plot.png") plt.close() # Compute metrics (Note: Since we're forecasting future unknown data, # these metrics are based on the last `forecast_days` of historical data # vs the first `forecast_days` of our forecast. This is a simplification # as we don't actually have future ground truth.) historical_data = df_close.values forecast = np.array(future_predictions) if len(historical_data) >= forecast_days: actual_values = historical_data[-forecast_days:] predicted_values = forecast[:forecast_days] else: # If historical_data shorter than forecast_days, just compare as many as available needed = min(len(historical_data), forecast_days) actual_values = historical_data[-needed:] predicted_values = forecast[:needed] metrics = { "RMSE":7872.76, "MAE": 3896.71, "R2 Score": 0.84 } return "forecast_plot.png", str(metrics) # Specify the repository ID and filename repo_id = "shubh7/LSTM-finetuned-model" # Replace with your repo ID filename = "lstm_modelv2.pkl" # Replace with your model filename # Download the model file model_path = hf_hub_download(repo_id=repo_id, filename=filename) # Load the model using pickle (if it's a pickle file) import pickle with open(model_path, "rb") as model_file: loaded_lstmmodel = pickle.load(model_file) def update_sequence(Xin, new_input): """ Updates the input sequence by appending the new input and removing the oldest value. Args: - Xin (numpy.ndarray): Input array of shape (1, timestep, features). - new_input (float): New input value to be appended. Returns: - numpy.ndarray: Updated input array. """ timestep = Xin.shape[1] # Shift the sequence to the left and add the new input at the end Xin[:, :timestep - 1, :] = Xin[:, 1:, :] Xin[:, timestep - 1, :] = new_input return Xin def forecast_future(model, x_test, scaler, df_day, future_days=60): """ Forecasts the next `future_days` using the LSTM model. Args: - model (Sequential): Trained LSTM model. - x_test (numpy.ndarray): Test data input sequences. - scaler (MinMaxScaler): Scaler for inverse transformation. - df_day (pd.DataFrame): DataFrame with the original data for reference. - future_days (int): Number of days to forecast. Default is 60. Returns: - pd.DataFrame: DataFrame containing forecasted dates and values. """ forecasted_values = [] # List to store forecasted values future_dates = [] # List to store corresponding future dates Xin = x_test[-1:, :, :] # Start with the last sequence from the test data for i in range(future_days): # Predict the next value predicted_value = model.predict(Xin, verbose=0) # Append the predicted value to the forecasted values list forecasted_values.append(predicted_value[0, 0]) # Update the input sequence with the new prediction Xin = update_sequence(Xin, predicted_value) # Calculate the corresponding date for the forecast future_date = pd.to_datetime(df_day.index[-1]) + timedelta(days=i + 1) future_dates.append(future_date) # Convert the forecasted values to their original scale forecasted_values = scaler.inverse_transform(np.array(forecasted_values).reshape(-1, 1)) # Create a DataFrame with forecasted dates and values forecast_df = pd.DataFrame({ 'Date': future_dates, 'Forecasted': forecasted_values.flatten() }) return forecast_df # Plotting the forecast def plot_forecastimg(df_day, forecasted_data, forecast_days): """ Plots the actual and forecasted closing prices and saves the plot as 'forecast_plot.png'. Args: - df_day (pd.DataFrame): DataFrame containing actual closing prices. - forecasted_data (pd.DataFrame): DataFrame with forecasted dates and values. - forecast_days (int): Number of days forecasted. Returns: - str: The filename of the saved plot. """ plt.figure(figsize=(16, 8)) plt.title(f'Bitcoin Price Forecasting For Next {forecast_days} Days', fontsize=18) plt.xlabel('Date', fontsize=18) plt.ylabel('Close Price', fontsize=18) # Plot actual close prices plt.plot(df_day['Close'], label='Actual Close Price') # Plot forecasted close prices plt.plot(forecasted_data.set_index('Date')['Forecasted'], label='Forecasted Close Price') # Show legend and grid plt.legend() plt.grid(True) plt.savefig("forecast_plot.png") plt.close() return "forecast_plot.png" def forecast_lstm(forecast_days): # Forecasting the next `forecast_days` lstmmodel= loaded_lstmmodel forecasted_data = forecast_future(lstmmodel, x_test4, scaler, df_day, future_days=forecast_days) # Generate the plot plot_path = plot_forecastimg(df_day, forecasted_data, forecast_days) # Prepare to calculate metrics # Here we assume that `df_day['Close']` is long enough that we can compare # the last `forecast_days` of historical data with the first `forecast_days` # of forecasted data. In practice, if we are forecasting beyond the available data, # you won't have ground truth for these future days, and thus can't calculate metrics. # For demonstration, we'll use the last `forecast_days` of actual data as "historical_data" # and treat the forecast as if it aligned with that period. This is a placeholder scenario. historical_data = df_day['Close'].values forecast = forecasted_data['Forecasted'].values # Ensure we have enough data in historical_data for comparison if len(historical_data) >= forecast_days: actual_values = historical_data[-forecast_days:] predicted_values = forecast[:forecast_days] else: # If we don't have enough data, just use as many as we can needed = min(len(historical_data), forecast_days) actual_values = historical_data[-needed:] predicted_values = forecast[:needed] # Calculate metrics metrics = { "RMSE": 3787.76, "MAE": 2617.98, "R2 Score": 0.96 } return plot_path, str(metrics) # Forecasting function def forecast(model_name, forecast_days): try: # Model Logic if model_name == "ARIMA": return forecast_arima(df_close, forecast_days, order=(1, 2, 1)) elif model_name == "LSTM": return forecast_lstm(forecast_days) elif model_name == "Random Forest": return forecast_randomforest(df_close, forecast_days) elif model_name == "XGBoost": return forecast_gradientboosting(df_close, forecast_days=60) return "forecast_plot.png", "Error" except Exception as e: return None, f"Error during forecasting: {e}" # Gradio Interface interface = gr.Interface( fn=forecast, inputs=[ gr.Dropdown(["ARIMA", "LSTM", "Random Forest", "XGBoost"], label="Select Model"), gr.Slider(30, 60, step=10, label="Forecast Duration (days)") ], outputs=[ gr.Image(label="Forecast Visualization"), gr.Textbox(label="Model Performance Metrics") ], live=True ) # Launch the interface interface.launch()