Spaces:
Sleeping
Sleeping
import json | |
import os | |
import logging | |
import shutil | |
import asyncpg | |
from fastapi import FastAPI, File, Query,Form, Request, HTTPException, UploadFile | |
from fastapi.responses import JSONResponse, RedirectResponse | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from fastapi.middleware.cors import CORSMiddleware | |
from dotenv import load_dotenv | |
import mysql.connector | |
from typing import List | |
from pydantic import BaseModel | |
import psycopg2 | |
# Load environment variables | |
load_dotenv() | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler("redmindgen.log"), | |
logging.StreamHandler() # This ensures logging to console | |
] | |
) | |
logging.info("Application startup") | |
# Create the FastAPI app | |
app = FastAPI(title="RedmindGen", description="Chat with your Data", version="1.0.0") | |
# Mount static files | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
# Jinja2 templates | |
templates = Jinja2Templates(directory="templates") | |
# Configure CORS | |
origins = [ | |
"http://localhost:8000", | |
"http://127.0.0.1:8000", | |
"http://167.71.75.10:8003/" | |
] | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=origins, | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
DB_USER = 'u852023448_redmindgpt' | |
DB_PASSWORD = 'redmindGpt@123' | |
DB_HOST = '217.21.88.10' | |
DB_NAME = 'u852023448_redmindgpt' | |
from pydantic import BaseModel | |
class DatabaseConnection(BaseModel): | |
database_type: str | |
server: str | |
port: str | |
databaseName: str | |
username: str | |
password: str | |
async def connect_to_database(connection: DatabaseConnection): | |
try: | |
print(f"Attempting to connect to database: {connection.database_type}") | |
if connection.database_type == "Postgres": | |
print(f"PostgreSQL connection details - Host: {connection.server}, Port: {connection.port}, Database: {connection.databaseName}, User: {connection.username}") | |
conn = psycopg2.connect( | |
host=connection.server, | |
port=connection.port, | |
database=connection.databaseName, | |
user=connection.username, | |
password=connection.password | |
) | |
query_schemas = "SELECT schema_name FROM information_schema.schemata" | |
query_tables = "SELECT table_name FROM information_schema.tables WHERE table_schema = %s" | |
elif connection.database_type == "mysql": | |
print(f"inside mysql",connection.server,connection.port,connection.databaseName,connection.username,connection.password) | |
conn = mysql.connector.connect( | |
host=connection.server, | |
port=connection.port, | |
database=connection.databaseName, | |
user=connection.username, | |
password=connection.password | |
) | |
query_schemas = "SELECT schema_name FROM information_schema.schemata" | |
query_tables = "SELECT table_name FROM information_schema.tables WHERE table_schema = %s" | |
else: | |
raise HTTPException(status_code=400, detail="Unsupported database type") | |
cursor = conn.cursor() | |
# Fetch all schemas | |
cursor.execute(query_schemas) | |
schemas = cursor.fetchall() | |
# Fetch all tables within each schema | |
schema_tables = {} | |
for schema in schemas: | |
cursor.execute(query_tables, (schema[0],)) | |
tables = cursor.fetchall() | |
schema_tables[schema[0]] = [table[0] for table in tables] | |
cursor.close() | |
conn.close() | |
return {"schemas": [schema[0] for schema in schemas], "schema_tables": schema_tables, "success": True} | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
# Function to create a new database connection for MySQL (Example) | |
def get_db_connection(): | |
try: | |
cnx = mysql.connector.connect(user=DB_USER, password=DB_PASSWORD, host=DB_HOST, database=DB_NAME) | |
return cnx | |
except mysql.connector.Error as err: | |
logging.error(f"Database connection error: {err}") | |
return None | |
# Function to create a new database connection for MySQL (Example) | |
def get_db_connection(): | |
try: | |
cnx = mysql.connector.connect(user=DB_USER, password=DB_PASSWORD, host=DB_HOST, database=DB_NAME) | |
return cnx | |
except mysql.connector.Error as err: | |
logging.error(f"Database connection error: {err}") | |
return None | |
async def read_root(request: Request): | |
return templates.TemplateResponse("index.html", {"request": request}) | |
def verify_user(username: str, password: str): | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = "SELECT role,company_id FROM user_detail WHERE username = %s AND password = %s" | |
values = (username, password) | |
cursor.execute(query, values) | |
result = cursor.fetchone() | |
cursor.close() | |
cnx.close() | |
if result is not None: | |
logging.info(f"User {username}{result[1]} logged in successfully") | |
return "success",result[0],result[1] | |
else: | |
logging.info(f"User {username} login failed") | |
return "failure" | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
return "failure" | |
async def validate_user(request: Request, username: str = Form(...), password: str = Form(...)): | |
status, role ,company_id= verify_user(username, password) | |
if status == 'success' and role and company_id: | |
logging.info(f"user role {role} is returned") | |
# Set cookies and redirect to the dashboard | |
response = RedirectResponse(url="/dashboard", status_code=302) | |
response.set_cookie(key="role", value=role) | |
response.set_cookie(key="username", value=username) | |
response.set_cookie(key="company_id",value=company_id) | |
return response | |
else: | |
# If login fails, redirect back to the index page with an error message | |
return templates.TemplateResponse("index.html", { | |
"request": request, | |
"error": "Invalid username or password" | |
}) | |
async def submit_company_profile(request: Request, | |
company_name: str = Form(...), | |
company_code: str = Form(...), | |
domain: str = Form(...), | |
llm_tools: List[str] = Form(...), | |
username:str=Form(...), | |
password:str=Form(...), | |
role:str=Form(...)): | |
logging.info("Received form submission for company profile") | |
logging.info(f"Form data - company_name: {company_name}, company_code: {company_code}, domain: {domain}, llm_tools: {llm_tools}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = "INSERT INTO company_detail (company_name, company_code, domain, llm_tools) VALUES (%s, %s, %s, %s)" | |
values = (company_name, company_code, domain, ",".join(llm_tools)) | |
logging.info(f"Executing query: {query} with values: {values}") | |
cursor.execute(query, values) | |
# Retrieve the inserted company_id | |
company_id = cursor.lastrowid | |
logging.info(f"Company profile for {company_name} inserted successfully with company_id: {company_id}") | |
# Insert user details with the retrieved company_id | |
user_query = "INSERT INTO user_detail (company_id, username, password,role) VALUES (%s, %s, %s, %s)" | |
user_values = (company_id, username, password, role) | |
logging.info(f"Executing user detail query: {user_query} with values: {user_values}") | |
cursor.execute(user_query, user_values) | |
cnx.commit() | |
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected") | |
cursor.close() | |
cnx.close() | |
logging.info(f"Company profile for {company_name} inserted successfully") | |
RedirectResponse(url="/company_profile?message=Data saved successfully", status_code=302) | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def get_companies(): | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = "SELECT company_name FROM company_detail " | |
cursor.execute(query) | |
companies = cursor.fetchall() | |
cursor.close() | |
cnx.close() | |
return {"companies": [{"name": company[0]} for company in companies]} | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def dashboard(request: Request): | |
try: | |
# Retrieve cookies | |
role = request.cookies.get("role") | |
username = request.cookies.get("username") | |
company_id=request._cookies.get("company_id") | |
# Establish database connection | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
# Fetch all table names | |
cursor.execute("SHOW TABLES") | |
all_tables = cursor.fetchall() | |
# Dictionary to hold the count of records for each table | |
table_count_of_each_table = {} | |
# Fetch count of records for each table | |
for table in all_tables: | |
table_name = table[0] | |
query = f"SELECT COUNT(*) FROM {table_name} WHERE company_id = %s" | |
cursor.execute(query, (company_id,)) | |
count = cursor.fetchone()[0] | |
table_count_of_each_table[table_name] = count | |
query1=f"select company_name from company_detail where company_id = %s" | |
cursor.execute(query1,(company_id,)) | |
company_name_result = cursor.fetchone() | |
# Check if company_name_result is not None | |
if company_name_result: | |
company_name = company_name_result[0] | |
else: | |
company_name = "Unknown" # Default | |
# Close cursor and connection | |
cursor.close() | |
cnx.close() | |
# Log the counts for debugging purposes | |
logging.info(table_count_of_each_table) | |
# Render the template with the data, role, and username | |
return templates.TemplateResponse("dashboard.html", { | |
"request": request, | |
"title": "Dashboard", | |
"table_count_of_each_table": table_count_of_each_table, | |
"role": role, | |
"username": username, | |
"company_id":company_id, | |
"company_name":company_name | |
}) | |
except mysql.connector.Error as err: | |
# Log the error and raise an HTTPException | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def get_company_record_count(company_id: int): | |
try: | |
# Establish database connection | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
# List of tables to count records in | |
tables = ["knowledge_base", "data_connectors", "api_connectors", "prompt_templates"] | |
# Dictionary to hold the count of records for each table | |
table_counts = {} | |
# Fetch count of records for the selected company in each table | |
for table in tables: | |
query = f"SELECT COUNT(*) FROM {table} WHERE company_id = %s" | |
cursor.execute(query, (company_id,)) | |
count = cursor.fetchone()[0] | |
table_counts[table] = count | |
# Close cursor and connection | |
cursor.close() | |
cnx.close() | |
return {"table_counts": table_counts} | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def company_profile(request: Request): | |
try: | |
# Retrieve cookies | |
role = request.cookies.get("role") | |
company_id = request.cookies.get("company_id") | |
# Render the template with the role and company_id | |
return templates.TemplateResponse("company_profile.html", { | |
"request": request, | |
"role": role, | |
"company_id": company_id, | |
"title":"Company Profile" | |
}) | |
except Exception as e: | |
# Handle exceptions | |
logging.error(f"Error: {e}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#return templates.TemplateResponse("company_profile.html", {"request": request,"title":"Company Profile"}) | |
async def get_company_id(company_name: str): | |
print(f"Received company_name: {company_name}") # Debug statement | |
logging.info(f"Received request for company name: {company_name}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = "SELECT * FROM company_detail WHERE company_name = %s" | |
cursor.execute(query, (company_name,)) | |
result = cursor.fetchone() | |
cursor.close() | |
cnx.close() | |
if result: | |
llm_tools = result[4].split(',') if result[4] else [] | |
return {"company_id": result[0], | |
"company_name":result[1], | |
"company_code":result[2], | |
"domain":result[3], | |
"llm_tools":llm_tools | |
} | |
else: | |
logging.error(f"Company not found for name: {company_name}") | |
raise HTTPException(status_code=404, detail="Company not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def get_companies(): | |
print(f"Received company_name") # Debug statement | |
logging.info(f"Received request for company name") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = "SELECT * FROM company_detail" | |
cursor.execute(query) | |
result = cursor.fetchall() | |
logging.info(f"Query result: {result}") | |
cursor.close() | |
cnx.close() | |
companies = [] | |
for row in result: | |
llm_tools = row[4].split(',') if row[4] else [] | |
logging.info(row[4]) | |
companies.append({ | |
"company_id": row[0], | |
"company_name": row[1], | |
"company_code": row[2], | |
"domain": row[3], | |
"llm_tools": row[4] | |
}) | |
if companies: | |
return companies | |
else: | |
logging.error(f"Company not found for name: {result[1]}") | |
raise HTTPException(status_code=404, detail="Company not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#to view the details | |
async def get_company_details(company_id: int): | |
company = await get_company_from_db(company_id) | |
if not company: | |
raise HTTPException(status_code=404, detail="Company not found") | |
return company | |
async def get_company_from_db(company_id: int): | |
try: | |
# Establish a connection to the database | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor(dictionary=True) | |
query = "SELECT * FROM company_detail WHERE company_id = %s" | |
cursor.execute(query, (company_id,)) | |
company = cursor.fetchone() | |
cursor.close() | |
cnx.close() | |
return company | |
except mysql.connector.Error as err: | |
logging.error(f"Error fetching company: {err}") | |
raise HTTPException(status_code=500, detail="Failed to fetch company") | |
# to edit the details | |
async def update_company_details(company_id: int, | |
company_name: str = Form(...), | |
company_code: str = Form(...), | |
domain: str = Form(...), | |
llm_tools: List[str] = Form(...)): | |
print(f"Received company_id",company_id) # Debug statement | |
logging.info(f"Received request for company data") | |
company_data = { | |
'company_name': company_name, | |
'company_code': company_code, | |
'domain': domain, | |
'llm_tools': ','.join(llm_tools) | |
} | |
updated_company = await update_company_in_db(company_id, company_data) | |
if not updated_company: | |
raise HTTPException(status_code=500, detail="Failed to update company") | |
return updated_company | |
async def update_company_in_db(company_id: int, company_data: dict): | |
try: | |
print(f"Received company_nid inside function",company_id) # Debug statement | |
logging.info(f"Received request for company name") | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor() | |
update_query = """ | |
UPDATE u852023448_redmind.gptcompany_detail cd | |
SET cd.company_name = %s, cd.company_code = %s, cd.domain = %s, cd.llm_tools = %s | |
WHERE cd.company_id = %s; | |
""" | |
logging.info(f"Executing query: {update_query} with company_id: {company_id}") | |
params = (company_id,company_data) | |
logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {params}") | |
cursor.execute(update_query, ( | |
company_data['company_name'], | |
company_data['company_code'], | |
company_data['domain'], | |
company_data['llm_tools'], | |
company_id | |
)) | |
cnx.commit() | |
success = cursor.rowcount > 0 | |
cursor.close() | |
cnx.close() | |
if not success: | |
return None | |
return company_data | |
except mysql.connector.Error as err: | |
logging.error(f"Error updating company: {err}") | |
raise HTTPException(status_code=500, detail="Failed to update company") | |
def delete_company_from_db(company_id: int) -> bool: | |
print(f"Received company_name: {company_id}") # Debug statement | |
logging.info(f"Received request for company name: {company_id}") | |
try: | |
# Establish a connection to the database | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor() | |
delete_query = "DELETE FROM company_detail WHERE company_id = %s" | |
cursor.execute(delete_query, (company_id,)) | |
cnx.commit() | |
success = cursor.rowcount > 0 | |
cursor.close() | |
cnx.close() | |
return success | |
except mysql.connector.Error as err: | |
logging.error(f"Error deleting company: {err}") | |
raise HTTPException(status_code=500, detail="Failed to delete company") | |
async def delete_company(company_id: int): | |
deletion_success = delete_company_from_db(company_id) | |
if not deletion_success: | |
raise HTTPException(status_code=404, detail="Company not found or failed to delete") | |
return {"message": "Company deleted successfully"} | |
async def knowledgebase(request: Request): | |
try: | |
# Retrieve cookies | |
role = request.cookies.get("role") | |
company_id = request.cookies.get("company_id") | |
# Render the template with the role and company_id | |
return templates.TemplateResponse("knowledgebase.html", { | |
"request": request, | |
"role": role, | |
"company_id": company_id, | |
"title":"KnowledgeBase" | |
}) | |
except Exception as e: | |
# Handle exceptions | |
logging.error(f"Error: {e}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#to insert into knowledgebase | |
async def upload_document( | |
request: Request, | |
company_id:str=Form(...), | |
uploadFile: UploadFile = File(...), | |
documentName: str = Form(...), | |
documentDescription: str = Form(...), | |
department: str = Form(...), | |
vectorDBflag:str=Form(...), | |
version: str = Form(...), | |
lastUpdated: str = Form(...) | |
): | |
try: | |
# Save the uploaded file | |
upload_folder = "uploads/" | |
os.makedirs(upload_folder, exist_ok=True) | |
file_path = os.path.join(upload_folder, uploadFile.filename) | |
with open(file_path, "wb") as buffer: | |
shutil.copyfileobj(uploadFile.file, buffer) | |
# Save the details to the database | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
INSERT INTO knowledge_base (company_id,file_path, document_name, document_desc, department, version,vectorDBflag, last_updated) | |
VALUES (%s,%s, %s, %s, %s, %s,%s, %s) | |
""" | |
values = (company_id,file_path, documentName, documentDescription, department, version,vectorDBflag, lastUpdated) | |
cursor.execute(query, values) | |
cnx.commit() | |
row_id=cursor.lastrowid | |
cursor.close() | |
cnx.close() | |
logging.info(f"Document {documentName} uploaded successfully") | |
return JSONResponse(status_code=200, content={"message": "Data saved successfully", "row_id": row_id}) | |
#return RedirectResponse(url="/knowledgebase", status_code=302) | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#to retrieve from knowledgebase | |
async def get_document(company_id: str = Query(...)): | |
print(f"Received companyId and name: {company_id}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
SELECT kb.kid,kb.company_id, kb.file_path, kb.document_name, kb.document_desc,kb.department,kb.version,kb.vectorDBflag,kb.last_updated | |
FROM u852023448_redmindgpt.knowledge_base kb | |
JOIN u852023448_redmindgpt.company_detail cd ON kb.company_id = cd.company_id | |
WHERE kb.company_id = %s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
params = (company_id,) | |
logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {params}") | |
cursor.execute(query, params) # Pa | |
result = cursor.fetchall() | |
logging.info(f"Query result: {result}") | |
cursor.close | |
cnx.close() | |
companies=[] | |
for row in result: | |
companies.append({ | |
"row_id":row[0], | |
"company_id": row[1], | |
"file_path":row[2], | |
"document_name": row[3], | |
"document_desc": row[4], | |
"department": row[5], | |
"version": row[6], | |
"vectorDBflag":row[7], | |
"last_updated": row[8] | |
}) | |
if companies: | |
return companies | |
else: | |
logging.warning(f"No document found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data document not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#on update of modal form the data table is refresh the value in datatable | |
async def get_document(company_id: str = Query(...)): | |
print(f"Received companyId and name: {company_id},{company_id}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id},{company_id}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
SELECT kb.kid,kb.company_id, kb.file_path, kb.document_name, kb.document_desc,kb.department,kb.version,kb.vectorDBflag,kb.last_updated | |
FROM u852023448_redmindgpt.knowledge_base kb | |
JOIN u852023448_redmindgpt.company_detail cd ON kb.company_id = cd.company_id | |
WHERE kb.company_id = %s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
values= (company_id,) | |
# logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {values}") | |
cursor.execute(query, values) # Pa | |
result = cursor.fetchall() | |
logging.info(f"Query result: {r.esult}") | |
cursor.close | |
cnx.close() | |
companies=[] | |
for row in result: | |
companies.append({ | |
"kid":row[0], | |
"company_id": row[1], | |
"file_path":row[2], | |
"document_name": row[3], | |
"document_desc": row[4], | |
"department": row[5], | |
"version": row[6], | |
"vectorDBflag":row[7], | |
"last_updated": row[8] | |
}) | |
if companies: | |
return companies | |
else: | |
logging.warning(f"No document found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data document not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#to get data for view in knowledgebase | |
async def get_company_details(company_id: int): | |
company = await get_knowledge_from_db(company_id) | |
if not company: | |
raise HTTPException(status_code=404, detail="Company not found") | |
return company | |
async def get_knowledge_from_db(company_id: int): | |
try: | |
# Establish a connection to the database | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor(dictionary=True) | |
query = "SELECT * FROM knowledge_base WHERE kid = %s" | |
cursor.execute(query, (company_id,)) | |
company = cursor.fetchone() | |
cursor.close() | |
cnx.close() | |
if company: | |
logging.debug(f"Extracted filename") | |
if company.get('file_path'): | |
company['file_path'] = os.path.basename(company['file_path']) | |
logging.debug(f"Extracted filename: {company['file_path']}") | |
return company | |
else: | |
raise HTTPException(status_code=404, detail="Company not found or file not found for the company") | |
except mysql.connector.Error as err: | |
logging.error(f"Error fetching company: {err}") | |
raise HTTPException(status_code=500, detail="Failed to fetch company") | |
# to edit the knowledgebase details | |
async def update_company_details( | |
request: Request, | |
companyId: int, | |
company_id: str = Form(...), | |
file_path: UploadFile = File(...), | |
documentName: str = Form(...), | |
documentDescription: str = Form(...), | |
department: str = Form(...), | |
version: str = Form(...), | |
vectorDBFlag: str = Form(...), | |
lastUpdated: str = Form(...) | |
): | |
logging.info(f"Received request for company data with ID inside edit/update knowledgebase: {companyId}") | |
print(f"Received request for company data with ID inside edit/update knowledgebase file name: {file_path.filename}") | |
# Create the upload folder if it doesn't exist | |
upload_folder = "uploads/" | |
os.makedirs(upload_folder, exist_ok=True) | |
# Construct the file path for saving | |
saved_file_path = os.path.join(upload_folder, file_path.filename) | |
try: | |
# Save the uploaded file to the server | |
with open(saved_file_path, "wb") as buffer: | |
shutil.copyfileobj(file_path.file, buffer) | |
except Exception as e: | |
logging.error(f"Error saving file: {e}") | |
raise HTTPException(status_code=500, detail="Failed to save file") | |
# Prepare the company data dictionary | |
company_data = { | |
'kid': companyId, | |
'company_id': company_id, | |
'file_path': saved_file_path, # Use the path where the file was saved | |
'document_name': documentName, | |
'document_desc': documentDescription, | |
'department': department, | |
'version': version, | |
'vectorDBflag': vectorDBFlag, | |
'last_updated': lastUpdated | |
} | |
# Update the knowledge base in the database | |
updated_company = await update_knowledge_in_db(companyId, company_data) | |
if not updated_company: | |
raise HTTPException(status_code=500, detail="Failed to update company") | |
return updated_company | |
async def update_knowledge_in_db(kid: int, company_data: dict): | |
try: | |
logging.info(f"Updating knowledge base for ID: {kid}") | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor() | |
update_query = """ | |
UPDATE u852023448_redmindgpt.knowledge_base kb | |
SET kb.company_id = %s, kb.document_name = %s, kb.document_desc = %s, | |
kb.department = %s, kb.version = %s, kb.vectorDBflag = %s, kb.last_updated = %s | |
WHERE kb.kid = %s; | |
""" | |
logging.info(f"Executing update query: {update_query}") | |
cursor.execute(update_query, ( | |
company_data['company_id'], | |
company_data['document_name'], | |
company_data['document_desc'], | |
company_data['department'], | |
company_data['version'], | |
company_data['vectorDBflag'], | |
company_data['last_updated'], | |
kid | |
)) | |
cnx.commit() | |
success = cursor.rowcount > 0 | |
cursor.close() | |
cnx.close() | |
if not success: | |
logging.info("No rows updated") | |
return None | |
logging.info("Update successful") | |
return company_data | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Failed to update company") | |
except Exception as e: | |
logging.error(f"Unexpected error: {e}") | |
raise HTTPException(status_code=500, detail="Unexpected error occurred") | |
def delete_knowledge_from_db(company_id: int) -> bool: | |
print(f"Received knowledge base company_id: {company_id}") # Debug statement | |
logging.info(f"Received request for knowledgebase company id: {company_id}") | |
try: | |
# Establish a connection to the database | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor() | |
delete_query = "DELETE FROM knowledge_base WHERE kid = %s" | |
cursor.execute(delete_query, (company_id,)) | |
cnx.commit() | |
success = cursor.rowcount > 0 | |
cursor.close() | |
cnx.close() | |
return success | |
except mysql.connector.Error as err: | |
logging.error(f"Error deleting company: {err}") | |
raise HTTPException(status_code=500, detail="Failed to delete company") | |
#to perform delete operation in knowlegebase | |
async def delete_company(company_id: int): | |
deletion_success = delete_knowledge_from_db(company_id) | |
if not deletion_success: | |
raise HTTPException(status_code=404, detail="Company not found or failed to delete") | |
return {"message": "Company deleted successfully"} | |
async def data_connectors(request: Request): | |
try: | |
# Retrieve cookies | |
role = request.cookies.get("role") | |
company_id = request.cookies.get("company_id") | |
# Render the template with the role and company_id | |
return templates.TemplateResponse("data_connectors.html", { | |
"request": request, | |
"role": role, | |
"company_id": company_id, | |
"title": "Data Connectors" | |
}) | |
except Exception as e: | |
# Handle exceptions | |
logging.error(f"Error: {e}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#to insert into data_connectors | |
async def save_data_connectors( request: Request, | |
company_id: int = Form(...), | |
database: List[str] = Form(...), | |
server: str = Form(...), | |
port: str = Form(...), | |
databaseName:List[str]= Form(...), | |
username: str=Form(...), | |
password: str=Form(...), | |
selectedTables: List[str] = Form(...)): | |
logging.info(f"Received form submission for database_connectors") | |
print(f"Received form submission for database_connectors") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
# Check if the company_id already exists in the data_connectors table | |
check_query = "SELECT COUNT(*) FROM data_connectors WHERE company_id = %s" | |
cursor.execute(check_query, (company_id,)) | |
exists = cursor.fetchone()[0] > 0 | |
if exists: | |
# Update the existing record | |
query = """ | |
UPDATE data_connectors | |
SET databasetype = %s, serverip = %s, port = %s, database_name = %s, username = %s, password = %s, dbtablename = %s | |
WHERE company_id = %s | |
""" | |
values = (",".join(database), server, port, ",".join(databaseName), username, password or '', ",".join(selectedTables), company_id) | |
logging.info(f"Executing update query: {query} with values: {values}") | |
cursor.execute(query, values) | |
cnx.commit() | |
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) updated") | |
else: | |
# Insert a new record | |
query = """ | |
INSERT INTO data_connectors(company_id, databasetype, serverip, port, database_name, username, password, dbtablename) | |
VALUES (%s, %s, %s, %s, %s, %s, %s, %s) | |
""" | |
values = (company_id, ",".join(database), server, port, ",".join(databaseName), username, password or '', ",".join(selectedTables)) | |
logging.info(f"Executing insert query: {query} with values: {values}") | |
cursor.execute(query, values) | |
cnx.commit() | |
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) inserted") | |
cursor.close() | |
cnx.close() | |
# logging.info(f"Data_connectors for {database} processed successfully") | |
# return JSONResponse(content={"status": "success", "message": "Data saved successfully"}, status_code=200) | |
response = { | |
"msg": "Data saved successfully", | |
"url": "/save_data_connectors", # The URL you want to redirect to | |
"created": True | |
} | |
return JSONResponse(content=response) | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
return JSONResponse(content={"status": "error", "message": "Internal Server Error"}, status_code=500) | |
except Exception as e: | |
logging.error(f"Unexpected error: {e}") | |
return JSONResponse(content={"status": "error", "message": "Unexpected Server Error"}, status_code=500) | |
async def get_data_connectors(company_id: str = Query(...), company_name: str = Query(...)): | |
print(f"Received companyId and name: {company_id},{company_name}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id},{company_name}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
SELECT dc.company_id, dc.databasetype, dc.serverip, dc.port,dc.database_name, dc.username, dc.password ,dc.dbtablename | |
FROM u852023448_redmindgpt.data_connectors dc | |
JOIN u852023448_redmindgpt.company_detail cd ON dc.company_id = cd.company_id | |
WHERE dc.company_id = %s and cd.company_name=%s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
params = (company_id,company_name) | |
logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {params}") | |
cursor.execute(query, params) # Pa | |
result = cursor.fetchone() | |
logging.info(f"Query result: {result}") | |
cursor.close() | |
cnx.close() | |
if result: | |
databasetype = result[1] | |
dbtablename = result[7].split(',') if result[7] else [] | |
logging.info(f"Data found for company_id: {company_id}") | |
return { | |
"company_id": result[0], | |
"databasetype":databasetype, | |
"serverip": result[2], | |
"port": result[3], | |
"database_name": result[4], | |
"username": result[5], | |
"password": result[6], | |
"dbtablename": dbtablename | |
} | |
else: | |
logging.warning(f"No data found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data connector not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def API_connectors(request: Request): | |
try: | |
# Retrieve cookies | |
role = request.cookies.get("role") | |
company_id = request.cookies.get("company_id") | |
# Render the template with the role and company_id | |
return templates.TemplateResponse("API_connectors.html", { | |
"request": request, | |
"role": role, | |
"company_id": company_id, | |
"title":"API Connectors" | |
}) | |
except Exception as e: | |
# Handle exceptions | |
logging.error(f"Error: {e}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#save api connectors | |
async def API_saveconnectors(request: Request, | |
company_id:int=Form(...), | |
APIName:str=Form(...), | |
APIEndpoint:str=Form(...), | |
Auth_Bearer:str=Form(...), | |
Inputjson:str=Form(...), | |
OutputJson:str=Form(...), | |
Description:str=Form(...)): | |
logging.info(f"Received form submission for database_connectors") | |
try: | |
cnx =get_db_connection() | |
cursor = cnx.cursor() | |
#databasetype_json=json.dumps(database) | |
query = "INSERT INTO api_connectors(company_id,api_name, api_endpoint, auth_token, input_param,output_json,description) VALUES (%s,%s, %s, %s, %s,%s,%s)" | |
values = (company_id, APIName, APIEndpoint, Auth_Bearer, Inputjson,OutputJson,Description) | |
logging.info(f"Executing query: {query} with values: {values}") | |
cursor.execute(query, values) | |
cnx.commit() | |
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected") | |
row_id = cursor.lastrowid | |
cursor.close() | |
cnx.close() | |
logging.info(f"Data_connectors for {APIName} inserted successfully") | |
return JSONResponse(status_code=200, content={"message": "Data saved successfully", "row_id": row_id}) | |
#return RedirectResponse(url="/data_connectors", status_code=302) | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
# retrieve api connectors | |
async def get_api_connectors(company_id: str = Query(...)): | |
print(f"Received companyId and name: {company_id}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id}") | |
try: | |
cnx =get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
SELECT ac.id, ac.company_id, ac.api_name, ac.api_endpoint,ac.auth_token,ac.input_param, ac.output_json, ac.description | |
FROM u852023448_redmindgpt.api_connectors ac | |
JOIN u852023448_redmindgpt.company_detail cd ON ac.company_id = cd.company_id | |
WHERE ac.company_id = %s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
params = (company_id,) | |
logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {params}") | |
cursor.execute(query, params) # Pa | |
result = cursor.fetchall() | |
logging.info(f"Query result: {result}") | |
cursor.close() | |
cnx.close() | |
companies=[] | |
for row in result: | |
companies.append({ | |
"row_id":row[0], | |
"company_id": row[1], | |
"APIName":row[2], | |
"APIEndpoint": row[3] | |
# "Auth_Bearer": result[3], | |
# "Inputjson": result[4], | |
#"OutputJson": result[5], | |
#"description": result[6] | |
}) | |
if companies: | |
return companies | |
else: | |
logging.warning(f"No data found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data connector not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#to view the table details in modal | |
async def get_company_details(company_id: int): | |
company = await get_api_from_db(company_id) | |
if not company: | |
raise HTTPException(status_code=404, detail="Company not found") | |
return company | |
async def get_api_from_db(company_id: int): | |
try: | |
# Establish a connection to the database | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor(dictionary=True) | |
query = "SELECT * FROM api_connectors WHERE id = %s" | |
cursor.execute(query, (company_id,)) | |
company = cursor.fetchone() | |
cursor.close() | |
cnx.close() | |
if company: | |
logging.info(f"api details:{company}") | |
return company | |
else: | |
raise HTTPException(status_code=404, detail="Company not found or file not found for the company") | |
except mysql.connector.Error as err: | |
logging.error(f"Error fetching company: {err}") | |
raise HTTPException(status_code=500, detail="Failed to fetch company") | |
#to edit the api details in modal form | |
async def update_company_details( | |
request: Request, | |
companyId: int, | |
company_id:str=Form(...), | |
APIName:str=Form(...), | |
APIEndpoint:str=Form(...), | |
Auth_Bearer:str=Form(...), | |
Inputjson:str=Form(...), | |
OutputJson:str=Form(...), | |
Description:str=Form(...)): | |
logging.info(f"Received form submission for database_connectors") | |
logging.info(f"Received request for company data with ID inside edit/update knowledgebase: {companyId}") | |
# Prepare the company data dictionary | |
company_data = { | |
'kid': companyId, | |
'company_id': company_id, | |
'api_name': APIName, | |
'api_endpoint': APIEndpoint, | |
'auth_token': Auth_Bearer, | |
'input_param': Inputjson, | |
'output_json': OutputJson, | |
'description': Description | |
} | |
# Update the knowledge base in the database | |
updated_company = await update_api_in_db(companyId, company_data) | |
if not updated_company: | |
raise HTTPException(status_code=500, detail="Failed to update company") | |
return updated_company | |
async def update_api_in_db(id: int, company_data: dict): | |
try: | |
logging.info(f"Updating api for ID: {id}") | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor() | |
update_query = """ | |
UPDATE u852023448_redmindgpt.api_connectors ac | |
SET ac.company_id = %s, ac.api_name = %s, ac.api_endpoint= %s, | |
ac.auth_token= %s, ac.input_param= %s,ac.output_json = %s, ac.description= %s | |
WHERE ac.id = %s; | |
""" | |
logging.info(f"Executing update query: {update_query}") | |
cursor.execute(update_query, ( | |
company_data['company_id'], | |
company_data['api_name'], | |
company_data['api_endpoint'], | |
company_data['auth_token'], | |
company_data['input_param'], | |
company_data['output_json'], | |
company_data['description'], | |
id | |
)) | |
cnx.commit() | |
success = cursor.rowcount > 0 | |
cursor.close() | |
cnx.close() | |
if not success: | |
logging.info("No rows updated") | |
return None | |
logging.info("Update successful") | |
return company_data | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Failed to update company") | |
except Exception as e: | |
logging.error(f"Unexpected error: {e}") | |
raise HTTPException(status_code=500, detail="Unexpected error occurred") | |
#on update of modal form the data table is refreshed to dispalyupdated value in datatable | |
async def get_document(company_id: str = Query(...)): | |
print(f"Received companyId and name for api datatable update: {company_id},{company_id}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id},{company_id}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query=""" SELECT ac.id,ac.company_id, ac.api_name, ac.api_endpoint,ac.auth_token,ac.input_param, ac.output_json, ac.description | |
FROM u852023448_redmindgpt.api_connectors ac | |
JOIN u852023448_redmindgpt.company_detail cd ON ac.company_id = cd.company_id | |
WHERE ac.company_id = %s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
values= (company_id,) | |
# logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {values}") | |
cursor.execute(query, values) # Pa | |
result = cursor.fetchall() | |
logging.info(f"Query result for update table: {result}") | |
cursor.close | |
cnx.close() | |
companies=[] | |
for row in result: | |
companies.append({ | |
"row_id":row[0], | |
"company_id": row[1], | |
"api_name":row[2], | |
"api_endpoint": row[3], | |
# "Auth_Bearer": row[4], | |
# "Inputjson": row[5], | |
# "OutputJson": row[6], | |
# "description": row[7] | |
}) | |
if companies: | |
return companies | |
else: | |
logging.warning(f"No document found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data document not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
#to delete api details from db | |
async def delete_company(company_id: int): | |
deletion_success = delete_api_from_db(company_id) | |
if not deletion_success: | |
raise HTTPException(status_code=404, detail="Company not found or failed to delete") | |
return {"message": "Company deleted successfully"} | |
def delete_api_from_db(company_id: int) -> bool: | |
print(f"Received api for company_id: {company_id}") # Debug statement | |
logging.info(f"Received request for api for company id: {company_id}") | |
try: | |
# Establish a connection to the database | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor() | |
delete_query = "DELETE FROM api_connectors WHERE id = %s" | |
cursor.execute(delete_query, (company_id,)) | |
cnx.commit() | |
success = cursor.rowcount > 0 | |
cursor.close() | |
cnx.close() | |
return success | |
except mysql.connector.Error as err: | |
logging.error(f"Error deleting company: {err}") | |
raise HTTPException(status_code=500, detail="Failed to delete company") | |
async def prompt_template(request: Request): | |
try: | |
# Retrieve cookies | |
role = request.cookies.get("role") | |
company_id = request.cookies.get("company_id") | |
# Render the template with the role and company_id | |
return templates.TemplateResponse("prompt_template.html", { | |
"request": request, | |
"role": role, | |
"company_id": company_id, | |
"title":"Prompt Templates" | |
}) | |
except Exception as e: | |
# Handle exceptions | |
logging.error(f"Error: {e}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
# to insert into prompt templates | |
async def prompt_saveconnectors(request: Request, | |
company_id:int=Form(...), | |
scenario:str=Form(...), | |
sampleprompt:str=Form(...), | |
comments:str=Form(...), | |
): | |
logging.info(f"Received form submission for database_connectors") | |
try: | |
cnx =get_db_connection() | |
cursor = cnx.cursor() | |
#databasetype_json=json.dumps(database) | |
query = "INSERT INTO prompt_templates(company_id,scenario, prompts, comments) VALUES (%s,%s, %s, %s)" | |
values = (company_id, scenario, sampleprompt, comments) | |
logging.info(f"Executing query: {query} with values: {values}") | |
cursor.execute(query, values) | |
cnx.commit() | |
logging.info(f"Query executed successfully, {cursor.rowcount} row(s) affected") | |
row_id = cursor.lastrowid # Get the last inserted row_id | |
cursor.close() | |
cnx.close() | |
logging.info(f"Data_connectors for {scenario} inserted successfully") | |
return JSONResponse(status_code=200, content={"message": "Data saved successfully", "row_id": row_id}) | |
#return RedirectResponse(url="/prompt_template", status_code=302) | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
# retrieve api connectors | |
async def get_prompt_connectors(company_id: str = Query(...)): | |
print(f"Received companyId and name: {company_id}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id}") | |
try: | |
cnx =get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
SELECT pt.id,pt.company_id,pt.scenario,pt.prompts,pt.comments | |
FROM u852023448_redmindgpt.prompt_templates pt | |
JOIN u852023448_redmindgpt.company_detail cd ON pt.company_id = cd.company_id | |
WHERE pt.company_id = %s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
params = (company_id,) | |
logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {params}") | |
cursor.execute(query, params) # Pa | |
result = cursor.fetchall() | |
logging.info(f"Query result: {result}") | |
cursor.close() | |
cnx.close() | |
companies=[] | |
for row in result: | |
companies.append({ | |
"row_id":row[0], | |
"company_id": row[1], | |
"scenario":row[2], | |
"prompt": row[3] | |
# "Auth_Bearer": result[3], | |
# "Inputjson": result[4], | |
#"OutputJson": result[5], | |
#"description": result[6] | |
}) | |
if companies: | |
return companies | |
else: | |
logging.warning(f"No data found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data connector not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
def delete_prompt_template_from_db(row_id: int) -> bool: | |
logging.info(f"Received request for prompt_template company id: {row_id}") | |
logging.info(f"Received request for prompt_template row id: {row_id}") | |
try: | |
# Establish a connection to the database | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor() | |
delete_query = "DELETE FROM prompt_templates WHERE id = %s" | |
logging.info(f"sql delete query for prompt template ===> {delete_query}") | |
cursor.execute(delete_query, (row_id,)) | |
cnx.commit() | |
success = cursor.rowcount > 0 | |
logging.info (f"deleted succesfully ! ===> {success}") | |
cursor.close() | |
cnx.close() | |
return success | |
except mysql.connector.Error as err: | |
print('python') | |
logging.error(f"Error deleting company: {err}") | |
raise HTTPException(status_code=500, detail="Failed to delete company") | |
async def delete_company(row_id: int): | |
deletion_success = delete_prompt_template_from_db(row_id) | |
logging.info(f"company row_id +++> {row_id}") | |
if not deletion_success: | |
raise HTTPException(status_code=404, detail="Company not found or failed to delete") | |
return {"message": "Company deleted successfully"} | |
# promt_template view function ! ............ | |
#to get data for view in promt_templae by id | |
async def get_promt_company_details(company_id: int): | |
company = await get_promt_from_db(company_id) | |
if not company: | |
raise HTTPException(status_code=404, detail="Company not found") | |
return company | |
async def get_promt_from_db(company_id: int): | |
try: | |
# Establish a connection to the database | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor(dictionary=True) | |
query = "SELECT * FROM prompt_templates WHERE id = %s" | |
logging.info(f"row_id in db addresss ========> {company_id}") | |
cursor.execute(query, (company_id,)) | |
company = cursor.fetchone() | |
cursor.close() | |
cnx.close() | |
if company: | |
logging.info(f"row_id in db addresss ========> {company}") | |
return company | |
else: | |
raise HTTPException(status_code=404, detail="Company not found or file not found for the company") | |
except mysql.connector.Error as err: | |
logging.error(f"Error fetching company: {err}") | |
raise HTTPException(status_code=500, detail="Failed to fetch company") | |
# Function to update company details | |
async def update_company_details( | |
kid: int, | |
scenario: str = Form(...), | |
prompt: str = Form(...), | |
comments: str = Form(...) | |
): | |
logging.info(f"Received request for company data with ID: {kid}") | |
company_data = { | |
'scenario': scenario, | |
'prompts': prompt, | |
'comments': comments, | |
} | |
updated_company = await update_prompt_in_db(kid, company_data) | |
if not updated_company: | |
raise HTTPException(status_code=500, detail="Failed to update company") | |
return updated_company | |
# Database query function to update company data | |
async def update_prompt_in_db(kid: int, company_data: dict): | |
try: | |
logging.info(f"Updating prompt for ID: {kid}") | |
cnx = get_db_connection() | |
if cnx is None: | |
raise HTTPException(status_code=500, detail="Failed to connect to the database") | |
cursor = cnx.cursor() | |
update_query = """ | |
UPDATE u852023448_redmindgpt.prompt_templates pt | |
SET pt.scenario=%s, pt.prompts=%s, pt.comments=%s | |
WHERE pt.id = %s; | |
""" | |
logging.info(f"row_id in prompt db address ========> {kid}") | |
logging.info(f"SQL update query for company ===> {update_query}") | |
cursor.execute(update_query, ( | |
company_data['scenario'], | |
company_data['prompts'], | |
company_data['comments'], | |
kid | |
)) | |
cnx.commit() | |
success = cursor.rowcount > 0 | |
cursor.close() | |
cnx.close() | |
if not success: | |
return None | |
return company_data | |
except mysql.connector.Error as err: | |
logging.error(f"Error updating company: {err}") | |
raise HTTPException(status_code=500, detail="Failed to update company") | |
# to refresh prompt data table | |
async def get_document(company_id: str = Query(...)): | |
print(f"Received companyId and name: {company_id},{company_id}") # Log rec | |
#async def get_data_connectors(company_id: str, company_name: str): | |
logging.info(f"Received request for company_id and company_id: {company_id},{company_id}") | |
try: | |
cnx = get_db_connection() | |
cursor = cnx.cursor() | |
query = """ | |
SELECT pt.id,pt.company_id,pt.scenario,pt.prompts,pt.comments | |
FROM u852023448_redmindgpt.prompt_templates pt | |
JOIN u852023448_redmindgpt.company_detail cd ON pt.company_id = cd.company_id | |
WHERE pt.company_id = %s | |
""" | |
logging.info(f"Executing query: {query} with company_id: {company_id}") | |
values= (company_id,) | |
# logging.info(f"Query parameters: {params}") | |
print(f"Query parameters: {values}") | |
cursor.execute(query, values) # Pa | |
result = cursor.fetchall() | |
logging.info(f"Query result: {result}") | |
cursor.close | |
cnx.close() | |
companies=[] | |
for row in result: | |
companies.append({ | |
'id':row[0], | |
"company_id": row[1], | |
"scenario":row[2], | |
"prompts": row[3] | |
# "Auth_Bearer": result[3], | |
# "Inputjson": result[4], | |
#"OutputJson": result[5], | |
#"description": result[6] | |
}) | |
if companies: | |
logging.info(f"the primary key id is {companies}") | |
return companies | |
else: | |
logging.warning(f"No document found for company_id: {company_id}") | |
raise HTTPException(status_code=404, detail="Data document not found") | |
except mysql.connector.Error as err: | |
logging.error(f"Database error: {err}") | |
raise HTTPException(status_code=500, detail="Internal Server Error") | |
async def chatbot(request: Request): | |
return templates.TemplateResponse("chatbot.html", {"request": request,"title":"Chatbot"}) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="127.0.0.1", port=8000) | |