GVAmaresh
dev: check working
c68e3bc
from fastapi import FastAPI
import os
import subprocess
import gdown
import h5py
app = FastAPI()
@app.get("/")
def greet_json():
return {"Hello": "World!"}
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "2"
os.environ["MPLCONFIGDIR"] = "/tmp/matplotlib"
os.environ["FONTCONFIG_PATH"] = "/tmp/fontconfig"
os.environ["HF_HOME"] = "/tmp/huggingface_cache"
os.makedirs("/tmp/matplotlib", exist_ok=True)
os.makedirs("/tmp/fontconfig", exist_ok=True)
os.makedirs("/tmp/huggingface_cache", exist_ok=True)
from torchaudio.pipelines import WAV2VEC2_BASE
bundle = WAV2VEC2_BASE
model = bundle.get_model()
print("Model downloaded successfully!")
def reencode_audio(input_path, output_path):
command = [
'ffmpeg', '-i', input_path, '-acodec', 'pcm_s16le', '-ar', '16000', '-ac', '1', output_path
]
subprocess.run(command, check=True)
#-----------------------------------------------------------------------------------------
# import os
# from dotenv import load_dotenv
# from googleapiclient.discovery import build
# from google.auth.transport.requests import Request
# from google.oauth2.credentials import Credentials
# from google.oauth2 import service_account
# SCOPES = ['https://www.googleapis.com/auth/drive']
# details = {
# "refresh_token": "1//0gYLCF5OE4fTmCgYIARAAGBASNwF-L9Irp3Ik0q5OtsQClcLwW7sxPZSuMboe7wyjteuSuOD_WvavEHfhuTvkSjkLHitkh76XaD4",
# "token": "ya29.a0ARW5m753vyDgN_C7kUnnYTkeCfknSnDDj8tuVCe99dL2ieN3IzvCPVoN5kVg49CAYDz-pS5AgpjH7whiy7dr7QhwX4EiGQreJCzu109nlH6kxultrNup5q-_W2dNepbOa5YV8iH7OwP28RjQVR7fs9IlMO7BfnA9hw-WQqXNaCgYKAXMSARMSFQHGX2MieHrC7CpySZFYpoZWln6vxA0175",
# "token_uri": "https://oauth2.googleapis.com/token",
# "client_id": "573421158717-a2tulr4s7gg6or7sd76336busnmk22vu.apps.googleusercontent.com",
# "client_secret": "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",
# "scopes": ["https://www.googleapis.com/auth/drive"],
# "universe_domain": "googleapis.com",
# "account": "",
# }
# def authenticate_with_env_vars(details):
# creds = Credentials.from_authorized_user_info(details, SCOPES)
# if not creds or not creds.valid:
# if creds and creds.expired and creds.refresh_token:
# creds.refresh(Request())
# else:
# raise ValueError("Credentials are invalid and cannot be refreshed.")
# return creds
#-----------------------------------------------------------------------------------------
from fastapi import UploadFile
from googleapiclient.http import MediaIoBaseUpload
import io
import PyPDF2
Folder_Name = "Document_DB"
file_metadata = {
"name": "Fake",
"mimeType": "application/vnd.google-apps.folder",
}
def check_folder(service):
try:
resource = service.files()
result = resource.list(
q=f"mimeType = 'application/vnd.google-apps.folder' and 'root' in parents",
fields="nextPageToken, files(id, name)",
).execute()
list_folders = result.get("files")
folder_id = None
for folder in list_folders:
if folder["name"] == Folder_Name:
folder_id = folder["id"]
break
if not folder_id:
folder = service.files().create(body=file_metadata, fields="id").execute()
folder_id = folder["id"]
return folder_id, "success"
except Exception as e:
print(f"Error occurred while pushing file to DB: {e}")
return None, str(e)
def extract_text_from_pdf(pdf_file_content):
extracted_text = ""
try:
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file_content))
num_pages = len(pdf_reader.pages)
for i in range(num_pages):
page = pdf_reader.pages[i]
page_text = page.extract_text()
if "ABSTRACT" in page_text:
extracted_text += page_text + "\n"
break
return extracted_text
except Exception as e:
print("An error occurred:", e)
return None
async def extract_text_url(file:UploadFile):
try:
file_content = await file.read()
extract_text = extract_text_from_pdf(file_content)
return extract_text, "success"
except Exception as e:
print(f"Error occurred while pushing file to DB: {e}")
return None, str(e)
async def push_file_db(service, file: UploadFile):
try:
folder_id, status = check_folder(service)
if not folder_id:
return [None, None, status]
file_content = await file.read()
file_metadata = {"name": file.filename, "parents": [folder_id]}
media = MediaIoBaseUpload(io.BytesIO(file_content), mimetype="application/pdf")
print("hh1")
new_file = (
service.files()
.create(body=file_metadata, media_body=media, fields="id")
.execute()
)
print("hh2")
service.permissions().create(
fileId=new_file["id"],
body={"role": "reader", "type": "anyone"},
fields="id",
).execute()
extracted_text = extract_text_from_pdf(file_content)
return new_file.get("id"), extracted_text, "success"
except Exception as e:
print(f"Error occurred while pushing file to DB: {e}")
return None, None, str(e)
#-----------------------------------------------------------------------------------------
import os
import gdown
file_id = "1zhisRgRi2qBFX73VFhzh-Ho93MORQqVa"
output_dir = "./downloads"
output_file = "file.h5"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_path = os.path.join(output_dir, output_file)
url = f"https://drive.google.com/uc?id={file_id}"
try:
gdown.download(url, output_path, quiet=False)
print(f"File downloaded successfully to: {output_path}")
except Exception as e:
print(f"Error downloading file: {e}")
output_file = "file.h5"
file_path = os.path.join(output_dir, output_file)
#-----------------------------------------------------------------------------------------
import os
import gdown
file_id = "1wIaycDFGTF3e0PpAHKk-GLnxk4cMehOU"
output_dir = "./downloads"
output_file = "file2.h5"
if not os.path.exists(output_dir):
os.makedirs(output_dir)
output_path = os.path.join(output_dir, output_file)
url = f"https://drive.google.com/uc?id={file_id}"
try:
gdown.download(url, output_path, quiet=False)
print(f"File downloaded successfully to: {output_path}")
except Exception as e:
print(f"Error downloading file: {e}")
output_file = "file2.h5"
file_path = os.path.join(output_dir, output_file)
if os.path.exists(file_path):
print(f"The file '{output_file}' exists at '{file_path}'.")
else:
print(f"The file '{output_file}' does not exist at '{file_path}'.")
#-----------------------------------------------------------------------------------------
import os
from dotenv import load_dotenv
from googleapiclient.discovery import build
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from google.oauth2.credentials import Credentials
from google.oauth2 import service_account
SCOPES = ['https://www.googleapis.com/auth/drive']
details = {
"refresh_token": "1//0gYLCF5OE4fTmCgYIARAAGBASNwF-L9Irp3Ik0q5OtsQClcLwW7sxPZSuMboe7wyjteuSuOD_WvavEHfhuTvkSjkLHitkh76XaD4",
"token": "ya29.a0ARW5m753vyDgN_C7kUnnYTkeCfknSnDDj8tuVCe99dL2ieN3IzvCPVoN5kVg49CAYDz-pS5AgpjH7whiy7dr7QhwX4EiGQreJCzu109nlH6kxultrNup5q-_W2dNepbOa5YV8iH7OwP28RjQVR7fs9IlMO7BfnA9hw-WQqXNaCgYKAXMSARMSFQHGX2MieHrC7CpySZFYpoZWln6vxA0175",
"token_uri": "https://oauth2.googleapis.com/token",
"client_id": "573421158717-a2tulr4s7gg6or7sd76336busnmk22vu.apps.googleusercontent.com",
"client_secret": "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",
"scopes": ["https://www.googleapis.com/auth/drive"],
"universe_domain": "googleapis.com",
"account": "",
}
def main():
try:
print(details)
creds = None
creds = Credentials.from_authorized_user_info(details, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(
'credentials.json', SCOPES)
creds = flow.run_local_server(port=0)
service = build('drive', 'v3', credentials=creds)
return service
except Exception as error:
print(f'An error occurred: {error}')
#-----------------------------------------------------------------------------------------
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from fastapi import FastAPI, File, UploadFile, HTTPException
from typing import List
service = main()
@app.get("/api/check")
def check_working():
try:
return JSONResponse(
{"status": 200, "message": "Server is working fine", "data": None},
status_code=200,
)
except Exception as e:
return JSONResponse(
{"status": 500, "message": "Server is not working", "error": e},
)
@app.get("/")
def read():
return JSONResponse(
{"status": 200, "message": "Working Successfully"}, status_code=200
)
@app.post("/api/save-docs")
async def save_docs(file: UploadFile = File(...)):
try:
print(file.filename)
file_id, extracted_text, status = await push_file_db(service, file)
if not file_id:
return JSONResponse(
{"status": 500, "message": status, "data": None}, status_code=500
)
print(file_id)
return JSONResponse(
{
"status": 200,
"message": "Document saved successfully",
"data": {
"file_id": file_id,
"extracted_text": extracted_text,
"title": file.filename,
},
},
status_code=200,
)
except Exception as e:
print(f"Error: {e}")
return JSONResponse({"status": 500, "message": str(e)}, status_code=500)
@app.post("/api/get-abstract")
async def get_abstract(file: UploadFile = File(...)):
try:
print(file.filename)
extracted_text, status = await extract_text_url(file)
if not extracted_text:
return JSONResponse(
{"status": 500, "message": status, "data": None}, status_code=500
)
return JSONResponse(
{
"status": 200,
"message": "Document saved successfully",
"data": {"extracted_text": extracted_text, "title": file.filename},
},
status_code=200,
)
except Exception as e:
print(f"Error: {e}")
return JSONResponse({"status": 500, "message": str(e)}, status_code=500)
class DeleteRequest(BaseModel):
_id: List[str]
@app.post("/api/delete-report")
async def delete_report(req: DeleteRequest):
print(req._id)
try:
delete_result = delete_reports_by_ids(req._id)
return {"message": "Deleted successfully", "deleted_ids": req._id}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error deleting reports: {str(e)}")
def delete_reports_by_ids(ids: List[str]):
print(f"Deleting reports with IDs: {ids}")
return True