|
import spaces
|
|
import gradio as gr
|
|
from huggingface_hub import HfApi, ModelInfo, DatasetInfo, SpaceInfo
|
|
from typing import Union
|
|
import gc
|
|
import pandas as pd
|
|
import datetime
|
|
import json
|
|
import re
|
|
from hfconstants import DS_SIZE_CATEGORIES, SPACE_HARDWARES, SPACE_STAGES
|
|
|
|
@spaces.GPU
|
|
def dummy_gpu():
|
|
pass
|
|
|
|
RESULT_ITEMS = {
|
|
"Type": [1, "str", True],
|
|
"ID": [2, "markdown", True, "40%"],
|
|
"Status": [4, "markdown", True],
|
|
"Gated": [6, "str", True],
|
|
"Likes": [10, "number", True],
|
|
"DLs": [12, "number", True],
|
|
"AllDLs": [13, "number", False],
|
|
"Trending": [16, "number", True],
|
|
"LastMod.": [17, "str", True],
|
|
"Library": [20, "markdown", False],
|
|
"Pipeline": [21, "markdown", True],
|
|
"Hardware": [25, "str", False],
|
|
"Stage": [26, "str", False],
|
|
"NFAA": [40, "str", False],
|
|
}
|
|
|
|
try:
|
|
with open("tags.json", encoding="utf-8") as f:
|
|
TAGS = json.load(f)
|
|
with open("subtags.json", encoding="utf-8") as f:
|
|
SUBTAGS = json.load(f)
|
|
except Exception as e:
|
|
TAGS = []
|
|
SUBTAGS = {}
|
|
print(e)
|
|
|
|
def get_tags():
|
|
return TAGS[0:1000]
|
|
|
|
def get_subtag_categories():
|
|
return list(SUBTAGS.keys())
|
|
|
|
def update_subtag_items(category: str):
|
|
choices=[""] + list(SUBTAGS.get(category, []))
|
|
return gr.update(choices=choices, value=choices[0])
|
|
|
|
def update_subtags(tags: str, category: str, item: str):
|
|
addtag = f"{category}:{item}" if item else ""
|
|
newtags = f"{tags}\n{addtag}" if tags else addtag
|
|
return newtags
|
|
|
|
def update_tags(tags: str, item: str):
|
|
newtags = f"{tags}\n{item}" if tags else item
|
|
return newtags
|
|
|
|
def str_to_list(s: str):
|
|
try:
|
|
m = re.split("\n", s)
|
|
return [s.strip() for s in list(m)]
|
|
except Exception:
|
|
return []
|
|
|
|
def is_valid_arg(s: str):
|
|
return len(str_to_list(s)) > 0
|
|
|
|
def get_labels():
|
|
return list(RESULT_ITEMS.keys())
|
|
|
|
def get_valid_labels():
|
|
return [k for k in list(RESULT_ITEMS.keys()) if RESULT_ITEMS[k][2]]
|
|
|
|
def date_to_str(dt: datetime.datetime):
|
|
return dt.strftime('%Y-%m-%d %H:%M')
|
|
|
|
class Labels():
|
|
VALID_DTYPE = ["str", "number", "bool", "date", "markdown"]
|
|
|
|
def __init__(self):
|
|
self.types = {}
|
|
self.orders = {}
|
|
self.widths = {}
|
|
|
|
def set(self, label: str):
|
|
if not label in RESULT_ITEMS.keys(): raise Exception(f"Invalid item: {label}")
|
|
item = RESULT_ITEMS.get(label)
|
|
if item[1] not in self.VALID_DTYPE: raise Exception(f"Invalid data type: {type}")
|
|
self.types[label] = item[1]
|
|
self.orders[label] = item[0]
|
|
if len(item) > 3: self.widths[label] = item[3]
|
|
else: self.widths[label] = "10%"
|
|
|
|
def get(self):
|
|
labels = list(self.types.keys())
|
|
labels.sort(key=lambda x: self.orders[x])
|
|
label_types = [self.types[s] for s in labels]
|
|
return labels, label_types
|
|
|
|
def get_widths(self):
|
|
labels = list(self.types.keys())
|
|
label_widths = [self.widths[s] for s in labels]
|
|
return label_widths
|
|
|
|
def get_null_value(self, type: str):
|
|
if type == "bool": return False
|
|
elif type == "number" or type == "date": return 0
|
|
else: return "None"
|
|
|
|
|
|
|
|
class HFSearchResult():
|
|
def __init__(self):
|
|
self.labels = Labels()
|
|
self.current_item = {}
|
|
self.current_item_info = None
|
|
self.item_list = []
|
|
self.item_info_list = []
|
|
self.item_hide_flags = []
|
|
self.hide_labels = []
|
|
self.show_labels = []
|
|
self.filter_items = None
|
|
self.filters = None
|
|
gc.collect()
|
|
|
|
def reset(self):
|
|
self.__init__()
|
|
|
|
def _set(self, data, label: str):
|
|
self.labels.set(label)
|
|
self.current_item[label] = data
|
|
|
|
def _next(self):
|
|
self.item_list.append(self.current_item.copy())
|
|
self.current_item = {}
|
|
self.item_info_list.append(self.current_item_info)
|
|
self.current_item_info = None
|
|
self.item_hide_flags.append(False)
|
|
|
|
def add_item(self, i: Union[ModelInfo, DatasetInfo, SpaceInfo]):
|
|
self.current_item_info = i
|
|
if isinstance(i, ModelInfo): type = "model"
|
|
elif isinstance(i, DatasetInfo): type = "dataset"
|
|
elif isinstance(i, SpaceInfo): type = "space"
|
|
else: return
|
|
self._set(type, "Type")
|
|
self._set(i.id, "ID")
|
|
if i.likes is not None: self._set(i.likes, "Likes")
|
|
if i.last_modified is not None: self._set(date_to_str(i.last_modified), "LastMod.")
|
|
if i.trending_score is not None: self._set(int(i.trending_score), "Trending")
|
|
if i.tags is not None: self._set("True" if "not-for-all-audiences" in i.tags else "False", "NFAA")
|
|
if type in ["model", "dataset"]:
|
|
if i.gated is not None: self._set(i.gated if i.gated else "off", "Gated")
|
|
if i.downloads is not None: self._set(i.downloads, "DLs")
|
|
if i.downloads_all_time is not None: self._set(i.downloads_all_time, "AllDLs")
|
|
if type == "model":
|
|
if i.inference is not None: self._set(i.inference, "Status")
|
|
if i.library_name is not None: self._set(i.library_name, "Library")
|
|
if i.pipeline_tag is not None: self._set(i.pipeline_tag, "Pipeline")
|
|
if type == "space":
|
|
if i.runtime is not None:
|
|
self._set(i.runtime.hardware, "Hardware")
|
|
self._set(i.runtime.stage, "Stage")
|
|
self._next()
|
|
|
|
def search(self, repo_types: list, sort: str, sort_method: str, filter_str: str, search_str: str, author: str, tags: str, infer: str, gated: str, appr: list[str],
|
|
size_categories: list, limit: int, hardware: list, stage: list, fetch_detail: list, show_labels: list):
|
|
try:
|
|
self.reset()
|
|
self.show_labels = show_labels.copy()
|
|
api = HfApi()
|
|
kwargs = {}
|
|
mkwargs = {}
|
|
dkwargs = {}
|
|
skwargs = {}
|
|
if filter_str: kwargs["filter"] = str_to_list(filter_str)
|
|
if search_str: kwargs["search"] = search_str
|
|
if author: kwargs["author"] = author
|
|
if tags and is_valid_arg(tags):
|
|
mkwargs["tags"] = str_to_list(tags)
|
|
dkwargs["tags"] = str_to_list(tags)
|
|
if limit > 0: kwargs["limit"] = limit
|
|
if sort_method == "descending order": kwargs["direction"] = -1
|
|
if gated == "gated":
|
|
mkwargs["gated"] = True
|
|
dkwargs["gated"] = True
|
|
elif gated == "non-gated":
|
|
mkwargs["gated"] = False
|
|
dkwargs["gated"] = False
|
|
mkwargs["sort"] = sort
|
|
if len(size_categories) > 0: dkwargs["size_categories"] = size_categories
|
|
if infer != "all": mkwargs["inference"] = infer
|
|
if "model" in repo_types:
|
|
models = api.list_models(full=True, cardData=True, **kwargs, **mkwargs)
|
|
for model in models:
|
|
if model.gated is not None and model.gated and model.gated not in appr: continue
|
|
self.add_item(model)
|
|
if "dataset" in repo_types:
|
|
datasets = api.list_datasets(full=True, **kwargs, **dkwargs)
|
|
for dataset in datasets:
|
|
if dataset.gated is not None and dataset.gated and dataset.gated not in appr: continue
|
|
self.add_item(dataset)
|
|
if "space" in repo_types:
|
|
if "Space Runtime" in fetch_detail:
|
|
spaces = api.list_spaces(expand=["cardData", "datasets", "disabled", "lastModified", "createdAt",
|
|
"likes", "models", "private", "runtime", "sdk", "sha", "tags", "trendingScore"], **kwargs, **skwargs)
|
|
else: spaces = api.list_spaces(full=True, **kwargs, **skwargs)
|
|
for space in spaces:
|
|
if space.gated is not None and space.gated and space.gated not in appr: continue
|
|
if space.runtime is not None:
|
|
if len(hardware) > 0 and space.runtime.stage == "RUNNING" and space.runtime.hardware not in hardware: continue
|
|
if len(stage) > 0 and space.runtime.stage not in stage: continue
|
|
self.add_item(space)
|
|
if sort == "downloads" and ("space" not in repo_types): self.sort("DLs")
|
|
elif sort == "downloads_all_time" and ("space" not in repo_types): self.sort("AllDLs")
|
|
elif sort == "likes": self.sort("Likes")
|
|
elif sort == "trending_score": self.sort("Trending")
|
|
else: self.sort("LastMod.")
|
|
except Exception as e:
|
|
raise Exception(f"Search error: {e}") from e
|
|
|
|
def get(self):
|
|
labels, label_types = self.labels.get()
|
|
self._do_filter()
|
|
dflist = [[item.get(l, self.labels.get_null_value(t)) for l, t in zip(labels, label_types)] for item, is_hide in zip(self.item_list, self.item_hide_flags) if not is_hide]
|
|
df = self._to_pandas(dflist, labels)
|
|
show_label_types = [t for l, t in zip(labels, label_types) if l not in self.hide_labels and l in self.show_labels]
|
|
show_labels = [l for l in labels if l not in self.hide_labels and l in self.show_labels]
|
|
return df, show_labels, show_label_types
|
|
|
|
def _to_pandas(self, dflist: list, labels: list):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def rank_df(sdf: pd.DataFrame, df: pd.DataFrame, col: str):
|
|
ranks = [(0.5, "gold"), (0.75, "orange"), (0.9, "orangered")]
|
|
for t, color in ranks:
|
|
sdf.loc[df[col] >= df[col].quantile(q=t), [col]] = f'color: {color}'
|
|
return sdf
|
|
|
|
def highlight_df(x: pd.DataFrame, df: pd.DataFrame):
|
|
sdf = pd.DataFrame("", index=x.copy().index, columns=x.copy().columns)
|
|
columns = df.columns
|
|
if "Trending" in columns: sdf = rank_df(sdf, df, "Trending")
|
|
if "Likes" in columns: sdf = rank_df(sdf, df, "Likes")
|
|
if "AllDLs" in columns: sdf = rank_df(sdf, df, "AllDLs")
|
|
if "DLs" in columns: sdf = rank_df(sdf, df, "DLs")
|
|
if "Status" in columns:
|
|
sdf.loc[df["Status"] == "warm", ["Type"]] = 'color: orange'
|
|
sdf.loc[df["Status"] == "cold", ["Type"]] = 'color: dodgerblue'
|
|
if "Gated" in columns:
|
|
sdf.loc[df["Gated"] == "auto", ["Gated"]] = 'color: dodgerblue'
|
|
sdf.loc[df["Gated"] == "manual", ["Gated"]] = 'color: crimson'
|
|
if "Stage" in columns and "Hardware" in columns:
|
|
sdf.loc[(df["Stage"] == "RUNNING") & (df["Hardware"] != "zero-a10g") & (df["Hardware"] != "cpu-basic") & (df["Hardware"] != "None") & (df["Hardware"]), ["Hardware", "Type"]] = 'color: lime'
|
|
sdf.loc[(df["Stage"] == "RUNNING") & (df["Hardware"] == "zero-a10g"), ["Hardware", "Type"]] = 'color: green'
|
|
sdf.loc[(df["Type"] == "space") & (df["Stage"] != "RUNNING")] = 'opacity: 0.5'
|
|
sdf.loc[(df["Type"] == "space") & (df["Stage"] != "RUNNING"), ["Type"]] = 'color: crimson'
|
|
sdf.loc[df["Stage"] == "RUNNING", ["Stage"]] = 'color: lime'
|
|
if "NFAA" in columns: sdf.loc[df["NFAA"] == "True", ["Type"]] = 'background-color: hotpink'
|
|
show_columns = x.copy().columns
|
|
style_columns = sdf.columns
|
|
drop_columns = [c for c in style_columns if c not in show_columns]
|
|
sdf = sdf.drop(drop_columns, axis=1)
|
|
return sdf
|
|
|
|
def id_to_md(df: pd.DataFrame):
|
|
if df["Type"] == "dataset": return f'[{df["ID"]}](https://hf.co/datasets/{df["ID"]})'
|
|
elif df["Type"] == "space": return f'[{df["ID"]}](https://hf.co/spaces/{df["ID"]})'
|
|
else: return f'[{df["ID"]}](https://hf.co/{df["ID"]})'
|
|
|
|
def format_md_df(df: pd.DataFrame):
|
|
df["ID"] = df.apply(id_to_md, axis=1)
|
|
return df
|
|
|
|
hide_labels = [l for l in labels if l in self.hide_labels or l not in self.show_labels]
|
|
df = format_md_df(pd.DataFrame(dflist, columns=labels))
|
|
ref_df = df.copy()
|
|
df = df.drop(hide_labels, axis=1).style.apply(highlight_df, axis=None, df=ref_df)
|
|
return df
|
|
|
|
def set_hide(self, hide_labels: list):
|
|
self.hide_labels = hide_labels.copy()
|
|
|
|
def set_filter(self, filter_item1: str, filter1: str):
|
|
if not filter_item1 and not filter1:
|
|
self.filter_items = None
|
|
self.filters = None
|
|
else:
|
|
self.filter_items = [filter_item1]
|
|
self.filters = [filter1]
|
|
|
|
def _do_filter(self):
|
|
if self.filters is None or self.filter_items is None:
|
|
self.item_hide_flags = [False] * len(self.item_list)
|
|
return
|
|
labels, label_types = self.labels.get()
|
|
types = dict(zip(labels, label_types))
|
|
flags = []
|
|
for item in self.item_list:
|
|
flag = False
|
|
for i, f in zip(self.filter_items, self.filters):
|
|
if i not in item.keys(): continue
|
|
t = types[i]
|
|
if item[i] == self.labels.get_null_value(t):
|
|
flag = True
|
|
break
|
|
if t in set(["str", "markdown"]):
|
|
if f in item[i]: flag = False
|
|
else:
|
|
flag = True
|
|
break
|
|
flags.append(flag)
|
|
self.item_hide_flags = flags
|
|
|
|
def sort(self, key="Likes"):
|
|
if len(self.item_list) == 0: raise Exception("No item found.")
|
|
if not key in self.labels.get()[0]: key = "Likes"
|
|
self.item_list, self.item_hide_flags, self.item_info_list = zip(*sorted(zip(self.item_list, self.item_hide_flags, self.item_info_list), key=lambda x: x[0][key], reverse=True))
|
|
|
|
def get_gr_df(self):
|
|
df, labels, label_types = self.get()
|
|
widths = self.labels.get_widths()
|
|
return gr.update(type="pandas", value=df, headers=labels, datatype=label_types, column_widths=widths, wrap=True)
|
|
|
|
def get_gr_hide_labels(self):
|
|
return gr.update(choices=self.labels.get()[0], value=[], visible=True)
|
|
|
|
def get_gr_filter_item(self, filter_item: str=""):
|
|
labels, label_types = self.labels.get()
|
|
choices = [s for s, t in zip(labels, label_types) if t in set(["str", "markdown"])]
|
|
if len(choices) == 0: choices = [""]
|
|
return gr.update(choices=choices, value=filter_item if filter_item else choices[0], visible=True)
|
|
|
|
def get_gr_filter(self, filter_item: str=""):
|
|
labels = self.labels.get()[0]
|
|
if not filter_item or filter_item not in set(labels): return gr.update(choices=[""], value="", visible=True)
|
|
d = {}
|
|
for item in self.item_list:
|
|
if filter_item not in item.keys(): continue
|
|
v = item[filter_item]
|
|
if v in d.keys(): d[v] += 1
|
|
else: d[v] = 1
|
|
return gr.update(choices=[""] + [t[0] for t in sorted(d.items(), key=lambda x : x[1])][:100], value="", visible=True)
|
|
|
|
def search(repo_types: list, sort: str, sort_method: str, filter_str: str, search_str: str, author: str, tags: str, infer: str,
|
|
gated: str, appr: list[str], size_categories: list, limit: int, hardware: list, stage: list, fetch_detail: list, show_labels: list, r: HFSearchResult):
|
|
try:
|
|
r.search(repo_types, sort, sort_method, filter_str, search_str, author, tags, infer, gated, appr, size_categories,
|
|
limit, hardware, stage, fetch_detail, show_labels)
|
|
return r.get_gr_df(), r.get_gr_hide_labels(), r
|
|
except Exception as e:
|
|
raise gr.Error(e)
|
|
|
|
def update_df(hide_labels: list, filter_item1: str, filter1: str, r: HFSearchResult):
|
|
r.set_hide(hide_labels)
|
|
r.set_filter(filter_item1, filter1)
|
|
return r.get_gr_df(), r
|
|
|
|
def update_filter(filter_item1: str, r: HFSearchResult):
|
|
return r.get_gr_filter_item(filter_item1), r.get_gr_filter(filter_item1), gr.update(visible=True), r
|
|
|