Spaces:
Sleeping
Sleeping
from __future__ import annotations as _annotations | |
import os | |
import asyncio | |
import json | |
import sqlite3 | |
import datetime | |
import fastapi | |
import logfire | |
import time | |
from collections.abc import AsyncIterator | |
from concurrent.futures.thread import ThreadPoolExecutor | |
from contextlib import asynccontextmanager | |
from dataclasses import dataclass | |
from datetime import datetime, timezone, date | |
from functools import partial | |
from pathlib import Path | |
from typing import Annotated, Any, Callable, Literal, TypeVar | |
from pydantic import BaseModel, Field, ValidationError, model_validator | |
from typing import List, Optional, Dict | |
from fastapi import Depends, Request | |
from fastapi.responses import FileResponse, Response, StreamingResponse | |
from typing_extensions import LiteralString, ParamSpec, TypedDict | |
from pydantic_ai import Agent | |
from pydantic_ai.exceptions import UnexpectedModelBehavior | |
from pydantic_ai.messages import ( | |
ModelMessage, | |
ModelMessagesTypeAdapter, | |
ModelRequest, | |
ModelResponse, | |
TextPart, | |
UserPromptPart, | |
) | |
from pydantic_ai.models.openai import OpenAIModel | |
model = OpenAIModel( | |
'gemma-2-2b-it', | |
base_url='http://localhost:1234/v1', | |
api_key='your-local-api-key', | |
) | |
model11 = OpenAIModel( | |
'mistral-7b-instruct-v0.3', | |
base_url='http://localhost:1234/v1', | |
api_key='your-local-api-key', | |
) | |
#DeepSeek Key: sk-694660c67c3947e4853019473f30240d https://api.deepseek.com | |
# Model Pydantic untuk Sales Order | |
class product1(BaseModel): | |
ds_productCode: Optional[str] = "" #str = Field(..., min_length=1, max_length=20) | |
ds_productName: str = Field(..., min_length=1, max_length=100) | |
ds_quantity: int = Field(..., ge=1) | |
ds_unitPrice: float = Field(..., ge=0) | |
ds_itemAmt: Optional[float] = 0 | |
# 'if-token-present' means nothing will be sent (and the example will work) if you don't have logfire configured | |
logfire.configure(send_to_logfire='if-token-present') | |
class product(BaseModel): | |
ds_productCode: Optional[str] = "" #str = Field(..., min_length=1, max_length=20) | |
ds_productName: Optional[str] = "" | |
ds_quantity: Optional[int] = 0 | |
ds_unitPrice: Optional[float] = 0 | |
ds_itemAmt: Optional[float] = 0 | |
class SalesOrder(BaseModel): | |
ds_salesOrderId: Optional[str] = "" | |
ds_salesDate: Optional[str] = "" | |
ds_customerName: str = Field(..., min_length=1, max_length=100) | |
ds_customerAddress: Optional[str] = "" | |
ds_items: List[product] = Field(..., min_items=1) | |
ds_shippingCost:Optional[float] = 0 | |
ds_shippingAddress: Optional[str] = "" | |
ds_totalAmount:Optional[float] = 0 | |
#ds_paymentTerms: Optional[str] = "" | |
os.environ['GEMINI_API_KEY'] = 'AIzaSyAsVIHsPIIfDBTb2K6VNdNlMt05t8x3mtE' | |
#agent= Agent('gemini-1.5-flash', result_type=SalesOrder) | |
# # Create a system prompt to guide the model | |
SYSTEM_PROMPT = """ | |
Anda adalah asisten yang ahli melakukan ekstrak data SalesOrder dari teks user. Format output yang diinginkan hanya berupa JSON saja sesuai class SalesOrder. Tidak usah ada penjelasan lain. Sekali lagi: Output hanya JSON saja. Untuk Nomor Sales Order pasangkan dengan key ds_salesOrderId, untuk Tanggal Sales pasangkan dengan key ds_salesDate, untuk Nama Customer pasangkan dengan Key ds_customerName, untuk Alamat Customer pasangkan dengan key ds_customerAddress, untuk Daftar Item Barang pasangkan dengan key ds_items, untuk Kode Barang pasangkan dengan key ds_productCode, untuk Nama Barang pasangkan dengan key ds_productName, untuk Quantity pasangkan dengan key ds_quantity, untuk Unit Price pasangkan dengan key ds_unitPrice, untuk Total Nilai Per Baris Barang pasangkan dengan key ds_itemAmt, untuk Ongkos Kirim pasangkan dengan key ds_shippingCost, untuk Alamat Pengiriman pasangkan dengan key ds_shippingAddress, untuk Total Nilai Sales Order pasangkan dengan key ds_totalAmount | |
""" | |
#agent3 = Agent(model=ollama_model, result_type=PetList, retries=3, system_prompt=SYSTEM_PROMPT) | |
#agent3 = Agent(model=ollama_model, retries=3, system_prompt=SYSTEM_PROMPT) | |
#INI SAJA. SALAH SATU | |
agent = Agent('gemini-1.5-flash', system_prompt=SYSTEM_PROMPT) # OK-Gemini | |
#agent = Agent(model) # OK-Lokal | |
#agent = Agent(model, result_type=SalesOrder) # belum bisa untuk local | |
#agent = Agent(model, system_prompt=SYSTEM_PROMPT) # belum bisa untuk local | |
#agent= Agent('gemini-1.5-flash', system_prompt=SYSTEM_PROMPT) # ERR | |
THIS_DIR = Path(__file__).parent | |
async def lifespan(_app: fastapi.FastAPI): | |
async with Database.connect() as db: | |
yield {'db': db} | |
app = fastapi.FastAPI(lifespan=lifespan) | |
logfire.instrument_fastapi(app) | |
async def index() -> FileResponse: | |
return FileResponse((THIS_DIR / 'chat_app.html'), media_type='text/html') | |
async def main_ts() -> FileResponse: | |
"""Get the raw typescript code, it's compiled in the browser, forgive me.""" | |
return FileResponse((THIS_DIR / 'chat_app.ts'), media_type='text/plain') | |
async def get_db(request: Request) -> Database: | |
return request.state.db | |
async def get_chat(database: Database = Depends(get_db)) -> Response: | |
msgs = await database.get_messages() | |
return Response( | |
b'\n'.join(json.dumps(to_ds_message(m)).encode('utf-8') for m in msgs), | |
media_type='text/plain', | |
) | |
class ChatMessage(TypedDict): | |
"""Format of messages sent to the browser.""" | |
role: Literal['user', 'model'] | |
timestamp: str | |
content: str | |
def to_chat_message(m: ModelMessage) -> ChatMessage: | |
first_part = m.parts[0] | |
if isinstance(m, ModelRequest): | |
if isinstance(first_part, UserPromptPart): | |
return { | |
'role': 'user', | |
'timestamp': first_part.timestamp.isoformat(), | |
'content': first_part.content, | |
} | |
elif isinstance(m, ModelResponse): | |
if isinstance(first_part, TextPart): | |
return { | |
'role': 'model', | |
'timestamp': m.timestamp.isoformat(), | |
'content': first_part.content, | |
} | |
raise UnexpectedModelBehavior(f'Unexpected message type for chat app: {m}') | |
def to_ds_message(m: ModelMessage) -> ChatMessage: | |
if isinstance(m, ModelRequest): | |
first_part = m.parts[1] | |
if isinstance(first_part, UserPromptPart): | |
return { | |
'role': 'user', | |
'timestamp': first_part.timestamp.isoformat(), | |
'content': first_part.content, | |
} | |
elif isinstance(m, ModelResponse): | |
first_part = m.parts[0] | |
if isinstance(first_part, TextPart): | |
return { | |
'role': 'model', | |
'timestamp': m.timestamp.isoformat(), | |
'content': first_part.content, | |
} | |
raise UnexpectedModelBehavior(f'Unexpected ds-message type for chat app: {m}') | |
def to_form_message(m: ModelMessage) -> ChatMessage: | |
first_part = m.parts[0] | |
if isinstance(m, ModelResponse): | |
if isinstance(first_part, TextPart): | |
return { | |
'role': 'form', | |
'timestamp': m.timestamp.isoformat(), | |
'content': first_part.content, | |
} | |
raise UnexpectedModelBehavior(f'Unexpected message type for chat app: {m}') | |
async def post_chat( | |
prompt: Annotated[str, fastapi.Form()], database: Database = Depends(get_db) | |
) -> StreamingResponse: | |
async def stream_messages(): | |
"""Streams new line delimited JSON `Message`s to the client.""" | |
# stream the user prompt so that can be displayed straight away | |
yield ( | |
json.dumps( | |
{ | |
'role': 'user', | |
'timestamp': datetime.now(tz=timezone.utc).isoformat(), | |
'content': prompt, | |
} | |
).encode('utf-8') | |
+ b'\n' | |
) | |
# get the chat history so far to pass as context to the agent | |
messages = await database.get_messages() | |
# run the agent with the user prompt and the chat history | |
async with agent.run_stream(prompt, message_history=messages) as result: | |
async for text in result.stream(debounce_by=0.01): | |
# text here is a `str` and the frontend wants | |
# JSON encoded ModelResponse, so we create one | |
m = ModelResponse.from_text(content=text, timestamp=result.timestamp()) | |
yield json.dumps(to_chat_message(m)).encode('utf-8') + b'\n' | |
# add new messages (e.g. the user prompt and the agent response in this case) to the database | |
#print("---",result.new_messages_json(),"---") | |
#print("***",prompt,"***") | |
await database.add_messages(result.new_messages_json()) | |
async def ds_messages(prompt1): | |
#Nama pembeli: Bu Lurah, alamat Bekasi Barat, hari ini membeli Teh Putih dua kaleng harga 110000 per kaleng, juga membeli Teh Hitam 3 kaleng, harga per kaleng 60000. Ongkos kirim ke Bekasi Barat sebesar 36 ribu | |
#nama pembeli Mas Anang alamat di Jalan Cisitu no.5 Bandung, membeli Chocobar 5 batang harga 15 ribu per batang, dan membeli Rice Cracker 4 buah harga 20 ribu per buah, ongkos kirim ke Jalan Cisitu no.5 sebesar 7 ribu rupiah | |
try: | |
#prompt2=f"Ekstrak data Sales Order dari teks: {prompt1}. Format output yang diinginkan hanya berupa JSON saja sesuai class Sales Order. Tidak usah ada penjelasan lain. Sekali lagi: Output hanya JSON saja. Hari ini adalah tanggal {date.today()}. untuk Nomor Sales Order pasangkan dengan key ds_salesOrderId, untuk Tanggal Sales pasangkan dengan key ds_salesDate, untuk Nama Customer pasangkan dengan Key ds_customerName, untuk Alamat Customer pasangkan dengan key ds_customerAddress, untuk Daftar Item Barang pasangkan dengan key ds_items, untuk Kode Barang pasangkan dengan key ds_productCode, untuk Nama Barang pasangkan dengan key ds_productName, untuk Quantity pasangkan dengan key ds_quantity, untuk Unit Price pasangkan dengan key ds_unitPrice, untuk Total Nilai Per Baris Barang pasangkan dengan key ds_itemAmt. Setelah Daftar Item Barang, selanjutnya untuk Ongkos Kirim pasangkan dengan key ds_shippingCost, untuk Alamat Pengiriman pasangkan dengan key ds_shippingAddress, untuk Total Nilai Sales Order pasangkan dengan key ds_totalAmount" | |
#prompt2=f"Ekstrak data Sales Order dari teks: {prompt1}. Hari ini adalah tanggal {date.today()}" | |
prompt2=f"{prompt1}. Hari ini adalah tanggal {date.today()}" | |
yield ( | |
json.dumps( | |
{ | |
'role': 'user', | |
'timestamp': datetime.now(tz=timezone.utc).isoformat(), | |
'content': prompt1, | |
} | |
).encode('utf-8') | |
+ b'\n' | |
) | |
#messages = await database.get_messages() | |
async with agent.run_stream(prompt2) as result: | |
async for text in result.stream(debounce_by=0.1): | |
m = ModelResponse.from_text(content=text, timestamp=result.timestamp()) | |
yield json.dumps(to_ds_message(m)).encode('utf-8') + b'\n' | |
##print(result.usage()) | |
#await database.add_messages(result.new_messages_json()) | |
darso = json.loads(result.new_messages_json()) | |
#print(darso) | |
darso0= darso[0] | |
#darso0.pop(darso0['parts'][0]) | |
#print("00|------------------") | |
#print("01|", darso0 ) | |
#print("01a|", darso0['parts'][0] ) | |
#print("01b|", darso0['parts'][1] ) | |
darso0['parts'][0] = darso0['parts'][1] | |
#print("01?|", darso0 ) | |
#print("01??|", darso ) | |
#print("02|------------------") | |
darso1= darso[1] | |
#print("1|", darso1) | |
darso2= json.loads(json.dumps(darso1)) | |
#print("2|",darso2['parts'][0]) | |
darso3= darso2['parts'][0] | |
darso4= json.loads(json.dumps(darso3)) | |
#print("4|",darso4['content']) | |
darso5= darso4['content'] | |
darso5=darso5.split('```', 2) | |
darso5=darso5[1] | |
#print("5a|",darso5) | |
darso5=darso5.replace('json', '') | |
#print("5|",darso5,"|") | |
try: | |
darso6= json.loads(darso5) #json | |
darso7= SalesOrder.model_validate(darso6) | |
except: | |
darso6= "ERR" | |
#print("6|",darso6,"|") | |
if "ds_items" in darso5: | |
cek_str="ds_items" | |
else: | |
cek_str="--" | |
if darso6=="ERR": | |
ds_id = time.time() | |
ds_salesOrderId = "ERR" | |
ds_salesDate = 'ERR' | |
ds_customerName="-" | |
ds_customerAddress="-" | |
ds_productName1 = "Produk1 --- " | |
ds_quantity1 = 1 | |
ds_unitPrice1 = 0 | |
ds_itemAmt1 = 0 | |
ds_productName2 = "Produk2 --- " | |
ds_quantity2 = 0 | |
ds_unitPrice2 = 0 | |
ds_itemAmt2 = 0 | |
ds_productName3 = "Produk3 --- " | |
ds_quantity3 = 0 | |
ds_unitPrice3 = 0 | |
ds_itemAmt3 = 0 | |
ds_shippingAddress="" | |
ds_shippingCost=0 | |
ds_totalAmount=0 | |
else: | |
ds_id = time.time() | |
ds_salesOrderId = "OK" | |
ds_salesDate = 'OK' | |
try: | |
ds_salesOrderId = darso7.ds_salesOrderId | |
#print("7|ds_salesOrderId") | |
ds_salesDate = darso7.ds_salesDate | |
#print("7|ds_salesDate") | |
ds_customerName=f"""{darso7.ds_customerName}""" | |
#print("7|ds_customerName:",ds_customerName) | |
ds_customerAddress=f"""{darso7.ds_customerAddress}""" | |
#print("7|ds_customerAddress:", len(darso7.ds_items)) | |
ds_productName1 = darso7.ds_items[0].ds_productName | |
#print("7|ds_productName1") | |
ds_quantity1 = darso7.ds_items[0].ds_quantity | |
#print("7|ds_quantity1") | |
ds_unitPrice1 = darso7.ds_items[0].ds_unitPrice | |
#print("7|ds_unitPrice1") | |
ds_itemAmt1 = darso7.ds_items[0].ds_itemAmt | |
#print("7|ds_itemAmt1") | |
ds_productName2 = "-" | |
ds_quantity2 = 0 | |
ds_unitPrice2 = 0 | |
ds_itemAmt2 = 0 | |
ds_productName3 = "-" | |
ds_quantity3 = 0 | |
ds_unitPrice3 = 0 | |
ds_itemAmt3 = 0 | |
if len(darso7.ds_items)>1: | |
ds_productName2 = darso7.ds_items[1].ds_productName | |
ds_quantity2 = darso7.ds_items[1].ds_quantity | |
ds_unitPrice2 = darso7.ds_items[1].ds_unitPrice | |
ds_itemAmt2 = darso7.ds_items[1].ds_itemAmt | |
if len(darso7.ds_items)>2: | |
ds_productName3 = darso7.ds_items[2].ds_productName | |
ds_quantity3 = darso7.ds_items[2].ds_quantity | |
ds_unitPrice3 = darso7.ds_items[2].ds_unitPrice | |
ds_itemAmt3 = darso7.ds_items[2].ds_itemAmt | |
ds_shippingCost=darso7.ds_shippingCost | |
#print("7|ds_shippingCost") | |
ds_shippingAddress=f"""{darso7.ds_shippingAddress}""" | |
#print("7|ds_shippingAddress") | |
ds_totalAmount=darso7.ds_totalAmount | |
#print("7|ds_totalAmount") | |
except: | |
ds_salesOrderId = "OK2" | |
ds_salesDate = 'OK2' | |
ds_customerName="-" | |
ds_customerAddress="-" | |
ds_productName1 = "Produk1" | |
ds_quantity1 = 0 | |
ds_unitPrice1 = 0 | |
ds_itemAmt1 = 0 | |
ds_productName2 = "Produk2" | |
ds_quantity2 = 0 | |
ds_unitPrice2 = 0 | |
ds_itemAmt2 = 0 | |
ds_productName3 = "Produk3" | |
ds_quantity3 = 0 | |
ds_unitPrice3 = 0 | |
ds_itemAmt3 = 0 | |
ds_shippingAddress="" | |
ds_shippingCost=0 | |
ds_totalAmount=0 | |
formDs = f""" | |
<form id="myaiForm{ds_id}" action="javascript:abcde({ds_id});" class="form-container"> | |
<h3>Pesanan</h3> | |
<table> | |
<tr> | |
<td><label for="ds_salesOrderId"><b>SO#</b></label><input type="text" placeholder="" name="ds_salesOrderId" value="{ds_salesOrderId}"></td> | |
<td><label for="ds_salesDate"><b>Date</b></label><input type="text" placeholder="" name="ds_salesDate" value="{ds_salesDate}"></td> | |
</tr> | |
<tr> | |
<td colspan="2"><label for="ds_customerName"><b>Customer</b></label><input type="text" placeholder="" name="ds_customerName" value="{ds_customerName}"></td> | |
</tr> | |
<tr> | |
<td colspan="2"><label for="ds_customerAddress"><b>Alamat</b></label><input type="text" placeholder="" name="ds_customerAddress" value="{ds_customerAddress}"></td> | |
</tr> | |
</table style="width:100%"> | |
<b>Item Barang:</b> | |
<table> | |
<tr><th>Prod</th><th style="text-align: center;">Qty</th><th style="text-align: center;">Prc</th><th style="text-align: center;">Rp</th></tr> | |
<tr> | |
<td><input type="text" placeholder="-" name="ds_productName1" value="{ds_productName1}"></td> | |
<td><input type="text" style="text-align: center;" name="ds_quantity1" value={ds_quantity1}></td> | |
<td><input type="text" style="text-align: center;" name="ds_unitPrice1" value={ds_unitPrice1}></td> | |
<td><input type="text" style="text-align: center;" name="ds_itemAmt1" value={ds_itemAmt1}></td> | |
</tr> | |
<tr> | |
<td><input type="text" placeholder="-" name="ds_productName2" value="{ds_productName2}"></td> | |
<td><input type="text" style="text-align: center;" name="ds_quantity2" value={ds_quantity2}></td> | |
<td><input type="text" style="text-align: center;" name="ds_unitPrice2" value={ds_unitPrice2}></td> | |
<td><input type="text" style="text-align: center;" name="ds_itemAmt2" value={ds_itemAmt2}></td> | |
</tr> | |
<tr> | |
<td><input type="text" placeholder="-" name="ds_productName3" value="{ds_productName3}"></td> | |
<td><input type="text" style="text-align: center;" name="ds_quantity3" value={ds_quantity3}></td> | |
<td><input type="text" style="text-align: center;" name="ds_unitPrice3" value={ds_unitPrice3}></td> | |
<td><input type="text" style="text-align: center;" name="ds_itemAmt3" value={ds_itemAmt3}></td> | |
</tr> | |
</table> | |
<table> | |
<tr> | |
<td style="text-align: center;"><b>Ongkir</b></td> | |
<td style="text-align: center;"><b>Total</b></td> | |
</tr> | |
<tr> | |
<td><input type="text" style="text-align: center;" placeholder="0" name="ds_shippingCost" value={ds_shippingCost}></td> | |
<td><input type="text" style="text-align: center;" placeholder="0" name="ds_totalAmount" value={ds_totalAmount}></td> | |
</tr> | |
<tr> | |
<td colspan="2"><label for="ds_shippingAddress"><b></b></label><input type="text" placeholder="" name="ds_shippingAddress" value="{ds_shippingAddress}"></td> | |
</tr> | |
</table> | |
<button type="submit" class="btn">Submit</button> | |
<button type="button" class="btn cancel" onclick="closeAiForm({ds_id})">Close</button> | |
</form> | |
<form id="myaiForm2{ds_id}" class="form-container" style="display:none;"> | |
<button type="button" class="btn umum" onclick="openAiForm({ds_id})">Open Form</button> | |
</form> | |
""" | |
m = ModelResponse.from_text(content=formDs, timestamp=result.timestamp()) | |
yield json.dumps(to_ds_message(m)).encode('utf-8') + b'\n' | |
print("OK") | |
#await database.add_messages(result.new_messages_json()) | |
await database.add_messages(json.dumps(darso)) | |
##print(len(items)) | |
#darso7 = SalesOrder.model_validate(darso6) | |
#print("[--",darso7.ds_customerName,"--]") | |
#darso8 = darso7.ds_items[0] | |
##, len(darso7.ds_items) | |
#print("[--",darso8.ds_productName,"--]") | |
except ValueError as e: | |
print(e) | |
if prompt[0] == "@" : | |
#print("@@@", prompt, "@@@") | |
nn = len(prompt) | |
prompt = prompt[1:nn] | |
print(">>>", prompt, "<<<") | |
return StreamingResponse(ds_messages(prompt), media_type='text/plain') | |
elif prompt[0] != "@" : | |
#print("biasa") | |
return StreamingResponse(ds_messages(prompt), media_type='text/plain') | |
print("** selesai **") | |
return StreamingResponse(ds_messages(prompt), media_type='text/plain') | |
P = ParamSpec('P') | |
R = TypeVar('R') | |
class Database: | |
"""Rudimentary database to store chat messages in SQLite. | |
The SQLite standard library package is synchronous, so we | |
use a thread pool executor to run queries asynchronously. | |
""" | |
con: sqlite3.Connection | |
_loop: asyncio.AbstractEventLoop | |
_executor: ThreadPoolExecutor | |
async def connect( | |
cls, file: Path = THIS_DIR / '.chat_messages.sqlite' | |
) -> AsyncIterator[Database]: | |
with logfire.span('connect to DB'): | |
loop = asyncio.get_event_loop() | |
executor = ThreadPoolExecutor(max_workers=1) | |
con = await loop.run_in_executor(executor, cls._connect, file) | |
slf = cls(con, loop, executor) | |
try: | |
yield slf | |
finally: | |
await slf._asyncify(con.close) | |
def _connect(file: Path) -> sqlite3.Connection: | |
con = sqlite3.connect(str(file)) | |
con = logfire.instrument_sqlite3(con) | |
cur = con.cursor() | |
cur.execute( | |
'CREATE TABLE IF NOT EXISTS messages (id INT PRIMARY KEY, message_list TEXT);' | |
) | |
con.commit() | |
return con | |
async def add_messages(self, messages: bytes): | |
await self._asyncify( | |
self._execute, | |
'INSERT INTO messages (message_list) VALUES (?);', | |
messages, | |
commit=True, | |
) | |
await self._asyncify(self.con.commit) | |
async def get_messages(self) -> list[ModelMessage]: | |
c = await self._asyncify( | |
self._execute, 'SELECT message_list FROM messages order by id asc' | |
) | |
rows = await self._asyncify(c.fetchall) | |
messages: list[ModelMessage] = [] | |
for row in rows: | |
messages.extend(ModelMessagesTypeAdapter.validate_json(row[0])) | |
return messages | |
def _execute( | |
self, sql: LiteralString, *args: Any, commit: bool = False | |
) -> sqlite3.Cursor: | |
cur = self.con.cursor() | |
cur.execute(sql, args) | |
if commit: | |
self.con.commit() | |
return cur | |
async def _asyncify( | |
self, func: Callable[P, R], *args: P.args, **kwargs: P.kwargs | |
) -> R: | |
return await self._loop.run_in_executor( # type: ignore | |
self._executor, | |
partial(func, **kwargs), | |
*args, # type: ignore | |
) | |
if __name__ == '__main__': | |
import uvicorn | |
uvicorn.run( | |
'app:app', reload=True, host="0.0.0.0", port=7860, reload_dirs=[str(THIS_DIR)] | |
) |