import os, glob import json from datetime import datetime, timezone from dataclasses import dataclass from datasets import load_dataset, Dataset import pandas as pd import gradio as gr from huggingface_hub import HfApi, snapshot_download, ModelInfo, list_models from enum import Enum OWNER = "AIEnergyScore" COMPUTE_SPACE = f"{OWNER}/launch-computation-example" TOKEN = os.environ.get("DEBUG") API = HfApi(token=TOKEN) task_mappings = { 'automatic speech recognition': 'automatic-speech-recognition', 'Object Detection': 'object-detection', 'Text Classification': 'text-classification', 'Image to Text': 'image-to-text', 'Question Answering': 'question-answering', 'Text Generation': 'text-generation', 'Image Classification': 'image-classification', 'Sentence Similarity': 'sentence-similarity', 'Image Generation': 'image-generation', 'Summarization': 'summarization' } @dataclass class ModelDetails: name: str display_name: str = "" symbol: str = "" # emoji def start_compute_space(): API.restart_space(COMPUTE_SPACE) gr.Info(f"Okay! {COMPUTE_SPACE} should be running now!") def get_model_size(model_info: ModelInfo): """Gets the model size from the configuration, or the model name if the configuration does not contain the information.""" try: model_size = round(model_info.safetensors["total"] / 1e9, 3) except (AttributeError, TypeError): return 0 # Unknown model sizes are indicated as 0, see NUMERIC_INTERVALS in app.py return model_size def add_docker_eval(zip_file): new_fid_list = zip_file.split("/") new_fid = new_fid_list[-1] if new_fid.endswith('.zip'): API.upload_file( path_or_fileobj=zip_file, repo_id="AIEnergyScore/tested_proprietary_models", path_in_repo='submitted_models/' + new_fid, repo_type="dataset", commit_message="Adding logs via submission Space.", token=TOKEN ) gr.Info('Uploaded logs to dataset! We will validate their validity and add them to the next version of the leaderboard.') else: gr.Info('You can only upload .zip files here!') def add_new_eval(repo_id: str, task: str): model_owner = repo_id.split("/")[0] model_name = repo_id.split("/")[1] current_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") requests = load_dataset("AIEnergyScore/requests_debug", split="test", token=TOKEN) requests_dset = requests.to_pandas() model_list = requests_dset[requests_dset['status'] == 'COMPLETED']['model'].tolist() task_models = list(API.list_models(filter=task_mappings[task])) task_model_names = [m.id for m in task_models] if repo_id in model_list: gr.Info('This model has already been run!') elif repo_id not in task_model_names: gr.Info("This model isn't compatible with the chosen task! Pick a different model-task combination") else: # Is the model info correctly filled? try: model_info = API.model_info(repo_id=repo_id) model_size = get_model_size(model_info=model_info) likes = model_info.likes except Exception: gr.Info("Could not find information for model %s" % (model_name)) model_size = None likes = None gr.Info("Adding request") request_dict = { "model": repo_id, "status": "PENDING", "submitted_time": pd.to_datetime(current_time), "task": task_mappings[task], "likes": likes, "params": model_size, "leaderboard_version": "v0", } print("Writing out request file to dataset") df_request_dict = pd.DataFrame([request_dict]) print(df_request_dict) df_final = pd.concat([requests_dset, df_request_dict], ignore_index=True) updated_dset = Dataset.from_pandas(df_final) updated_dset.push_to_hub("AIEnergyScore/requests_debug", split="test", token=TOKEN) gr.Info("Starting compute space at %s " % COMPUTE_SPACE) return start_compute_space() def print_existing_models(): requests = load_dataset("AIEnergyScore/requests_debug", split="test", token=TOKEN) requests_dset = requests.to_pandas() model_df = requests_dset[['model', 'status']] model_df = model_df[model_df['status'] == 'COMPLETED'] return model_df def highlight_cols(x): df = x.copy() df[df['status'] == 'COMPLETED'] = 'color: green' df[df['status'] == 'PENDING'] = 'color: orange' df[df['status'] == 'FAILED'] = 'color: red' return df # Applying the style function existing_models = print_existing_models() formatted_df = existing_models.style.apply(highlight_cols, axis=None) def get_leaderboard_models(): path = r'leaderboard_v0_data/energy' filenames = glob.glob(path + "/*.csv") data = [] for filename in filenames: data.append(pd.read_csv(filename)) leaderboard_data = pd.concat(data, ignore_index=True) return leaderboard_data[['model', 'task']] # A placeholder for get_zip_data_link() -- replace with your actual implementation if available. def get_zip_data_link(): return ( 'Download Logs' ) with gr.Blocks() as demo: # --- Header Links (at the very top, evenly spaced) --- gr.HTML("""
""") # --- Logo (centered) --- gr.HTML("""