WebashalarForML's picture
Update app.py
d3da744 verified
raw
history blame
16.4 kB
import os
import json
from flask import Flask, request, render_template, redirect, url_for, session, flash, send_from_directory, send_file
from werkzeug.utils import secure_filename
from utils.file_to_text import extract_text_based_on_format, preprocess_text
from utils.anoter_to_json import process_uploaded_json
from utils.json_to_spacy import convert_json_to_spacy
from utils.model import train_model
import zipfile
import shutil
app = Flask(__name__)
app.secret_key = 'your_secret_key'
os.umask(0o000)
# Folder paths
app.config['UPLOAD_FOLDER'] = 'uploads/'
app.config['JSON_FOLDER'] = 'JSON/'
app.config['DATA_FOLDER'] = 'data/'
app.config['MODELS_FOLDER'] = 'Models/'
app.config['MAIN_FOLDER'] = os.getcwd()
# Creating Folders if not exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
os.makedirs(app.config['JSON_FOLDER'], exist_ok=True)
os.makedirs(app.config['DATA_FOLDER'], exist_ok=True)
os.makedirs(app.config['MODELS_FOLDER'], exist_ok=True)
# Verify and check once again if the folders are created or not
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
if not os.path.exists(app.config['JSON_FOLDER']):
os.makedirs(app.config['JSON_FOLDER'])
if not os.path.exists(app.config['DATA_FOLDER']):
os.makedirs(app.config['DATA_FOLDER'])
if not os.path.exists(app.config['MODELS_FOLDER']):
os.makedirs(app.config['MODELS_FOLDER'])
# Create the empty file
file_path = os.path.join(app.config['DATA_FOLDER'], 'resume_text.txt')
with open(file_path, 'w') as file:
pass
# Allowed file extensions
ALLOWED_EXTENSIONS = {'pdf', 'docx', 'rsf', 'odt', 'png', 'jpg', 'jpeg', 'json'}
# Function to check file extensions
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
# HTML render routes (modify to fit your structure)
@app.route('/')
def index():
return render_template('upload.html')
@app.route('/guide')
def guide():
return render_template('guide.html')
@app.route('/ner_preview', methods=['GET'])
def ner_preview():
return render_template('anoter.html')
@app.route('/json', methods=['GET'])
def json_file():
return render_template('savejson.html')
@app.route('/spacy', methods=['GET'])
def spacy_file():
return render_template('saveSpacy.html')
@app.route('/text_preview', methods=['GET'])
def text_preview():
try:
resume_file_path = os.path.join(app.config['DATA_FOLDER'], 'resume_text.txt')
if not os.path.exists(resume_file_path):
flash('Resume text not found', 'error')
return redirect(url_for('index'))
with open(resume_file_path, 'r') as f:
text = f.read()
return render_template('text.html', text=text)
except Exception as e:
flash(f"Error loading text preview: {str(e)}", 'error')
return redirect(url_for('index'))
# API for uploading Resume files
@app.route('/upload',methods=['GET', 'POST'])
def upload_file():
try:
if 'file' not in request.files:
flash('No file part', 'error')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No selected file', 'error')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(file_path)
# Handle text extraction for non-JSON files
if not filename.lower().endswith('.json'):
return process_other_files(file_path, filename)
flash('File type not allowed', 'error')
except Exception as e:
flash(f"Error: {str(e)}", 'error')
return redirect(request.url)
# Process non-JSON files, extract text and save to 'resume_text.txt'
def process_other_files(file_path, filename):
try:
extracted_text, _ = extract_text_based_on_format(file_path)
cleaned_text = preprocess_text(extracted_text)
os.makedirs(app.config['DATA_FOLDER'], exist_ok=True)
resume_file_path = os.path.join(app.config['DATA_FOLDER'], 'resume_text.txt')
with open(resume_file_path, 'w', encoding='utf-8') as f:
f.write(cleaned_text)
session['uploaded_file'] = filename
return render_template('text.html', text=cleaned_text)
except Exception as e:
flash(f"Error processing file {filename}: {str(e)}", 'error')
return redirect(request.referrer)
# API to handle the text editing and saving
@app.route('/edit_text', methods=['POST'])
def edit_text():
try:
# Get the edited text from the form
edited_text = request.form['edited_text']
# Save the edited text back to 'resume_text.txt'
resume_file_path = os.path.join(app.config['DATA_FOLDER'], 'resume_text.txt')
with open(resume_file_path, 'w', encoding='utf-8') as f:
f.write(edited_text)
flash('Text edited successfully', 'success')
# Pass the edited text back to the template
return render_template('text.html', text=edited_text)
except Exception as e:
flash(f"Error saving edited text: {str(e)}", 'error')
return redirect(request.referrer)
# API for downloading the 'resume_text.txt' file
@app.route('/download', methods=['GET'])
def download_file():
try:
return send_from_directory(app.config['DATA_FOLDER'], 'resume_text.txt', as_attachment=True)
except Exception as e:
flash(f"Error downloading file: {str(e)}", 'error')
return redirect(request.referrer)
@app.route('/save_and_download', methods=['POST'])
def save_and_download():
try:
# Get the edited text from the form
edited_text = request.form['edited_text']
# Save the edited text back to 'resume_text.txt'
resume_file_path = os.path.join(app.config['DATA_FOLDER'], 'resume_text.txt')
with open(resume_file_path, 'w', encoding='utf-8') as f:
f.write(edited_text)
# flash('Text edited successfully', 'success')
print(f"Saved instace: {app.config['DATA_FOLDER']}/resume_text.txt")
# Now send the file as a download
return send_from_directory(app.config['DATA_FOLDER'], 'resume_text.txt', as_attachment=True)
except Exception as e:
flash(f"Error saving and downloading file: {str(e)}", 'error')
return redirect(request.referrer)
# API for uploading and processing JSON files
@app.route('/upload_json', methods=['POST'])
def upload_json_file():
try:
if 'file' not in request.files:
flash('No file part', 'error')
return redirect(request.url)
file = request.files['file']
if file.filename == '':
flash('No selected file', 'error')
return redirect(request.url)
if file and file.filename.lower().endswith('.json'):
filename = secure_filename(file.filename)
json_path = os.path.join(app.config['JSON_FOLDER'], filename)
os.makedirs(app.config['JSON_FOLDER'], exist_ok=True)
file.save(json_path)
session['uploaded_json'] = filename
flash(f'JSON file {filename} uploaded successfully')
print(f"Saved instace for JSon: {app.config['JSON_FOLDER']}/{filename}")
else:
flash('File type not allowed', 'error')
except Exception as e:
flash(f"Error: {str(e)}", 'error')
return redirect(request.referrer)
# Process uploaded JSON file and save formatted data
@app.route('/process_json', methods=['GET'])
def process_json_file():
try:
json_folder = app.config['JSON_FOLDER']
json_files = os.listdir(json_folder)
if not json_files:
flash('No JSON files found in the folder', 'error')
return redirect(request.referrer)
filename = json_files[0] # Modify logic if needed to handle multiple files
json_path = os.path.join(json_folder, filename)
if not os.path.exists(json_path):
flash(f'JSON file {filename} not found', 'error')
return redirect(request.referrer)
process_uploaded_json(json_path)
os.makedirs(app.config['DATA_FOLDER'], exist_ok=True)
processed_file_path = os.path.join(app.config['DATA_FOLDER'], f'Processed_{filename}')
flash(f'JSON file {filename} processed successfully')
except Exception as e:
flash(f"Error processing JSON file: {str(e)}", 'error')
return redirect(request.referrer)
# API for removing uploaded JSON files
@app.route('/remove_json', methods=['POST'])
def remove_all_json_files():
try:
json_folder = app.config['JSON_FOLDER']
for filename in os.listdir(json_folder):
file_path = os.path.join(json_folder, filename)
if os.path.isfile(file_path):
os.remove(file_path)
session.pop('uploaded_json', None)
flash('All JSON files removed successfully')
except Exception as e:
flash(f"Error removing files: {str(e)}", 'error')
return redirect(request.referrer)
# API for removing non-JSON files
@app.route('/remove', methods=['POST'])
def remove_file():
try:
upload_folder = app.config['UPLOAD_FOLDER']
# Check if the folder exists
if os.path.exists(upload_folder):
# Loop through all files in the upload folder and remove them
for filename in os.listdir(upload_folder):
file_path = os.path.join(upload_folder, filename)
# Check if it is a file and remove it
if os.path.isfile(file_path):
os.remove(file_path)
# Clear session data related to uploaded files
session.pop('uploaded_file', None)
flash('All files removed successfully')
else:
flash(f"Upload folder does not exist", 'error')
except Exception as e:
flash(f"Error removing files: {str(e)}", 'error')
return redirect(url_for('index'))
@app.route('/to_sapcy', methods=['POST'])
def to_sapcy():
try:
# Path to the JSON file
json_file_path = 'data/Json_Data.json'
# Convert the JSON file to a .spacy file
spacy_file_path = 'data/Spacy_data.spacy'
# Call the conversion function
convert_json_to_spacy(json_file_path, spacy_file_path)
flash('Model training data converted successfully', 'success')
except Exception as e:
flash(f"Error during conversion: {str(e)}", 'error')
return redirect(request.referrer)
@app.route('/train_model_endpoint', methods=['POST'])
def train_model_endpoint():
try:
# Get the number of epochs and model version from the request
epochs = int(request.form.get('epochs', 10)) # Default to 10 if not provided
version = request.form.get('model_version', 'v1') # Default to 'v1' if not provided
# Call the training function with user-defined parameters
model_path = f"./Models/ner_model_{version}"
train_model(epochs, model_path)
print(f"Saved instace for Model: app//Models/ner_model_{version}")
flash('Model training completed successfully', 'success')
except Exception as e:
flash(f"Error during training: {str(e)}", 'error')
return redirect(url_for('index'))
# API for removing all files from specific folders
@app.route('/remove_files', methods=['POST'])
def remove_files():
try:
# Define folders to clear
folders_to_clear = [app.config['UPLOAD_FOLDER'], app.config['JSON_FOLDER'], app.config['MODELS_FOLDER'] ]
for folder_path in folders_to_clear:
# Remove all files from the specified folder
for filename in os.listdir(folder_path):
file_path = os.path.join(folder_path, filename)
if os.path.isfile(file_path):
os.remove(file_path)
# Clear session variables related to the removed folders
session.pop('uploaded_file', None)
session.pop('uploaded_json', None)
flash('All files removed from folder successfully')
except Exception as e:
flash(f"Error removing files: {str(e)}", 'error')
return redirect(url_for('index'))
# API for downloading the latest trained model
'''
@app.route('/download_model', methods=['GET'])
def download_latest_model():
try:
models_dir = app.config['MODELS_FOLDER']
model_files = os.listdir(models_dir)
if not model_files:
flash('No model files found', 'error')
return redirect(request.referrer)
# Sort model files and get the latest one
latest_model_file = sorted(model_files, reverse=True)[0]
# Full path to the latest model file
model_path = os.path.join(models_dir, latest_model_file)
if not os.path.exists(model_path):
flash('Model file not found on the server', 'error')
return redirect(request.referrer)
# Create a zip file with the model
zip_filename = os.path.join(models_dir, f"{latest_model_file}.zip")
with zipfile.ZipFile(zip_filename, 'w') as zipf:
zipf.write(model_path, os.path.basename(model_path))
# Send the zip file as a download
return send_file(zip_filename, as_attachment=True)
except Exception as e:
flash(f"Error while downloading the model: {str(e)}", 'error')
return redirect(request.referrer)
'''
@app.route('/download_model', methods=['GET'])
def download_latest_model():
try:
models_dir = app.config['MODELS_FOLDER']
model_files = os.listdir(models_dir)
if not model_files:
flash('No model files found', 'error')
return redirect(request.referrer)
# Sort model files and get the latest one
latest_model_file = sorted(model_files, reverse=True)[0]
# Full path to the latest model file
model_path = os.path.join(models_dir, latest_model_file)
if not os.path.exists(model_path):
flash('Model file not found on the server', 'error')
return redirect(request.referrer)
# Create a zip file with the model
zip_filename = os.path.join(models_dir, f"{latest_model_file}.zip")
# Create a zip file that includes the whole directory structure of the model
with zipfile.ZipFile(zip_filename, 'w', zipfile.ZIP_DEFLATED) as zipf:
# If the model is a directory, zip the whole directory
if os.path.isdir(model_path):
for foldername, subfolders, filenames in os.walk(model_path):
for filename in filenames:
file_path = os.path.join(foldername, filename)
# Write the file, preserving the directory structure
zipf.write(file_path, os.path.relpath(file_path, models_dir))
else:
# If it's just a single file, zip it
zipf.write(model_path, os.path.basename(model_path))
# Send the zip file as a download
return send_file(zip_filename, as_attachment=True)
except Exception as e:
flash(f"Error while downloading the model: {str(e)}", 'error')
return redirect(request.referrer)
# Route to serve uploaded files
@app.route('/<foldername>/zip', methods=['GET'])
def zip_folder(foldername):
# Construct the folder path
folder_path = os.path.join(app.config['MAIN_FOLDER'], foldername)
# Check if the folder exists
if not os.path.exists(folder_path) or not os.path.isdir(folder_path):
abort(404, description="Folder not found")
# Define the zip file name and path
zip_filename = f"{foldername}.zip"
zip_path = os.path.join(app.config['MAIN_FOLDER'], zip_filename)
# Create a zip file of the folder
shutil.make_archive(zip_path[:-4], 'zip', folder_path)
# Send the zipped file to the client
return send_from_directory(app.config['MAIN_FOLDER'], zip_filename, as_attachment=True)
if __name__ == '__main__':
app.run(debug=True)