File size: 3,713 Bytes
97c4064
c0f8926
 
 
 
 
 
 
 
97c4064
c0f8926
f8e2041
c0f8926
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
97c4064
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c0f8926
97c4064
 
 
 
 
c0f8926
 
 
 
 
97c4064
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c0f8926
 
 
 
 
97c4064
c0f8926
97c4064
c0f8926
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from typing import Optional
from transformers import pipeline
from pydantic import BaseModel
from fastapi.responses import JSONResponse
from io import BytesIO
import PyPDF2
from newspaper import Article

model_name = "roaltopo/scan-u-doc_question-answer"
qa_pipeline = pipeline(
    "question-answering",
    model=model_name,
)

app = FastAPI()

# Diccionario en memoria para almacenar información
text_storage = {}

class TextInfo(BaseModel):
    text: Optional[str] = None
    pdf: Optional[bytes] = None
    html_url: Optional[str] = None

class QuestionInfo(BaseModel):
    question: str

@app.post("/store_text/{uuid}")
async def store_text(uuid: str, text_info: TextInfo):
    try:
        url = text_info.html_url.strip() if text_info.html_url else None
        if url:
            print('url:', url)
            article = Article(url)
            article.download()
            article.parse()
            text = f'{article.title}\n{article.text}'
        elif text_info.text:
            text = text_info.text
        else:
            raise HTTPException(status_code=400, detail="Invalid Option: 'url' or 'text' required in text_info.")

        # Store information in the in-memory dictionary
        text_storage[uuid] = {
            'text': text,
            'url': text_info.html_url
        }

        return {'success': True}
    except Exception as e:
        error_message = f"Error: {str(e)}"
        print(error_message)
        raise HTTPException(status_code=500, detail="Internal Server Error: An unexpected error occurred.")

# Ruta para cargar un archivo
@app.post("/upload_file/{uuid}")
async def upload_file(uuid: str, file: UploadFile = File(...)):
    try:
        file_extension = file.filename.split('.')[-1].lower()

        if file_extension == 'pdf':
            content = await file.read()
            stream = BytesIO(content)
            reader = PyPDF2.PdfReader(stream)

            extracted_text = ''

            for page_num in range(len(reader.pages)):
                page = reader.pages[page_num]
                tmp = page.extract_text()
                tmp = tmp.replace('\n', ' ')
                tmp = tmp.replace('  ', ' ')
                tmp = tmp.replace('.  ', '.\n')
                extracted_text += tmp
                if len(extracted_text) > 4000:
                    extracted_text = extracted_text[:4000]
                    break

        elif file_extension == 'txt':
            content = await file.read()
            extracted_text = content.decode('utf-8')

        else:
            raise ValueError("Unsupported file format.")

        text_storage[uuid] = {
            'text': extracted_text,
        }

        return JSONResponse(content={'success': True})

    except Exception as e:
        return JSONResponse(content={"message": f"Error while uploading the file: {e}"}, status_code=500)

@app.post("/answer_question/{uuid}")
async def answer_question(uuid: str, question_info: QuestionInfo):
    question = question_info.question

    # Verifica si el texto con el ID existe en el diccionario
    if uuid not in text_storage:
        return {'error': 'Text not found'}

    r = qa_pipeline(question=question, context=text_storage[uuid]['text'], top_k=10)
    return r[0]


app.mount("/", StaticFiles(directory="static", html=True), name="static")

@app.get("/")
def index() -> FileResponse:
    return FileResponse(path="/app/static/index.html", media_type="text/html")