from asyncio import create_subprocess_shell, create_task, gather, run, sleep from json import dumps from logging import ERROR, INFO, basicConfig, getLogger from pathlib import Path from shutil import rmtree from subprocess import CalledProcessError, PIPE from typing import Any, List from uuid import uuid4 from PIL import Image from aiorentry.client import Client as RentryClient from fastapi import FastAPI, HTTPException from fastapi.responses import PlainTextResponse from httpx import AsyncClient, HTTPStatusError, RequestError from lxml import html from markdown import markdown from pydantic import BaseModel, HttpUrl from uvicorn import run as uvicorn_run proxy_endpoint = 'https://telegraphprxver.vercel.app' need_logging = False basicConfig(level=INFO if need_logging else ERROR) logger = getLogger(__name__) oxipng_bin = Path(__file__).parent / 'oxipng' if not oxipng_bin.stat().st_mode & 0o111: oxipng_bin.chmod(0o755) tokens = [ # мне в общем-то все равно на эти токены '425e0d892f8359d1b063c0ca3ff59d8a', 'fe3b20d8b57083670e0ee0e40bf7f05d', 'cca6cb31ebd4a45408ab6beb8c441179', '9b5bce29f09030e4395a11decffb85e1', 'c45b37aba4506082be529faf6e74de75', ] class UserInfo: def __init__(self, token: str, short_name: str, author_name: str, author_url: str, auth_link: str): self.token = token self.short_name = short_name self.author_name = author_name self.author_url = author_url self.auth_link = auth_link def get_token(self) -> str: return self.token def get_short_name(self) -> str: return self.short_name def get_author_name(self) -> str: return self.author_name def get_author_url(self) -> str: return self.author_url def get_auth_link(self) -> str: return self.auth_link def __repr__(self): return f'UserAuthInfo(token={self.token}, short_name={self.short_name}, author_name={self.author_name}, author_url={self.author_url}, auth_link={self.auth_link})' async def create_account(short_name: str, author_name: str, author_url: str) -> UserInfo: params = { 'short_name': short_name, 'author_name': author_name, 'author_url': author_url } async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: response = await client.get(f'{proxy_endpoint}/createAccount', params=params) json_response: dict = response.json() if json_response.get('ok'): result = json_response.get('result', {}) return UserInfo(result['access_token'], result['short_name'], result['author_name'], result['author_url'], result['auth_url']) tgph_acc = run(create_account('prolapse', 'prolapse', '')) def md_to_dom(markdown_text: str) -> list[dict[str, str | list | dict | None]]: def handle_heading(element) -> dict[str, str | list | dict | None]: if element.tag == 'h1': return {'tag': 'h3', 'children': parse_children(element)} elif element.tag == 'h2': return {'tag': 'h4', 'children': parse_children(element)} else: return {'tag': 'p', 'children': [{'tag': 'strong', 'children': parse_children(element)}]} def handle_list(element) -> dict[str, str | list | dict | None]: return {'tag': element.tag, 'children': [{'tag': 'li', 'children': parse_children(li)} for li in element.xpath('./li')]} def handle_link(element) -> dict[str, str | list | dict | None]: return {'tag': 'a', 'attrs': {'href': element.get('href')}, 'children': parse_children(element)} def handle_media(element) -> dict[str, str | list | dict | None]: return {'tag': element.tag, 'attrs': {'src': element.get('src')}} def parse_children(element) -> list[str | dict[str, str | list | dict | None]]: children = [] for child in element.iterchildren(): if child.tag: children.append(parse_element(child)) elif isinstance(child, str): children.append(child.strip()) if element.text and element.text.strip(): children.insert(0, element.text.strip()) if element.tail and element.tail.strip(): children.append(element.tail.strip()) return children def parse_element(element) -> dict[str, str | list | dict | None]: handlers = {'h1': handle_heading, 'h2': handle_heading, 'h3': handle_heading, 'h4': handle_heading, 'h5': handle_heading, 'h6': handle_heading, 'ul': handle_list, 'ol': handle_list, 'a': handle_link, 'img': handle_media, 'iframe': handle_media} handler = handlers.get(element.tag, lambda e: {'tag': e.tag, 'children': parse_children(e)}) return handler(element) html_content = markdown(markdown_text, extensions=['extra', 'sane_lists']) tree = html.fromstring(html_content) try: return [parse_element(element) for element in tree.body] except: return [parse_element(element) for element in tree.xpath('/*/*')] async def tgph_create_page(token: str, title: str, markdown_text: str) -> str: content = dumps(md_to_dom(markdown_text)) params = { 'access_token': token, 'title': title, 'content': content, 'return_content': False } async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: response = await client.get(f'{proxy_endpoint}/createPage', params=params) json_response: dict = response.json() if json_response.get('ok'): result = json_response.get('result', {}) else: result = {} print(f'ошибка создания страницы: {json_response}') print(markdown_text) print(content) return result.get('path') async def tgph_edit_page(token: str, page: str, title: str, markdown_text: str) -> str: content = dumps(md_to_dom(markdown_text)) data = { 'access_token': token, 'path': page, 'title': title, 'content': content, 'return_content': False } async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: response = await client.post(f'{proxy_endpoint}/editPage', data=data) json_response = response.json() if json_response.get('ok'): result = json_response.get('result', {}) else: result = {} return result.get('path', '') async def download_png(url: str, folder: str, client: AsyncClient, retries: int = 5) -> Path: # print(f'загрузка изображения: {url}') for attempt in range(retries): try: response = await client.get(url) response.raise_for_status() file_path = Path(__file__).parent / folder / f'{uuid4()}.png' file_path.parent.mkdir(parents=True, exist_ok=True) file_path.write_bytes(response.content) return file_path except (HTTPStatusError, RequestError) as e: if attempt < retries - 1: await sleep(2 ** attempt) else: raise e async def download_pngs(urls: str | list[str]) -> list[Any]: urls = [urls] if isinstance(urls, str) else urls # print(f'скачивается список список из {len(urls)}: {urls}') # бот coze имеет баг, и из воркфлоу прибавляет предыдущий ответ к ссылкам, если включен контекст чата: valid_urls = [url for url in urls if url and '\n' not in url and '\\n' not in url and url.strip() != ''] if len(valid_urls) != len(urls): print(f'некорректные ссылки удалены из списка: {set(urls) - set(valid_urls)}') async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: tasks = [download_png(url, str(uuid4()), client) for url in valid_urls] return list(await gather(*tasks)) async def optimize_png(image_path: Path, retries: int = 3) -> None: command = f'{oxipng_bin.resolve()} --opt 2 --strip safe --out {image_path} {image_path}' # print(f'оптимизация картинки {image_path}') for attempt in range(retries): try: process = await create_subprocess_shell(command, stdout=PIPE, stderr=PIPE) stdout, stderr = await process.communicate() if process.returncode == 0: return else: raise CalledProcessError(process.returncode, command, output=stdout, stderr=stderr) except CalledProcessError as e: print(f'ошибка при оптимизации {image_path}') if attempt < retries - 1: await sleep(2 ** attempt) else: raise e async def convert_to_jpeg(image_path: Path) -> Path: # print(f'конвертируется {image_path}') try: image = Image.open(image_path) output_path = image_path.with_suffix('.jpg') image.save(output_path, 'JPEG', quality=98, optimize=True) image_path.unlink(missing_ok=True) return output_path except: print(f'ошибка при конвертации {image_path}') return image_path async def convert_to_jpegs(image_paths: list[str | Path] | str | Path) -> tuple[Path]: image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)] # print(f'оптимизируется список список из {len(image_paths)}: {image_paths}') tasks = [convert_to_jpeg(image_path) for image_path in image_paths] return await gather(*tasks) async def optimize_pngs(image_paths: list[str | Path] | str | Path) -> None: image_paths = [Path(image_file) for image_file in ([image_paths] if not isinstance(image_paths, list) else image_paths)] # print(f'оптимизируется список список из {len(image_paths)}: {image_paths}') tasks = [optimize_png(image_path) for image_path in image_paths] await gather(*tasks) async def upload_image_to_imgbb(file_path: Path, file_type: str = 'png') -> str | None: for token in tokens: url = f'https://api.imgbb.com/1/upload?key={token}' try: with file_path.open('rb') as file: files = {'image': (file_path.name, file, f'image/{file_type}')} data = {} async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: response = await client.post(url, files=files, data=data, timeout=30) response.raise_for_status() json = response.json() if json.get('success'): return json['data']['url'] except Exception as e: print(f"ошибка при загрузке с {token}: {e}") continue return None async def upload_image_to_freeimagehost(image_path: Path, file_type: str = 'png') -> str | None: try: async with AsyncClient(verify=False, follow_redirects=True, timeout=30.0) as client: with image_path.open("rb") as image_file: files = {'source': (image_path.name, image_file, f'image/{file_type}')} payload = {'key': '6d207e02198a847aa98d0a2a901485a5', 'action': 'upload', 'format': 'json'} response = await client.post('https://freeimage.host/api/1/upload', data=payload, files=files) response.raise_for_status() response_data = response.json() return response_data['image']['url'] except Exception as e: print(f'ошибка при загрузке {image_path}: {e}') return None async def upload_image(file_path: Path | str, file_type: str = 'png') -> str | None: file_path = Path(file_path) return await upload_image_to_imgbb(file_path, file_type) or await upload_image_to_freeimagehost(file_path, file_type) async def process_image(old_url: str, image_path: Path, convert: bool) -> tuple[str, Path]: new_url = await upload_image(image_path, 'png' if not convert else 'jpeg') if new_url: # print(f'загружено изображение {image_path} в {new_url}') pass else: new_url = old_url print(f'не удалось загрузить изображение {image_path}, оставим старую ссылку: {old_url}') try: image_path.unlink() except Exception as e: print(f'не удалось удалить файл {image_path}: {e}') return new_url, image_path async def optimize_and_upload(images_urls: List[str] | str, convert: bool = False) -> List[str]: images_urls = [images_urls] if isinstance(images_urls, str) else images_urls ph_link = None if convert: try: ph_link = await tgph_create_page(tgph_acc.get_token(), 'DAll-E v3', '*изображения скоро появятся, обнови страницу...*') except Exception as e: print(f'не получилось создать страницу на телеграфе: {e}') async with RentryClient('https://rentry.org') as client: page = await client.new_page('изображения скоро появятся, обнови страницу...') page_id, code = page.url, page.edit_code continue_task = create_task(continue_optimizing_and_uploading(images_urls, page_id, code, ph_link)) print(f'https://telegra.ph/{ph_link}', f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}') return [f'https://telegra.ph/{ph_link}', f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}'] if ph_link else [f'https://rentry.co/{page_id}', f'https://rentry.org/{page_id}'] else: return await continue_optimizing_and_uploading(images_urls) async def continue_optimizing_and_uploading(images_urls: list[str], page_id: str = None, code: str = None, ph_link: str = None) -> list[str]: images_paths = await download_pngs(images_urls) if not page_id: # convert=False await optimize_pngs(images_paths) new_images_urls = [] images_paths = images_paths if not page_id else await convert_to_jpegs(images_paths) tasks = [] for old_url, image_path in zip(images_urls, images_paths): tasks.append(process_image(old_url, image_path, page_id is not None)) results = await gather(*tasks) new_images_urls = [result[0] for result in results] print(f'новые ссылки: ({len(new_images_urls)}): {new_images_urls}') try: rmtree(images_paths[0].parent) except Exception as e: print(f'не удалось удалить файл {images_paths[0].parent}: {e}') content = '\n\n'.join([f'![]({url})' for url in new_images_urls]) print(content) if ph_link: try: await tgph_edit_page(tgph_acc.get_token(), ph_link, 'DAll-E v3', content) except Exception as e: print(f'не удалось отредактировать на телеграфе: {e}') else: print(f'страница телеграф не предоставлена: {ph_link}') if page_id and code: try: async with RentryClient('https://rentry.org') as client: await client.edit_page(text=content, url=page_id, edit_code=code) except Exception as e: print(f'не удалось создать страницу в rentry: {e}') return new_images_urls if len(new_images_urls) > 0 and new_images_urls[0] else images_urls app = FastAPI() class ImageURLs(BaseModel): urls: List[HttpUrl] @app.get('/') async def read_root(): return PlainTextResponse('ну пролапс, ну и что', status_code=200) @app.post('/pngopt_by_urls/') async def optimize_images_endpoint(image_urls: ImageURLs): try: optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls]) return {"optimized_urls": optimized_urls} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post('/jpegs_by_urls/') async def optimize_images_endpoint(image_urls: ImageURLs): try: optimized_urls = await optimize_and_upload([str(url) for url in image_urls.urls], convert=True) return {"optimized_urls": optimized_urls} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) if __name__ == "__main__": uvicorn_run(app, host='0.0.0.0', port=7860, timeout_keep_alive=90, log_level='info', use_colors=False)