import streamlit as st import pandas as pd import re from openai import OpenAI import concurrent.futures import json import os def extract_and_parse_json_from_markdown(markdown_text: str) -> dict: code_block_pattern = r"```(?:json)?\s*([\s\S]*?)```" code_block_match = re.search(code_block_pattern, markdown_text) if code_block_match: json_str = code_block_match.group(1).strip() else: json_str = markdown_text.strip() try: return json.loads(json_str) except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON format: {e}") def process_event(event): openai = OpenAI( api_key=os.environ.get('DEEP_API_KEY'), base_url="https://api.deepinfra.com/v1/openai", ) llm_prompt = f""" You are a digital marketing campaign analyst designed to analyze and report digital marketing campaign data for Rod Wave concerts. Your job is to convert the given text into JSON. Don't make any assumptions; if a value doesn't exist, consider it as zero. {{ "market": "str", "total_spend": "float", "impressions": "float", "clicks": "float", "metrics_cpc": "float", "metrics_cpm": "float", "metrics_ctr": "float", "metrics_cpa": "float", "platform_spend_meta_total": "float", "platform_spend_meta_instagram": "float", "platform_spend_meta_facebook": "float", "platform_spend_google_total": "float", "platform_spend_google_youtube": "float", "platform_spend_google_search_display": "float", "platform_spend_programmatic": "float", "revenue_average_ticket_price": "float", "revenue_total_revenue": "float", "revenue_roi": "float" }} Here is the text for it: {event} Return in only JSON adhering to the above schema. """ # Attempt to process the event and validate JSON for attempt in range(2): # Try twice chat_completion = openai.chat.completions.create( model="Qwen/Qwen2.5-Coder-32B-Instruct", messages=[{"role": "user", "content": llm_prompt}], ) json_output = chat_completion.choices[0].message.content try: return extract_and_parse_json_from_markdown(json_output) except ValueError: if attempt == 0: st.warning("JSON validation failed, retrying...") else: st.error("Failed to validate JSON after retrying.") return None # Return None if it fails after retrying def process_all_events(events): json_all = [] progress_bar = st.progress(0) with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: futures = [executor.submit(process_event, event) for event in events] for i, future in enumerate(concurrent.futures.as_completed(futures)): progress = (i + 1) / len(events) progress_bar.progress(progress) json_all.append(future.result()) return json_all def main(): st.title("Rod Wave Concert Marketing Data Processor") input_method = st.radio("Choose input method:", ["Text Area", "File Upload"]) text = None if input_method == "Text Area": text = st.text_area("Enter concert marketing data:", height=300) else: uploaded_file = st.file_uploader("Choose a text file", type="txt") if uploaded_file is not None: text = uploaded_file.read().decode("utf-8") if text: events = re.split(r'\n(?=Rod Wave Concert)', text) events = [event for event in events if event.strip()] st.write(f"Found **{len(events)}** events to process") if st.button("Process Data"): with st.spinner("Processing events..."): json_all = process_all_events(events) json_sanity = [] for ele in json_all: if ele is not None: # Only process valid JSON json_sanity.append(ele) df = pd.DataFrame(json_sanity) st.success("Processing complete!") st.write("Preview of processed data:") st.dataframe(df.head()) csv = df.to_csv(index=False) st.download_button( label="Download CSV", data=csv, file_name="processed_concert_data.csv", mime="text/csv" ) if __name__ == "__main__": main()