from fastapi import FastAPI |
import os |
import subprocess |
import gdown |
import h5py |
app = FastAPI() |
@app.get("/") |
def greet_json(): |
return {"Hello": "World!"} |
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2" |
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib" |
os.environ["FONTCONFIG_PATH"] = "/tmp/fontconfig" |
os.environ["HF_HOME"] = "/tmp/huggingface_cache" |
os.makedirs("/tmp/matplotlib", exist_ok=True) |
os.makedirs("/tmp/fontconfig", exist_ok=True) |
os.makedirs("/tmp/huggingface_cache", exist_ok=True) |
from torchaudio.pipelines import WAV2VEC2_BASE |
bundle = WAV2VEC2_BASE |
model = bundle.get_model() |
print("Model downloaded successfully!") |
def reencode_audio(input_path, output_path): |
command = [ |
'ffmpeg', '-i', input_path, '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', output_path |
] |
subprocess.run(command, check=True) |
from fastapi import UploadFile |
from googleapiclient.http import MediaIoBaseUpload |
import io |
import PyPDF2 |
Folder_Name = "Document_DB" |
file_metadata = { |
"name": "Fake", |
"mimeType": "application/vnd.google-apps.folder", |
} |
def check_folder(service): |
try: |
resource = service.files() |
result = resource.list( |
q=f"mimeType = 'application/vnd.google-apps.folder' and 'root' in parents", |
fields="nextPageToken, files(id, name)", |
).execute() |
list_folders = result.get("files") |
folder_id = None |
for folder in list_folders: |
if folder["name"] == Folder_Name: |
folder_id = folder["id"] |
break |
if not folder_id: |
folder = service.files().create(body=file_metadata, fields="id").execute() |
folder_id = folder["id"] |
return folder_id, "success" |
except Exception as e: |
print(f"Error occurred while pushing file to DB: {e}") |
return None, str(e) |
def extract_text_from_pdf(pdf_file_content): |
extracted_text = "" |
try: |
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file_content)) |
num_pages = len(pdf_reader.pages) |
for i in range(num_pages): |
page = pdf_reader.pages[i] |
page_text = page.extract_text() |
if "ABSTRACT" in page_text: |
extracted_text += page_text + "\n" |
break |
return extracted_text |
except Exception as e: |
print("An error occurred:", e) |
return None |
async def extract_text_url(file:UploadFile): |
try: |
file_content = await file.read() |
extract_text = extract_text_from_pdf(file_content) |
return extract_text, "success" |
except Exception as e: |
print(f"Error occurred while pushing file to DB: {e}") |
return None, str(e) |
async def push_file_db(service, file: UploadFile): |
try: |
folder_id, status = check_folder(service) |
if not folder_id: |
return [None, None, status] |
file_content = await file.read() |
file_metadata = {"name": file.filename, "parents": [folder_id]} |
media = MediaIoBaseUpload(io.BytesIO(file_content), mimetype="application/pdf") |
print("hh1") |
new_file = ( |
service.files() |
.create(body=file_metadata, media_body=media, fields="id") |
.execute() |
) |
print("hh2") |
service.permissions().create( |
fileId=new_file["id"], |
body={"role": "reader", "type": "anyone"}, |
fields="id", |
).execute() |
extracted_text = extract_text_from_pdf(file_content) |
return new_file.get("id"), extracted_text, "success" |
except Exception as e: |
print(f"Error occurred while pushing file to DB: {e}") |
return None, None, str(e) |
import os |
import gdown |
file_id = "1zhisRgRi2qBFX73VFhzh-Ho93MORQqVa" |
output_dir = "./downloads" |
output_file = "file.h5" |
if not os.path.exists(output_dir): |
os.makedirs(output_dir) |
output_path = os.path.join(output_dir, output_file) |
url = f"https://drive.google.com/uc?id={file_id}" |
try: |
gdown.download(url, output_path, quiet=False) |
print(f"File downloaded successfully to: {output_path}") |
except Exception as e: |
print(f"Error downloading file: {e}") |
output_file = "file.h5" |
file_path = os.path.join(output_dir, output_file) |
import os |
import gdown |
file_id = "1wIaycDFGTF3e0PpAHKk-GLnxk4cMehOU" |
output_dir = "./downloads" |
output_file = "file2.h5" |
if not os.path.exists(output_dir): |
os.makedirs(output_dir) |
output_path = os.path.join(output_dir, output_file) |
url = f"https://drive.google.com/uc?id={file_id}" |
try: |
gdown.download(url, output_path, quiet=False) |
print(f"File downloaded successfully to: {output_path}") |
except Exception as e: |
print(f"Error downloading file: {e}") |
output_file = "file2.h5" |
file_path = os.path.join(output_dir, output_file) |
if os.path.exists(file_path): |
print(f"The file '{output_file}' exists at '{file_path}'.") |
else: |
print(f"The file '{output_file}' does not exist at '{file_path}'.") |
import os |
from dotenv import load_dotenv |
from googleapiclient.discovery import build |
from google_auth_oauthlib.flow import InstalledAppFlow |
from google.auth.transport.requests import Request |
from google.oauth2.credentials import Credentials |
from google.oauth2 import service_account |
SCOPES = ['https://www.googleapis.com/auth/drive'] |
details = { |
"refresh_token": "1//0gYLCF5OE4fTmCgYIARAAGBASNwF-L9Irp3Ik0q5OtsQClcLwW7sxPZSuMboe7wyjteuSuOD_WvavEHfhuTvkSjkLHitkh76XaD4", |
"token": "ya29.a0ARW5m753vyDgN_C7kUnnYTkeCfknSnDDj8tuVCe99dL2ieN3IzvCPVoN5kVg49CAYDz-pS5AgpjH7whiy7dr7QhwX4EiGQreJCzu109nlH6kxultrNup5q-_W2dNepbOa5YV8iH7OwP28RjQVR7fs9IlMO7BfnA9hw-WQqXNaCgYKAXMSARMSFQHGX2MieHrC7CpySZFYpoZWln6vxA0175", |
"token_uri": "https://oauth2.googleapis.com/token", |
"client_id": "573421158717-a2tulr4s7gg6or7sd76336busnmk22vu.apps.googleusercontent.com", |
"client_secret": "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", |
"scopes": ["https://www.googleapis.com/auth/drive"], |
"universe_domain": "googleapis.com", |
"account": "", |
} |
def main(): |
try: |
print(details) |
creds = None |
creds = Credentials.from_authorized_user_info(details, SCOPES) |
if not creds or not creds.valid: |
if creds and creds.expired and creds.refresh_token: |
creds.refresh(Request()) |
else: |
flow = InstalledAppFlow.from_client_secrets_file( |
'credentials.json', SCOPES) |
creds = flow.run_local_server(port=0) |
service = build('drive', 'v3', credentials=creds) |
return service |
except Exception as error: |
print(f'An error occurred: {error}') |
from fastapi.responses import JSONResponse |
from pydantic import BaseModel |
from fastapi import FastAPI, File, UploadFile, HTTPException |
from typing import List |
service = main() |
@app.get("/api/check") |
def check_working(): |
try: |
return JSONResponse( |
{"status": 200, "message": "Server is working fine", "data": None}, |
status_code=200, |
) |
except Exception as e: |
return JSONResponse( |
{"status": 500, "message": "Server is not working", "error": e}, |
) |
@app.get("/") |
def read(): |
return JSONResponse( |
{"status": 200, "message": "Working Successfully"}, status_code=200 |
) |
@app.post("/api/save-docs") |
async def save_docs(file: UploadFile = File(...)): |
try: |
print(file.filename) |
file_id, extracted_text, status = await push_file_db(service, file) |
if not file_id: |
return JSONResponse( |
{"status": 500, "message": status, "data": None}, status_code=500 |
) |
print(file_id) |
return JSONResponse( |
{ |
"status": 200, |
"message": "Document saved successfully", |
"data": { |
"file_id": file_id, |
"extracted_text": extracted_text, |
"title": file.filename, |
}, |
}, |
status_code=200, |
) |
except Exception as e: |
print(f"Error: {e}") |
return JSONResponse({"status": 500, "message": str(e)}, status_code=500) |
@app.post("/api/get-abstract") |
async def get_abstract(file: UploadFile = File(...)): |
try: |
print(file.filename) |
extracted_text, status = await extract_text_url(file) |
if not extracted_text: |
return JSONResponse( |
{"status": 500, "message": status, "data": None}, status_code=500 |
) |
return JSONResponse( |
{ |
"status": 200, |
"message": "Document saved successfully", |
"data": {"extracted_text": extracted_text, "title": file.filename}, |
}, |
status_code=200, |
) |
except Exception as e: |
print(f"Error: {e}") |
return JSONResponse({"status": 500, "message": str(e)}, status_code=500) |
class DeleteRequest(BaseModel): |
_id: List[str] |
@app.post("/api/delete-report") |
async def delete_report(req: DeleteRequest): |
print(req._id) |
try: |
delete_result = delete_reports_by_ids(req._id) |
return {"message": "Deleted successfully", "deleted_ids": req._id} |
except Exception as e: |
raise HTTPException(status_code=500, detail=f"Error deleting reports: {str(e)}") |
def delete_reports_by_ids(ids: List[str]): |
print(f"Deleting reports with IDs: {ids}") |
return True |