from gevent import pywsgi import dotenv dotenv.load_dotenv(override=True) import sys import time import argparse import uvicorn from typing import Union from pydantic import BaseModel from transformers import AutoTokenizer, AutoModelForSequenceClassification import torch import openedai import numpy as np import asyncio app = openedai.OpenAIStub() moderation = None device = "cuda" if torch.cuda.is_available() else "cpu" #device = "cpu" labels = ['hate', 'hate_threatening', 'harassment', 'harassment_threatening', 'self_harm', 'self_harm_intent', 'self_harm_instructions', 'sexual', 'sexual_minors', 'violence', 'violence_graphic', ] label2id = {l:i for i, l in enumerate(labels)} id2label = {i:l for i, l in enumerate(labels)} model_name = "/root/autodl-tmp/duanyu027/moderation_0628" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModelForSequenceClassification.from_pretrained(model_name, num_labels=len(labels),id2label=id2label, label2id=label2id, problem_type = "multi_label_classification") model.to(device) model.eval() #model = torch.quantization.quantize_dynamic( # model, {torch.nn.Linear}, dtype=torch.qint8 #) torch.set_num_threads(1) class ModerationsRequest(BaseModel): model: str = "text-moderation-latest" # or "text-moderation-stable" input: Union[str, list[str]] @app.post("/v1/moderations") async def moderations(request: ModerationsRequest): results = { "id": f"modr-{int(time.time()*1e9)}", "model": "text-moderation-005", "results": [], } if isinstance(request.input, str): request.input = [request.input] thresholds = { "sexual": 0.5, "hate": 0.5, "harassment": 0.5, "self_harm": 0.5, "sexual_minors": 0.9, "hate_threatening": 0.9, "violence_graphic": 0.9, "self_harm_intent": 0.9, "self_harm_instructions": 0.9, "harassment_threatening": 0.9, "violence": 0.5, } for text in request.input: predictions = await predict(text, model, tokenizer) category_scores = {labels[i]: predictions[0][i].item() for i in range(len(labels))} detect = {key: score > thresholds[key] for key, score in category_scores.items()} detected = any(detect.values()) results['results'].append({ 'flagged': detected, 'categories': detect, 'category_scores': category_scores, }) return results def sigmoid(x): return 1/(1 + np.exp(-x)) def parse_args(argv): parser = argparse.ArgumentParser(description='Moderation API') parser.add_argument('--host', type=str, default='0.0.0.0') parser.add_argument('--port', type=int, default=5002) parser.add_argument('--test-load', action='store_true') return parser.parse_args(argv) async def predict(text, model, tokenizer): encoding = tokenizer.encode_plus( text, return_tensors='pt' ) input_ids = encoding['input_ids'].to(device) attention_mask = encoding['attention_mask'].to(device) # 运行模型预测在独立的线程中 def _predict(): with torch.no_grad(): outputs = model(input_ids, attention_mask=attention_mask) return torch.sigmoid(outputs.logits) loop = asyncio.get_running_loop() predictions = await loop.run_in_executor(None, _predict) # 清理 GPU 内存 del input_ids del attention_mask torch.cuda.empty_cache() return predictions # Main if __name__ == "__main__": args = parse_args(sys.argv[1:]) # start API print(f'Starting moderations[{device}] API on {args.host}:{args.port}', file=sys.stderr) app.register_model('text-moderations-latest', 'text-moderations-stable') app.register_model('text-moderations-005', 'text-moderations-ifmain') if not args.test_load: uvicorn.run(app, host=args.host, port=args.port)