Spaces:
Sleeping
Sleeping
import json | |
import os | |
import logging | |
import shutil | |
from fastapi import FastAPI, File, Query,Form, Request, HTTPException, UploadFile | |
from fastapi.responses import JSONResponse, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from fastapi.middleware.cors import CORSMiddleware | |
from dotenv import load_dotenv | |
import mysql.connector | |
from typing import List | |
# Load environment variables | |
load_dotenv() | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("redmindgen.log"), | |
logging.StreamHandler() # This ensures logging to console | |
] | |
) | |
logging.info("Application startup") | |
# Create the FastAPI app | |
app = FastAPI(title="RedmindGen", description="Chat with your Data", version="1.0.0") | |
# Mount static files | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Jinja2 templates | |
templates = Jinja2Templates(directory="templates") | |
# Configure CORS | |
origins = [ | |
"http://localhost:8000", | |
"http://127.0.0.1:8000", | |
"http://167.71.75.10:8003/" | |
] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
DB_USER = 'u852023448_redmindgpt' | |
DB_PASSWORD = 'redmindGpt@123' | |
DB_HOST = '217.21.88.10' | |
DB_NAME = 'u852023448_redmindgpt' | |
# Function to create a new database connection | |
def get_db_connection(): | |
try: | |
cnx = mysql.connector.connect(user=DB_USER, password=DB_PASSWORD, host=DB_HOST, database=DB_NAME) | |
return cnx | |
except mysql.connector.Error as err: | |
logging.error(f"Database connection error: {err}") | |
return None | |
async def read_root(request: Request): | |
return templates.TemplateResponse("index.html", {"request": request}) | |
def verify_user(username: str, password: str): | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = "SELECT role FROM user_detail WHERE username = %s AND password = %s" | |
values = (username, password) | |
cursor.execute(query, values) | |
result = cursor.fetchone() | |
cursor.close() | |
cnx.close() | |
if result is not None: | |
logging.info(f"User {username} logged in successfully") | |
return "success",result[0] | |
else: | |
logging.info(f"User {username} login failed") | |
return "failure" | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
return "failure" | |
async def validate_user(request: Request, username: str = Form(...), password: str = Form(...)): | |
status,role = verify_user(username, password) | |
if status == 'success': | |
logging.info(f"user role {role}is rerturned") | |
return templates.TemplateResponse("dashboard.html", {"request": request, "username": username,"role": role}) | |
else: | |
return templates.TemplateResponse("index.html", {"request": request}) | |
async def submit_company_profile(request: Request, | |
company_name: str = Form(...), | |
company_code: str = Form(...), | |
domain: str = Form(...), | |
llm_tools: List[str] = Form(...)): | |
logging.info("Received form submission for company profile") | |
logging.info(f"Form data - company_name: {company_name}, company_code: {company_code}, domain: {domain}, llm_tools: {llm_tools}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = "INSERT INTO company_detail (company_name, company_code, domain, llm_tools) VALUES (%s, %s, %s, %s)" | |
values = (company_name, company_code, domain, ",".join(llm_tools)) | |
logging.info(f"Executing query: {query} with values: {values}") | |
cursor.execute(query, values) | |
cnx.commit() | |
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected") | |
cursor.close() | |
cnx.close() | |
logging.info(f"Company profile for {company_name} inserted successfully") | |
RedirectResponse(url="/company_profile?message=Data saved successfully", status_code=302) | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def get_companies(): | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = "SELECT company_name FROM company_detail " | |
cursor.execute(query) | |
companies = cursor.fetchall() | |
cursor.close() | |
cnx.close() | |
return {"companies": [{"name": company[0]} for company in companies]} | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def dashboard(request: Request): | |
return templates.TemplateResponse("dashboard.html", {"request": request,"title":"Dashboard"}) | |
async def company_profile(request: Request): | |
return templates.TemplateResponse("company_profile.html", {"request": request,"title":"Company Profile"}) | |
async def get_company_id(company_name: str): | |
print(f"Received company_name: {company_name}") # Debug statement | |
logging.info(f"Received request for company name: {company_name}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = "SELECT * FROM company_detail WHERE company_name = %s" | |
cursor.execute(query, (company_name,)) | |
result = cursor.fetchone() | |
cursor.close() | |
cnx.close() | |
if result: | |
llm_tools = result[4].split(',') if result[4] else [] | |
return {"company_id": result[0], | |
"company_name":result[1], | |
"company_code":result[2], | |
"domain":result[3], | |
"llm_tools":llm_tools | |
} | |
else: | |
logging.error(f"Company not found for name: {company_name}") | |
raise HTTPException(status_code=404, detail="Company not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def get_companies(): | |
print(f"Received company_name") # Debug statement | |
logging.info(f"Received request for company name") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = "SELECT * FROM company_detail" | |
cursor.execute(query) | |
result = cursor.fetchall() | |
logging.info(f"Query result: {result}") | |
cursor.close() | |
cnx.close() | |
companies = [] | |
for row in result: | |
llm_tools = row[4].split(',') if row[4] else [] | |
logging.info(row[4]) | |
companies.append({ | |
"company_id": row[0], | |
"company_name": row[1], | |
"company_code": row[2], | |
"domain": row[3], | |
"llm_tools": row[4] | |
}) | |
if companies: | |
return companies | |
else: | |
logging.error(f"Company not found for name: {result[1]}") | |
raise HTTPException(status_code=404, detail="Company not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def knowledgebase(request: Request): | |
return templates.TemplateResponse("knowledgebase.html", {"request": request,"title":"KnowledgeBase"}) | |
#to insert into knowledgebase | |
async def upload_document( | |
request: Request, | |
company_id:str=Form(...), | |
uploadFile: UploadFile = File(...), | |
documentName: str = Form(...), | |
documentDescription: str = Form(...), | |
department: str = Form(...), | |
vectorDBflag:str=Form(...), | |
version: str = Form(...), | |
lastUpdated: str = Form(...) | |
): | |
try: | |
# Save the uploaded file | |
upload_folder = "uploads/" | |
os.makedirs(upload_folder, exist_ok=True) | |
file_path = os.path.join(upload_folder, uploadFile.filename) | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(uploadFile.file, buffer) | |
# Save the details to the database | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
INSERT INTO knowledge_base (company_id,file_path, document_name, document_desc, department, version,vectorDBflag, last_updated) | |
VALUES (%s,%s, %s, %s, %s, %s,%s, %s) | |
""" | |
values = (company_id,file_path, documentName, documentDescription, department, version,vectorDBflag, lastUpdated) | |
cursor.execute(query, values) | |
cnx.commit() | |
cursor.close() | |
cnx.close() | |
logging.info(f"Document {documentName} uploaded successfully") | |
return RedirectResponse(url="/knowledgebase", status_code=302) | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#to retrieve from knowledgebase | |
async def get_document(company_id: str = Query(...), company_name: str = Query(...)): | |
print(f"Received companyId and name: {company_id},{company_name}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id},{company_name}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
SELECT kb.company_id, kb.file_path, kb.document_name, kb.document_desc,kb.department,kb.version,kb.vectorDBflag,kb.last_updated | |
FROM knowledge_base kb | |
JOIN company_detail cd ON kb.company_id = cd.company_id | |
WHERE kb.company_id = %s and cd.company_name=%s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
params = (company_id,company_name) | |
logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {params}") | |
cursor.execute(query, params) # Pa | |
result = cursor.fetchall() | |
logging.info(f"Query result: {result}") | |
cursor.close | |
cnx.close() | |
companies=[] | |
for row in result: | |
companies.append({ | |
"company_id": row[0], | |
# "file_path":row[1], | |
"document_name": row[2], | |
"document_desc": row[3], | |
"department": row[4], | |
"version": row[5], | |
"vectorDBflag":row[6], | |
"last_updated": row[7] | |
}) | |
if companies: | |
return companies | |
else: | |
logging.warning(f"No document found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data document not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def data_connectors(request: Request): | |
return templates.TemplateResponse("data_connectors.html", {"request": request, "title": "Data Connectors"}) | |
#to insert into data_connectors | |
async def save_data_connectors( request: Request, | |
company_id: int = Form(...), | |
database: List[str] = Form(...), | |
server: str = Form(...), | |
port: str = Form(...), | |
databaseName:List[str]= Form(...), | |
username: str=Form(...), | |
password: str=Form(...), | |
selectedTables: List[str] = Form(...)): | |
logging.info(f"Received form submission for database_connectors") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
# Check if the company_id already exists in the data_connectors table | |
check_query = "SELECT COUNT(*) FROM data_connectors WHERE company_id = %s" | |
cursor.execute(check_query, (company_id,)) | |
exists = cursor.fetchone()[0] > 0 | |
if exists: | |
# Update the existing record | |
query = """ | |
UPDATE data_connectors | |
SET databasetype = %s, serverip = %s, port = %s, database_name = %s, username = %s, password = %s, dbtablename = %s | |
WHERE company_id = %s | |
""" | |
values = (",".join(database), server, port, ",".join(databaseName), username, password, ",".join(selectedTables), company_id) | |
logging.info(f"Executing update query: {query} with values: {values}") | |
cursor.execute(query, values) | |
cnx.commit() | |
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) updated") | |
else: | |
# Insert a new record | |
query = """ | |
INSERT INTO data_connectors(company_id, databasetype, serverip, port, database_name, username, password, dbtablename) | |
VALUES (%s, %s, %s, %s, %s, %s, %s, %s) | |
""" | |
values = (company_id, ",".join(database), server, port, ",".join(databaseName), username, password, ",".join(selectedTables)) | |
logging.info(f"Executing insert query: {query} with values: {values}") | |
cursor.execute(query, values) | |
cnx.commit() | |
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) inserted") | |
cursor.close() | |
cnx.close() | |
logging.info(f"Data_connectors for {database} processed successfully") | |
return RedirectResponse(url="/data_connectors", status_code=302) | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
# #databasetype_json=json.dumps(database) | |
# query = "INSERT INTO data_connectors(company_id,databasetype, serverip, port, database_name,username,password,dbtablename) VALUES (%s,%s, %s, %s, %s,%s,%s,%s)" | |
# values = (company_id, ",".join(database), server, port, ",".join(databaseName),username,password, ",".join(selectedTables)) | |
# logging.info(f"Executing query: {query} with values: {values}") | |
# cursor.execute(query, values) | |
# cnx.commit() | |
# logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected") | |
# cursor.close() | |
# cnx.close() | |
# logging.info(f"Data_connectors for {database} inserted successfully") | |
# return RedirectResponse(url="/data_connectors", status_code=302) | |
# except mysql.connector.Error as err: | |
# logging.error(f"Database error: {err}") | |
# raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def get_data_connectors(company_id: str = Query(...), company_name: str = Query(...)): | |
print(f"Received companyId and name: {company_id},{company_name}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id},{company_name}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
SELECT dc.company_id, dc.databasetype, dc.serverip, dc.port,dc.database_name, dc.username, dc.password ,dc.dbtablename | |
FROM data_connectors dc | |
JOIN company_detail cd ON dc.company_id = cd.company_id | |
WHERE dc.company_id = %s and cd.company_name=%s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
params = (company_id,company_name) | |
logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {params}") | |
cursor.execute(query, params) # Pa | |
result = cursor.fetchone() | |
logging.info(f"Query result: {result}") | |
cursor.close() | |
cnx.close() | |
if result: | |
databasetype = result[1] | |
dbtablename = result[7].split(',') if result[7] else [] | |
logging.info(f"Data found for company_id: {company_id}") | |
return { | |
"company_id": result[0], | |
"databasetype":databasetype, | |
"serverip": result[2], | |
"port": result[3], | |
"database_name": result[4], | |
"username": result[5], | |
"password": result[6], | |
"dbtablename": dbtablename | |
} | |
else: | |
logging.warning(f"No data found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data connector not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def API_connectors(request: Request): | |
return templates.TemplateResponse("API_connectors.html", {"request": request,"title":"API Connectors"}) | |
#save api connectors | |
async def API_saveconnectors(request: Request, | |
company_id:int=Form(...), | |
APIName:str=Form(...), | |
APIEndpoint:str=Form(...), | |
Auth_Bearer:str=Form(...), | |
Inputjson:str=Form(...), | |
OutputJson:str=Form(...), | |
Description:str=Form(...)): | |
logging.info(f"Received form submission for database_connectors") | |
try: | |
cnx =get_db_connection() | |
cursor = cnx.cursor() | |
#databasetype_json=json.dumps(database) | |
query = "INSERT INTO api_connectors(company_id,api_name, api_endpoint, auth_token, input_param,output_json,description) VALUES (%s,%s, %s, %s, %s,%s,%s)" | |
values = (company_id, APIName, APIEndpoint, Auth_Bearer, Inputjson,OutputJson,Description) | |
logging.info(f"Executing query: {query} with values: {values}") | |
cursor.execute(query, values) | |
cnx.commit() | |
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected") | |
cursor.close() | |
cnx.close() | |
logging.info(f"Data_connectors for {APIName} inserted successfully") | |
return RedirectResponse(url="/data_connectors", status_code=302) | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
# retrieve api connectors | |
async def get_api_connectors(company_id: str = Query(...), company_name: str = Query(...)): | |
print(f"Received companyId and name: {company_id},{company_name}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id},{company_name}") | |
try: | |
cnx =get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
SELECT ac.company_id, ac.api_name, ac.api_endpoint,ac.auth_token,ac.input_param, ac.output_json, ac.description | |
FROM api_connectors ac | |
JOIN company_detail cd ON ac.company_id = cd.company_id | |
WHERE ac.company_id = %s and cd.company_name=%s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
params = (company_id,company_name) | |
logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {params}") | |
cursor.execute(query, params) # Pa | |
result = cursor.fetchall() | |
logging.info(f"Query result: {result}") | |
cursor.close() | |
cnx.close() | |
companies=[] | |
for row in result: | |
companies.append({ | |
"company_id": row[0], | |
"APIName":row[1], | |
"APIEndpoint": row[2] | |
# "Auth_Bearer": result[3], | |
# "Inputjson": result[4], | |
#"OutputJson": result[5], | |
#"description": result[6] | |
}) | |
if companies: | |
return companies | |
else: | |
logging.warning(f"No data found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data connector not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def prompt_template(request: Request): | |
return templates.TemplateResponse("prompt_template.html", {"request": request,"title":"Prompt Templates"}) | |
# to insert into prompt templates | |
async def prompt_saveconnectors(request: Request, | |
company_id:int=Form(...), | |
scenario:str=Form(...), | |
sampleprompt:str=Form(...), | |
comments:str=Form(...), | |
): | |
logging.info(f"Received form submission for database_connectors") | |
try: | |
cnx =get_db_connection() | |
cursor = cnx.cursor() | |
#databasetype_json=json.dumps(database) | |
query = "INSERT INTO prompt_templates(company_id,scenario, prompts, comments) VALUES (%s,%s, %s, %s)" | |
values = (company_id, scenario, sampleprompt, comments) | |
logging.info(f"Executing query: {query} with values: {values}") | |
cursor.execute(query, values) | |
cnx.commit() | |
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected") | |
cursor.close() | |
cnx.close() | |
logging.info(f"Data_connectors for {scenario} inserted successfully") | |
return RedirectResponse(url="/prompt_template", status_code=302) | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
# retrieve api connectors | |
async def get_prompt_connectors(company_id: str = Query(...), company_name: str = Query(...)): | |
print(f"Received companyId and name: {company_id},{company_name}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id},{company_name}") | |
try: | |
cnx =get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
SELECT pt.company_id,pt.scenario,pt.prompts,pt.comments | |
FROM prompt_templates pt | |
JOIN company_detail cd ON pt.company_id = cd.company_id | |
WHERE pt.company_id = %s and cd.company_name=%s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
params = (company_id,company_name) | |
logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {params}") | |
cursor.execute(query, params) # Pa | |
result = cursor.fetchall() | |
logging.info(f"Query result: {result}") | |
cursor.close() | |
cnx.close() | |
companies=[] | |
for row in result: | |
companies.append({ | |
"company_id": row[0], | |
"scenario":row[1], | |
"prompt": row[2] | |
# "Auth_Bearer": result[3], | |
# "Inputjson": result[4], | |
#"OutputJson": result[5], | |
#"description": result[6] | |
}) | |
if companies: | |
return companies | |
else: | |
logging.warning(f"No data found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data connector not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def chatbot(request: Request): | |
return templates.TemplateResponse("chatbot.html", {"request": request,"title":"Chatbot"}) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="127.0.0.1", port=8000) | |