|
from fastapi import FastAPI |
|
import os |
|
import subprocess |
|
import gdown |
|
import h5py |
|
|
|
app = FastAPI() |
|
|
|
@app.get("/") |
|
def greet_json(): |
|
return {"Hello": "World!"} |
|
|
|
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" |
|
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" |
|
os.environ["FONTCONFIG_PATH"] = "/tmp/fontconfig" |
|
os.environ["HF_HOME"] = "/tmp/huggingface_cache" |
|
|
|
os.makedirs("/tmp/matplotlib", exist_ok=True) |
|
os.makedirs("/tmp/fontconfig", exist_ok=True) |
|
os.makedirs("/tmp/huggingface_cache", exist_ok=True) |
|
|
|
from torchaudio.pipelines import WAV2VEC2_BASE |
|
bundle = WAV2VEC2_BASE |
|
model = bundle.get_model() |
|
print("Model downloaded successfully!") |
|
|
|
def reencode_audio(input_path, output_path): |
|
command = [ |
|
'ffmpeg', '-i', input_path, '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', output_path |
|
] |
|
subprocess.run(command, check=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from fastapi import UploadFile |
|
from googleapiclient.http import MediaIoBaseUpload |
|
import io |
|
import PyPDF2 |
|
|
|
Folder_Name = "Document_DB" |
|
file_metadata = { |
|
"name": "Fake", |
|
"mimeType": "application/vnd.google-apps.folder", |
|
} |
|
|
|
def check_folder(service): |
|
try: |
|
resource = service.files() |
|
result = resource.list( |
|
q=f"mimeType = 'application/vnd.google-apps.folder' and 'root' in parents", |
|
fields="nextPageToken, files(id, name)", |
|
).execute() |
|
list_folders = result.get("files") |
|
|
|
folder_id = None |
|
|
|
for folder in list_folders: |
|
if folder["name"] == Folder_Name: |
|
folder_id = folder["id"] |
|
break |
|
|
|
if not folder_id: |
|
folder = service.files().create(body=file_metadata, fields="id").execute() |
|
folder_id = folder["id"] |
|
|
|
return folder_id, "success" |
|
except Exception as e: |
|
print(f"Error occurred while pushing file to DB: {e}") |
|
return None, str(e) |
|
|
|
def extract_text_from_pdf(pdf_file_content): |
|
extracted_text = "" |
|
try: |
|
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file_content)) |
|
num_pages = len(pdf_reader.pages) |
|
for i in range(num_pages): |
|
page = pdf_reader.pages[i] |
|
page_text = page.extract_text() |
|
if "ABSTRACT" in page_text: |
|
extracted_text += page_text + "\n" |
|
break |
|
return extracted_text |
|
except Exception as e: |
|
print("An error occurred:", e) |
|
return None |
|
|
|
async def extract_text_url(file:UploadFile): |
|
try: |
|
file_content = await file.read() |
|
extract_text = extract_text_from_pdf(file_content) |
|
return extract_text, "success" |
|
except Exception as e: |
|
print(f"Error occurred while pushing file to DB: {e}") |
|
return None, str(e) |
|
|
|
|
|
async def push_file_db(service, file: UploadFile): |
|
try: |
|
folder_id, status = check_folder(service) |
|
|
|
if not folder_id: |
|
return [None, None, status] |
|
|
|
file_content = await file.read() |
|
|
|
file_metadata = {"name": file.filename, "parents": [folder_id]} |
|
media = MediaIoBaseUpload(io.BytesIO(file_content), mimetype="application/pdf") |
|
print("hh1") |
|
new_file = ( |
|
service.files() |
|
.create(body=file_metadata, media_body=media, fields="id") |
|
.execute() |
|
) |
|
print("hh2") |
|
service.permissions().create( |
|
fileId=new_file["id"], |
|
body={"role": "reader", "type": "anyone"}, |
|
fields="id", |
|
).execute() |
|
|
|
extracted_text = extract_text_from_pdf(file_content) |
|
|
|
return new_file.get("id"), extracted_text, "success" |
|
|
|
except Exception as e: |
|
print(f"Error occurred while pushing file to DB: {e}") |
|
return None, None, str(e) |
|
|
|
|
|
|
|
import os |
|
import gdown |
|
|
|
file_id = "1zhisRgRi2qBFX73VFhzh-Ho93MORQqVa" |
|
output_dir = "./downloads" |
|
output_file = "file.h5" |
|
|
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
output_path = os.path.join(output_dir, output_file) |
|
|
|
url = f"https://drive.google.com/uc?id={file_id}" |
|
|
|
try: |
|
gdown.download(url, output_path, quiet=False) |
|
print(f"File downloaded successfully to: {output_path}") |
|
except Exception as e: |
|
print(f"Error downloading file: {e}") |
|
|
|
output_file = "file.h5" |
|
file_path = os.path.join(output_dir, output_file) |
|
|
|
|
|
import os |
|
import gdown |
|
|
|
file_id = "1wIaycDFGTF3e0PpAHKk-GLnxk4cMehOU" |
|
output_dir = "./downloads" |
|
output_file = "file2.h5" |
|
|
|
if not os.path.exists(output_dir): |
|
os.makedirs(output_dir) |
|
|
|
output_path = os.path.join(output_dir, output_file) |
|
|
|
url = f"https://drive.google.com/uc?id={file_id}" |
|
|
|
try: |
|
gdown.download(url, output_path, quiet=False) |
|
print(f"File downloaded successfully to: {output_path}") |
|
except Exception as e: |
|
print(f"Error downloading file: {e}") |
|
|
|
output_file = "file2.h5" |
|
file_path = os.path.join(output_dir, output_file) |
|
|
|
|
|
if os.path.exists(file_path): |
|
print(f"The file '{output_file}' exists at '{file_path}'.") |
|
else: |
|
print(f"The file '{output_file}' does not exist at '{file_path}'.") |
|
|
|
|
|
import os |
|
from dotenv import load_dotenv |
|
from googleapiclient.discovery import build |
|
from google_auth_oauthlib.flow import InstalledAppFlow |
|
from google.auth.transport.requests import Request |
|
from google.oauth2.credentials import Credentials |
|
from google.oauth2 import service_account |
|
|
|
SCOPES = ['https://www.googleapis.com/auth/drive'] |
|
|
|
|
|
details = { |
|
"refresh_token": "1//0gYLCF5OE4fTmCgYIARAAGBASNwF-L9Irp3Ik0q5OtsQClcLwW7sxPZSuMboe7wyjteuSuOD_WvavEHfhuTvkSjkLHitkh76XaD4", |
|
"token": "ya29.a0ARW5m753vyDgN_C7kUnnYTkeCfknSnDDj8tuVCe99dL2ieN3IzvCPVoN5kVg49CAYDz-pS5AgpjH7whiy7dr7QhwX4EiGQreJCzu109nlH6kxultrNup5q-_W2dNepbOa5YV8iH7OwP28RjQVR7fs9IlMO7BfnA9hw-WQqXNaCgYKAXMSARMSFQHGX2MieHrC7CpySZFYpoZWln6vxA0175", |
|
"token_uri": "https://oauth2.googleapis.com/token", |
|
"client_id": "573421158717-a2tulr4s7gg6or7sd76336busnmk22vu.apps.googleusercontent.com", |
|
"client_secret": "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", |
|
"scopes": ["https://www.googleapis.com/auth/drive"], |
|
"universe_domain": "googleapis.com", |
|
"account": "", |
|
} |
|
|
|
|
|
def main(): |
|
try: |
|
print(details) |
|
creds = None |
|
creds = Credentials.from_authorized_user_info(details, SCOPES) |
|
|
|
if not creds or not creds.valid: |
|
if creds and creds.expired and creds.refresh_token: |
|
creds.refresh(Request()) |
|
else: |
|
flow = InstalledAppFlow.from_client_secrets_file( |
|
'credentials.json', SCOPES) |
|
creds = flow.run_local_server(port=0) |
|
|
|
service = build('drive', 'v3', credentials=creds) |
|
return service |
|
|
|
except Exception as error: |
|
print(f'An error occurred: {error}') |
|
|
|
|
|
|
|
from fastapi.responses import JSONResponse |
|
from pydantic import BaseModel |
|
from fastapi import FastAPI, File, UploadFile, HTTPException |
|
from typing import List |
|
|
|
service = main() |
|
|
|
@app.get("/api/check") |
|
def check_working(): |
|
try: |
|
return JSONResponse( |
|
{"status": 200, "message": "Server is working fine", "data": None}, |
|
status_code=200, |
|
) |
|
except Exception as e: |
|
return JSONResponse( |
|
{"status": 500, "message": "Server is not working", "error": e}, |
|
) |
|
|
|
@app.get("/") |
|
def read(): |
|
return JSONResponse( |
|
{"status": 200, "message": "Working Successfully"}, status_code=200 |
|
) |
|
|
|
@app.post("/api/save-docs") |
|
async def save_docs(file: UploadFile = File(...)): |
|
try: |
|
print(file.filename) |
|
file_id, extracted_text, status = await push_file_db(service, file) |
|
if not file_id: |
|
return JSONResponse( |
|
{"status": 500, "message": status, "data": None}, status_code=500 |
|
) |
|
print(file_id) |
|
return JSONResponse( |
|
{ |
|
"status": 200, |
|
"message": "Document saved successfully", |
|
"data": { |
|
"file_id": file_id, |
|
"extracted_text": extracted_text, |
|
"title": file.filename, |
|
}, |
|
}, |
|
status_code=200, |
|
) |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return JSONResponse({"status": 500, "message": str(e)}, status_code=500) |
|
|
|
|
|
@app.post("/api/get-abstract") |
|
async def get_abstract(file: UploadFile = File(...)): |
|
try: |
|
print(file.filename) |
|
extracted_text, status = await extract_text_url(file) |
|
if not extracted_text: |
|
return JSONResponse( |
|
{"status": 500, "message": status, "data": None}, status_code=500 |
|
) |
|
return JSONResponse( |
|
{ |
|
"status": 200, |
|
"message": "Document saved successfully", |
|
"data": {"extracted_text": extracted_text, "title": file.filename}, |
|
}, |
|
status_code=200, |
|
) |
|
except Exception as e: |
|
print(f"Error: {e}") |
|
return JSONResponse({"status": 500, "message": str(e)}, status_code=500) |
|
|
|
class DeleteRequest(BaseModel): |
|
_id: List[str] |
|
|
|
@app.post("/api/delete-report") |
|
async def delete_report(req: DeleteRequest): |
|
print(req._id) |
|
try: |
|
delete_result = delete_reports_by_ids(req._id) |
|
return {"message": "Deleted successfully", "deleted_ids": req._id} |
|
except Exception as e: |
|
raise HTTPException(status_code=500, detail=f"Error deleting reports: {str(e)}") |
|
|
|
def delete_reports_by_ids(ids: List[str]): |
|
print(f"Deleting reports with IDs: {ids}") |
|
return True |
|
|
|
|
|
|
|
|