import inspect |
import io |
import os |
import shutil |
import tempfile |
import threading |
import uuid |
import warnings |
from datetime import datetime |
from typing import Callable, Dict |
import markdown |
import matplotlib.pyplot as plt |
import numpy as np |
import orjson |
import pandas as pd |
from flask import Flask, Response, render_template, request, send_file |
from selector.methods.distance import DISE, MaxMin, MaxSum, OptiSim |
from selector.methods.partition import GridPartition, Medoid |
from selector.methods.similarity import NSimilarity |
from selector.measures.diversity import compute_diversity |
from sklearn.metrics import pairwise_distances |
from werkzeug.utils import secure_filename |
UPLOAD_FOLDER = "uploads" |
ALLOWED_EXTENSIONS = {"txt", "npz", "xlsx", "xls"} |
app = Flask(__name__) |
app.config["MAX_CONTENT_LENGTH"] = 32 * 1024 * 1024 |
file_lock = threading.Lock() |
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) |
ALLOWED_EXTENSIONS = {"txt", "npz", "xlsx", "xls"} |
"MaxMin": MaxMin, |
"MaxSum": MaxSum, |
"OptiSim": OptiSim, |
"GridPartition": GridPartition, |
"NSimilarity": NSimilarity, |
} |
def allowed_file(filename): |
"""Check if file extension is allowed.""" |
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS |
def get_unique_upload_dir(): |
"""Create a unique directory for each upload session.""" |
unique_dir = os.path.join(app.config["UPLOAD_FOLDER"], str(uuid.uuid4())) |
os.makedirs(unique_dir, exist_ok=True) |
os.chmod(unique_dir, 0o777) |
return unique_dir |
def clean_upload_dir(upload_dir): |
"""Clean up upload directory after processing.""" |
try: |
if os.path.exists(upload_dir): |
shutil.rmtree(upload_dir) |
except Exception as e: |
print(f"Error cleaning upload directory: {e}") |
def load_data(filepath): |
"""Load data from various file formats.""" |
try: |
ext = filepath.rsplit(".", 1)[1].lower() |
if ext == "npz": |
with np.load(filepath) as data: |
return data["arr_0"] if "arr_0" in data else next(iter(data.values())) |
elif ext == "txt": |
return np.loadtxt(filepath) |
elif ext in ["xlsx", "xls"]: |
df = pd.read_excel(filepath) |
return df.to_numpy() |
except Exception as e: |
raise ValueError(f"Error loading file {filepath}: {str(e)}") |
def create_json_response(data, status=200): |
"""Create a JSON response using orjson for better numpy array handling""" |
return Response( |
orjson.dumps(data, option=orjson.OPT_SERIALIZE_NUMPY, default=str), |
status=status, |
mimetype="application/json", |
) |
def read_markdown_file(filename): |
"""Read and convert markdown file to HTML.""" |
filepath = os.path.join(os.path.dirname(__file__), "md_files", filename) |
try: |
with open(filepath, "r", encoding="utf-8") as f: |
content = f.read() |
md = markdown.Markdown(extensions=["tables", "fenced_code", "codehilite", "attr_list"]) |
html = md.convert(content) |
html = html.replace("<p>$$", '<div class="math-block">$$') |
html = html.replace("$$</p>", "$$</div>") |
return html |
except Exception as e: |
print(f"Error reading markdown file {filename}: {e}") |
return f"<p>Error loading content: {str(e)}</p>" |
def get_default_parameters(func): |
"""Get default parameters for a function from its signature.""" |
sig = inspect.signature(func) |
defaults = {} |
for name, param in sig.parameters.items(): |
if name == "self" or name == "fun_dist": |
continue |
if param.default is not param.empty: |
defaults[name] = param.default |
return defaults |
@app.route("/get_default_params/<algorithm>") |
def get_default_params(algorithm): |
"""API endpoint to get default parameters for an algorithm.""" |
if algorithm not in SELECTION_ALGORITHM_MAP: |
return create_json_response({"error": f"Unknown algorithm: {algorithm}"}, 400) |
try: |
algorithm_class = SELECTION_ALGORITHM_MAP[algorithm] |
params = get_default_parameters(algorithm_class.__init__) |
return create_json_response(params) |
except Exception as e: |
return create_json_response({"error": f"Error getting parameters: {str(e)}"}, 500) |
@app.route("/get_default_selection_params/<algorithm>") |
def get_default_selection_params(algorithm): |
"""API endpoint to get default parameters for a selection algorithm.""" |
if algorithm not in SELECTION_ALGORITHM_MAP: |
return create_json_response({"error": f"Algorithm unsupported: {algorithm}"}, 400) |
try: |
return create_json_response(get_default_selection_params(algorithm)) |
except Exception as e: |
return create_json_response({"error": f"Error getting parameters: {str(e)}"}, 500) |
@app.route("/") |
def home(): |
return render_template("index.html") |
@app.route("/md/<filename>") |
def get_markdown(filename): |
"""Serve markdown files as HTML.""" |
if not filename.endswith(".md"): |
filename = filename + ".md" |
html = read_markdown_file(filename) |
return create_json_response({"html": html}) |
def process_selection(arr, algorithm, parameters, dist_metric): |
""" |
Process feature matrix using the specified selection algorithm. |
Parameters |
---------- |
arr : np.ndarray |
Input feature matrix |
algorithm : str |
Name of the selection algorithm to use |
parameters : dict |
Parameters for the algorithm |
dist_metric : str, optional |
Distance function to use. |
Returns |
------- |
dict |
Dictionary containing results and any warnings |
""" |
result = {"success": False, "error": None, "warnings": [], "indices": None} |
try: |
algorithm_class = SELECTION_ALGORITHM_MAP.get(algorithm) |
if algorithm_class is None: |
raise ValueError(f"Unknown algorithm: {algorithm}") |
size = parameters.pop('size', None) |
if size is None: |
raise ValueError("Subset size must be specified") |
try: |
size = int(size) |
if size < 1: |
raise ValueError |
except (TypeError, ValueError): |
raise ValueError("Subset size must be a positive integer") |
if size > arr.shape[0]: |
raise ValueError(f"Subset size ({size}) cannot be larger than the number of samples ({arr.shape[0]})") |
is_distance_based = algorithm in ["MaxMin", "MaxSum", "OptiSim", "DISE"] |
arr_float = arr.astype(float) |
if is_distance_based: |
try: |
if dist_metric and dist_metric != "": |
arr_dist = pairwise_distances(arr_float, metric=dist_metric) |
else: |
arr_dist = pairwise_distances(arr_float, metric='euclidean') |
except Exception as e: |
raise ValueError(f"Error computing distance matrix: {str(e)}") |
else: |
arr_dist = arr_float |
if algorithm == "GridPartition": |
nbins_axis = parameters.get('nbins_axis') |
if nbins_axis is None: |
raise ValueError("nbins_axis must be specified for GridPartition") |
try: |
parameters['nbins_axis'] = int(nbins_axis) |
if parameters['nbins_axis'] < 1: |
raise ValueError |
except (TypeError, ValueError): |
raise ValueError("nbins_axis must be a positive integer") |
try: |
collector = algorithm_class(**parameters) |
indices = collector.select(arr_dist, size=size) |
if indices is None: |
raise ValueError("Algorithm returned None instead of indices") |
if len(indices) != size: |
warnings.warn(f"Algorithm returned {len(indices)} indices but expected {size}") |
indices_list = indices.tolist() if isinstance(indices, np.ndarray) else list(indices) |
if not all(isinstance(i, (int, np.integer)) and 0 <= i < arr.shape[0] for i in indices_list): |
raise ValueError("Algorithm returned invalid indices") |
result["success"] = True |
result["indices"] = indices_list |
except Exception as e: |
import traceback |
print(f"Traceback: {traceback.format_exc()}") |
raise ValueError(f"Error executing algorithm: {str(e)}") |
except Warning as w: |
result["warnings"].append(str(w)) |
except Exception as e: |
result["error"] = str(e) |
return result |
@app.route("/upload_selection", methods=["POST"]) |
def upload_selection_file(): |
"""Handle file upload and process selection.""" |
try: |
print("Debug - Starting upload_selection_file") |
if "file" not in request.files: |
return create_json_response({"error": "No file provided"}, 400) |
file = request.files["file"] |
if file.filename == "": |
return create_json_response({"error": "No file selected"}, 400) |
if not allowed_file(file.filename): |
return create_json_response({"error": "File type not allowed"}, 400) |
algorithm = request.form.get("algorithm") |
if not algorithm: |
return create_json_response({"error": "No algorithm specified"}, 400) |
size = request.form.get("size") |
if not size: |
return create_json_response({"error": "Subset size must be specified"}, 400) |
dist_metric = request.form.get("func_dist", "") |
try: |
parameters = orjson.loads(request.form.get("parameters", "{}")) |
except Exception as e: |
parameters = {} |
parameters["size"] = size |
upload_dir = get_unique_upload_dir() |
try: |
file_path = os.path.join( |
upload_dir, secure_filename(str(uuid.uuid4()) + "_" + file.filename) |
) |
with file_lock: |
file.save(file_path) |
array = load_data(file_path) |
result = process_selection(array, algorithm, parameters, dist_metric) |
return create_json_response(result) |
except Exception as e: |
return create_json_response({"error": str(e)}, 500) |
finally: |
clean_upload_dir(upload_dir) |
except Exception as e: |
return create_json_response({"error": f"Error processing request: {str(e)}"}, 400) |
@app.route("/download", methods=["POST"]) |
def download(): |
"""Download selected indices in specified format.""" |
try: |
data = request.get_json() |
if not data or "indices" not in data: |
return create_json_response({"error": "No indices provided"}, 400) |
indices = data["indices"] |
format = data.get("format", "txt") |
timestamp = data.get("timestamp", datetime.now().strftime("%Y%m%d-%H%M%S")) |
buffer = io.BytesIO() |
format_settings = { |
"txt": { |
"extension": "txt", |
"mimetype": "text/plain", |
"processor": lambda b, d: b.write("\n".join(map(str, d)).encode()), |
}, |
"npz": { |
"extension": "npz", |
"mimetype": "application/octet-stream", |
"processor": lambda b, d: np.savez_compressed(b, indices=np.array(d)), |
}, |
"xlsx": { |
"extension": "xlsx", |
"mimetype": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
"processor": lambda b, d: pd.DataFrame({"selected_indices": d}).to_excel( |
b, index=False |
), |
}, |
} |
if format not in format_settings: |
return create_json_response({"error": f"Unsupported format: {format}"}, 400) |
settings = format_settings[format] |
settings["processor"](buffer, indices) |
filename = f'selected_indices_{timestamp}.{settings["extension"]}' |
buffer.seek(0) |
return send_file( |
buffer, mimetype=settings["mimetype"], as_attachment=True, download_name=filename |
) |
except Exception as e: |
print(f"Error in download: {str(e)}") |
return create_json_response({"error": str(e)}, 500) |
@app.route("/calculate_diversity", methods=["POST"]) |
def calculate_diversity(): |
"""Calculate diversity score for the given feature subset.""" |
try: |
feature_subset_file = request.files.get('feature_subset') |
features_file = request.files.get('features') |
if not feature_subset_file: |
return create_json_response({"error": "Feature subset file is required"}, 400) |
div_type = request.form.get('div_type', 'shannon_entropy') |
div_parameters = orjson.loads(request.form.get('div_parameters', '{}')) |
try: |
feature_subset_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(feature_subset_file.filename)) |
feature_subset_file.save(feature_subset_path) |
feature_subset = load_data(feature_subset_path) |
if feature_subset is None: |
raise ValueError(f"Failed to read feature subset file: {feature_subset_file.filename}") |
feature_subset = feature_subset.astype(float) |
os.remove(feature_subset_path) |
except Exception as e: |
return create_json_response({"error": f"Error reading feature subset file: {str(e)}"}, 400) |
features = None |
if features_file: |
try: |
features_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(features_file.filename)) |
features_file.save(features_path) |
features = load_data(features_path) |
if features is None: |
raise ValueError(f"Failed to read features file: {features_file.filename}") |
features = features.astype(float) |
os.remove(features_path) |
except Exception as e: |
return create_json_response({"error": f"Error reading features file: {str(e)}"}, 400) |
normalize = div_parameters.get('normalize', False) |
truncation = div_parameters.get('truncation', False) |
cs = div_parameters.get('cs', None) |
try: |
diversity_score = compute_diversity( |
feature_subset=feature_subset, |
div_type=div_type, |
normalize=normalize, |
truncation=truncation, |
features=features, |
cs=cs |
) |
return create_json_response({ |
"success": True, |
"diversity_score": float(diversity_score) |
}) |
except Exception as e: |
import traceback |
print(f"Error calculating diversity: {str(e)}") |
print(f"Traceback: {traceback.format_exc()}") |
return create_json_response({"error": f"Error calculating diversity: {str(e)}"}, 400) |
except Exception as e: |
return create_json_response({"error": str(e)}, 500) |
@app.route('/health') |
def health_check(): |
"""Health check endpoint for Docker""" |
return create_json_response({"status": "healthy"}) |
if __name__ == "__main__": |
app.run(debug=False, host="", port=7860) |