Spaces:
Running
Running
import requests | |
import json | |
import os | |
from PIL import Image | |
import io, time | |
class OtherModel(): | |
def __init__(self, model_name, model_type): | |
self.model_name = model_name | |
self.model_type = model_type | |
self.image_url = "https://www.xdai.online/mj/submit/imagine" | |
self.key = os.environ.get('MIDJOURNEY_KEY') | |
self.get_image_url = "https://www.xdai.online/mj/image/" | |
self.repeat_num = 5 | |
def __call__(self, *args, **kwargs): | |
if self.model_type == "text2image": | |
assert "prompt" in kwargs, "prompt is required for text2image model" | |
if self.model_name == "Midjourney-v6.0": | |
data = { | |
"base64Array": [], | |
"notifyHook": "", | |
"prompt": "{} --v 6.0".format(kwargs["prompt"]), | |
"state": "", | |
"botType": "MID_JOURNEY", | |
} | |
elif self.model_name == "Midjourney-v5.0": | |
data = { | |
"base64Array": [], | |
"notifyHook": "", | |
"prompt": "{} --v 5.0".format(kwargs["prompt"]), | |
"state": "", | |
"botType": "MID_JOURNEY", | |
} | |
else: | |
raise NotImplementedError | |
headers = { | |
"Authorization": "Bearer {}".format(self.key), | |
"Content-Type": "application/json" | |
} | |
while 1: | |
response = requests.post(self.image_url, data=json.dumps(data), headers=headers) | |
if response.status_code == 200: | |
print("Submit success!") | |
response_json = json.loads(response.content.decode('utf-8')) | |
img_id = response_json["result"] | |
result_url = self.get_image_url + img_id | |
print(result_url) | |
self.repeat_num = 800 | |
while 1: | |
time.sleep(1) | |
img_response = requests.get(result_url) | |
if img_response.status_code == 200: | |
result = Image.open(io.BytesIO(img_response.content)) | |
width, height = result.size | |
new_width = width // 2 | |
new_height = height // 2 | |
result = result.crop((0, 0, new_width, new_height)) | |
self.repeat_num = 5 | |
return result | |
else: | |
self.repeat_num = self.repeat_num - 1 | |
if self.repeat_num == 0: | |
raise ValueError("Image request failed.") | |
continue | |
else: | |
self.repeat_num = self.repeat_num - 1 | |
if self.repeat_num == 0: | |
raise ValueError("API request failed.") | |
continue | |
if self.model_type == "text2video": | |
assert "prompt" in kwargs, "prompt is required for text2video model" | |
else: | |
raise ValueError("model_type must be text2image") | |
def load_other_model(model_name, model_type): | |
return OtherModel(model_name, model_type) | |
if __name__ == "__main__": | |
import http.client | |
import json | |
pipe = load_other_model("Midjourney-v5.0", "text2image") | |
result = pipe(prompt="An Impressionist illustration depicts a river winding through a meadow") | |
print(result) | |
exit() | |