|
import inspect |
|
import io |
|
import os |
|
import shutil |
|
import tempfile |
|
import threading |
|
import uuid |
|
import warnings |
|
from datetime import datetime |
|
from typing import Callable, Dict |
|
|
|
import markdown |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
import orjson |
|
import pandas as pd |
|
from flask import Flask, Response, render_template, request, send_file |
|
from selector.methods.distance import DISE, MaxMin, MaxSum, OptiSim |
|
from selector.methods.partition import GridPartition, Medoid |
|
from selector.methods.similarity import NSimilarity |
|
from selector.measures.diversity import compute_diversity |
|
from sklearn.metrics import pairwise_distances |
|
from werkzeug.utils import secure_filename |
|
|
|
|
|
UPLOAD_FOLDER = "uploads" |
|
ALLOWED_EXTENSIONS = {"txt", "npz", "xlsx", "xls"} |
|
|
|
app = Flask(__name__) |
|
app.config["MAX_CONTENT_LENGTH"] = 32 * 1024 * 1024 |
|
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER |
|
file_lock = threading.Lock() |
|
|
|
|
|
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True) |
|
|
|
ALLOWED_EXTENSIONS = {"txt", "npz", "xlsx", "xls"} |
|
|
|
|
|
SELECTION_ALGORITHM_MAP = { |
|
|
|
"MaxMin": MaxMin, |
|
"MaxSum": MaxSum, |
|
"OptiSim": OptiSim, |
|
"DISE": DISE, |
|
|
|
"GridPartition": GridPartition, |
|
|
|
"NSimilarity": NSimilarity, |
|
} |
|
|
|
|
|
def allowed_file(filename): |
|
"""Check if file extension is allowed.""" |
|
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS |
|
|
|
def get_unique_upload_dir(): |
|
"""Create a unique directory for each upload session.""" |
|
unique_dir = os.path.join(app.config["UPLOAD_FOLDER"], str(uuid.uuid4())) |
|
os.makedirs(unique_dir, exist_ok=True) |
|
os.chmod(unique_dir, 0o777) |
|
return unique_dir |
|
|
|
def clean_upload_dir(upload_dir): |
|
"""Clean up upload directory after processing.""" |
|
try: |
|
if os.path.exists(upload_dir): |
|
shutil.rmtree(upload_dir) |
|
except Exception as e: |
|
print(f"Error cleaning upload directory: {e}") |
|
|
|
def load_data(filepath): |
|
"""Load data from various file formats.""" |
|
try: |
|
ext = filepath.rsplit(".", 1)[1].lower() |
|
if ext == "npz": |
|
with np.load(filepath) as data: |
|
return data["arr_0"] if "arr_0" in data else next(iter(data.values())) |
|
elif ext == "txt": |
|
return np.loadtxt(filepath) |
|
elif ext in ["xlsx", "xls"]: |
|
df = pd.read_excel(filepath) |
|
return df.to_numpy() |
|
except Exception as e: |
|
raise ValueError(f"Error loading file {filepath}: {str(e)}") |
|
|
|
|
|
def create_json_response(data, status=200): |
|
"""Create a JSON response using orjson for better numpy array handling""" |
|
return Response( |
|
orjson.dumps(data, option=orjson.OPT_SERIALIZE_NUMPY, default=str), |
|
status=status, |
|
mimetype="application/json", |
|
) |
|
|
|
|
|
def read_markdown_file(filename): |
|
"""Read and convert markdown file to HTML.""" |
|
filepath = os.path.join(os.path.dirname(__file__), "md_files", filename) |
|
try: |
|
with open(filepath, "r", encoding="utf-8") as f: |
|
content = f.read() |
|
|
|
|
|
|
|
|
|
|
|
md = markdown.Markdown(extensions=["tables", "fenced_code", "codehilite", "attr_list"]) |
|
|
|
|
|
html = md.convert(content) |
|
|
|
|
|
|
|
html = html.replace("<p>$$", '<div class="math-block">$$') |
|
html = html.replace("$$</p>", "$$</div>") |
|
|
|
|
|
|
|
|
|
return html |
|
except Exception as e: |
|
print(f"Error reading markdown file {filename}: {e}") |
|
return f"<p>Error loading content: {str(e)}</p>" |
|
|
|
|
|
def get_default_parameters(func): |
|
"""Get default parameters for a function from its signature.""" |
|
sig = inspect.signature(func) |
|
defaults = {} |
|
|
|
for name, param in sig.parameters.items(): |
|
if name == "self" or name == "fun_dist": |
|
continue |
|
if param.default is not param.empty: |
|
defaults[name] = param.default |
|
|
|
return defaults |
|
|
|
|
|
@app.route("/get_default_params/<algorithm>") |
|
def get_default_params(algorithm): |
|
"""API endpoint to get default parameters for an algorithm.""" |
|
if algorithm not in SELECTION_ALGORITHM_MAP: |
|
return create_json_response({"error": f"Unknown algorithm: {algorithm}"}, 400) |
|
|
|
try: |
|
|
|
algorithm_class = SELECTION_ALGORITHM_MAP[algorithm] |
|
|
|
params = get_default_parameters(algorithm_class.__init__) |
|
return create_json_response(params) |
|
except Exception as e: |
|
return create_json_response({"error": f"Error getting parameters: {str(e)}"}, 500) |
|
|
|
|
|
@app.route("/get_default_selection_params/<algorithm>") |
|
def get_default_selection_params(algorithm): |
|
"""API endpoint to get default parameters for a selection algorithm.""" |
|
if algorithm not in SELECTION_ALGORITHM_MAP: |
|
return create_json_response({"error": f"Algorithm unsupported: {algorithm}"}, 400) |
|
|
|
try: |
|
return create_json_response(get_default_selection_params(algorithm)) |
|
except Exception as e: |
|
return create_json_response({"error": f"Error getting parameters: {str(e)}"}, 500) |
|
|
|
|
|
@app.route("/") |
|
def home(): |
|
return render_template("index.html") |
|
|
|
|
|
@app.route("/md/<filename>") |
|
def get_markdown(filename): |
|
"""Serve markdown files as HTML.""" |
|
if not filename.endswith(".md"): |
|
filename = filename + ".md" |
|
html = read_markdown_file(filename) |
|
return create_json_response({"html": html}) |
|
|
|
|
|
def process_selection(arr, algorithm, parameters, dist_metric): |
|
""" |
|
Process feature matrix using the specified selection algorithm. |
|
|
|
Parameters |
|
---------- |
|
arr : np.ndarray |
|
Input feature matrix |
|
algorithm : str |
|
Name of the selection algorithm to use |
|
parameters : dict |
|
Parameters for the algorithm |
|
dist_metric : str, optional |
|
Distance function to use. |
|
|
|
Returns |
|
------- |
|
dict |
|
Dictionary containing results and any warnings |
|
""" |
|
result = {"success": False, "error": None, "warnings": [], "indices": None} |
|
|
|
try: |
|
|
|
algorithm_class = SELECTION_ALGORITHM_MAP.get(algorithm) |
|
|
|
if algorithm_class is None: |
|
raise ValueError(f"Unknown algorithm: {algorithm}") |
|
|
|
|
|
size = parameters.pop('size', None) |
|
if size is None: |
|
raise ValueError("Subset size must be specified") |
|
|
|
try: |
|
size = int(size) |
|
if size < 1: |
|
raise ValueError |
|
except (TypeError, ValueError): |
|
raise ValueError("Subset size must be a positive integer") |
|
|
|
|
|
if size > arr.shape[0]: |
|
raise ValueError(f"Subset size ({size}) cannot be larger than the number of samples ({arr.shape[0]})") |
|
|
|
|
|
is_distance_based = algorithm in ["MaxMin", "MaxSum", "OptiSim", "DISE"] |
|
|
|
|
|
arr_float = arr.astype(float) |
|
|
|
|
|
if is_distance_based: |
|
|
|
try: |
|
if dist_metric and dist_metric != "": |
|
|
|
arr_dist = pairwise_distances(arr_float, metric=dist_metric) |
|
else: |
|
|
|
arr_dist = pairwise_distances(arr_float, metric='euclidean') |
|
except Exception as e: |
|
raise ValueError(f"Error computing distance matrix: {str(e)}") |
|
else: |
|
|
|
arr_dist = arr_float |
|
|
|
|
|
if algorithm == "GridPartition": |
|
|
|
nbins_axis = parameters.get('nbins_axis') |
|
if nbins_axis is None: |
|
raise ValueError("nbins_axis must be specified for GridPartition") |
|
try: |
|
parameters['nbins_axis'] = int(nbins_axis) |
|
if parameters['nbins_axis'] < 1: |
|
raise ValueError |
|
except (TypeError, ValueError): |
|
raise ValueError("nbins_axis must be a positive integer") |
|
|
|
|
|
try: |
|
collector = algorithm_class(**parameters) |
|
indices = collector.select(arr_dist, size=size) |
|
|
|
|
|
if indices is None: |
|
raise ValueError("Algorithm returned None instead of indices") |
|
if len(indices) != size: |
|
warnings.warn(f"Algorithm returned {len(indices)} indices but expected {size}") |
|
|
|
|
|
indices_list = indices.tolist() if isinstance(indices, np.ndarray) else list(indices) |
|
if not all(isinstance(i, (int, np.integer)) and 0 <= i < arr.shape[0] for i in indices_list): |
|
raise ValueError("Algorithm returned invalid indices") |
|
|
|
result["success"] = True |
|
result["indices"] = indices_list |
|
|
|
except Exception as e: |
|
import traceback |
|
print(f"Traceback: {traceback.format_exc()}") |
|
raise ValueError(f"Error executing algorithm: {str(e)}") |
|
|
|
except Warning as w: |
|
result["warnings"].append(str(w)) |
|
except Exception as e: |
|
result["error"] = str(e) |
|
|
|
return result |
|
|
|
|
|
@app.route("/upload_selection", methods=["POST"]) |
|
def upload_selection_file(): |
|
"""Handle file upload and process selection.""" |
|
try: |
|
print("Debug - Starting upload_selection_file") |
|
|
|
if "file" not in request.files: |
|
return create_json_response({"error": "No file provided"}, 400) |
|
|
|
file = request.files["file"] |
|
if file.filename == "": |
|
return create_json_response({"error": "No file selected"}, 400) |
|
|
|
if not allowed_file(file.filename): |
|
return create_json_response({"error": "File type not allowed"}, 400) |
|
|
|
|
|
algorithm = request.form.get("algorithm") |
|
if not algorithm: |
|
return create_json_response({"error": "No algorithm specified"}, 400) |
|
|
|
|
|
size = request.form.get("size") |
|
if not size: |
|
return create_json_response({"error": "Subset size must be specified"}, 400) |
|
|
|
|
|
dist_metric = request.form.get("func_dist", "") |
|
|
|
|
|
try: |
|
parameters = orjson.loads(request.form.get("parameters", "{}")) |
|
except Exception as e: |
|
parameters = {} |
|
|
|
|
|
parameters["size"] = size |
|
|
|
|
|
upload_dir = get_unique_upload_dir() |
|
|
|
try: |
|
|
|
file_path = os.path.join( |
|
upload_dir, secure_filename(str(uuid.uuid4()) + "_" + file.filename) |
|
) |
|
|
|
with file_lock: |
|
file.save(file_path) |
|
|
|
|
|
|
|
array = load_data(file_path) |
|
|
|
|
|
result = process_selection(array, algorithm, parameters, dist_metric) |
|
|
|
return create_json_response(result) |
|
|
|
except Exception as e: |
|
return create_json_response({"error": str(e)}, 500) |
|
|
|
finally: |
|
|
|
clean_upload_dir(upload_dir) |
|
|
|
except Exception as e: |
|
return create_json_response({"error": f"Error processing request: {str(e)}"}, 400) |
|
|
|
|
|
@app.route("/download", methods=["POST"]) |
|
def download(): |
|
"""Download selected indices in specified format.""" |
|
try: |
|
data = request.get_json() |
|
if not data or "indices" not in data: |
|
return create_json_response({"error": "No indices provided"}, 400) |
|
|
|
indices = data["indices"] |
|
format = data.get("format", "txt") |
|
timestamp = data.get("timestamp", datetime.now().strftime("%Y%m%d-%H%M%S")) |
|
|
|
|
|
buffer = io.BytesIO() |
|
|
|
|
|
format_settings = { |
|
"txt": { |
|
"extension": "txt", |
|
"mimetype": "text/plain", |
|
"processor": lambda b, d: b.write("\n".join(map(str, d)).encode()), |
|
}, |
|
"npz": { |
|
"extension": "npz", |
|
"mimetype": "application/octet-stream", |
|
"processor": lambda b, d: np.savez_compressed(b, indices=np.array(d)), |
|
}, |
|
"xlsx": { |
|
"extension": "xlsx", |
|
"mimetype": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", |
|
"processor": lambda b, d: pd.DataFrame({"selected_indices": d}).to_excel( |
|
b, index=False |
|
), |
|
}, |
|
} |
|
|
|
if format not in format_settings: |
|
return create_json_response({"error": f"Unsupported format: {format}"}, 400) |
|
|
|
settings = format_settings[format] |
|
|
|
|
|
settings["processor"](buffer, indices) |
|
|
|
|
|
filename = f'selected_indices_{timestamp}.{settings["extension"]}' |
|
|
|
|
|
buffer.seek(0) |
|
|
|
return send_file( |
|
buffer, mimetype=settings["mimetype"], as_attachment=True, download_name=filename |
|
) |
|
|
|
except Exception as e: |
|
print(f"Error in download: {str(e)}") |
|
return create_json_response({"error": str(e)}, 500) |
|
|
|
|
|
@app.route("/calculate_diversity", methods=["POST"]) |
|
def calculate_diversity(): |
|
"""Calculate diversity score for the given feature subset.""" |
|
try: |
|
|
|
feature_subset_file = request.files.get('feature_subset') |
|
features_file = request.files.get('features') |
|
|
|
if not feature_subset_file: |
|
return create_json_response({"error": "Feature subset file is required"}, 400) |
|
|
|
|
|
div_type = request.form.get('div_type', 'shannon_entropy') |
|
div_parameters = orjson.loads(request.form.get('div_parameters', '{}')) |
|
|
|
|
|
try: |
|
|
|
feature_subset_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(feature_subset_file.filename)) |
|
feature_subset_file.save(feature_subset_path) |
|
|
|
|
|
feature_subset = load_data(feature_subset_path) |
|
if feature_subset is None: |
|
raise ValueError(f"Failed to read feature subset file: {feature_subset_file.filename}") |
|
|
|
|
|
feature_subset = feature_subset.astype(float) |
|
|
|
|
|
os.remove(feature_subset_path) |
|
except Exception as e: |
|
return create_json_response({"error": f"Error reading feature subset file: {str(e)}"}, 400) |
|
|
|
|
|
features = None |
|
if features_file: |
|
try: |
|
|
|
features_path = os.path.join(app.config['UPLOAD_FOLDER'], secure_filename(features_file.filename)) |
|
features_file.save(features_path) |
|
|
|
|
|
features = load_data(features_path) |
|
if features is None: |
|
raise ValueError(f"Failed to read features file: {features_file.filename}") |
|
|
|
|
|
features = features.astype(float) |
|
|
|
|
|
os.remove(features_path) |
|
except Exception as e: |
|
return create_json_response({"error": f"Error reading features file: {str(e)}"}, 400) |
|
|
|
|
|
normalize = div_parameters.get('normalize', False) |
|
truncation = div_parameters.get('truncation', False) |
|
cs = div_parameters.get('cs', None) |
|
|
|
|
|
try: |
|
diversity_score = compute_diversity( |
|
feature_subset=feature_subset, |
|
div_type=div_type, |
|
normalize=normalize, |
|
truncation=truncation, |
|
features=features, |
|
cs=cs |
|
) |
|
|
|
return create_json_response({ |
|
"success": True, |
|
"diversity_score": float(diversity_score) |
|
}) |
|
|
|
except Exception as e: |
|
import traceback |
|
print(f"Error calculating diversity: {str(e)}") |
|
print(f"Traceback: {traceback.format_exc()}") |
|
return create_json_response({"error": f"Error calculating diversity: {str(e)}"}, 400) |
|
|
|
except Exception as e: |
|
return create_json_response({"error": str(e)}, 500) |
|
|
|
@app.route('/health') |
|
def health_check(): |
|
"""Health check endpoint for Docker""" |
|
return create_json_response({"status": "healthy"}) |
|
|
|
if __name__ == "__main__": |
|
app.run(debug=False, host="0.0.0.0", port=7860) |
|
|
|
|