testwarm / hfsearch.py
John6666's picture
Upload 8 files
a8f6f3c verified
import spaces
import gradio as gr
from huggingface_hub import HfApi, ModelInfo, DatasetInfo, SpaceInfo
from typing import Union
import gc
import pandas as pd
import datetime
import json
import re
from hfconstants import DS_SIZE_CATEGORIES, SPACE_HARDWARES, SPACE_STAGES
@spaces.GPU
def dummy_gpu():
pass
RESULT_ITEMS = {
"Type": [1, "str", True],
"ID": [2, "markdown", True, "40%"],
"Status": [4, "markdown", True],
"Gated": [6, "str", True],
"Likes": [10, "number", True],
"DLs": [12, "number", True],
"AllDLs": [13, "number", False],
"Trending": [16, "number", True],
"LastMod.": [17, "str", True],
"Library": [20, "markdown", False],
"Pipeline": [21, "markdown", True],
"Hardware": [25, "str", False],
"Stage": [26, "str", False],
"NFAA": [40, "str", False],
}
try:
with open("tags.json", encoding="utf-8") as f:
TAGS = json.load(f)
with open("subtags.json", encoding="utf-8") as f:
SUBTAGS = json.load(f)
except Exception as e:
TAGS = []
SUBTAGS = {}
print(e)
def get_tags():
return TAGS[0:1000]
def get_subtag_categories():
return list(SUBTAGS.keys())
def update_subtag_items(category: str):
choices=[""] + list(SUBTAGS.get(category, []))
return gr.update(choices=choices, value=choices[0])
def update_subtags(tags: str, category: str, item: str):
addtag = f"{category}:{item}" if item else ""
newtags = f"{tags}\n{addtag}" if tags else addtag
return newtags
def update_tags(tags: str, item: str):
newtags = f"{tags}\n{item}" if tags else item
return newtags
def str_to_list(s: str):
try:
m = re.split("\n", s)
return [s.strip() for s in list(m)]
except Exception:
return []
def is_valid_arg(s: str):
return len(str_to_list(s)) > 0
def get_labels():
return list(RESULT_ITEMS.keys())
def get_valid_labels():
return [k for k in list(RESULT_ITEMS.keys()) if RESULT_ITEMS[k][2]]
def date_to_str(dt: datetime.datetime):
return dt.strftime('%Y-%m-%d %H:%M')
class Labels():
VALID_DTYPE = ["str", "number", "bool", "date", "markdown"]
def __init__(self):
self.types = {}
self.orders = {}
self.widths = {}
def set(self, label: str):
if not label in RESULT_ITEMS.keys(): raise Exception(f"Invalid item: {label}")
item = RESULT_ITEMS.get(label)
if item[1] not in self.VALID_DTYPE: raise Exception(f"Invalid data type: {type}")
self.types[label] = item[1]
self.orders[label] = item[0]
if len(item) > 3: self.widths[label] = item[3]
else: self.widths[label] = "10%"
def get(self):
labels = list(self.types.keys())
labels.sort(key=lambda x: self.orders[x])
label_types = [self.types[s] for s in labels]
return labels, label_types
def get_widths(self):
labels = list(self.types.keys())
label_widths = [self.widths[s] for s in labels]
return label_widths
def get_null_value(self, type: str):
if type == "bool": return False
elif type == "number" or type == "date": return 0
else: return "None"
# https://huggingface.co/docs/huggingface_hub/package_reference/hf_api
# https://huggingface.co/docs/huggingface_hub/package_reference/hf_api#huggingface_hub.ModelInfo
class HFSearchResult():
def __init__(self):
self.labels = Labels()
self.current_item = {}
self.current_item_info = None
self.item_list = []
self.item_info_list = []
self.item_hide_flags = []
self.hide_labels = []
self.show_labels = []
self.filter_items = None
self.filters = None
gc.collect()
def reset(self):
self.__init__()
def _set(self, data, label: str):
self.labels.set(label)
self.current_item[label] = data
def _next(self):
self.item_list.append(self.current_item.copy())
self.current_item = {}
self.item_info_list.append(self.current_item_info)
self.current_item_info = None
self.item_hide_flags.append(False)
def add_item(self, i: Union[ModelInfo, DatasetInfo, SpaceInfo]):
self.current_item_info = i
if isinstance(i, ModelInfo): type = "model"
elif isinstance(i, DatasetInfo): type = "dataset"
elif isinstance(i, SpaceInfo): type = "space"
else: return
self._set(type, "Type")
self._set(i.id, "ID")
if i.likes is not None: self._set(i.likes, "Likes")
if i.last_modified is not None: self._set(date_to_str(i.last_modified), "LastMod.")
if i.trending_score is not None: self._set(int(i.trending_score), "Trending")
if i.tags is not None: self._set("True" if "not-for-all-audiences" in i.tags else "False", "NFAA")
if type in ["model", "dataset"]:
if i.gated is not None: self._set(i.gated if i.gated else "off", "Gated")
if i.downloads is not None: self._set(i.downloads, "DLs")
if i.downloads_all_time is not None: self._set(i.downloads_all_time, "AllDLs")
if type == "model":
if i.inference is not None: self._set(i.inference, "Status")
if i.library_name is not None: self._set(i.library_name, "Library")
if i.pipeline_tag is not None: self._set(i.pipeline_tag, "Pipeline")
if type == "space":
if i.runtime is not None:
self._set(i.runtime.hardware, "Hardware")
self._set(i.runtime.stage, "Stage")
self._next()
def search(self, repo_types: list, sort: str, sort_method: str, filter_str: str, search_str: str, author: str, tags: str, infer: str, gated: str, appr: list[str],
size_categories: list, limit: int, hardware: list, stage: list, fetch_detail: list, show_labels: list):
try:
self.reset()
self.show_labels = show_labels.copy()
api = HfApi()
kwargs = {}
mkwargs = {}
dkwargs = {}
skwargs = {}
if filter_str: kwargs["filter"] = str_to_list(filter_str)
if search_str: kwargs["search"] = search_str
if author: kwargs["author"] = author
if tags and is_valid_arg(tags):
mkwargs["tags"] = str_to_list(tags)
dkwargs["tags"] = str_to_list(tags)
if limit > 0: kwargs["limit"] = limit
if sort_method == "descending order": kwargs["direction"] = -1
if gated == "gated":
mkwargs["gated"] = True
dkwargs["gated"] = True
elif gated == "non-gated":
mkwargs["gated"] = False
dkwargs["gated"] = False
mkwargs["sort"] = sort
if len(size_categories) > 0: dkwargs["size_categories"] = size_categories
if infer != "all": mkwargs["inference"] = infer
if "model" in repo_types:
models = api.list_models(full=True, cardData=True, **kwargs, **mkwargs)
for model in models:
if model.gated is not None and model.gated and model.gated not in appr: continue
self.add_item(model)
if "dataset" in repo_types:
datasets = api.list_datasets(full=True, **kwargs, **dkwargs)
for dataset in datasets:
if dataset.gated is not None and dataset.gated and dataset.gated not in appr: continue
self.add_item(dataset)
if "space" in repo_types:
if "Space Runtime" in fetch_detail:
spaces = api.list_spaces(expand=["cardData", "datasets", "disabled", "lastModified", "createdAt",
"likes", "models", "private", "runtime", "sdk", "sha", "tags", "trendingScore"], **kwargs, **skwargs)
else: spaces = api.list_spaces(full=True, **kwargs, **skwargs)
for space in spaces:
if space.gated is not None and space.gated and space.gated not in appr: continue
if space.runtime is not None:
if len(hardware) > 0 and space.runtime.stage == "RUNNING" and space.runtime.hardware not in hardware: continue
if len(stage) > 0 and space.runtime.stage not in stage: continue
self.add_item(space)
if sort == "downloads" and ("space" not in repo_types): self.sort("DLs")
elif sort == "downloads_all_time" and ("space" not in repo_types): self.sort("AllDLs")
elif sort == "likes": self.sort("Likes")
elif sort == "trending_score": self.sort("Trending")
else: self.sort("LastMod.")
except Exception as e:
raise Exception(f"Search error: {e}") from e
def get(self):
labels, label_types = self.labels.get()
self._do_filter()
dflist = [[item.get(l, self.labels.get_null_value(t)) for l, t in zip(labels, label_types)] for item, is_hide in zip(self.item_list, self.item_hide_flags) if not is_hide]
df = self._to_pandas(dflist, labels)
show_label_types = [t for l, t in zip(labels, label_types) if l not in self.hide_labels and l in self.show_labels]
show_labels = [l for l in labels if l not in self.hide_labels and l in self.show_labels]
return df, show_labels, show_label_types
def _to_pandas(self, dflist: list, labels: list):
# https://pandas.pydata.org/docs/reference/api/pandas.io.formats.style.Styler.apply.html
# https://stackoverflow.com/questions/41654949/pandas-style-function-to-highlight-specific-columns
# https://stackoverflow.com/questions/69832206/pandas-styling-with-conditional-rules
# https://stackoverflow.com/questions/41203959/conditionally-format-python-pandas-cell
# https://stackoverflow.com/questions/51187868/how-do-i-remove-and-re-sort-reindex-columns-after-applying-style-in-python-pan
# https://stackoverflow.com/questions/36921951/truth-value-of-a-series-is-ambiguous-use-a-empty-a-bool-a-item-a-any-o
def rank_df(sdf: pd.DataFrame, df: pd.DataFrame, col: str):
ranks = [(0.5, "gold"), (0.75, "orange"), (0.9, "orangered")]
for t, color in ranks:
sdf.loc[df[col] >= df[col].quantile(q=t), [col]] = f'color: {color}'
return sdf
def highlight_df(x: pd.DataFrame, df: pd.DataFrame):
sdf = pd.DataFrame("", index=x.copy().index, columns=x.copy().columns)
columns = df.columns
if "Trending" in columns: sdf = rank_df(sdf, df, "Trending")
if "Likes" in columns: sdf = rank_df(sdf, df, "Likes")
if "AllDLs" in columns: sdf = rank_df(sdf, df, "AllDLs")
if "DLs" in columns: sdf = rank_df(sdf, df, "DLs")
if "Status" in columns:
sdf.loc[df["Status"] == "warm", ["Type"]] = 'color: orange'
sdf.loc[df["Status"] == "cold", ["Type"]] = 'color: dodgerblue'
if "Gated" in columns:
sdf.loc[df["Gated"] == "auto", ["Gated"]] = 'color: dodgerblue'
sdf.loc[df["Gated"] == "manual", ["Gated"]] = 'color: crimson'
if "Stage" in columns and "Hardware" in columns:
sdf.loc[(df["Stage"] == "RUNNING") & (df["Hardware"] != "zero-a10g") & (df["Hardware"] != "cpu-basic") & (df["Hardware"] != "None") & (df["Hardware"]), ["Hardware", "Type"]] = 'color: lime'
sdf.loc[(df["Stage"] == "RUNNING") & (df["Hardware"] == "zero-a10g"), ["Hardware", "Type"]] = 'color: green'
sdf.loc[(df["Type"] == "space") & (df["Stage"] != "RUNNING")] = 'opacity: 0.5'
sdf.loc[(df["Type"] == "space") & (df["Stage"] != "RUNNING"), ["Type"]] = 'color: crimson'
sdf.loc[df["Stage"] == "RUNNING", ["Stage"]] = 'color: lime'
if "NFAA" in columns: sdf.loc[df["NFAA"] == "True", ["Type"]] = 'background-color: hotpink'
show_columns = x.copy().columns
style_columns = sdf.columns
drop_columns = [c for c in style_columns if c not in show_columns]
sdf = sdf.drop(drop_columns, axis=1)
return sdf
def id_to_md(df: pd.DataFrame):
if df["Type"] == "dataset": return f'[{df["ID"]}](https://hf.co/datasets/{df["ID"]})'
elif df["Type"] == "space": return f'[{df["ID"]}](https://hf.co/spaces/{df["ID"]})'
else: return f'[{df["ID"]}](https://hf.co/{df["ID"]})'
def format_md_df(df: pd.DataFrame):
df["ID"] = df.apply(id_to_md, axis=1)
return df
hide_labels = [l for l in labels if l in self.hide_labels or l not in self.show_labels]
df = format_md_df(pd.DataFrame(dflist, columns=labels))
ref_df = df.copy()
df = df.drop(hide_labels, axis=1).style.apply(highlight_df, axis=None, df=ref_df)
return df
def set_hide(self, hide_labels: list):
self.hide_labels = hide_labels.copy()
def set_filter(self, filter_item1: str, filter1: str):
if not filter_item1 and not filter1:
self.filter_items = None
self.filters = None
else:
self.filter_items = [filter_item1]
self.filters = [filter1]
def _do_filter(self):
if self.filters is None or self.filter_items is None:
self.item_hide_flags = [False] * len(self.item_list)
return
labels, label_types = self.labels.get()
types = dict(zip(labels, label_types))
flags = []
for item in self.item_list:
flag = False
for i, f in zip(self.filter_items, self.filters):
if i not in item.keys(): continue
t = types[i]
if item[i] == self.labels.get_null_value(t):
flag = True
break
if t in set(["str", "markdown"]):
if f in item[i]: flag = False
else:
flag = True
break
flags.append(flag)
self.item_hide_flags = flags
def sort(self, key="Likes"):
if len(self.item_list) == 0: raise Exception("No item found.")
if not key in self.labels.get()[0]: key = "Likes"
self.item_list, self.item_hide_flags, self.item_info_list = zip(*sorted(zip(self.item_list, self.item_hide_flags, self.item_info_list), key=lambda x: x[0][key], reverse=True))
def get_gr_df(self):
df, labels, label_types = self.get()
widths = self.labels.get_widths()
return gr.update(type="pandas", value=df, headers=labels, datatype=label_types, column_widths=widths, wrap=True)
def get_gr_hide_labels(self):
return gr.update(choices=self.labels.get()[0], value=[], visible=True)
def get_gr_filter_item(self, filter_item: str=""):
labels, label_types = self.labels.get()
choices = [s for s, t in zip(labels, label_types) if t in set(["str", "markdown"])]
if len(choices) == 0: choices = [""]
return gr.update(choices=choices, value=filter_item if filter_item else choices[0], visible=True)
def get_gr_filter(self, filter_item: str=""):
labels = self.labels.get()[0]
if not filter_item or filter_item not in set(labels): return gr.update(choices=[""], value="", visible=True)
d = {}
for item in self.item_list:
if filter_item not in item.keys(): continue
v = item[filter_item]
if v in d.keys(): d[v] += 1
else: d[v] = 1
return gr.update(choices=[""] + [t[0] for t in sorted(d.items(), key=lambda x : x[1])][:100], value="", visible=True)
def search(repo_types: list, sort: str, sort_method: str, filter_str: str, search_str: str, author: str, tags: str, infer: str,
gated: str, appr: list[str], size_categories: list, limit: int, hardware: list, stage: list, fetch_detail: list, show_labels: list, r: HFSearchResult):
try:
r.search(repo_types, sort, sort_method, filter_str, search_str, author, tags, infer, gated, appr, size_categories,
limit, hardware, stage, fetch_detail, show_labels)
return r.get_gr_df(), r.get_gr_hide_labels(), r
except Exception as e:
raise gr.Error(e)
def update_df(hide_labels: list, filter_item1: str, filter1: str, r: HFSearchResult):
r.set_hide(hide_labels)
r.set_filter(filter_item1, filter1)
return r.get_gr_df(), r
def update_filter(filter_item1: str, r: HFSearchResult):
return r.get_gr_filter_item(filter_item1), r.get_gr_filter(filter_item1), gr.update(visible=True), r