Spaces:
Sleeping
Sleeping
import tensorflow as tf | |
import coremltools as ct | |
import numpy as np | |
import PIL | |
from huggingface_hub import hf_hub_download | |
from huggingface_hub import snapshot_download | |
import os | |
import math | |
# Helper class to extract features from one model, and then feed those features into a classification head | |
# Because coremltools will only perform inference on OSX, an alternative tensorflow inference pipeline uses | |
# a tensorflow feature extractor and feeds the features into a dynamically created keras model based on the coreml classification head. | |
class CoreMLPipeline: | |
def __init__(self, config, auth_key, use_tf): | |
self.config = config | |
self.use_tf = use_tf | |
if use_tf: | |
extractor_path = snapshot_download(repo_id=config["tf_extractor_repoid"], use_auth_token = auth_key) | |
else: | |
extractor_path = hf_hub_download(repo_id=config["coreml_extractor_repoid"], | |
filename=config["coreml_extractor_path"], use_auth_token = auth_key) | |
classifier_path = hf_hub_download(repo_id=config["coreml_classifier_repoid"], filename=config["coreml_classifier_path"], | |
use_auth_token = auth_key) | |
print(f"Loading extractor...{extractor_path}") | |
if use_tf: | |
self.extractor = tf.saved_model.load(os.path.join(extractor_path, config["tf_extractor_path"])) | |
else: | |
self.extractor = ct.models.MLModel(extractor_path) | |
print(f"Loading classifier...{classifier_path}") | |
self.classifier = ct.models.MLModel(classifier_path) | |
if use_tf: | |
self.make_keras_model() | |
#unquantizes values if quantized | |
def realize_weights(self, nnWeights, width): | |
if nnWeights.quantization.numberOfBits == 0: | |
if len(nnWeights.float16Value) > 0: | |
weights = np.frombuffer(nnWeights.float16Value, dtype=np.float16) | |
print(f"found 16 bit {len(nnWeights.float16Value)/2} values") | |
else: | |
weights = np.array(nnWeights.floatValue) | |
elif nnWeights.quantization.numberOfBits == 8: | |
scales = np.array(nnWeights.quantization.linearQuantization.scale) | |
biases = np.array(nnWeights.quantization.linearQuantization.bias) | |
quantized = nnWeights.rawValue | |
classes = len(scales) | |
weights = [] | |
for i in range(0,classes): | |
scale = scales[i] | |
bias = biases[i] | |
for j in range(0,width): | |
weights.append(quantized[i*width + j] * scale + bias) | |
weights = np.array(weights) | |
else: | |
print(f"Unsupported quantization: {nnWeights.quantization.numberOfBits}") | |
weights = None | |
return weights | |
#Only MacOS can run inference on CoreML models. Convert it to tensorflow to match the tf feature extractor | |
def make_keras_model(self): | |
spec = self.classifier.get_spec() | |
nnClassifier = spec.neuralNetworkClassifier | |
labels = nnClassifier.stringClassLabels.vector | |
input = tf.keras.Input(shape = (1280)) | |
if "activation" in self.config: | |
activation = self.config['activation'] | |
else: | |
activation = "sigmoid" if len(labels) == 1 else "softmax" | |
x = tf.keras.layers.Dense(len(labels), activation = activation)(input) | |
model = tf.keras.Model(input,x, trainable = False) | |
weights = self.realize_weights(nnClassifier.layers[0].innerProduct.weights,1280) | |
weights = weights.reshape((len(labels),1280)) | |
weights = weights.T | |
bias = self.realize_weights(nnClassifier.layers[0].innerProduct.bias, len(labels)) | |
bias.reshape(1,len(labels)) | |
model.set_weights([weights,bias]) | |
self.tf_model = model | |
self.labels = labels | |
import math | |
def softmax_dict(self, input_dict): | |
""" | |
Compute the softmax of a dictionary of values. | |
Args: | |
input_dict (dict): A dictionary with numerical values. | |
Returns: | |
dict: A dictionary with the same keys where the values are the softmax of the input values. | |
""" | |
# Compute the exponential of all the values | |
exp_values = {k: math.exp(v) for k, v in input_dict.items()} | |
# Compute the sum of all exponential values | |
sum_exp_values = sum(exp_values.values()) | |
# Compute the softmax by dividing each exponential value by the sum of all exponential values | |
softmax_values = {k: v / sum_exp_values for k, v in exp_values.items()} | |
return softmax_values | |
def classify(self,resized): | |
if self.use_tf: | |
image = tf.image.convert_image_dtype(resized, tf.float32) | |
image = tf.expand_dims(image, 0) | |
features = self.extractor.signatures['serving_default'](image) | |
input = {"input_1":features["output_1"]} | |
output = self.tf_model.predict(input) | |
results = {} | |
for i,label in enumerate(self.labels): | |
results[label] = output[0][i] | |
else: | |
features = self.extractor.predict({"image":resized}) | |
features = features["Identity"] | |
output = self.classifier.predict({"features":features[0]}) | |
results = output["Identity"] | |
if "activation" in self.config and self.config["activation"] == "softmax": | |
results = self.softmax_dict(results) | |
return results | |