File size: 5,491 Bytes
ca64dfe
988c5f2
 
55bd66e
988c5f2
7321565
 
d30c02a
26b862a
7321565
5eddda9
988c5f2
7321565
 
 
988c5f2
55bd66e
837e37f
 
 
60848d1
 
 
 
837e37f
 
7321565
988c5f2
 
 
 
7321565
988c5f2
7321565
 
988c5f2
 
 
 
 
 
 
 
 
 
ca64dfe
cbe2d25
ca64dfe
 
cbe2d25
ca64dfe
cbe2d25
 
ca64dfe
7321565
cbe2d25
 
 
988c5f2
837e37f
 
 
 
7321565
ca64dfe
26b862a
d30c02a
7bf6ee2
 
60848d1
 
 
 
cbe2d25
 
7321565
 
 
 
 
 
882d9bb
 
 
 
cbe2d25
 
 
882d9bb
 
 
 
 
 
 
23a54f8
cbe2d25
7bf6ee2
 
cbe2d25
d30c02a
7bf6ee2
 
837e37f
2b2ec83
d6a5c7c
 
2b2ec83
60848d1
837e37f
 
 
 
d6a5c7c
 
 
 
 
23a54f8
d6a5c7c
837e37f
d6a5c7c
 
 
 
 
23a54f8
837e37f
 
 
23a54f8
837e37f
a3542d4
 
 
 
 
837e37f
d30c02a
 
 
 
 
837e37f
 
d30c02a
988c5f2
d30c02a
 
 
837e37f
 
96d766a
 
 
 
e294c88
837e37f
 
 
 
 
 
 
 
 
60848d1
bf76813
60848d1
837e37f
 
60848d1
837e37f
60848d1
bf76813
60848d1
837e37f
988c5f2
 
d6a5c7c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import spaces
import os
import gradio as gr
import torch
from transformers import AutoTokenizer, TextStreamer, pipeline, AutoModelForCausalLM
from langchain_community.embeddings import HuggingFaceInstructEmbeddings
from langchain_community.vectorstores import Chroma
from langchain.prompts import PromptTemplate
from langchain.chains import RetrievalQA
from langchain_community.llms import HuggingFacePipeline

DEFAULT_SYSTEM_PROMPT = """
You are a ROS2 expert assistant. Based on the context provided, give direct and concise answers.
If the information is not in the context, respond with "I don't find that information in the available documentation."
Keep responses to 1-2 lines maximum.
""".strip()

PREDEFINED_QUESTIONS = [
    "Select a question...",
    "Tell me how can I navigate to a specific pose - include replanning aspects in your answer.",
    "Can you provide me with code for this task?",
    "How do I set up obstacle avoidance in ROS2 navigation?",
    "What are the key parameters for tuning the nav2 planner?",
    "How do I integrate custom recovery behaviors?"
]

def generate_prompt(context: str, question: str, system_prompt: str = DEFAULT_SYSTEM_PROMPT) -> str:
    return f"""
[INST] <<SYS>>
{system_prompt}
<</SYS>>
Context: {context}
Question: {question}
Answer: [/INST]
""".strip()

embeddings = HuggingFaceInstructEmbeddings(
    model_name="hkunlp/instructor-base",
    model_kwargs={"device": "cpu"}
)

db = Chroma(
    persist_directory="db",
    embedding_function=embeddings
)

def initialize_model():
    model_id = "meta-llama/Llama-3.2-3B-Instruct"
    token = os.environ.get("HF_TOKEN")
    
    tokenizer = AutoTokenizer.from_pretrained(model_id, token=token)
    model = AutoModelForCausalLM.from_pretrained(
        model_id,
        token=token,
        device_map="cuda" if torch.cuda.is_available() else "cpu"
    )
    
    return model, tokenizer

def question_selected(question):
    if question == "Select a question...":
        return ""
    return question

@spaces.GPU
def respond(message, history, system_message, max_tokens, temperature, top_p):
    try:
        history = history or []
        
        if not message.strip():
            history.append((message, "Please enter a question or select one from the dropdown menu."))
            return history
            
        model, tokenizer = initialize_model()
        
        retriever = db.as_retriever(search_kwargs={"k": 2})
        docs = retriever.get_relevant_documents(message)
        context = "\n".join([doc.page_content for doc in docs])
        
        prompt = generate_prompt(context=context, question=message, system_prompt=system_message)
        
        text_pipeline = pipeline(
            "text-generation",
            model=model,
            tokenizer=tokenizer,
            max_new_tokens=max_tokens,
            temperature=temperature,
            top_p=top_p,
            repetition_penalty=1.15
        )
        
        output = text_pipeline(
            prompt,
            return_full_text=False,
            max_new_tokens=max_tokens
        )[0]['generated_text']
        
        history.append((message, output.strip()))
        return history
        
    except Exception as e:
        history.append((message, f"An error occurred: {str(e)}"))
        return history


def submit_and_clear(message, history, system_message, max_tokens, temperature, top_p):
    new_history = respond(message, history, system_message, max_tokens, temperature, top_p)
    return new_history, ""

with gr.Blocks(title="ROS2 Expert Assistant") as demo:
    gr.Markdown("# ROS2 Expert Assistant")
    gr.Markdown("Ask questions about ROS2, navigation, and robotics. I'll provide concise answers based on the available documentation.")
    
    question_dropdown = gr.Dropdown(
        choices=PREDEFINED_QUESTIONS,
        value="Select a question...",
        label="Pre-defined Questions"
    )
        
    chatbot = gr.Chatbot()
    
    msg = gr.Textbox(
        label="Your Question",
        placeholder="Type your question here or select one from the dropdown above...",
        lines=2
    )
        
    with gr.Row():
        submit = gr.Button("Submit")
        clear = gr.Button("Clear")
        
    with gr.Accordion("Advanced Settings", open=False):
        # system_message = gr.Textbox(
        #     value=DEFAULT_SYSTEM_PROMPT,
        #     label="System Message",
        #     lines=3
        # )
        max_tokens = gr.Slider(
            minimum=1,
            maximum=2048,
            value=500,
            step=1,
            label="Max new tokens"
        )
        temperature = gr.Slider(
            minimum=0.1,
            maximum=4.0,
            value=0.1,
            step=0.1,
            label="Temperature"
        )
        top_p = gr.Slider(
            minimum=0.1,
            maximum=1.0,
            value=0.95,
            step=0.05,
            label="Top-p"
        )
    
    question_dropdown.change(
        question_selected,
        inputs=[question_dropdown],
        outputs=[msg]
    )
    
    submit.click(
        submit_and_clear,
        inputs=[msg, chatbot, max_tokens, temperature, top_p],
        outputs=[chatbot, msg]
    )
    
    clear.click(lambda: (None, ""), None, [chatbot, msg], queue=False)
    msg.submit(
        submit_and_clear,
        inputs=[msg, chatbot, max_tokens, temperature, top_p],
        outputs=[chatbot, msg]
    )

if __name__ == "__main__":
    demo.launch()