Spaces:
Sleeping
Sleeping
import streamlit as st | |
import pandas as pd | |
import re | |
from openai import OpenAI | |
import concurrent.futures | |
import json | |
import os | |
def extract_and_parse_json_from_markdown(markdown_text: str) -> dict: | |
code_block_pattern = r"```(?:json)?\s*([\s\S]*?)```" | |
code_block_match = re.search(code_block_pattern, markdown_text) | |
if code_block_match: | |
json_str = code_block_match.group(1).strip() | |
else: | |
json_str = markdown_text.strip() | |
try: | |
return json.loads(json_str) | |
except json.JSONDecodeError as e: | |
raise ValueError(f"Invalid JSON format: {e}") | |
def process_event(event): | |
openai = OpenAI( | |
api_key=os.environ.get('DEEP_API_KEY'), | |
base_url="https://api.deepinfra.com/v1/openai", | |
) | |
llm_prompt = f""" | |
You are a digital marketing campaign analyst designed to analyze and report digital marketing campaign data for Rod Wave concerts. Your job is to convert the given text into JSON. | |
Don't make any assumptions; if a value doesn't exist, consider it as zero. | |
{{ | |
"market": "str", | |
"total_spend": "float", | |
"impressions": "float", | |
"clicks": "float", | |
"metrics_cpc": "float", | |
"metrics_cpm": "float", | |
"metrics_ctr": "float", | |
"metrics_cpa": "float", | |
"platform_spend_meta_total": "float", | |
"platform_spend_meta_instagram": "float", | |
"platform_spend_meta_facebook": "float", | |
"platform_spend_google_total": "float", | |
"platform_spend_google_youtube": "float", | |
"platform_spend_google_search_display": "float", | |
"platform_spend_programmatic": "float", | |
"revenue_average_ticket_price": "float", | |
"revenue_total_revenue": "float", | |
"revenue_roi": "float" | |
}} | |
Here is the text for it: | |
{event} | |
Return in only JSON adhering to the above schema. | |
""" | |
# Attempt to process the event and validate JSON | |
for attempt in range(2): # Try twice | |
chat_completion = openai.chat.completions.create( | |
model="Qwen/Qwen2.5-Coder-32B-Instruct", | |
messages=[{"role": "user", "content": llm_prompt}], | |
) | |
json_output = chat_completion.choices[0].message.content | |
try: | |
return extract_and_parse_json_from_markdown(json_output) | |
except ValueError: | |
if attempt == 0: | |
st.warning("JSON validation failed, retrying...") | |
else: | |
st.error("Failed to validate JSON after retrying.") | |
return None # Return None if it fails after retrying | |
def process_all_events(events): | |
json_all = [] | |
progress_bar = st.progress(0) | |
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: | |
futures = [executor.submit(process_event, event) for event in events] | |
for i, future in enumerate(concurrent.futures.as_completed(futures)): | |
progress = (i + 1) / len(events) | |
progress_bar.progress(progress) | |
json_all.append(future.result()) | |
return json_all | |
def main(): | |
st.title("Rod Wave Concert Marketing Data Processor") | |
input_method = st.radio("Choose input method:", ["Text Area", "File Upload"]) | |
text = None | |
if input_method == "Text Area": | |
text = st.text_area("Enter concert marketing data:", height=300) | |
else: | |
uploaded_file = st.file_uploader("Choose a text file", type="txt") | |
if uploaded_file is not None: | |
text = uploaded_file.read().decode("utf-8") | |
if text: | |
events = re.split(r'\n(?=Rod Wave Concert)', text) | |
events = [event for event in events if event.strip()] | |
st.write(f"Found **{len(events)}** events to process") | |
if st.button("Process Data"): | |
with st.spinner("Processing events..."): | |
json_all = process_all_events(events) | |
json_sanity = [] | |
for ele in json_all: | |
if ele is not None: # Only process valid JSON | |
json_sanity.append(ele) | |
df = pd.DataFrame(json_sanity) | |
st.success("Processing complete!") | |
st.write("Preview of processed data:") | |
st.dataframe(df.head()) | |
csv = df.to_csv(index=False) | |
st.download_button( | |
label="Download CSV", | |
data=csv, | |
file_name="processed_concert_data.csv", | |
mime="text/csv" | |
) | |
if __name__ == "__main__": | |
main() |