import cachetools from pydantic import BaseModel from llama_cpp import Llama from concurrent.futures import ThreadPoolExecutor, as_completed import re import httpx import asyncio import gradio as gr import os from dotenv import load_dotenv from fastapi import FastAPI, Request from fastapi.responses import JSONResponse import uvicorn from threading import Thread import gptcache import nltk from sklearn.metrics.pairwise import cosine_similarity from sklearn.feature_extraction.text import TfidfVectorizer # Cargar las variables de entorno load_dotenv() HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") # Configuración del caché cache = cachetools.TTLCache(maxsize=100, ttl=60) # Datos globales para almacenar la configuración de los modelos global_data = { 'models': {}, 'tokens': { 'eos': 'eos_token', 'pad': 'pad_token', 'padding': 'padding_token', 'unk': 'unk_token', 'bos': 'bos_token', 'sep': 'sep_token', 'cls': 'cls_token', 'mask': 'mask_token' }, 'model_metadata': {}, 'max_tokens': {}, 'tokenizers': {}, 'model_params': {}, 'model_size': {}, 'model_ftype': {}, 'n_ctx_train': {}, 'n_embd': {}, 'n_layer': {}, 'n_head': {}, 'n_head_kv': {}, 'n_rot': {}, 'n_swa': {}, 'n_embd_head_k': {}, 'n_embd_head_v': {}, 'n_gqa': {}, 'n_embd_k_gqa': {}, 'n_embd_v_gqa': {}, 'f_norm_eps': {}, 'f_norm_rms_eps': {}, 'f_clamp_kqv': {}, 'f_max_alibi_bias': {}, 'f_logit_scale': {}, 'n_ff': {}, 'n_expert': {}, 'n_expert_used': {}, 'causal_attn': {}, 'pooling_type': {}, 'rope_type': {}, 'rope_scaling': {}, 'freq_base_train': {}, 'freq_scale_train': {}, 'n_ctx_orig_yarn': {}, 'rope_finetuned': {}, 'ssm_d_conv': {}, 'ssm_d_inner': {}, 'ssm_d_state': {}, 'ssm_dt_rank': {}, 'ssm_dt_b_c_rms': {}, 'vocab_type': {}, 'model_type': {} } # Configuración de los modelos model_configs = [ { "repo_id": "Hjgugugjhuhjggg/testing_semifinal-Q2_K-GGUF", "filename": "testing_semifinal-q2_k.gguf", "name": "testing" }, { "repo_id": "bartowski/Llama-3.2-3B-Instruct-uncensored-GGUF", "filename": "Llama-3.2-3B-Instruct-uncensored-Q2_K.gguf", "name": "llama-3.2-3B" }, { "repo_id": "Ffftdtd5dtft/Meta-Llama-3.1-70B-Q2_K-GGUF", "filename": "meta-llama-3.1-70b-q2_k.gguf", "name": "meta-llama-3.1-70B" } ] # Asegurémonos de que la función de caché esté definida antes de su uso def cache_response(func): def wrapper(*args, **kwargs): cache_key = f"{args}-{kwargs}" if cache_key in cache: return cache[cache_key] response = func(*args, **kwargs) cache[cache_key] = response return response return wrapper class ModelManager: def __init__(self): self.models = {} def load_model(self, model_config): if model_config['name'] not in self.models: try: self.models[model_config['name']] = Llama.from_pretrained( repo_id=model_config['repo_id'], filename=model_config['filename'], use_auth_token=HUGGINGFACE_TOKEN, n_threads=8, use_gpu=False ) except Exception as e: print(f"Error loading model {model_config['name']}: {e}") def load_all_models(self): with ThreadPoolExecutor() as executor: for config in model_configs: executor.submit(self.load_model, config) return self.models model_manager = ModelManager() global_data['models'] = model_manager.load_all_models() class ChatRequest(BaseModel): message: str # Normalizar entrada def normalize_input(input_text): return input_text.strip() # Eliminar respuestas duplicadas def remove_duplicates(text): lines = text.split('\n') unique_lines = [] seen_lines = set() for line in lines: if line not in seen_lines: unique_lines.append(line) seen_lines.add(line) return '\n'.join(unique_lines) # Función para evaluar la coherencia de las respuestas usando similitud de coseno def get_best_response(responses): # Vectorizar las respuestas usando TF-IDF vectorizer = TfidfVectorizer().fit_transform(responses) # Calcular la similitud de coseno entre las respuestas similarity_matrix = cosine_similarity(vectorizer) # Sumar las similitudes para cada respuesta total_similarities = similarity_matrix.sum(axis=1) # Obtener el índice de la respuesta con mayor similitud best_response_index = total_similarities.argmax() return responses[best_response_index] # Función para generar respuestas de modelos @cache_response def generate_model_response(model, inputs): try: response = model(inputs) return remove_duplicates(response['choices'][0]['text']) except Exception as e: return "" # Procesar mensaje y generar respuestas async def process_message(message): inputs = normalize_input(message) with ThreadPoolExecutor() as executor: futures = [ executor.submit(generate_model_response, model, inputs) for model in global_data['models'].values() ] responses = [ future.result() for future in as_completed(futures) ] # Seleccionar la mejor respuesta basada en similitud best_response = get_best_response(responses) return best_response # API FastAPI app = FastAPI() @app.post("/generate") async def generate(request: ChatRequest): try: response = await process_message(request.message) return JSONResponse(content={"response": response}) except Exception as e: return JSONResponse(content={"error": str(e)}) # Función para iniciar servidor uvicorn def run_uvicorn(): try: uvicorn.run(app, host="0.0.0.0", port=7860) except Exception as e: print(f"Error al ejecutar uvicorn: {e}") # Interfaz Gradio iface = gr.Interface( fn=process_message, inputs=gr.Textbox(lines=2, placeholder="Enter your message here..."), outputs=gr.Markdown(), title="Multi-Model LLM API (CPU Optimized)", description="" ) def run_gradio(): iface.launch(server_port=7862, prevent_thread_lock=True) if __name__ == "__main__": Thread(target=run_uvicorn).start() Thread(target=run_gradio).start() asyncio.get_event_loop().run_forever()