|
from gevent import pywsgi |
|
import dotenv |
|
dotenv.load_dotenv(override=True) |
|
|
|
import sys |
|
import time |
|
import argparse |
|
import uvicorn |
|
from typing import Union |
|
from pydantic import BaseModel |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification |
|
import torch |
|
import openedai |
|
import numpy as np |
|
import onnxruntime as ort |
|
import asyncio |
|
|
|
app = openedai.OpenAIStub() |
|
moderation = None |
|
device = "cpu" if torch.cuda.is_available() else "cpu" |
|
|
|
|
|
labels = ['hate', |
|
'hate_threatening', |
|
'harassment', |
|
'harassment_threatening', |
|
'self_harm', |
|
'self_harm_intent', |
|
'self_harm_instructions', |
|
'sexual', |
|
'sexual_minors', |
|
'violence', |
|
'violence_graphic', |
|
] |
|
|
|
label2id = {l:i for i, l in enumerate(labels)} |
|
id2label = {i:l for i, l in enumerate(labels)} |
|
model_name = "/root/autodl-tmp/moderation_0703_deberta_v3_small_onnx" |
|
tokenizer = AutoTokenizer.from_pretrained(model_name) |
|
model = ort.InferenceSession(model_name + "/model.onnx") |
|
|
|
torch.set_num_threads(1) |
|
class ModerationsRequest(BaseModel): |
|
model: str = "text-moderation-latest" |
|
input: Union[str, list[str]] |
|
|
|
@app.post("/v1/moderations") |
|
async def moderations(request: ModerationsRequest): |
|
results = { |
|
"id": f"modr-{int(time.time()*1e9)}", |
|
"model": "text-moderation-005", |
|
"results": [], |
|
} |
|
if isinstance(request.input, str): |
|
request.input = [request.input] |
|
|
|
thresholds = { |
|
"sexual": 0.1, |
|
"hate": 0.25, |
|
"harassment": 0.5, |
|
"self_harm": 0.25, |
|
"sexual_minors": 0.5, |
|
"hate_threatening": 0.2, |
|
"violence_graphic": 0.25, |
|
"self_harm_intent": 0.2, |
|
"self_harm_instructions": 0.25, |
|
"harassment_threatening": 0.1, |
|
"violence": 0.25, |
|
} |
|
|
|
for text in request.input: |
|
predictions = await predict(text, model, tokenizer) |
|
category_scores = {labels[i]: predictions[0][i].item() for i in range(len(labels))} |
|
detect = {key: score > thresholds[key] for key, score in category_scores.items()} |
|
detected = any(detect.values()) |
|
|
|
results['results'].append({ |
|
'flagged': detected, |
|
'categories': detect, |
|
'category_scores': category_scores, |
|
}) |
|
|
|
return results |
|
def sigmoid(x): |
|
return 1/(1 + np.exp(-x)) |
|
|
|
def parse_args(argv): |
|
parser = argparse.ArgumentParser(description='Moderation API') |
|
parser.add_argument('--host', type=str, default='0.0.0.0') |
|
parser.add_argument('--port', type=int, default=5002) |
|
parser.add_argument('--test-load', action='store_true') |
|
return parser.parse_args(argv) |
|
|
|
async def predict(text, ort_session, tokenizer): |
|
|
|
encoding = tokenizer.encode_plus( |
|
text, |
|
return_tensors='np' |
|
) |
|
input_ids = encoding['input_ids'] |
|
attention_mask = encoding['attention_mask'] |
|
|
|
|
|
def _predict(): |
|
|
|
ort_inputs = { |
|
ort_session.get_inputs()[0].name: input_ids, |
|
ort_session.get_inputs()[1].name: attention_mask |
|
} |
|
|
|
ort_outs = ort_session.run(None, ort_inputs) |
|
return torch.sigmoid(torch.from_numpy(ort_outs[0])) |
|
|
|
|
|
loop = asyncio.get_running_loop() |
|
predictions = await loop.run_in_executor(None, _predict) |
|
|
|
return predictions |
|
|
|
if __name__ == "__main__": |
|
|
|
args = parse_args(sys.argv[1:]) |
|
|
|
print(f'Starting moderations[{device}] API on {args.host}:{args.port}', file=sys.stderr) |
|
app.register_model('text-moderations-latest', 'text-moderations-stable') |
|
app.register_model('text-moderations-005', 'text-moderations-ifmain') |
|
|
|
if not args.test_load: |
|
uvicorn.run(app, host=args.host, port=args.port) |
|
|