Redmind_GPT_API / app.py
Redmind's picture
Update app.py
54a3f35 verified
raw
history blame
37.3 kB
import json
import os
import logging
import shutil
import asyncpg
from fastapi import FastAPI, File, Query,Form, Request, HTTPException, UploadFile
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from dotenv import load_dotenv
import mysql.connector
from typing import List
from pydantic import BaseModel
import psycopg2
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("redmindgen.log"),
logging.StreamHandler() # This ensures logging to console
]
)
logging.info("Application startup")
# Create the FastAPI app
app = FastAPI(title="RedmindGen", description="Chat with your Data", version="1.0.0")
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
# Jinja2 templates
templates = Jinja2Templates(directory="templates")
# Configure CORS
origins = [
"http://localhost:8000",
"http://127.0.0.1:8000",
"http://167.71.75.10:8003/"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
DB_USER = 'u852023448_redmindgpt'
DB_PASSWORD = 'redmindGpt@123'
DB_HOST = '217.21.88.10'
DB_NAME = 'u852023448_redmindgpt'
from pydantic import BaseModel
class DatabaseConnection(BaseModel):
database_type: str
server: str
port: str
databaseName: str
username: str
password: str
@app.post("/api/connect")
async def connect_to_database(connection: DatabaseConnection):
try:
if connection.database_type == "postgres":
conn = psycopg2.connect(
host=connection.server,
port=connection.port,
database=connection.databaseName,
user=connection.username,
password=connection.password
)
query_schemas = "SELECT schema_name FROM information_schema.schemata"
query_tables = "SELECT table_name FROM information_schema.tables WHERE table_schema = %s"
print(query_tables)
elif connection.database_type == "mysql":
print(f"inside mysql",connection.server,connection.port,connection.databaseName,connection.username,connection.password)
conn = mysql.connector.connect(
host=connection.server,
port=connection.port,
database=connection.databaseName,
user=connection.username,
password=connection.password
)
query_schemas = "SELECT schema_name FROM information_schema.schemata"
query_tables = "SELECT table_name FROM information_schema.tables WHERE table_schema = %s"
else:
raise HTTPException(status_code=400, detail="Unsupported database type")
cursor = conn.cursor()
# Fetch all schemas
cursor.execute(query_schemas)
schemas = cursor.fetchall()
# Fetch all tables within each schema
schema_tables = {}
for schema in schemas:
cursor.execute(query_tables, (schema[0],))
tables = cursor.fetchall()
schema_tables[schema[0]] = [table[0] for table in tables]
cursor.close()
conn.close()
return {"schemas": [schema[0] for schema in schemas], "schema_tables": schema_tables, "success": True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Function to create a new database connection for MySQL (Example)
def get_db_connection():
try:
cnx = mysql.connector.connect(user=DB_USER, password=DB_PASSWORD, host=DB_HOST, database=DB_NAME)
return cnx
except mysql.connector.Error as err:
logging.error(f"Database connection error: {err}")
return None
@app.get("/")
async def read_root(request: Request):
return templates.TemplateResponse("index.html", {"request": request})
def verify_user(username: str, password: str):
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = "SELECT role FROM user_detail WHERE username = %s AND password = %s"
values = (username, password)
cursor.execute(query, values)
result = cursor.fetchone()
cursor.close()
cnx.close()
if result is not None:
logging.info(f"User {username} logged in successfully")
return "success",result[0]
else:
logging.info(f"User {username} login failed")
return "failure"
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
return "failure"
@app.post("/validate-user")
async def validate_user(request: Request, username: str = Form(...), password: str = Form(...)):
status,role = verify_user(username, password)
if status == 'success':
logging.info(f"user role {role}is rerturned")
return templates.TemplateResponse("dashboard.html", {"request": request, "username": username,"role": role})
else:
return templates.TemplateResponse("index.html", {"request": request})
@app.post("/submit_company_profile")
async def submit_company_profile(request: Request,
company_name: str = Form(...),
company_code: str = Form(...),
domain: str = Form(...),
llm_tools: List[str] = Form(...)):
logging.info("Received form submission for company profile")
logging.info(f"Form data - company_name: {company_name}, company_code: {company_code}, domain: {domain}, llm_tools: {llm_tools}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = "INSERT INTO company_detail (company_name, company_code, domain, llm_tools) VALUES (%s, %s, %s, %s)"
values = (company_name, company_code, domain, ",".join(llm_tools))
logging.info(f"Executing query: {query} with values: {values}")
cursor.execute(query, values)
cnx.commit()
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected")
cursor.close()
cnx.close()
logging.info(f"Company profile for {company_name} inserted successfully")
RedirectResponse(url="/company_profile?message=Data saved successfully", status_code=302)
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/api/companies")
async def get_companies():
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = "SELECT company_name FROM company_detail "
cursor.execute(query)
companies = cursor.fetchall()
cursor.close()
cnx.close()
return {"companies": [{"name": company[0]} for company in companies]}
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/dashboard")
async def dashboard(request: Request):
return templates.TemplateResponse("dashboard.html", {"request": request,"title":"Dashboard"})
@app.get("/company_profile")
async def company_profile(request: Request):
return templates.TemplateResponse("company_profile.html", {"request": request,"title":"Company Profile"})
@app.get("/api/company_id")
async def get_company_id(company_name: str):
print(f"Received company_name: {company_name}") # Debug statement
logging.info(f"Received request for company name: {company_name}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = "SELECT * FROM company_detail WHERE company_name = %s"
cursor.execute(query, (company_name,))
result = cursor.fetchone()
cursor.close()
cnx.close()
if result:
llm_tools = result[4].split(',') if result[4] else []
return {"company_id": result[0],
"company_name":result[1],
"company_code":result[2],
"domain":result[3],
"llm_tools":llm_tools
}
else:
logging.error(f"Company not found for name: {company_name}")
raise HTTPException(status_code=404, detail="Company not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/api/companydetails")
async def get_companies():
print(f"Received company_name") # Debug statement
logging.info(f"Received request for company name")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = "SELECT * FROM company_detail"
cursor.execute(query)
result = cursor.fetchall()
logging.info(f"Query result: {result}")
cursor.close()
cnx.close()
companies = []
for row in result:
llm_tools = row[4].split(',') if row[4] else []
logging.info(row[4])
companies.append({
"company_id": row[0],
"company_name": row[1],
"company_code": row[2],
"domain": row[3],
"llm_tools": row[4]
})
if companies:
return companies
else:
logging.error(f"Company not found for name: {result[1]}")
raise HTTPException(status_code=404, detail="Company not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#to view the details
@app.get("/api/getcompanydetails/{company_id}")
async def get_company_details(company_id: int):
company = await get_company_from_db(company_id)
if not company:
raise HTTPException(status_code=404, detail="Company not found")
return company
async def get_company_from_db(company_id: int):
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor(dictionary=True)
query = "SELECT * FROM company_detail WHERE company_id = %s"
cursor.execute(query, (company_id,))
company = cursor.fetchone()
cursor.close()
cnx.close()
return company
except mysql.connector.Error as err:
logging.error(f"Error fetching company: {err}")
raise HTTPException(status_code=500, detail="Failed to fetch company")
# to edit the details
@app.put("/api/putcompanydetails/{company_id}")
async def update_company_details(company_id: int,
company_name: str = Form(...),
company_code: str = Form(...),
domain: str = Form(...),
llm_tools: List[str] = Form(...)):
print(f"Received company_id",company_id) # Debug statement
logging.info(f"Received request for company data")
company_data = {
'company_name': company_name,
'company_code': company_code,
'domain': domain,
'llm_tools': ','.join(llm_tools)
}
updated_company = await update_company_in_db(company_id, company_data)
if not updated_company:
raise HTTPException(status_code=500, detail="Failed to update company")
return updated_company
async def update_company_in_db(company_id: int, company_data: dict):
try:
print(f"Received company_nid inside function",company_id) # Debug statement
logging.info(f"Received request for company name")
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
update_query = """
UPDATE u852023448_redmindgptcompany_detail cd
SET cd.company_name = %s, cd.company_code = %s, cd.domain = %s, cd.llm_tools = %s
WHERE cd.company_id = %s;
"""
logging.info(f"Executing query: {update_query} with company_id: {company_id}")
params = (company_id,company_data)
logging.info(f"Query parameters: {params}")
print(f"Query parameters: {params}")
cursor.execute(update_query, (
company_data['company_name'],
company_data['company_code'],
company_data['domain'],
company_data['llm_tools'],
company_id
))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
if not success:
return None
return company_data
except mysql.connector.Error as err:
logging.error(f"Error updating company: {err}")
raise HTTPException(status_code=500, detail="Failed to update company")
def delete_company_from_db(company_id: int) -> bool:
print(f"Received company_name: {company_id}") # Debug statement
logging.info(f"Received request for company name: {company_id}")
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
delete_query = "DELETE FROM company_detail WHERE company_id = %s"
cursor.execute(delete_query, (company_id,))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
return success
except mysql.connector.Error as err:
logging.error(f"Error deleting company: {err}")
raise HTTPException(status_code=500, detail="Failed to delete company")
@app.delete("/api/delcompanydetails/{company_id}")
async def delete_company(company_id: int):
deletion_success = delete_company_from_db(company_id)
if not deletion_success:
raise HTTPException(status_code=404, detail="Company not found or failed to delete")
return {"message": "Company deleted successfully"}
@app.get("/knowledgebase")
async def knowledgebase(request: Request):
return templates.TemplateResponse("knowledgebase.html", {"request": request,"title":"KnowledgeBase"})
#to insert into knowledgebase
@app.post("/upload_document")
async def upload_document(
request: Request,
company_id:str=Form(...),
uploadFile: UploadFile = File(...),
documentName: str = Form(...),
documentDescription: str = Form(...),
department: str = Form(...),
vectorDBflag:str=Form(...),
version: str = Form(...),
lastUpdated: str = Form(...)
):
try:
# Save the uploaded file
upload_folder = "uploads/"
os.makedirs(upload_folder, exist_ok=True)
file_path = os.path.join(upload_folder, uploadFile.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(uploadFile.file, buffer)
# Save the details to the database
cnx = get_db_connection()
cursor = cnx.cursor()
query = """
INSERT INTO knowledge_base (company_id,file_path, document_name, document_desc, department, version,vectorDBflag, last_updated)
VALUES (%s,%s, %s, %s, %s, %s,%s, %s)
"""
values = (company_id,file_path, documentName, documentDescription, department, version,vectorDBflag, lastUpdated)
cursor.execute(query, values)
cnx.commit()
cursor.close()
cnx.close()
logging.info(f"Document {documentName} uploaded successfully")
return RedirectResponse(url="/knowledgebase", status_code=302)
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#to retrieve from knowledgebase
@app.get("/api/document_upload")
async def get_document(company_id: str = Query(...), company_name: str = Query(...)):
print(f"Received companyId and name: {company_id},{company_name}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id},{company_name}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = """
SELECT kb.kid,kb.company_id, kb.file_path, kb.document_name, kb.document_desc,kb.department,kb.version,kb.vectorDBflag,kb.last_updated
FROM u852023448_redmindgptknowledge_base kb
JOIN u852023448_redmindgptcompany_detail cd ON kb.company_id = cd.company_id
WHERE kb.company_id = %s and cd.company_name=%s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
params = (company_id,company_name)
logging.info(f"Query parameters: {params}")
print(f"Query parameters: {params}")
cursor.execute(query, params) # Pa
result = cursor.fetchall()
logging.info(f"Query result: {result}")
cursor.close
cnx.close()
companies=[]
for row in result:
companies.append({
"row_id":row[0],
"company_id": row[1],
#"file_path":row[2],
"document_name": row[3],
"document_desc": row[4],
"department": row[5],
"version": row[6],
"vectorDBflag":row[7],
"last_updated": row[8]
})
if companies:
return companies
else:
logging.warning(f"No document found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data document not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
#to get data for view in knowledgebase
@app.get("/api/getknowledgebase/{company_id}")
async def get_company_details(company_id: int):
company = await get_knowledge_from_db(company_id)
if not company:
raise HTTPException(status_code=404, detail="Company not found")
return company
async def get_knowledge_from_db(company_id: int):
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor(dictionary=True)
query = "SELECT * FROM knowledge_base WHERE kid = %s"
cursor.execute(query, (company_id,))
company = cursor.fetchone()
cursor.close()
cnx.close()
if company:
logging.debug(f"Extracted filename")
if company.get('file_path'):
company['file_path'] = os.path.basename(company['file_path'])
logging.debug(f"Extracted filename: {company['file_path']}")
return company
else:
raise HTTPException(status_code=404, detail="Company not found or file not found for the company")
except mysql.connector.Error as err:
logging.error(f"Error fetching company: {err}")
raise HTTPException(status_code=500, detail="Failed to fetch company")
# to edit the knowledgebase details
@app.put("/api/putknowledgebase/{kid}")
async def update_company_details(
kid: int,
company_id: str = Form(...),
# file_name: str = Form(...),
documentName: str = Form(...),
documentDescription: str = Form(...),
department: str = Form(...),
version: str = Form(...),
vectorDBFlag: str = Form(...),
lastUpdated: str = Form(...)):
logging.info(f"Received request for company data with ID: {kid}")
# upload_folder = "uploads/"
# os.makedirs(upload_folder, exist_ok=True)
# file_path = os.path.join(upload_folder, uploadFile.filename)
# with open(file_path, "wb") as buffer:
# shutil.copyfileobj(uploadFile.file, buffer)
company_data = {
'company_id': company_id,
#'file_path': file_name,
'document_name': documentName,
'document_desc': documentDescription,
'department': department,
'version': version,
'vectorDBflag': vectorDBFlag,
'last_updated': lastUpdated
}
updated_company = await update_knowledge_in_db(kid, company_data)
if not updated_company:
raise HTTPException(status_code=500, detail="Failed to update company")
return updated_company
async def update_knowledge_in_db(kid: int, company_data: dict):
try:
logging.info(f"Updating knowledge base for ID: {kid}")
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
update_query = """
UPDATE u852023448_redmindgptknowledge_base kb
SET kb.company_id,kb.document_name=%s, kb.document_desc=%s, kb.department=%s, kb.version=%s, kb.vectorDBflag=%s, kb.last_updated=%s
WHERE kb.kid = %s;
"""
cursor.execute(update_query, (
company_data['company_id'],
# company_data['file_path'],
company_data['document_name'],
company_data['document_desc'],
company_data['department'],
company_data['version'],
company_data['vectorDBflag'],
company_data['last_updated'],
kid
))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
if not success:
return None
return company_data
except mysql.connector.Error as err:
logging.error(f"Error updating company: {err}")
raise HTTPException(status_code=500, detail="Failed to update company")
def delete_knowledge_from_db(company_id: int) -> bool:
print(f"Received knowledge base company_id: {company_id}") # Debug statement
logging.info(f"Received request for knowledgebase company id: {company_id}")
try:
# Establish a connection to the database
cnx = get_db_connection()
if cnx is None:
raise HTTPException(status_code=500, detail="Failed to connect to the database")
cursor = cnx.cursor()
delete_query = "DELETE FROM knowledge_base WHERE kid = %s"
cursor.execute(delete_query, (company_id,))
cnx.commit()
success = cursor.rowcount > 0
cursor.close()
cnx.close()
return success
except mysql.connector.Error as err:
logging.error(f"Error deleting company: {err}")
raise HTTPException(status_code=500, detail="Failed to delete company")
@app.delete("/api/delknowledgebase/{company_id}")
async def delete_company(company_id: int):
deletion_success = delete_knowledge_from_db(company_id)
if not deletion_success:
raise HTTPException(status_code=404, detail="Company not found or failed to delete")
return {"message": "Company deleted successfully"}
@app.get("/data_connectors")
async def data_connectors(request: Request):
return templates.TemplateResponse("data_connectors.html", {"request": request, "title": "Data Connectors"})
#to insert into data_connectors
@app.post("/save_data_connectors")
async def save_data_connectors( request: Request,
company_id: int = Form(...),
database: List[str] = Form(...),
server: str = Form(...),
port: str = Form(...),
databaseName:List[str]= Form(...),
username: str=Form(...),
password: str=Form(...),
selectedTables: List[str] = Form(...)):
logging.info(f"Received form submission for database_connectors")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
# Check if the company_id already exists in the data_connectors table
check_query = "SELECT COUNT(*) FROM data_connectors WHERE company_id = %s"
cursor.execute(check_query, (company_id,))
exists = cursor.fetchone()[0] > 0
if exists:
# Update the existing record
query = """
UPDATE data_connectors
SET databasetype = %s, serverip = %s, port = %s, database_name = %s, username = %s, password = %s, dbtablename = %s
WHERE company_id = %s
"""
values = (",".join(database), server, port, ",".join(databaseName), username, password, ",".join(selectedTables), company_id)
logging.info(f"Executing update query: {query} with values: {values}")
cursor.execute(query, values)
cnx.commit()
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) updated")
else:
# Insert a new record
query = """
INSERT INTO data_connectors(company_id, databasetype, serverip, port, database_name, username, password, dbtablename)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
values = (company_id, ",".join(database), server, port, ",".join(databaseName), username, password, ",".join(selectedTables))
logging.info(f"Executing insert query: {query} with values: {values}")
cursor.execute(query, values)
cnx.commit()
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) inserted")
cursor.close()
cnx.close()
logging.info(f"Data_connectors for {database} processed successfully")
return RedirectResponse(url="/data_connectors", status_code=302)
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
# #databasetype_json=json.dumps(database)
# query = "INSERT INTO data_connectors(company_id,databasetype, serverip, port, database_name,username,password,dbtablename) VALUES (%s,%s, %s, %s, %s,%s,%s,%s)"
# values = (company_id, ",".join(database), server, port, ",".join(databaseName),username,password, ",".join(selectedTables))
# logging.info(f"Executing query: {query} with values: {values}")
# cursor.execute(query, values)
# cnx.commit()
# logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected")
# cursor.close()
# cnx.close()
# logging.info(f"Data_connectors for {database} inserted successfully")
# return RedirectResponse(url="/data_connectors", status_code=302)
# except mysql.connector.Error as err:
# logging.error(f"Database error: {err}")
# raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/api/check_data_connectors")
async def get_data_connectors(company_id: str = Query(...), company_name: str = Query(...)):
print(f"Received companyId and name: {company_id},{company_name}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id},{company_name}")
try:
cnx = get_db_connection()
cursor = cnx.cursor()
query = """
SELECT dc.company_id, dc.databasetype, dc.serverip, dc.port,dc.database_name, dc.username, dc.password ,dc.dbtablename
FROM u852023448_redmindgpt.data_connectors dc
JOIN u852023448_redmindgpt.company_detail cd ON dc.company_id = cd.company_id
WHERE dc.company_id = %s and cd.company_name=%s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
params = (company_id,company_name)
logging.info(f"Query parameters: {params}")
print(f"Query parameters: {params}")
cursor.execute(query, params) # Pa
result = cursor.fetchone()
logging.info(f"Query result: {result}")
cursor.close()
cnx.close()
if result:
databasetype = result[1]
dbtablename = result[7].split(',') if result[7] else []
logging.info(f"Data found for company_id: {company_id}")
return {
"company_id": result[0],
"databasetype":databasetype,
"serverip": result[2],
"port": result[3],
"database_name": result[4],
"username": result[5],
"password": result[6],
"dbtablename": dbtablename
}
else:
logging.warning(f"No data found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data connector not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/API_connectors")
async def API_connectors(request: Request):
return templates.TemplateResponse("API_connectors.html", {"request": request,"title":"API Connectors"})
#save api connectors
@app.post("/api/save_api_details")
async def API_saveconnectors(request: Request,
company_id:int=Form(...),
APIName:str=Form(...),
APIEndpoint:str=Form(...),
Auth_Bearer:str=Form(...),
Inputjson:str=Form(...),
OutputJson:str=Form(...),
Description:str=Form(...)):
logging.info(f"Received form submission for database_connectors")
try:
cnx =get_db_connection()
cursor = cnx.cursor()
#databasetype_json=json.dumps(database)
query = "INSERT INTO api_connectors(company_id,api_name, api_endpoint, auth_token, input_param,output_json,description) VALUES (%s,%s, %s, %s, %s,%s,%s)"
values = (company_id, APIName, APIEndpoint, Auth_Bearer, Inputjson,OutputJson,Description)
logging.info(f"Executing query: {query} with values: {values}")
cursor.execute(query, values)
cnx.commit()
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected")
cursor.close()
cnx.close()
logging.info(f"Data_connectors for {APIName} inserted successfully")
return RedirectResponse(url="/data_connectors", status_code=302)
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
# retrieve api connectors
@app.get("/api/get_api_connectors")
async def get_api_connectors(company_id: str = Query(...), company_name: str = Query(...)):
print(f"Received companyId and name: {company_id},{company_name}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id},{company_name}")
try:
cnx =get_db_connection()
cursor = cnx.cursor()
query = """
SELECT ac.company_id, ac.api_name, ac.api_endpoint,ac.auth_token,ac.input_param, ac.output_json, ac.description
FROM u852023448_redmindgpt.api_connectors ac
JOIN u852023448_redmindgpt.company_detail cd ON ac.company_id = cd.company_id
WHERE ac.company_id = %s and cd.company_name=%s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
params = (company_id,company_name)
logging.info(f"Query parameters: {params}")
print(f"Query parameters: {params}")
cursor.execute(query, params) # Pa
result = cursor.fetchall()
logging.info(f"Query result: {result}")
cursor.close()
cnx.close()
companies=[]
for row in result:
companies.append({
"company_id": row[0],
"APIName":row[1],
"APIEndpoint": row[2]
# "Auth_Bearer": result[3],
# "Inputjson": result[4],
#"OutputJson": result[5],
#"description": result[6]
})
if companies:
return companies
else:
logging.warning(f"No data found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data connector not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/prompt_template")
async def prompt_template(request: Request):
return templates.TemplateResponse("prompt_template.html", {"request": request,"title":"Prompt Templates"})
# to insert into prompt templates
@app.post("/api/save_prompt_details")
async def prompt_saveconnectors(request: Request,
company_id:int=Form(...),
scenario:str=Form(...),
sampleprompt:str=Form(...),
comments:str=Form(...),
):
logging.info(f"Received form submission for database_connectors")
try:
cnx =get_db_connection()
cursor = cnx.cursor()
#databasetype_json=json.dumps(database)
query = "INSERT INTO prompt_templates(company_id,scenario, prompts, comments) VALUES (%s,%s, %s, %s)"
values = (company_id, scenario, sampleprompt, comments)
logging.info(f"Executing query: {query} with values: {values}")
cursor.execute(query, values)
cnx.commit()
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected")
cursor.close()
cnx.close()
logging.info(f"Data_connectors for {scenario} inserted successfully")
return RedirectResponse(url="/prompt_template", status_code=302)
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
# retrieve api connectors
@app.get("/api/get_prompt_templates")
async def get_prompt_connectors(company_id: str = Query(...), company_name: str = Query(...)):
print(f"Received companyId and name: {company_id},{company_name}") # Log rec
#async def get_data_connectors(company_id: str, company_name: str):
logging.info(f"Received request for company_id and company_id: {company_id},{company_name}")
try:
cnx =get_db_connection()
cursor = cnx.cursor()
query = """
SELECT pt.company_id,pt.scenario,pt.prompts,pt.comments
FROM u852023448_redmindgpt.prompt_templates pt
JOIN u852023448_redmindgpt.company_detail cd ON pt.company_id = cd.company_id
WHERE pt.company_id = %s and cd.company_name=%s
"""
logging.info(f"Executing query: {query} with company_id: {company_id}")
params = (company_id,company_name)
logging.info(f"Query parameters: {params}")
print(f"Query parameters: {params}")
cursor.execute(query, params) # Pa
result = cursor.fetchall()
logging.info(f"Query result: {result}")
cursor.close()
cnx.close()
companies=[]
for row in result:
companies.append({
"company_id": row[0],
"scenario":row[1],
"prompt": row[2]
# "Auth_Bearer": result[3],
# "Inputjson": result[4],
#"OutputJson": result[5],
#"description": result[6]
})
if companies:
return companies
else:
logging.warning(f"No data found for company_id: {company_id}")
raise HTTPException(status_code=404, detail="Data connector not found")
except mysql.connector.Error as err:
logging.error(f"Database error: {err}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/chatbot")
async def chatbot(request: Request):
return templates.TemplateResponse("chatbot.html", {"request": request,"title":"Chatbot"})
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8000)