Spaces:
Sleeping
Sleeping
import io | |
import os | |
import time | |
import streamlit as st | |
import requests | |
import zipfile | |
from azure.core.credentials import AzureKeyCredential | |
from azure.ai.translation.document import DocumentTranslationClient | |
from dotenv import load_dotenv | |
from streamlit_pdf_viewer import pdf_viewer | |
from utils import blob_service_client, upload_to_azure, download_from_azure, delete_from_azure | |
from streamlit_msal import Msal | |
load_dotenv() | |
st.set_page_config(layout="wide") | |
# Authenticate user with Azure Active Directory | |
with st.sidebar: | |
auth_data = Msal.initialize_ui( | |
client_id=os.environ['AZURE_CLIENT_ID'], | |
authority=os.environ['AZURE_AUTHORITY_URL'], | |
scopes=[], | |
connecting_label="Connecting", | |
disconnected_label="Disconnected", | |
sign_in_label="Sign in", | |
sign_out_label="Sign out" | |
) | |
if not auth_data: | |
st.warning("Please login to continue") | |
st.stop() | |
else: | |
# Streamlit UI | |
st.title("Azure Translation Tools") | |
uploaded_files = st.file_uploader("Upload files to start the process", accept_multiple_files=True) | |
# Initialize a new instance of the DocumentTranslationClient | |
client = DocumentTranslationClient(os.environ["AZURE_AI_ENDPOINT_URL"], AzureKeyCredential(os.environ["AZURE_AI_TRANSLATOR_KEY"])) | |
sourceUri = "https://cbdtranslation.blob.core.windows.net/source" | |
targetUri = "https://cbdtranslation.blob.core.windows.net/target" | |
# Define available language options with their codes and names | |
langs = ( | |
'id - Indonesian', | |
'en - English', | |
'es - Spanish', | |
'zh - Chinese', | |
'ar - Arabic', | |
'fr - French', | |
'ru - Russian', | |
'hi - Hindi', | |
'pt - Portuguese', | |
'de - German', | |
'ms - Malay', | |
'ta - Tamil', | |
'ko - Korean', | |
'th - Thai', | |
) | |
# Get user's language selection and extract language code and name | |
lang = st.selectbox('Target language selection:', langs, key='lang') | |
lang_id = lang.split()[0] # Get language code (e.g., 'en') | |
lang_name = lang.split()[-1] # Get language name (e.g., 'English') | |
def process_sync(file_name, file_content): | |
# Set up Azure Translator API headers | |
headers = { | |
"Ocp-Apim-Subscription-Key": os.environ["AZURE_AI_TRANSLATOR_KEY"], | |
} | |
# Prepare file for translation | |
files = { | |
"document": (file_name, file_content, "ContentType/file-extension"), | |
} | |
# Construct API URL with target language and version | |
url = f"{os.environ['AZURE_AI_ENDPOINT_URL']}/translator/document:translate?targetLanguage={lang_id}&api-version={os.environ['AZURE_AI_API_VERSION']}" | |
# Send translation request to Azure | |
response = requests.post(url, headers=headers, files=files) | |
return response.status_code == 200, response.content | |
def process_async(file_name, file_content): | |
# Upload the original file to Azure Blob Storage source container | |
upload_to_azure(blob_service_client, "source", file_content, file_name) | |
# Initialize translation job using the DocumentTranslationClient | |
# Wait for the translation to complete and get the result | |
poller = client.begin_translation(sourceUri, targetUri, lang_id) | |
result = poller.result() | |
# Download the translated file from Azure Blob Storage target container | |
downloaded_file_content = download_from_azure(blob_service_client, "target", file_name) | |
# Clean up: Remove files from both source and target containers | |
delete_from_azure(blob_service_client, "source", file_name) | |
delete_from_azure(blob_service_client, "target", file_name) | |
# Return translation status and the translated content | |
for document in result: | |
return document.status == 'Succeeded', downloaded_file_content | |
if uploaded_files: | |
submit = st.button("Get Result", key='submit') | |
if uploaded_files and submit: | |
# Create an in-memory zip file to store translated documents | |
zip_buffer = io.BytesIO() | |
with zipfile.ZipFile(zip_buffer, 'w') as zip_file: | |
# Add progress bar for translation status | |
progress_bar = st.progress(0) | |
for idx, uploaded_file in enumerate(uploaded_files): | |
# Start timing | |
start_time = time.time() | |
file_name = uploaded_file.name | |
file_content = uploaded_file.read() | |
file_type = file_name.split('.')[-1] | |
# Check file extension to determine translation method | |
if file_type in ['txt', 'tsv', 'tab', 'csv', 'html', 'htm', 'mthml', 'mht', 'pptx', 'xlsx', 'docx', 'msg', 'xlf', 'xliff']: | |
result, response = process_sync(file_name, file_content) | |
elif file_type in ['pdf', 'odt', 'odp', 'ods', 'rtf']: | |
result, response = process_async(file_name, file_content) | |
# Calculate duration | |
duration = time.time() - start_time | |
# Check if translation was successful | |
if result: | |
# Add successfully translated file to zip archive | |
zip_file.writestr(f"{lang_name}-translated-{uploaded_file.name}", response) | |
st.success(f"Successfully translated: {uploaded_file.name} (Time taken: {duration:.2f} seconds)") | |
else: | |
st.error(f"Failed to translate {uploaded_file.name} with status code {response.status_code}: {response.text} (Time taken: {duration:.2f} seconds)") | |
if file_type == 'pdf': | |
# Display the original and translated files side by side | |
col1, col2 = st.columns(2) | |
with col1: | |
st.write(f"Original File: {uploaded_file.name}") | |
st.divider() | |
pdf_viewer(file_content) | |
with col2: | |
st.write(f"Translated File: {lang_name}-translated-{uploaded_file.name}") | |
st.divider() | |
pdf_viewer(response) | |
elif file_type == 'docx': | |
col1, col2 = st.columns(2) | |
with col1: | |
st.write(f"Original File: {uploaded_file.name}") | |
st.divider() | |
st.write("On development") | |
with col2: | |
st.write(f"Translated File: {lang_name}-translated-{uploaded_file.name}") | |
st.divider() | |
st.write("On development") | |
elif file_type == 'txt': | |
# Display the original and translated files side by side | |
col1, col2 = st.columns(2) | |
with col1: | |
st.write(f"Original File: {uploaded_file.name}") | |
st.divider() | |
st.write(file_content) | |
with col2: | |
st.write(f"Translated File: {lang_name}-translated-{uploaded_file.name}") | |
st.divider() | |
st.write(response) | |
# Update progress bar based on completed translations | |
progress = (idx + 1) / len(uploaded_files) | |
progress_bar.progress(progress) | |
# Create download button for the zip file containing all translations | |
st.download_button( | |
label="Download All Translated Files", | |
data=zip_buffer.getvalue(), | |
file_name=f"{lang_name}-translated-files.zip", | |
mime="application/zip" | |
) | |