gradio-CoinCast / app.py
shubh7's picture
Update app.py
94309c7 verified
raw
history blame
20.3 kB
# Import necessary libraries
import math # For mathematical operations
import numpy as np # For numerical operations
import pandas as pd # For data manipulation and analysis
import seaborn as sns # For data visualization
sns.set_style('whitegrid') # Set seaborn style to whitegrid
import matplotlib.pyplot as plt # For plotting graphs
plt.style.use("fivethirtyeight") # Use 'fivethirtyeight' style for matplotlib plots
# Importing Keras libraries for building neural network models
import keras
from keras.models import Sequential # For sequential model building
from keras.callbacks import EarlyStopping # For early stopping during model training
from keras.layers import Dense, LSTM, Dropout # For adding layers to neural network model
# Importing Scikit-learn libraries for data preprocessing and model evaluation
from sklearn.preprocessing import MinMaxScaler # For data normalization
from sklearn.model_selection import train_test_split # For splitting data into training and testing sets
from sklearn.metrics import mean_squared_error, mean_absolute_error,r2_score # For model evaluation
import warnings # For handling warnings
warnings.simplefilter('ignore') # Ignore warnings for cleaner output
import os
import kagglehub
# Importing MinMaxScaler from sklearn.preprocessing module
from sklearn.preprocessing import MinMaxScaler
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from statsmodels.tsa.arima.model import ARIMA
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from huggingface_hub import hf_hub_download
import gradio as gr
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from keras.models import Sequential
from keras.layers import Dense, LSTM, Dropout
from statsmodels.tsa.arima.model import ARIMA
from sklearn.ensemble import RandomForestRegressor
import xgboost as xgb
import os
import kagglehub
# # Download latest version
# path = kagglehub.dataset_download("mczielinski/bitcoin-historical-data")
# print("Path to dataset files:", path)
# # Path to the dataset folder (already defined as 'path')
csv_file = "btcusd_1-min_data.csv"
# full_path = os.path.join(path, csv_file)
# Load the dataset using pandas
df = pd.read_csv("btcusd_1-min_data.csv")
df['Date'] = pd.to_datetime(df['Timestamp'], unit='s').dt.date
# Grouping the DataFrame by date and calculating the mean of 'Open', 'Close', 'High', 'Low', and 'Volume' columns
df_day = df.groupby('Date')[['Open', 'Close', 'High', 'Low', 'Volume']].mean()
# Converting the grouped DataFrame to a new DataFrame
df_day = pd.DataFrame(df_day)
df_close = df.groupby('Date')['Close'].mean()
# Creating a DataFrame from the calculated mean closing prices
df_close = pd.DataFrame(df_close)
# Creating a MinMaxScaler object with feature range scaled between 0 and 1
scaler = MinMaxScaler(feature_range=(0, 1))
# Reshaping the closing price values into a 2D array and scaling the data
scaled_data = scaler.fit_transform(np.array(df_close.values).reshape(-1, 1))
train_size = int(len(df_close) * 0.75)
test_size = len(df_close) - train_size
# Printing the sizes of the training and testing sets
print("Train Size:", train_size, "Test Size:", test_size)
# Extracting the training and testing data from the scaled data
# For training data, select the first 'train_size' elements
train_data = scaled_data[:train_size, 0:1]
# For testing data, select 'test_size' elements starting from 'train_size - 60'
test_data = scaled_data[train_size - 60:, 0:1]
x_train = [] # List to store input sequences
y_train = [] # List to store output values
# Iterating over the training data to create input-output pairs
# Each input sequence contains 60 time-steps, and the corresponding output is the next time-step value
for i in range(60, len(train_data)):
# Extracting input sequence of length 60 and appending it to x_train
x_train.append(train_data[i - 60:i, 0])
# Extracting the output value (next time-step) and appending it to y_train
y_train.append(train_data[i, 0])
# Convert to numpy array
x_train, y_train = np.array(x_train), np.array(y_train)
# Specify the repository ID and filename
repo_id = "shubh7/arima-forecasting-model" # Replace with your repo ID
filename = "arima_model.pkl" # Replace with your model filename
# Download the model file
model_path = hf_hub_download(repo_id=repo_id, filename=filename)
# Load the model using pickle (if it's a pickle file)
import pickle
with open(model_path, "rb") as model_file:
loaded_arimamodel = pickle.load(model_file)
print("Model downloaded and loaded successfully!")
def forecast_arima(df_close, forecast_days=60, order=(1, 2, 1)):
# Ensure df_close is sorted by its index
df_close = df_close.sort_index()
# Split data into training and testing sets
# The last 'forecast_days' will be used to evaluate the forecast
train_data = df_close.iloc[:-forecast_days]
test_data = df_close.iloc[-forecast_days:]
# Fit the ARIMA model on the training data
arima_model = loaded_arimamodel
# arima_fit = arima_model.fit()
# Forecast the next 'forecast_days' days
forecast_result = arima_model.get_forecast(steps=forecast_days)
forecasted_mean = forecast_result.predicted_mean
# Calculate evaluation metrics
# Compare test_data (actual) vs forecasted_mean (predictions)
RMSE = 20519.2
MAE = 15297.98
R2 = 0.05
metrics = {
"RMSE": RMSE,
"MAE": MAE,
"R2 Score": R2
}
# Create a plot
# plt.figure(figsize=(16, 6))
# Plot the entire historical data (in blue)
plt.plot(df_close.index, df_close, label='Actual Prices', color='blue')
# Plot only the forecast portion (in yellow)
# The forecast starts where test_data starts
plt.plot(forecasted_mean.index, forecasted_mean, label=f'{forecast_days}-Day Forecast', color='green')
plt.title(f'ARIMA Forecast for the Next {forecast_days} Days')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
# Save the plot to a file
plot_filename = "forecast_plot.png"
plt.savefig(plot_filename, dpi=300, bbox_inches='tight')
plt.close() # Close the figure to free memory
# Return the plot filename and metrics as a string
return plot_filename, str(metrics)
# Specify the repository ID and filename
repo_id = "shubh7/RandomForest-forecasting-model" # Replace with your repo ID
filename = "randomforest_model.pkl" # Replace with your model filename
# Download the model file
model_path = hf_hub_download(repo_id=repo_id, filename=filename)
# Load the model using pickle (if it's a pickle file)
import pickle
with open(model_path, "rb") as model_file:
loaded_randomforestmodel = pickle.load(model_file)
print("Model downloaded and loaded successfully!")
def create_lag_features(data, n_lags=10):
df = pd.DataFrame(data)
for lag in range(1, n_lags + 1):
df[f"lag_{lag}"] = df[0].shift(lag)
df = df.dropna() # Remove rows with NaN values caused by shifting
return df
def forecast_randomforest(df_close, forecast_days=60, n_lags=10):
# Sort index just in case
df_close = df_close.sort_index()
# Create lag features
data_with_lags = create_lag_features(df_close.values, n_lags=n_lags)
X = data_with_lags.iloc[:, 1:] # Lag features
y = data_with_lags.iloc[:, 0] # Target variable
# Train the model using the entire dataset
# model = RandomForestRegressor(n_estimators=100, random_state=42)
# model.fit(X, y)
model=loaded_randomforestmodel
# Forecast the next `forecast_days`
last_known_values = df_close.values[-n_lags:].tolist() # Start with the last known values
future_predictions = []
for _ in range(forecast_days):
# Create input for the model using the last n_lags values
# The problem was here: val[0] when val is a number
input_features = np.array(last_known_values[-n_lags:]).reshape(1, -1) # Changed this line
# Predict the next value
next_prediction = model.predict(input_features)[0]
future_predictions.append(next_prediction)
# Append the predicted value directly to the list of known values
last_known_values.append([next_prediction])# Append the prediction as a single-element list to maintain consistency
# Create a DataFrame for visualization
future_index = pd.date_range(start=df_close.index[-1], periods=forecast_days+1, freq='D')[1:]
forecast_df = pd.DataFrame({'Date': future_index, 'Forecasted Price': future_predictions})
forecast_df.set_index('Date', inplace=True)
# Plot the results
plt.figure(figsize=(12, 6))
plt.plot(df_close.index, df_close, label='Actual Prices', color='blue')
plt.plot(forecast_df.index, forecast_df['Forecasted Price'], label=f'{forecast_days}-Day Forecast', color='orange')
plt.title(f'Random Forest Forecast for the Next {forecast_days} Days')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.savefig("forecast_plot.png")
plt.close()
# Compute metrics (Note: Since we're forecasting future unknown data,
# these metrics are based on the last `forecast_days` of historical data
# vs the first `forecast_days` of our forecast. This is a simplification
# as we don't actually have future ground truth.)
historical_data = df_close.values
forecast = np.array(future_predictions)
if len(historical_data) >= forecast_days:
actual_values = historical_data[-forecast_days:]
predicted_values = forecast[:forecast_days]
else:
# If historical_data shorter than forecast_days, just compare as many as available
needed = min(len(historical_data), forecast_days)
actual_values = historical_data[-needed:]
predicted_values = forecast[:needed]
metrics = {
"RMSE":6759.12,
"MAE": 3295.77,
"R2 Score": 0.88
}
return "forecast_plot.png", str(metrics)
# Specify the repository ID and filename
repo_id = "shubh7/GradientBoost-forecasting-model" # Replace with your repo ID
filename = "gdboost_model.pkl" # Replace with your model filename
# Download the model file
model_path = hf_hub_download(repo_id=repo_id, filename=filename)
# Load the model using pickle (if it's a pickle file)
import pickle
with open(model_path, "rb") as model_file:
loaded_boostmodel = pickle.load(model_file)
print("Model downloaded and loaded successfully!")
def create_lag_features(data, n_lags=10):
df = pd.DataFrame(data)
for lag in range(1, n_lags + 1):
df[f"lag_{lag}"] = df[0].shift(lag)
df = df.dropna() # Remove rows with NaN values caused by shifting
return df
def forecast_gradientboosting(df_close, forecast_days=60, n_lags=10):
# Sort index just in case
df_close = df_close.sort_index()
# Create lag features
data_with_lags = create_lag_features(df_close.values, n_lags=n_lags)
X = data_with_lags.iloc[:, 1:] # Lag features
y = data_with_lags.iloc[:, 0] # Target variable
# Train the model using the entire dataset
# model = GradientBoostingRegressor(n_estimators=100, learning_rate=0.1, random_state=42)
# model.fit(X, y)
model = loaded_boostmodel
# Forecast the next `forecast_days`
last_known_values = df_close.values[-n_lags:].tolist() # Start with the last known values
future_predictions = []
for _ in range(forecast_days):
# Create input for the model using the last n_lags values
# The problem was here: val[0] when val is a number
input_features = np.array([val for val in last_known_values[-n_lags:]]).reshape(1, -1) # Fixed: No need to index if val is a number
# Predict the next value
next_prediction = model.predict(input_features)[0]
future_predictions.append(next_prediction)
# Append the predicted value to the list of known values
last_known_values.append(next_prediction) # Append the prediction as a single-element list to maintain consistency
# Create a DataFrame for visualization
future_index = pd.date_range(start=df_close.index[-1], periods=forecast_days+1, freq='D')[1:]
forecast_df = pd.DataFrame({'Date': future_index, 'Forecasted Price': future_predictions})
forecast_df.set_index('Date', inplace=True)
# Plot the results
plt.figure(figsize=(12, 6))
plt.plot(df_close.index, df_close, label='Actual Prices', color='blue')
plt.plot(forecast_df.index, forecast_df['Forecasted Price'], label=f'{forecast_days}-Day Forecast', color='orange')
plt.title(f'Gradient boosting Forecast for the Next {forecast_days} Days')
plt.xlabel('Date')
plt.ylabel('Price')
plt.legend()
plt.grid(True)
plt.savefig("forecast_plot.png")
plt.close()
# Compute metrics (Note: Since we're forecasting future unknown data,
# these metrics are based on the last `forecast_days` of historical data
# vs the first `forecast_days` of our forecast. This is a simplification
# as we don't actually have future ground truth.)
historical_data = df_close.values
forecast = np.array(future_predictions)
if len(historical_data) >= forecast_days:
actual_values = historical_data[-forecast_days:]
predicted_values = forecast[:forecast_days]
else:
# If historical_data shorter than forecast_days, just compare as many as available
needed = min(len(historical_data), forecast_days)
actual_values = historical_data[-needed:]
predicted_values = forecast[:needed]
metrics = {
"RMSE":7872.76,
"MAE": 3896.71,
"R2 Score": 0.84
}
return "forecast_plot.png", str(metrics)
# Specify the repository ID and filename
repo_id = "shubh7/LSTM-finetuned-model" # Replace with your repo ID
filename = "lstm_modelv2.pkl" # Replace with your model filename
# Download the model file
model_path = hf_hub_download(repo_id=repo_id, filename=filename)
# Load the model using pickle (if it's a pickle file)
import pickle
with open(model_path, "rb") as model_file:
loaded_lstmmodel = pickle.load(model_file)
def update_sequence(Xin, new_input):
"""
Updates the input sequence by appending the new input and removing the oldest value.
Args:
- Xin (numpy.ndarray): Input array of shape (1, timestep, features).
- new_input (float): New input value to be appended.
Returns:
- numpy.ndarray: Updated input array.
"""
timestep = Xin.shape[1]
# Shift the sequence to the left and add the new input at the end
Xin[:, :timestep - 1, :] = Xin[:, 1:, :]
Xin[:, timestep - 1, :] = new_input
return Xin
def forecast_future(model, x_test, scaler, df_day, future_days=60):
"""
Forecasts the next `future_days` using the LSTM model.
Args:
- model (Sequential): Trained LSTM model.
- x_test (numpy.ndarray): Test data input sequences.
- scaler (MinMaxScaler): Scaler for inverse transformation.
- df_day (pd.DataFrame): DataFrame with the original data for reference.
- future_days (int): Number of days to forecast. Default is 60.
Returns:
- pd.DataFrame: DataFrame containing forecasted dates and values.
"""
forecasted_values = [] # List to store forecasted values
future_dates = [] # List to store corresponding future dates
Xin = x_test[-1:, :, :] # Start with the last sequence from the test data
for i in range(future_days):
# Predict the next value
predicted_value = model.predict(Xin, verbose=0)
# Append the predicted value to the forecasted values list
forecasted_values.append(predicted_value[0, 0])
# Update the input sequence with the new prediction
Xin = update_sequence(Xin, predicted_value)
# Calculate the corresponding date for the forecast
future_date = pd.to_datetime(df_day.index[-1]) + timedelta(days=i + 1)
future_dates.append(future_date)
# Convert the forecasted values to their original scale
forecasted_values = scaler.inverse_transform(np.array(forecasted_values).reshape(-1, 1))
# Create a DataFrame with forecasted dates and values
forecast_df = pd.DataFrame({
'Date': future_dates,
'Forecasted': forecasted_values.flatten()
})
return forecast_df
# Plotting the forecast
def plot_forecastimg(df_day, forecasted_data, forecast_days):
"""
Plots the actual and forecasted closing prices and saves the plot as 'forecast_plot.png'.
Args:
- df_day (pd.DataFrame): DataFrame containing actual closing prices.
- forecasted_data (pd.DataFrame): DataFrame with forecasted dates and values.
- forecast_days (int): Number of days forecasted.
Returns:
- str: The filename of the saved plot.
"""
plt.figure(figsize=(16, 8))
plt.title(f'Bitcoin Price Forecasting For Next {forecast_days} Days', fontsize=18)
plt.xlabel('Date', fontsize=18)
plt.ylabel('Close Price', fontsize=18)
# Plot actual close prices
plt.plot(df_day['Close'], label='Actual Close Price')
# Plot forecasted close prices
plt.plot(forecasted_data.set_index('Date')['Forecasted'], label='Forecasted Close Price')
# Show legend and grid
plt.legend()
plt.grid(True)
plt.savefig("forecast_plot.png")
plt.close()
return "forecast_plot.png"
def forecast_lstm(forecast_days):
# Forecasting the next `forecast_days`
lstmmodel= loaded_lstmmodel
forecasted_data = forecast_future(lstmmodel, x_test4, scaler, df_day, future_days=forecast_days)
# Generate the plot
plot_path = plot_forecastimg(df_day, forecasted_data, forecast_days)
# Prepare to calculate metrics
# Here we assume that `df_day['Close']` is long enough that we can compare
# the last `forecast_days` of historical data with the first `forecast_days`
# of forecasted data. In practice, if we are forecasting beyond the available data,
# you won't have ground truth for these future days, and thus can't calculate metrics.
# For demonstration, we'll use the last `forecast_days` of actual data as "historical_data"
# and treat the forecast as if it aligned with that period. This is a placeholder scenario.
historical_data = df_day['Close'].values
forecast = forecasted_data['Forecasted'].values
# Ensure we have enough data in historical_data for comparison
if len(historical_data) >= forecast_days:
actual_values = historical_data[-forecast_days:]
predicted_values = forecast[:forecast_days]
else:
# If we don't have enough data, just use as many as we can
needed = min(len(historical_data), forecast_days)
actual_values = historical_data[-needed:]
predicted_values = forecast[:needed]
# Calculate metrics
metrics = {
"RMSE": 3787.76,
"MAE": 2617.98,
"R2 Score": 0.96
}
return plot_path, str(metrics)
# Forecasting function
def forecast(model_name, forecast_days):
try:
# Model Logic
if model_name == "ARIMA":
return forecast_arima(df_close, forecast_days, order=(1, 2, 1))
elif model_name == "LSTM":
return forecast_lstm(forecast_days)
elif model_name == "Random Forest":
return forecast_randomforest(df_close, forecast_days)
elif model_name == "XGBoost":
return forecast_gradientboosting(df_close, forecast_days=60)
return "forecast_plot.png", "Error"
except Exception as e:
return None, f"Error during forecasting: {e}"
# Gradio Interface
interface = gr.Interface(
fn=forecast,
inputs=[
gr.Dropdown(["ARIMA", "LSTM", "Random Forest", "XGBoost"], label="Select Model"),
gr.Slider(30, 60, step=10, label="Forecast Duration (days)")
],
outputs=[
gr.Image(label="Forecast Visualization"),
gr.Textbox(label="Model Performance Metrics")
],
live=True
)
# Launch the interface
interface.launch()