Spaces:
Running
Running
import io | |
import json | |
import os | |
import time | |
import uuid | |
from datetime import datetime | |
from io import BytesIO | |
from tempfile import NamedTemporaryFile | |
from xmlrpc.client import Binary | |
import jwt | |
import numpy as np | |
import requests | |
import scipy | |
import soundfile as sf | |
import streamlit as st | |
import streamlit_vertical_slider as svs | |
from pydub import AudioSegment | |
from scipy.signal import butter, sosfilt | |
from streamlit import session_state as st_state | |
from woocommerce import API | |
from wordpress_xmlrpc import Client | |
from wordpress_xmlrpc.compat import xmlrpc_client | |
from wordpress_xmlrpc.methods import media | |
#import extra_streamlit_components as stx | |
#cookie_manager = stx.CookieManager(key='cookie_manager') | |
# Try to get API_URL from environment variables, if not found set to a default value | |
try: | |
API_URL = os.environ["API_URL"] | |
except KeyError: | |
st.error("API_URL environment variable is not set.") | |
st.stop() | |
print("API_URL:", os.environ["API_URL"]) | |
page_bg_img = ''' | |
<style> | |
.stApp { | |
background-image: url("https://songlabai.com/wp-content/uploads/2024/03/4.png"); | |
background-size: cover; | |
} | |
</style> | |
''' | |
st.markdown(page_bg_img, unsafe_allow_html=True) | |
def save_to_wordpress(channel1, channel2, sample_rate): | |
channel1 = np.array(channel1).astype(np.float32) | |
channel2 = np.array(channel2).astype(np.float32) | |
stereo_audio = np.column_stack((channel1, channel2)) | |
# Define your WordPress site URL and authentication credentials | |
wordpress_url = "https://songlabai.com/xmlrpc.php" | |
woocommerce_url = "https://songlabai.com" | |
consumer_key = "ck_93d516ba12289a6fd0eced56bbc0b05ecbf98735" | |
consumer_secret = "cs_9d5eb716d631db408a4c47796b5d18b0313d8559" | |
username = "admin_h2ibbgql" | |
password = "um^VdaNK0H8Vw2*KNJlYABkh" | |
# Authenticate with WordPress XML-RPC API | |
wav_bytes = BytesIO() | |
sf.write(wav_bytes, stereo_audio, samplerate=sample_rate, format="WAV") | |
title = f"generated_audio_{datetime.now().timestamp()}.wav" | |
file_data = { | |
"name": title, | |
"type": "audio/x-wav", # Change the MIME type according to your file type | |
"bits": xmlrpc_client.Binary(wav_bytes.getvalue()), | |
} | |
wp_client = Client(wordpress_url, username, password) | |
for _ in range(4): | |
try: | |
# Upload the file to WordPress Media Library | |
media_response = wp_client.call(media.UploadFile(file_data)) | |
# Handle the response | |
if media_response: | |
print( | |
"File successfully uploaded to WordPress with attachment ID:", | |
media_response, | |
) | |
# Create product data for WooCommerce | |
product_data = { | |
"status": "pending", | |
"name": title, | |
"type": "simple", | |
"regular_price": "1.00", # Set the price as needed | |
"sku": str(uuid.uuid4()), | |
"downloadable": True, | |
"download_limit": -1, | |
"download_expiry": -1, | |
} | |
# Authenticate with WooCommerce API | |
wc_api = API( | |
url=woocommerce_url, | |
consumer_key=consumer_key, | |
consumer_secret=consumer_secret, | |
version="wc/v3", | |
) | |
# Create the product | |
response = wc_api.post("products", product_data) | |
# Handle the response | |
if response.status_code == 201: | |
print( | |
"Product successfully created in WooCommerce:", response.json() | |
) | |
# Update product to add downloadable file URL | |
product_update_data = { | |
"downloads": [ | |
{ | |
"name": media_response["title"], | |
"file": media_response["link"], | |
} | |
] | |
} | |
product_id = response.json().get("id") | |
response = wc_api.put(f"products/{product_id}", product_update_data) | |
if response.status_code == 200: | |
print( | |
"Downloadable file URL added to product:", response.json() | |
) | |
return ( | |
response.json()["permalink"], | |
response.json()["permalink"].split("p=")[-1], | |
) | |
else: | |
print( | |
"Error adding downloadable file URL to product:", | |
response.text, | |
) | |
else: | |
print("Error creating product in WooCommerce:", response.text) | |
else: | |
print("Error uploading file to WordPress.") | |
break | |
except Exception as e: | |
print("Error:", e) | |
# Streamlit app title | |
st.title("Songlab AI") | |
#cookies = cookie_manager.get_all(key='init_get_all') | |
# Ensure session state keys are initialized | |
if "jwt_token" not in st.session_state: | |
st.session_state["jwt_token"] = None | |
if "session_id" not in st.session_state: | |
st.session_state["session_id"] = None | |
if st.session_state["jwt_token"] is None: | |
st.header("Please log in to continue") | |
username = st.text_input("Username", key="username_input") | |
password = st.text_input("Password", type="password", key="password_input") | |
col1, col2 = st.columns([1, 1]) | |
with col1: | |
if st.button("Log in", key="login_button"): | |
login_url = "https://songlabai.com/wp-json/jwt-auth/v1/token" | |
data = { | |
"username": username, | |
"password": password | |
} | |
response = requests.post(login_url, data=data) | |
if response.status_code == 200 and 'token' in response.json(): | |
result = response.json() | |
st.session_state["jwt_token"] = result["token"] | |
st.session_state["session_id"] = str(uuid.uuid4()) # Generate new session ID | |
st.success("Logged in successfully!") | |
st.rerun() | |
else: | |
st.error("Invalid username or password") | |
st.stop() | |
with col2: | |
if st.button("Don't have an account?", key="register_button"): | |
js = "window.location.href = 'https://songlabai.com/register/';" | |
html = '<script>{}</script>'.format(js) | |
st.components.v1.html(html) | |
st.stop() | |
else: | |
# User is logged in, continue with the app | |
if st.button("Log out", key="logout_button"): | |
# Clear all user-specific session state | |
st.session_state.clear() | |
st.success("Logged out successfully!") | |
st.rerun() | |
# Function to get the auth headers using the current jwt_token | |
def get_auth_headers(): | |
jwt_token = st.session_state.get("jwt_token") | |
if jwt_token: | |
return { | |
"Authorization": f"Bearer {jwt_token}", | |
"Content-Type": "application/json", | |
"Cache-Control": "no-store", | |
} | |
else: | |
return { | |
"Content-Type": "application/json", | |
"Cache-Control": "no-store", | |
} | |
if "session_id" not in st.session_state: | |
st.session_state["session_id"] = str(uuid.uuid4()) | |
st.write("Session ID:", st.session_state["session_id"]) | |
st.write("Current JWT Token:", st.session_state.get("jwt_token")) | |
st.write("Session State:", st.session_state) | |
# Initialize session state variables | |
if "vocal_audio" not in st_state: | |
st_state.vocal_audio = None | |
if "vocal_sample_rate" not in st_state: | |
st_state.vocal_sample_rate = None | |
if "audio" not in st_state: | |
st_state.audio = None | |
if "audio_pydub" not in st_state: | |
st_state.audio_pydub = None | |
if "audio_sample_rate" not in st_state: | |
st_state.audio_sample_rate = None | |
if "augmented_audio" not in st_state: | |
st_state.augmented_audio = None | |
if "augmented_audio_pydub" not in st_state: | |
st_state.augmented_audio_pydub = None | |
if "augmented_audio_sample_rate" not in st_state: | |
st_state.augmented_audio_sample_rate = None | |
genres = [ | |
"Pop", | |
"Rock", | |
"Hip Hop", | |
"Jazz", | |
"Blues", | |
"Country", | |
"Classical", | |
"Electronic", | |
"Reggae", | |
"Folk", | |
"R&B", | |
"Metal", | |
"Punk", | |
"Indie", | |
"Dance", | |
"World", | |
"Gospel", | |
"Soul", | |
"Funk", | |
"Ambient", | |
"Techno", | |
"Disco", | |
"House", | |
"Trance", | |
"Dubstep", | |
] | |
genre = st.selectbox("Select Genre:", genres) | |
energy_levels = ["Low", "Medium", "High"] | |
energy_level = st.radio("Energy Level:", energy_levels, horizontal=True) | |
description = st.text_input("Description:", "") | |
tempo = st.slider("Tempo (in bpm):", min_value=40, max_value=100, value=60, step=5) | |
# Duration input | |
duration = st.slider( | |
"Duration (in seconds):", min_value=15, max_value=300, value=30, step=1 | |
) | |
prompt2 = "Classical ,Low, 40, slow calm" | |
def convert_audio_segment_to_float_array(audio_pydub): | |
""" | |
Convert a pydub AudioSegment to a NumPy array of type float32. | |
Args: | |
audio_pydub (AudioSegment): The AudioSegment object to be converted. | |
Returns: | |
np.ndarray: A NumPy array containing the audio data as float32. | |
""" | |
# Get the raw audio data as a sequence of samples | |
samples = audio_pydub.get_array_of_samples() | |
# Convert the samples to a NumPy array and normalize to float32 | |
audio_array = np.array(samples).astype(np.float32) | |
# Normalize the audio array to range between -1.0 and 1.0 | |
max_val = 2**15 # Assuming 16-bit audio, modify this if using different bit depths | |
audio_array /= max_val | |
return audio_array | |
def time_post_request(api_url, headers=None, payload=None): | |
""" | |
Times the execution of a POST request. | |
Parameters: | |
- api_url (str): The URL to which the POST request is sent. | |
- headers (dict): The headers to include in the POST request. | |
- payload (dict): The payload to include in the POST request. | |
Returns: | |
- response (requests.Response): The response object returned by the POST request. | |
- execution_time (float): The time it took to execute the POST request. | |
""" | |
start_time = time.time() | |
response = requests.post(api_url, headers=headers, json=payload) | |
end_time = time.time() | |
execution_time = end_time - start_time | |
print(f"Execution time: {execution_time} seconds") | |
return response | |
def generate_audio(genre, energy_level, tempo, description, duration): | |
# Start the countdown | |
count_down = 60 | |
count_down_placeholder = st.empty() | |
for seconds in range(count_down): | |
count_down_placeholder.info( | |
f"Preparing the server... Please wait ⏳ {count_down - seconds} seconds." | |
) | |
time.sleep(1) | |
count_down_placeholder.empty() | |
# Check if the user is logged in | |
if st.session_state.get("jwt_token") is None: | |
st.error("You must be logged in to generate audio.") | |
return | |
jwt_token = st.session_state["jwt_token"] | |
st.write("Current JWT Token:", st.session_state.get("jwt_token")) | |
# Headers for authenticated requests | |
auth_headers = get_auth_headers() | |
# Get user ID | |
user_id_url = "https://songlabai.com/wp-json/custom-api/v1/current-user-id" | |
user_response = requests.get(user_id_url, headers=auth_headers) | |
if user_response.status_code == 200: | |
user_data = user_response.json() | |
user_id = user_data["user_id"] | |
st.write("User Data:", user_data) | |
st.write("user_id:", user_id) | |
# Get subscription status | |
subscription_url = "https://songlabai.com/wp-json/custom-api/subscription" | |
subscription_response = requests.get(subscription_url, headers=auth_headers) | |
if subscription_response.status_code == 200: | |
subscription_data = subscription_response.json() | |
subscription_plan_id = subscription_data["subscription_plan_id"] | |
st.write("subscription_plan_id:", subscription_plan_id) | |
has_exceeded_generation_limit = subscription_data["has_exceeded_generation_limit"] | |
st.write("has_exceeded_generation_limit:", has_exceeded_generation_limit) | |
# Check subscription plan and limits | |
if subscription_plan_id in [None, ""]: # Free user | |
if has_exceeded_generation_limit: | |
st.error("You have reached your generation limit.") | |
return | |
else: | |
# Update user's generation count | |
update_generation_url = "https://songlabai.com/wp-json/custom-api/update-generation-count" | |
update_response = requests.post(update_generation_url, headers=auth_headers) | |
if update_response.status_code != 200: | |
st.error("Failed to update your generation count.") | |
return | |
# Proceed to generate audio | |
elif subscription_plan_id in ["304", "305", "306"]: | |
if has_exceeded_generation_limit: | |
st.error("You have reached your daily generation limit.") | |
return | |
else: | |
# Update user's generation count | |
update_generation_url = "https://songlabai.com/wp-json/custom-api/update-generation-count" | |
update_response = requests.post(update_generation_url, headers=auth_headers) | |
if update_response.status_code != 200: | |
st.error("Failed to update your generation count.") | |
return | |
# Proceed to generate audio | |
else: | |
st.error("Your subscription does not allow you to generate music.") | |
return | |
else: | |
st.error("Failed to retrieve your subscription status.") | |
return | |
else: | |
st.error("Failed to retrieve your user information.") | |
return | |
# Proceed to generate audio after checks | |
prompt = f"Genre: {genre}, Energy Level: {energy_level}, Tempo: {tempo}, Description: {description}," | |
payload = {"inputs": {"prompt": prompt, "duration": duration}} | |
# Use the original headers for the API request | |
with st.spinner("Generating audio ..."): | |
response = time_post_request(API_URL, headers=auth_headers, payload=payload) | |
placeholder1 = st.empty() | |
if response.status_code != 200: | |
st.error("Failed to generate audio.") | |
return | |
else: | |
placeholder1.success(f"✔️ Audio Generated, Loading...") | |
load_and_play_generated_audio(response) | |
placeholder1.empty() | |
def load_and_play_generated_audio(response): | |
response_eval = json.loads(response.content) | |
channel1 = response_eval[0]["generated_audio"][0] | |
channel2 = response_eval[0]["generated_audio"][1] | |
with NamedTemporaryFile() as f1: | |
scipy.io.wavfile.write( | |
f1.name, rate=32000, data=np.array(channel1).astype(np.float32) | |
) | |
channel1_temp = AudioSegment.from_file(f1.name) | |
with NamedTemporaryFile() as f2: | |
scipy.io.wavfile.write( | |
f2.name, rate=32000, data=np.array(channel2).astype(np.float32) | |
) | |
channel2_temp = AudioSegment.from_file(f2.name) | |
audio_pydub = AudioSegment.from_mono_audiosegments(channel1_temp, channel2_temp) | |
st_state.audio_pydub = audio_pydub | |
st_state.audio_pydub_sample_rate = audio_pydub.frame_rate | |
st_state.audio = np.array(st_state.audio_pydub.get_array_of_samples()).astype(np.float32) | |
sample_rate = response_eval[0]["sample_rate"] | |
perm_link, product_code = save_to_wordpress( | |
channel1, channel2, 32000 | |
) | |
st_state.audio_sample_rate = sample_rate | |
st_state.augmented_audio_pydub = st_state.audio_pydub | |
st.audio(st_state.audio_pydub.export().read()) | |
col_btn, col_text = st.columns([2,4]) | |
with col_btn: | |
st.link_button("Publish your Song", "https://songlabai.com/contact_us/") | |
st.link_button("Download Song", "https://songlabai.com/download-music/") | |
with col_text: | |
st.write( | |
f"To Publish, please contact the admin by sending the following link: {perm_link}" | |
) | |
st.write(f"To download use the following product code: {product_code}") | |
if st.button("Generate Audio", key="generate_audio_button"): | |
if genre and energy_level and description and tempo: | |
generate_audio(genre, energy_level, tempo, description, duration) | |
else: | |
st.info("Description field is required.") | |
# Post-processing options | |
st.header("Post-processing Options") | |
vocal_file = st.file_uploader( | |
"Upload Vocal File", type=["mp3", "wav", "ogg", "flac", "aac"] | |
) | |
if vocal_file: | |
st_state.vocal_audio = vocal_file.read() | |
# st.audio(st_state.vocal_audio, format="audio/wav") | |
# Mixing | |
mix_vocals = st.checkbox("Mix Vocals") | |
if mix_vocals and st_state.vocal_audio is not None: | |
with NamedTemporaryFile() as f: | |
f.write(st_state.vocal_audio) | |
st_state.vocal_audio = AudioSegment.from_file(f.name) | |
st_state.augmented_audio_pydub = st_state.augmented_audio_pydub.overlay( | |
st_state.vocal_audio, position=100 | |
) | |
# st.audio(st_state.augmented_audio_pydub.export().read()) | |
st_state.augmented_audio = convert_audio_segment_to_float_array( | |
st_state.augmented_audio_pydub | |
) | |
st_state.augmented_audio_sample_rate = st_state.augmented_audio_pydub.frame_rate | |
elif not mix_vocals and st_state.vocal_audio is not None: | |
st_state.augmented_audio_pydub = st_state.audio_pydub | |
st_state.augmented_audio_sample_rate = st_state.audio_pydub.frame_rate | |
# Mastering | |
st.header("Mastering") | |
st.markdown("") | |
# Volume Balance, Compression Ratio, and Reverb Amount | |
vol_col, pitch_shift,buttons_col = st.columns([2, 2, 2.5,]) | |
with buttons_col: | |
with st.container(height=371, border=True): | |
st.markdown("") | |
apply_stereo = st.button("Apply Stereo Effect") | |
st.markdown("") | |
reverse = st.button("Apply Audio Reverse ") | |
st.markdown("") | |
reset_post_processing = st.button("Undo All Post-processings") | |
st.markdown("") | |
with vol_col: | |
with st.container(border=True): | |
volume_balance = svs.vertical_slider( | |
"Volume Balance", | |
min_value=-10.0, | |
max_value=10.0, | |
default_value=0.0, | |
step=0.1, | |
slider_color="green", | |
track_color="lightgray", | |
thumb_color="red", | |
thumb_shape="pill", | |
) | |
vol_button = st.button("Apply Vol-Balance") | |
# Pitch shifting | |
with pitch_shift: | |
with st.container(border=True): | |
pitch_semitones = svs.vertical_slider( | |
label="Pitch (semitones)", | |
min_value=-12, | |
max_value=12, | |
default_value=0, | |
step=1, | |
slider_color="red", | |
track_color="lightgray", | |
thumb_color="red", | |
thumb_shape="pill", | |
) | |
pitch_shift_button = st.button( | |
"Apply Pitch Shift", | |
) | |
if st_state.augmented_audio_pydub is not None: | |
if vol_button: | |
st_state.augmented_audio_pydub = st_state.augmented_audio_pydub + volume_balance | |
if apply_stereo: | |
st_state.augmented_audio_pydub = st_state.augmented_audio_pydub.pan( | |
-0.5 | |
).overlay(st_state.augmented_audio_pydub.pan(0.5)) | |
if reverse: | |
st_state.augmented_audio_pydub = st_state.augmented_audio_pydub.reverse() | |
if pitch_shift_button: | |
st_state.augmented_audio_pydub = st_state.augmented_audio_pydub._spawn( | |
st_state.augmented_audio_pydub.raw_data, | |
overrides={ | |
"frame_rate": int( | |
st_state.augmented_audio_pydub.frame_rate | |
* (2 ** (pitch_semitones / 12.0)) | |
) | |
}, | |
) | |
# Display the final audio | |
if st_state.augmented_audio_pydub is not None: | |
st.audio(st_state.augmented_audio_pydub.export().read()) | |
# sample_rate = st_state.augmented_audio_sample_rate | |
# st.audio(st_state.augmented_audio, format="audio/wav", sample_rate=sample_rate*2, start_time=0) | |
st.link_button( | |
label="⬇️ Download/Save", | |
url="https://songlabai.com/subcribe/", | |
type="primary", | |
use_container_width=True, | |
) | |
# m = st.markdown(""" | |
# <style> | |
# div.stButton > button:first-child { | |
# # color: rgb(204, 49, 49); | |
# # background-color: ; | |
# # border-radius: 5%; | |
# backgroud-color: #00ff00; | |
# } | |
# # </style>""", unsafe_allow_html=True) | |
if reset_post_processing and st_state.audio_pydub is not None: | |
st_state.augmented_audio_pydub = st_state.audio_pydub |