|
import streamlit as st |
|
from streamlit_elements import elements, mui, editor, dashboard |
|
from stqdm import stqdm |
|
import textgrad as tg |
|
import os |
|
|
|
|
|
class MathSolution: |
|
def __init__(self, data) -> None: |
|
self.data = data |
|
if 'default_initial_solution' not in st.session_state: |
|
st.session_state.default_initial_solution = self.data["default_initial_solution"] |
|
if 'loss_system_prompt' not in st.session_state: |
|
st.session_state.loss_system_prompt = self.data["default_loss_system_prompt"] |
|
if 'instruction' not in st.session_state: |
|
st.session_state.instruction = self.data["instruction"] |
|
if 'current_solution' not in st.session_state: |
|
st.session_state.current_solution = self.data["default_initial_solution"] |
|
|
|
self.llm_engine = tg.get_engine("gpt-4o") |
|
print("="*50, "init", "="*50) |
|
self.loss_value = "" |
|
self.gradients = "" |
|
if 'iteration' not in st.session_state: |
|
st.session_state.iteration = 0 |
|
st.session_state.results = [] |
|
tg.set_backward_engine(self.llm_engine, override=True) |
|
|
|
def load_layout(self): |
|
|
|
def update_solution_content(value): |
|
if st.session_state.iteration == 0: |
|
st.session_state.current_solution = value |
|
|
|
|
|
col1, col2 = st.columns([1, 1]) |
|
with col1: |
|
|
|
|
|
solution = st.text_area("Initial solution:", st.session_state.default_initial_solution, height=300) |
|
|
|
if solution is not None and st.session_state.default_initial_solution != solution: |
|
update_solution_content(solution) |
|
with col2: |
|
self.loss_system_prompt = st.text_area("Loss system prompt:", st.session_state.loss_system_prompt, height=300) |
|
|
|
def _run(self): |
|
|
|
current_solution = st.session_state.current_solution |
|
|
|
self.response = tg.Variable(current_solution, |
|
requires_grad=True, |
|
role_description="solution to the math question") |
|
|
|
loss_fn = tg.TextLoss(tg.Variable(self.loss_system_prompt, |
|
requires_grad=False, |
|
role_description="system prompt")) |
|
optimizer = tg.TGD([self.response]) |
|
|
|
loss = loss_fn(self.response) |
|
self.loss_value = loss.value |
|
self.graph = loss.generate_graph() |
|
|
|
loss.backward() |
|
self.gradients = self.response.gradients |
|
|
|
optimizer.step() |
|
st.session_state.current_solution = self.response.value |
|
|
|
def show_results(self): |
|
self._run() |
|
st.session_state.iteration += 1 |
|
st.session_state.results.append({ |
|
'iteration': st.session_state.iteration, |
|
'loss_value': self.loss_value, |
|
'response': self.response.value, |
|
'gradients': self.gradients |
|
}) |
|
|
|
tabs = st.tabs([f"Iteration {i+1}" for i in range(st.session_state.iteration)]) |
|
|
|
for i, tab in enumerate(tabs): |
|
with tab: |
|
result = st.session_state.results[i] |
|
st.markdown(f"Current iteration: **{result['iteration']}**") |
|
st.markdown("## Current solution:") |
|
st.markdown(result['response']) |
|
col1, col2 = st.columns([1, 1]) |
|
with col1: |
|
st.markdown("### Loss value") |
|
st.markdown("**Loss value is based on previous code.**") |
|
st.markdown(result['loss_value']) |
|
with col2: |
|
st.markdown("### Code gradients") |
|
for j, g in enumerate(result['gradients']): |
|
st.markdown(f"### Gradient") |
|
st.markdown(g.value) |