Redmind_GPT_API / app.py
Redmind's picture
Update app.py
6191fdc verified
raw
history blame
61.3 kB
import json
import os
import logging
import shutil
import asyncpg
from fastapi import FastAPI, File, Query,Form, Request, HTTPException, UploadFile
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
import mysql.connector
from typing import List
from pydantic import BaseModel
import psycopg2
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("redmindgen.log"),
logging.StreamHandler() # This ensures logging to console
]
)
logging.info("Application startup")
# Create the FastAPI app
app = FastAPI(title="RedmindGen", description="Chat with your Data", version="1.0.0")
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Jinja2 templates
templates = Jinja2Templates(directory="templates")
# Configure CORS
origins = [
"http://localhost:8000",
"http://127.0.0.1:8000",
"http://167.71.75.10:8003/"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DB_USER = 'u852023448_redmindgpt'
DB_PASSWORD = 'redmindGpt@123'
DB_HOST = '217.21.88.10'
DB_NAME = 'u852023448_redmindgpt'
from pydantic import BaseModel
class DatabaseConnection(BaseModel):
database_type: str
server: str
port: str
databaseName: str
username: str
password: str
@app.post("/api/connect")
async def connect_to_database(connection: DatabaseConnection):
try:
print(f"Attempting to connect to database: {connection.database_type}")
if connection.database_type == "Postgres":
print(f"PostgreSQL connection details - Host: {connection.server}, Port: {connection.port}, Database: {connection.databaseName}, User: {connection.username}")
conn = psycopg2.connect(
host=connection.server,
port=connection.port,
database=connection.databaseName,
user=connection.username,
password=connection.password
)
query_schemas = "SELECT schema_name FROM information_schema.schemata"
query_tables = "SELECT table_name FROM information_schema.tables WHERE table_schema = %s"
elif connection.database_type == "mysql":
print(f"inside mysql",connection.server,connection.port,connection.databaseName,connection.username,connection.password)
conn = mysql.connector.connect(
host=connection.server,
port=connection.port,
database=connection.databaseName,
user=connection.username,
password=connection.password
)
query_schemas = "SELECT schema_name FROM information_schema.schemata"
query_tables = "SELECT table_name FROM information_schema.tables WHERE table_schema = %s"
else:
raise HTTPException(status_code=400, detail="Unsupported database type")
cursor = conn.cursor()
# Fetch all schemas
cursor.execute(query_schemas)
schemas = cursor.fetchall()
# Fetch all tables within each schema
schema_tables = {}
for schema in schemas:
cursor.execute(query_tables, (schema[0],))
tables = cursor.fetchall()
schema_tables[schema[0]] = [table[0] for table in tables]
cursor.close()
conn.close()
return {"schemas": [schema[0] for schema in schemas], "schema_tables": schema_tables, "success": True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Function to create a new database connection for MySQL (Example)
def get_db_connection():
try:
cnx = mysql.connector.connect(user=DB_USER, password=DB_PASSWORD, host=DB_HOST, database=DB_NAME)
return cnx
except mysql.connector.Error as err:
logging.error(f"Database connection error: {err}")
return None
# Function to create a new database connection for MySQL (Example)
def get_db_connection():
try:
cnx = mysql.connector.connect(user=DB_USER, password=DB_PASSWORD, host=DB_HOST, database=DB_NAME)
return cnx
except mysql.connector.Error as err:
logging.error(f"Database connection error: {err}")
return None
@app.get("/")
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
def verify_user(username: str, password: str):
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = "SELECT role,company_id FROM user_detail WHERE username = %s AND password = %s"
values = (username, password)
cursor.execute(query, values)
result = cursor.fetchone()
cursor.close()
cnx.close()
if result is not None:
logging.info(f"User {username}{result[1]} logged in successfully")
return "success",result[0],result[1]
else:
logging.info(f"User {username} login failed")
return "failure"
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
return "failure"
@app.post("/validate-user")
async def validate_user(request: Request, username: str = Form(...), password: str = Form(...)):
status, role ,company_id= verify_user(username, password)
if status == 'success' and role and company_id:
logging.info(f"user role {role} is returned")
# Set cookies and redirect to the dashboard
response = RedirectResponse(url="/dashboard", status_code=302)
response.set_cookie(key="role", value=role)
response.set_cookie(key="username", value=username)
response.set_cookie(key="company_id",value=company_id)
return response
else:
# If login fails, redirect back to the index page with an error message
return templates.TemplateResponse("index.html", {
"request": request,
"error": "Invalid username or password"
})
@app.post("/submit_company_profile")
async def submit_company_profile(request: Request,
company_name: str = Form(...),
company_code: str = Form(...),
domain: str = Form(...),
llm_tools: List[str] = Form(...),
username:str=Form(...),
password:str=Form(...),
role:str=Form(...)):
logging.info("Received form submission for company profile")
logging.info(f"Form data - company_name: {company_name}, company_code: {company_code}, domain: {domain}, llm_tools: {llm_tools}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = "INSERT INTO company_detail (company_name, company_code, domain, llm_tools) VALUES (%s, %s, %s, %s)"
values = (company_name, company_code, domain, ",".join(llm_tools))
logging.info(f"Executing query: {query} with values: {values}")
cursor.execute(query, values)
# Retrieve the inserted company_id
company_id = cursor.lastrowid
logging.info(f"Company profile for {company_name} inserted successfully with company_id: {company_id}")
# Insert user details with the retrieved company_id
user_query = "INSERT INTO user_detail (company_id, username, password,role) VALUES (%s, %s, %s, %s)"
user_values = (company_id, username, password, role)
logging.info(f"Executing user detail query: {user_query} with values: {user_values}")
cursor.execute(user_query, user_values)
cnx.commit()
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected")
cursor.close()
cnx.close()
logging.info(f"Company profile for {company_name} inserted successfully")
RedirectResponse(url="/company_profile?message=Data saved successfully", status_code=302)
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/api/companies")
async def get_companies():
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = "SELECT company_name FROM company_detail "
cursor.execute(query)
companies = cursor.fetchall()
cursor.close()
cnx.close()
return {"companies": [{"name": company[0]} for company in companies]}
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/dashboard")
async def dashboard(request: Request):
try:
# Retrieve cookies
role = request.cookies.get("role")
username = request.cookies.get("username")
company_id=request._cookies.get("company_id")
# Establish database connection
cnx = get_db_connection()
cursor = cnx.cursor()
# Fetch all table names
cursor.execute("SHOW TABLES")
all_tables = cursor.fetchall()
# Dictionary to hold the count of records for each table
table_count_of_each_table = {}
# Fetch count of records for each table
for table in all_tables:
table_name = table[0]
query = f"SELECT COUNT(*) FROM {table_name} WHERE company_id = %s"
cursor.execute(query, (company_id,))
count = cursor.fetchone()[0]
table_count_of_each_table[table_name] = count
query1=f"select company_name from company_detail where company_id = %s"
cursor.execute(query1,(company_id,))
company_name_result = cursor.fetchone()
# Check if company_name_result is not None
if company_name_result:
company_name = company_name_result[0]
else:
company_name = "Unknown" # Default
# Close cursor and connection
cursor.close()
cnx.close()
# Log the counts for debugging purposes
logging.info(table_count_of_each_table)
# Render the template with the data, role, and username
return templates.TemplateResponse("dashboard.html", {
"request": request,
"title": "Dashboard",
"table_count_of_each_table": table_count_of_each_table,
"role": role,
"username": username,
"company_id":company_id,
"company_name":company_name
})
except mysql.connector.Error as err:
# Log the error and raise an HTTPException
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/api/company_record_count/{company_id}")
async def get_company_record_count(company_id: int):
try:
# Establish database connection
cnx = get_db_connection()
cursor = cnx.cursor()
# List of tables to count records in
tables = ["knowledge_base", "data_connectors", "api_connectors", "prompt_templates"]
# Dictionary to hold the count of records for each table
table_counts = {}
# Fetch count of records for the selected company in each table
for table in tables:
query = f"SELECT COUNT(*) FROM {table} WHERE company_id = %s"
cursor.execute(query, (company_id,))
count = cursor.fetchone()[0]
table_counts[table] = count
# Close cursor and connection
cursor.close()
cnx.close()
return {"table_counts": table_counts}
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/company_profile")
async def company_profile(request: Request):
try:
# Retrieve cookies
role = request.cookies.get("role")
company_id = request.cookies.get("company_id")
# Render the template with the role and company_id
return templates.TemplateResponse("company_profile.html", {
"request": request,
"role": role,
"company_id": company_id,
"title":"Company Profile"
})
except Exception as e:
# Handle exceptions
logging.error(f"Error: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#return templates.TemplateResponse("company_profile.html", {"request": request,"title":"Company Profile"})
@app.get("/api/company_id")
async def get_company_id(company_name: str):
print(f"Received company_name: {company_name}") # Debug statement
logging.info(f"Received request for company name: {company_name}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = "SELECT * FROM company_detail WHERE company_name = %s"
cursor.execute(query, (company_name,))
result = cursor.fetchone()
cursor.close()
cnx.close()
if result:
llm_tools = result[4].split(',') if result[4] else []
return {"company_id": result[0],
"company_name":result[1],
"company_code":result[2],
"domain":result[3],
"llm_tools":llm_tools
}
else:
logging.error(f"Company not found for name: {company_name}")
raise HTTPException(status_code=404, detail="Company not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/api/companydetails")
async def get_companies():
print(f"Received company_name") # Debug statement
logging.info(f"Received request for company name")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = "SELECT * FROM company_detail"
cursor.execute(query)
result = cursor.fetchall()
logging.info(f"Query result: {result}")
cursor.close()
cnx.close()
companies = []
for row in result:
llm_tools = row[4].split(',') if row[4] else []
logging.info(row[4])
companies.append({
"company_id": row[0],
"company_name": row[1],
"company_code": row[2],
"domain": row[3],
"llm_tools": row[4]
})
if companies:
return companies
else:
logging.error(f"Company not found for name: {result[1]}")
raise HTTPException(status_code=404, detail="Company not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#to view the details
@app.get("/api/getcompanydetails/{company_id}")
async def get_company_details(company_id: int):
company = await get_company_from_db(company_id)
if not company:
raise HTTPException(status_code=404, detail="Company not found")
return company
async def get_company_from_db(company_id: int):
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor(dictionary=True)
query = "SELECT * FROM company_detail WHERE company_id = %s"
cursor.execute(query, (company_id,))
company = cursor.fetchone()
cursor.close()
cnx.close()
return company
except mysql.connector.Error as err:
logging.error(f"Error fetching company: {err}")
raise HTTPException(status_code=500, detail="Failed to fetch company")
# to edit the details
@app.put("/api/putcompanydetails/{company_id}")
async def update_company_details(company_id: int,
company_name: str = Form(...),
company_code: str = Form(...),
domain: str = Form(...),
llm_tools: List[str] = Form(...)):
print(f"Received company_id",company_id) # Debug statement
logging.info(f"Received request for company data")
company_data = {
'company_name': company_name,
'company_code': company_code,
'domain': domain,
'llm_tools': ','.join(llm_tools)
}
updated_company = await update_company_in_db(company_id, company_data)
if not updated_company:
raise HTTPException(status_code=500, detail="Failed to update company")
return updated_company
async def update_company_in_db(company_id: int, company_data: dict):
try:
print(f"Received company_nid inside function",company_id) # Debug statement
logging.info(f"Received request for company name")
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
update_query = """
UPDATE u852023448_redmind.gptcompany_detail cd
SET cd.company_name = %s, cd.company_code = %s, cd.domain = %s, cd.llm_tools = %s
WHERE cd.company_id = %s;
"""
logging.info(f"Executing query: {update_query} with company_id: {company_id}")
params = (company_id,company_data)
logging.info(f"Query parameters: {params}")
print(f"Query parameters: {params}")
cursor.execute(update_query, (
company_data['company_name'],
company_data['company_code'],
company_data['domain'],
company_data['llm_tools'],
company_id
))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
if not success:
return None
return company_data
except mysql.connector.Error as err:
logging.error(f"Error updating company: {err}")
raise HTTPException(status_code=500, detail="Failed to update company")
def delete_company_from_db(company_id: int) -> bool:
print(f"Received company_name: {company_id}") # Debug statement
logging.info(f"Received request for company name: {company_id}")
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
delete_query = "DELETE FROM company_detail WHERE company_id = %s"
cursor.execute(delete_query, (company_id,))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
return success
except mysql.connector.Error as err:
logging.error(f"Error deleting company: {err}")
raise HTTPException(status_code=500, detail="Failed to delete company")
@app.delete("/api/delcompanydetails/{company_id}")
async def delete_company(company_id: int):
deletion_success = delete_company_from_db(company_id)
if not deletion_success:
raise HTTPException(status_code=404, detail="Company not found or failed to delete")
return {"message": "Company deleted successfully"}
@app.get("/knowledgebase")
async def knowledgebase(request: Request):
try:
# Retrieve cookies
role = request.cookies.get("role")
company_id = request.cookies.get("company_id")
# Render the template with the role and company_id
return templates.TemplateResponse("knowledgebase.html", {
"request": request,
"role": role,
"company_id": company_id,
"title":"KnowledgeBase"
})
except Exception as e:
# Handle exceptions
logging.error(f"Error: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#to insert into knowledgebase
@app.post("/upload_document")
async def upload_document(
request: Request,
company_id:str=Form(...),
uploadFile: UploadFile = File(...),
documentName: str = Form(...),
documentDescription: str = Form(...),
department: str = Form(...),
vectorDBflag:str=Form(...),
version: str = Form(...),
lastUpdated: str = Form(...)
):
try:
# Save the uploaded file
upload_folder = "uploads/"
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join(upload_folder, uploadFile.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(uploadFile.file, buffer)
# Save the details to the database
cnx = get_db_connection()
cursor = cnx.cursor()
query = """
INSERT INTO knowledge_base (company_id,file_path, document_name, document_desc, department, version,vectorDBflag, last_updated)
VALUES (%s,%s, %s, %s, %s, %s,%s, %s)
"""
values = (company_id,file_path, documentName, documentDescription, department, version,vectorDBflag, lastUpdated)
cursor.execute(query, values)
cnx.commit()
row_id=cursor.lastrowid
cursor.close()
cnx.close()
logging.info(f"Document {documentName} uploaded successfully")
return JSONResponse(status_code=200, content={"message": "Data saved successfully", "row_id": row_id})
#return RedirectResponse(url="/knowledgebase", status_code=302)
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#to retrieve from knowledgebase
@app.get("/api/document_upload")
async def get_document(company_id: str = Query(...)):
print(f"Received companyId and name: {company_id}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = """
SELECT kb.kid,kb.company_id, kb.file_path, kb.document_name, kb.document_desc,kb.department,kb.version,kb.vectorDBflag,kb.last_updated
FROM u852023448_redmindgpt.knowledge_base kb
JOIN u852023448_redmindgpt.company_detail cd ON kb.company_id = cd.company_id
WHERE kb.company_id = %s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
params = (company_id,)
logging.info(f"Query parameters: {params}")
print(f"Query parameters: {params}")
cursor.execute(query, params) # Pa
result = cursor.fetchall()
logging.info(f"Query result: {result}")
cursor.close
cnx.close()
companies=[]
for row in result:
companies.append({
"row_id":row[0],
"company_id": row[1],
"file_path":row[2],
"document_name": row[3],
"document_desc": row[4],
"department": row[5],
"version": row[6],
"vectorDBflag":row[7],
"last_updated": row[8]
})
if companies:
return companies
else:
logging.warning(f"No document found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data document not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#on update of modal form the data table is refresh the value in datatable
@app.get("/api/document_update")
async def get_document(company_id: str = Query(...)):
print(f"Received companyId and name: {company_id},{company_id}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id},{company_id}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = """
SELECT kb.kid,kb.company_id, kb.file_path, kb.document_name, kb.document_desc,kb.department,kb.version,kb.vectorDBflag,kb.last_updated
FROM u852023448_redmindgpt.knowledge_base kb
JOIN u852023448_redmindgpt.company_detail cd ON kb.company_id = cd.company_id
WHERE kb.company_id = %s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
values= (company_id,)
# logging.info(f"Query parameters: {params}")
print(f"Query parameters: {values}")
cursor.execute(query, values) # Pa
result = cursor.fetchall()
logging.info(f"Query result: {r.esult}")
cursor.close
cnx.close()
companies=[]
for row in result:
companies.append({
"kid":row[0],
"company_id": row[1],
"file_path":row[2],
"document_name": row[3],
"document_desc": row[4],
"department": row[5],
"version": row[6],
"vectorDBflag":row[7],
"last_updated": row[8]
})
if companies:
return companies
else:
logging.warning(f"No document found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data document not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#to get data for view in knowledgebase
@app.get("/api/getknowledgebase/{company_id}")
async def get_company_details(company_id: int):
company = await get_knowledge_from_db(company_id)
if not company:
raise HTTPException(status_code=404, detail="Company not found")
return company
async def get_knowledge_from_db(company_id: int):
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor(dictionary=True)
query = "SELECT * FROM knowledge_base WHERE kid = %s"
cursor.execute(query, (company_id,))
company = cursor.fetchone()
cursor.close()
cnx.close()
if company:
logging.debug(f"Extracted filename")
if company.get('file_path'):
company['file_path'] = os.path.basename(company['file_path'])
logging.debug(f"Extracted filename: {company['file_path']}")
return company
else:
raise HTTPException(status_code=404, detail="Company not found or file not found for the company")
except mysql.connector.Error as err:
logging.error(f"Error fetching company: {err}")
raise HTTPException(status_code=500, detail="Failed to fetch company")
# to edit the knowledgebase details
@app.put("/api/putknowledgebase/{companyId}")
async def update_company_details(
request: Request,
companyId: int,
company_id: str = Form(...),
file_path: UploadFile = File(...),
documentName: str = Form(...),
documentDescription: str = Form(...),
department: str = Form(...),
version: str = Form(...),
vectorDBFlag: str = Form(...),
lastUpdated: str = Form(...)
):
logging.info(f"Received request for company data with ID inside edit/update knowledgebase: {companyId}")
print(f"Received request for company data with ID inside edit/update knowledgebase file name: {file_path.filename}")
# Create the upload folder if it doesn't exist
upload_folder = "uploads/"
os.makedirs(upload_folder, exist_ok=True)
# Construct the file path for saving
saved_file_path = os.path.join(upload_folder, file_path.filename)
try:
# Save the uploaded file to the server
with open(saved_file_path, "wb") as buffer:
shutil.copyfileobj(file_path.file, buffer)
except Exception as e:
logging.error(f"Error saving file: {e}")
raise HTTPException(status_code=500, detail="Failed to save file")
# Prepare the company data dictionary
company_data = {
'kid': companyId,
'company_id': company_id,
'file_path': saved_file_path, # Use the path where the file was saved
'document_name': documentName,
'document_desc': documentDescription,
'department': department,
'version': version,
'vectorDBflag': vectorDBFlag,
'last_updated': lastUpdated
}
# Update the knowledge base in the database
updated_company = await update_knowledge_in_db(companyId, company_data)
if not updated_company:
raise HTTPException(status_code=500, detail="Failed to update company")
return updated_company
async def update_knowledge_in_db(kid: int, company_data: dict):
try:
logging.info(f"Updating knowledge base for ID: {kid}")
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
update_query = """
UPDATE u852023448_redmindgpt.knowledge_base kb
SET kb.company_id = %s, kb.document_name = %s, kb.document_desc = %s,
kb.department = %s, kb.version = %s, kb.vectorDBflag = %s, kb.last_updated = %s
WHERE kb.kid = %s;
"""
logging.info(f"Executing update query: {update_query}")
cursor.execute(update_query, (
company_data['company_id'],
company_data['document_name'],
company_data['document_desc'],
company_data['department'],
company_data['version'],
company_data['vectorDBflag'],
company_data['last_updated'],
kid
))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
if not success:
logging.info("No rows updated")
return None
logging.info("Update successful")
return company_data
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Failed to update company")
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail="Unexpected error occurred")
def delete_knowledge_from_db(company_id: int) -> bool:
print(f"Received knowledge base company_id: {company_id}") # Debug statement
logging.info(f"Received request for knowledgebase company id: {company_id}")
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
delete_query = "DELETE FROM knowledge_base WHERE kid = %s"
cursor.execute(delete_query, (company_id,))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
return success
except mysql.connector.Error as err:
logging.error(f"Error deleting company: {err}")
raise HTTPException(status_code=500, detail="Failed to delete company")
#to perform delete operation in knowlegebase
@app.delete("/api/delknowledgebase/{company_id}")
async def delete_company(company_id: int):
deletion_success = delete_knowledge_from_db(company_id)
if not deletion_success:
raise HTTPException(status_code=404, detail="Company not found or failed to delete")
return {"message": "Company deleted successfully"}
@app.get("/data_connectors")
async def data_connectors(request: Request):
try:
# Retrieve cookies
role = request.cookies.get("role")
company_id = request.cookies.get("company_id")
# Render the template with the role and company_id
return templates.TemplateResponse("data_connectors.html", {
"request": request,
"role": role,
"company_id": company_id,
"title": "Data Connectors"
})
except Exception as e:
# Handle exceptions
logging.error(f"Error: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#to insert into data_connectors
@app.post("/save_data_connectors")
async def save_data_connectors( request: Request,
company_id: int = Form(...),
database: List[str] = Form(...),
server: str = Form(...),
port: str = Form(...),
databaseName:List[str]= Form(...),
username: str=Form(...),
password: str=Form(...),
selectedTables: List[str] = Form(...)):
logging.info(f"Received form submission for database_connectors")
print(f"Received form submission for database_connectors")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
# Check if the company_id already exists in the data_connectors table
check_query = "SELECT COUNT(*) FROM data_connectors WHERE company_id = %s"
cursor.execute(check_query, (company_id,))
exists = cursor.fetchone()[0] > 0
if exists:
# Update the existing record
query = """
UPDATE data_connectors
SET databasetype = %s, serverip = %s, port = %s, database_name = %s, username = %s, password = %s, dbtablename = %s
WHERE company_id = %s
"""
values = (",".join(database), server, port, ",".join(databaseName), username, password or '', ",".join(selectedTables), company_id)
logging.info(f"Executing update query: {query} with values: {values}")
cursor.execute(query, values)
cnx.commit()
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) updated")
else:
# Insert a new record
query = """
INSERT INTO data_connectors(company_id, databasetype, serverip, port, database_name, username, password, dbtablename)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (company_id, ",".join(database), server, port, ",".join(databaseName), username, password or '', ",".join(selectedTables))
logging.info(f"Executing insert query: {query} with values: {values}")
cursor.execute(query, values)
cnx.commit()
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) inserted")
cursor.close()
cnx.close()
# logging.info(f"Data_connectors for {database} processed successfully")
# return JSONResponse(content={"status": "success", "message": "Data saved successfully"}, status_code=200)
response = {
"msg": "Data saved successfully",
"url": "/save_data_connectors", # The URL you want to redirect to
"created": True
}
return JSONResponse(content=response)
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
return JSONResponse(content={"status": "error", "message": "Internal Server Error"}, status_code=500)
except Exception as e:
logging.error(f"Unexpected error: {e}")
return JSONResponse(content={"status": "error", "message": "Unexpected Server Error"}, status_code=500)
@app.get("/api/check_data_connectors")
async def get_data_connectors(company_id: str = Query(...), company_name: str = Query(...)):
print(f"Received companyId and name: {company_id},{company_name}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id},{company_name}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = """
SELECT dc.company_id, dc.databasetype, dc.serverip, dc.port,dc.database_name, dc.username, dc.password ,dc.dbtablename
FROM u852023448_redmindgpt.data_connectors dc
JOIN u852023448_redmindgpt.company_detail cd ON dc.company_id = cd.company_id
WHERE dc.company_id = %s and cd.company_name=%s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
params = (company_id,company_name)
logging.info(f"Query parameters: {params}")
print(f"Query parameters: {params}")
cursor.execute(query, params) # Pa
result = cursor.fetchone()
logging.info(f"Query result: {result}")
cursor.close()
cnx.close()
if result:
databasetype = result[1]
dbtablename = result[7].split(',') if result[7] else []
logging.info(f"Data found for company_id: {company_id}")
return {
"company_id": result[0],
"databasetype":databasetype,
"serverip": result[2],
"port": result[3],
"database_name": result[4],
"username": result[5],
"password": result[6],
"dbtablename": dbtablename
}
else:
logging.warning(f"No data found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data connector not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/API_connectors")
async def API_connectors(request: Request):
try:
# Retrieve cookies
role = request.cookies.get("role")
company_id = request.cookies.get("company_id")
# Render the template with the role and company_id
return templates.TemplateResponse("API_connectors.html", {
"request": request,
"role": role,
"company_id": company_id,
"title":"API Connectors"
})
except Exception as e:
# Handle exceptions
logging.error(f"Error: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#save api connectors
@app.post("/api/save_api_details")
async def API_saveconnectors(request: Request,
company_id:int=Form(...),
APIName:str=Form(...),
APIEndpoint:str=Form(...),
Auth_Bearer:str=Form(...),
Inputjson:str=Form(...),
OutputJson:str=Form(...),
Description:str=Form(...)):
logging.info(f"Received form submission for database_connectors")
try:
cnx =get_db_connection()
cursor = cnx.cursor()
#databasetype_json=json.dumps(database)
query = "INSERT INTO api_connectors(company_id,api_name, api_endpoint, auth_token, input_param,output_json,description) VALUES (%s,%s, %s, %s, %s,%s,%s)"
values = (company_id, APIName, APIEndpoint, Auth_Bearer, Inputjson,OutputJson,Description)
logging.info(f"Executing query: {query} with values: {values}")
cursor.execute(query, values)
cnx.commit()
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected")
row_id = cursor.lastrowid
cursor.close()
cnx.close()
logging.info(f"Data_connectors for {APIName} inserted successfully")
return JSONResponse(status_code=200, content={"message": "Data saved successfully", "row_id": row_id})
#return RedirectResponse(url="/data_connectors", status_code=302)
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
# retrieve api connectors
@app.get("/api/get_api_connectors")
async def get_api_connectors(company_id: str = Query(...)):
print(f"Received companyId and name: {company_id}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id}")
try:
cnx =get_db_connection()
cursor = cnx.cursor()
query = """
SELECT ac.id, ac.company_id, ac.api_name, ac.api_endpoint,ac.auth_token,ac.input_param, ac.output_json, ac.description
FROM u852023448_redmindgpt.api_connectors ac
JOIN u852023448_redmindgpt.company_detail cd ON ac.company_id = cd.company_id
WHERE ac.company_id = %s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
params = (company_id,)
logging.info(f"Query parameters: {params}")
print(f"Query parameters: {params}")
cursor.execute(query, params) # Pa
result = cursor.fetchall()
logging.info(f"Query result: {result}")
cursor.close()
cnx.close()
companies=[]
for row in result:
companies.append({
"row_id":row[0],
"company_id": row[1],
"APIName":row[2],
"APIEndpoint": row[3]
# "Auth_Bearer": result[3],
# "Inputjson": result[4],
#"OutputJson": result[5],
#"description": result[6]
})
if companies:
return companies
else:
logging.warning(f"No data found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data connector not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#to view the table details in modal
@app.get("/api/viewapiconnectors/{company_id}")
async def get_company_details(company_id: int):
company = await get_api_from_db(company_id)
if not company:
raise HTTPException(status_code=404, detail="Company not found")
return company
async def get_api_from_db(company_id: int):
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor(dictionary=True)
query = "SELECT * FROM api_connectors WHERE id = %s"
cursor.execute(query, (company_id,))
company = cursor.fetchone()
cursor.close()
cnx.close()
if company:
logging.info(f"api details:{company}")
return company
else:
raise HTTPException(status_code=404, detail="Company not found or file not found for the company")
except mysql.connector.Error as err:
logging.error(f"Error fetching company: {err}")
raise HTTPException(status_code=500, detail="Failed to fetch company")
#to edit the api details in modal form
@app.put("/api/editapiconnectors/{companyId}")
async def update_company_details(
request: Request,
companyId: int,
company_id:str=Form(...),
APIName:str=Form(...),
APIEndpoint:str=Form(...),
Auth_Bearer:str=Form(...),
Inputjson:str=Form(...),
OutputJson:str=Form(...),
Description:str=Form(...)):
logging.info(f"Received form submission for database_connectors")
logging.info(f"Received request for company data with ID inside edit/update knowledgebase: {companyId}")
# Prepare the company data dictionary
company_data = {
'kid': companyId,
'company_id': company_id,
'api_name': APIName,
'api_endpoint': APIEndpoint,
'auth_token': Auth_Bearer,
'input_param': Inputjson,
'output_json': OutputJson,
'description': Description
}
# Update the knowledge base in the database
updated_company = await update_api_in_db(companyId, company_data)
if not updated_company:
raise HTTPException(status_code=500, detail="Failed to update company")
return updated_company
async def update_api_in_db(id: int, company_data: dict):
try:
logging.info(f"Updating api for ID: {id}")
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
update_query = """
UPDATE u852023448_redmindgpt.api_connectors ac
SET ac.company_id = %s, ac.api_name = %s, ac.api_endpoint= %s,
ac.auth_token= %s, ac.input_param= %s,ac.output_json = %s, ac.description= %s
WHERE ac.id = %s;
"""
logging.info(f"Executing update query: {update_query}")
cursor.execute(update_query, (
company_data['company_id'],
company_data['api_name'],
company_data['api_endpoint'],
company_data['auth_token'],
company_data['input_param'],
company_data['output_json'],
company_data['description'],
id
))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
if not success:
logging.info("No rows updated")
return None
logging.info("Update successful")
return company_data
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Failed to update company")
except Exception as e:
logging.error(f"Unexpected error: {e}")
raise HTTPException(status_code=500, detail="Unexpected error occurred")
#on update of modal form the data table is refreshed to dispalyupdated value in datatable
@app.get("/api/api_updatetable")
async def get_document(company_id: str = Query(...)):
print(f"Received companyId and name for api datatable update: {company_id},{company_id}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id},{company_id}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query=""" SELECT ac.id,ac.company_id, ac.api_name, ac.api_endpoint,ac.auth_token,ac.input_param, ac.output_json, ac.description
FROM u852023448_redmindgpt.api_connectors ac
JOIN u852023448_redmindgpt.company_detail cd ON ac.company_id = cd.company_id
WHERE ac.company_id = %s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
values= (company_id,)
# logging.info(f"Query parameters: {params}")
print(f"Query parameters: {values}")
cursor.execute(query, values) # Pa
result = cursor.fetchall()
logging.info(f"Query result for update table: {result}")
cursor.close
cnx.close()
companies=[]
for row in result:
companies.append({
"row_id":row[0],
"company_id": row[1],
"api_name":row[2],
"api_endpoint": row[3],
# "Auth_Bearer": row[4],
# "Inputjson": row[5],
# "OutputJson": row[6],
# "description": row[7]
})
if companies:
return companies
else:
logging.warning(f"No document found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data document not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#to delete api details from db
@app.delete("/api/deleteapi/{company_id}")
async def delete_company(company_id: int):
deletion_success = delete_api_from_db(company_id)
if not deletion_success:
raise HTTPException(status_code=404, detail="Company not found or failed to delete")
return {"message": "Company deleted successfully"}
def delete_api_from_db(company_id: int) -> bool:
print(f"Received api for company_id: {company_id}") # Debug statement
logging.info(f"Received request for api for company id: {company_id}")
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
delete_query = "DELETE FROM api_connectors WHERE id = %s"
cursor.execute(delete_query, (company_id,))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
return success
except mysql.connector.Error as err:
logging.error(f"Error deleting company: {err}")
raise HTTPException(status_code=500, detail="Failed to delete company")
@app.get("/prompt_template")
async def prompt_template(request: Request):
try:
# Retrieve cookies
role = request.cookies.get("role")
company_id = request.cookies.get("company_id")
# Render the template with the role and company_id
return templates.TemplateResponse("prompt_template.html", {
"request": request,
"role": role,
"company_id": company_id,
"title":"Prompt Templates"
})
except Exception as e:
# Handle exceptions
logging.error(f"Error: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
# to insert into prompt templates
@app.post("/api/save_prompt_details")
async def prompt_saveconnectors(request: Request,
company_id:int=Form(...),
scenario:str=Form(...),
sampleprompt:str=Form(...),
comments:str=Form(...),
):
logging.info(f"Received form submission for database_connectors")
try:
cnx =get_db_connection()
cursor = cnx.cursor()
#databasetype_json=json.dumps(database)
query = "INSERT INTO prompt_templates(company_id,scenario, prompts, comments) VALUES (%s,%s, %s, %s)"
values = (company_id, scenario, sampleprompt, comments)
logging.info(f"Executing query: {query} with values: {values}")
cursor.execute(query, values)
cnx.commit()
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected")
row_id = cursor.lastrowid # Get the last inserted row_id
cursor.close()
cnx.close()
logging.info(f"Data_connectors for {scenario} inserted successfully")
return JSONResponse(status_code=200, content={"message": "Data saved successfully", "row_id": row_id})
#return RedirectResponse(url="/prompt_template", status_code=302)
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
# retrieve api connectors
@app.get("/api/get_prompt_templates")
async def get_prompt_connectors(company_id: str = Query(...)):
print(f"Received companyId and name: {company_id}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id}")
try:
cnx =get_db_connection()
cursor = cnx.cursor()
query = """
SELECT pt.id,pt.company_id,pt.scenario,pt.prompts,pt.comments
FROM u852023448_redmindgpt.prompt_templates pt
JOIN u852023448_redmindgpt.company_detail cd ON pt.company_id = cd.company_id
WHERE pt.company_id = %s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
params = (company_id,)
logging.info(f"Query parameters: {params}")
print(f"Query parameters: {params}")
cursor.execute(query, params) # Pa
result = cursor.fetchall()
logging.info(f"Query result: {result}")
cursor.close()
cnx.close()
companies=[]
for row in result:
companies.append({
"row_id":row[0],
"company_id": row[1],
"scenario":row[2],
"prompt": row[3]
# "Auth_Bearer": result[3],
# "Inputjson": result[4],
#"OutputJson": result[5],
#"description": result[6]
})
if companies:
return companies
else:
logging.warning(f"No data found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data connector not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
def delete_prompt_template_from_db(row_id: int) -> bool:
logging.info(f"Received request for prompt_template company id: {row_id}")
logging.info(f"Received request for prompt_template row id: {row_id}")
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
delete_query = "DELETE FROM prompt_templates WHERE id = %s"
logging.info(f"sql delete query for prompt template ===> {delete_query}")
cursor.execute(delete_query, (row_id,))
cnx.commit()
success = cursor.rowcount > 0
logging.info (f"deleted succesfully ! ===> {success}")
cursor.close()
cnx.close()
return success
except mysql.connector.Error as err:
print('python')
logging.error(f"Error deleting company: {err}")
raise HTTPException(status_code=500, detail="Failed to delete company")
@app.delete("/api/prompt_template_for_del/{row_id}")
async def delete_company(row_id: int):
deletion_success = delete_prompt_template_from_db(row_id)
logging.info(f"company row_id +++> {row_id}")
if not deletion_success:
raise HTTPException(status_code=404, detail="Company not found or failed to delete")
return {"message": "Company deleted successfully"}
# promt_template view function ! ............
#to get data for view in promt_templae by id
@app.get("/api/getpromttemplate/{company_id}")
async def get_promt_company_details(company_id: int):
company = await get_promt_from_db(company_id)
if not company:
raise HTTPException(status_code=404, detail="Company not found")
return company
async def get_promt_from_db(company_id: int):
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor(dictionary=True)
query = "SELECT * FROM prompt_templates WHERE id = %s"
logging.info(f"row_id in db addresss ========> {company_id}")
cursor.execute(query, (company_id,))
company = cursor.fetchone()
cursor.close()
cnx.close()
if company:
logging.info(f"row_id in db addresss ========> {company}")
return company
else:
raise HTTPException(status_code=404, detail="Company not found or file not found for the company")
except mysql.connector.Error as err:
logging.error(f"Error fetching company: {err}")
raise HTTPException(status_code=500, detail="Failed to fetch company")
# Function to update company details
@app.put("/api/putprompttemplates/{kid}")
async def update_company_details(
kid: int,
scenario: str = Form(...),
prompt: str = Form(...),
comments: str = Form(...)
):
logging.info(f"Received request for company data with ID: {kid}")
company_data = {
'scenario': scenario,
'prompts': prompt,
'comments': comments,
}
updated_company = await update_prompt_in_db(kid, company_data)
if not updated_company:
raise HTTPException(status_code=500, detail="Failed to update company")
return updated_company
# Database query function to update company data
async def update_prompt_in_db(kid: int, company_data: dict):
try:
logging.info(f"Updating prompt for ID: {kid}")
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
update_query = """
UPDATE u852023448_redmindgpt.prompt_templates pt
SET pt.scenario=%s, pt.prompts=%s, pt.comments=%s
WHERE pt.id = %s;
"""
logging.info(f"row_id in prompt db address ========> {kid}")
logging.info(f"SQL update query for company ===> {update_query}")
cursor.execute(update_query, (
company_data['scenario'],
company_data['prompts'],
company_data['comments'],
kid
))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
if not success:
return None
return company_data
except mysql.connector.Error as err:
logging.error(f"Error updating company: {err}")
raise HTTPException(status_code=500, detail="Failed to update company")
# to refresh prompt data table
@app.get("/api/prompt_update")
async def get_document(company_id: str = Query(...)):
print(f"Received companyId and name: {company_id},{company_id}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id},{company_id}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = """
SELECT pt.id,pt.company_id,pt.scenario,pt.prompts,pt.comments
FROM u852023448_redmindgpt.prompt_templates pt
JOIN u852023448_redmindgpt.company_detail cd ON pt.company_id = cd.company_id
WHERE pt.company_id = %s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
values= (company_id,)
# logging.info(f"Query parameters: {params}")
print(f"Query parameters: {values}")
cursor.execute(query, values) # Pa
result = cursor.fetchall()
logging.info(f"Query result: {result}")
cursor.close
cnx.close()
companies=[]
for row in result:
companies.append({
'id':row[0],
"company_id": row[1],
"scenario":row[2],
"prompts": row[3]
# "Auth_Bearer": result[3],
# "Inputjson": result[4],
#"OutputJson": result[5],
#"description": result[6]
})
if companies:
logging.info(f"the primary key id is {companies}")
return companies
else:
logging.warning(f"No document found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data document not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/chatbot")
async def chatbot(request: Request):
return templates.TemplateResponse("chatbot.html", {"request": request,"title":"Chatbot"})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)