🧠 Умная CLI-загрузка: прогресс-бар с ETA, скоростью и цветами
Выглядит профессионально, пишется за 15 строк, реально полезен в любом скрипте ⚙️📦
📌 Что делает:
— Красивый прогресс-бар в терминале
— Показывает % выполнения, ETA, скорость
— Цветной вывод через
📦 Установка:
🔧 Используй в своих скриптах для загрузки, скачивания, обработки файлов — это реально удобнее, чем print() и sleep() 🙌
Подпишись 👉🏻 @KodduuPython 🤖
Выглядит профессионально, пишется за 15 строк, реально полезен в любом скрипте ⚙️📦
from tqdm import tqdm
import time
from colorama import Fore, Style, init
init(autoreset=True)
for i in tqdm(range(100), desc=Fore.CYAN + "Загрузка данных" + Style.RESET_ALL,
bar_format="{l_bar}%s{bar}%s{r_bar}" % (Fore.GREEN, Style.RESET_ALL)):
time.sleep(0.05)
📌 Что делает:
— Красивый прогресс-бар в терминале
— Показывает % выполнения, ETA, скорость
— Цветной вывод через
colorama, бар от tqdm📦 Установка:
pip install tqdm colorama
🔧 Используй в своих скриптах для загрузки, скачивания, обработки файлов — это реально удобнее, чем print() и sleep() 🙌
Подпишись 👉🏻 @KodduuPython 🤖
⚙️ Умный таймер-функция: логирует время, ошибки и возвращает результат
Полезный
📌 Что делает:
— Оборачивает любую функцию
— Логирует время выполнения
— Отлавливает и логирует ошибки (с трассировкой!)
— Используется в проде для мониторинга, отладки и контроля SLA
📦 Всё стандартное:
Работает и в скриптах, и в backend API, и в пайплайнах.
Подпишись 👉🏻 @KodduuPython 🤖
Полезный
@decorator для любых боевых функций — отслеживает время выполнения и ловит баги без лишнего кода 🔥import time
import functools
import logging
logging.basicConfig(level=logging.INFO)
def timed(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
t0 = time.time()
try:
result = func(*args, **kwargs)
logging.info(f"{func.__name__} finished in {time.time() - t0:.2f}s")
return result
except Exception as e:
logging.exception(f"{func.__name__} failed after {time.time() - t0:.2f}s")
raise
return wrapper
# 🔧 Пример использования
@timed
def process_data():
time.sleep(1)
return "✅ Done"
process_data()
📌 Что делает:
— Оборачивает любую функцию
— Логирует время выполнения
— Отлавливает и логирует ошибки (с трассировкой!)
— Используется в проде для мониторинга, отладки и контроля SLA
📦 Всё стандартное:
time, functools, logging — ничего ставить не надоРаботает и в скриптах, и в backend API, и в пайплайнах.
Подпишись 👉🏻 @KodduuPython 🤖
🛡 Безопасный вызов стороннего API с автоповторами и таймаутом
Анти-фейл сессия для любого запроса в боевом сервисе — не упадёт и не зависнет 🚀
📌 Что делает:
— Повторяет запрос до 3 раз при временных ошибках (500+)
— Использует экспоненциальную задержку (
— Не виснет бесконечно —
— Работает со всеми методами:
📦 Библиотека:
Установка:
🔥 Практично для микросервисов, загрузки данных, интеграций с внешними API.
Можешь встроить прямо в боевую систему — сэкономит часы отладки и нервов.
Подпишись 👉🏻 @KodduuPython 🤖
Анти-фейл сессия для любого запроса в боевом сервисе — не упадёт и не зависнет 🚀
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 🔧 Пример запроса
try:
response = session.get("https://api.example.com/data", timeout=3)
print(response.json())
except requests.RequestException as e:
print("❌ Ошибка запроса:", e)
📌 Что делает:
— Повторяет запрос до 3 раз при временных ошибках (500+)
— Использует экспоненциальную задержку (
backoff_factor)— Не виснет бесконечно —
timeout=3 сек— Работает со всеми методами:
.get(), .post() и т.д.📦 Библиотека:
requestsУстановка:
pip install requests
🔥 Практично для микросервисов, загрузки данных, интеграций с внешними API.
Можешь встроить прямо в боевую систему — сэкономит часы отладки и нервов.
Подпишись 👉🏻 @KodduuPython 🤖
👍1
🔐 Безопасное скачивание файла с логами и таймаутом
📌 Скачивает файл по URL с таймаутом и логирует успех или ошибку. Создаёт нужные папки автоматически. Подходит для бэкенда, cron-задач, скриптов загрузки данных.
Подпишись 👉🏻 @KodduuPython 🤖
import requests
import logging
from pathlib import Path
from requests.exceptions import RequestException, Timeout
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
def download_file(url: str, dest: Path, timeout: float = 5.0):
try:
response = requests.get(url, stream=True, timeout=timeout)
response.raise_for_status()
dest.parent.mkdir(parents=True, exist_ok=True)
with open(dest, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logging.info(f"✅ Файл сохранён: {dest.resolve()}")
except (RequestException, Timeout) as e:
logging.error(f"❌ Ошибка при скачивании: {e}")
# Пример использования
download_file(
url="https://example.com/data/report.csv",
dest=Path("./downloads/report.csv")
)
📌 Скачивает файл по URL с таймаутом и логирует успех или ошибку. Создаёт нужные папки автоматически. Подходит для бэкенда, cron-задач, скриптов загрузки данных.
Подпишись 👉🏻 @KodduuPython 🤖
⏱️ Декоратор с логгингом времени выполнения
📌 Декоратор логирует, сколько времени заняло выполнение функции. Полезно для профилирования, анализа производительности, в том числе в проде.
🛠 Ничего устанавливать не нужно — только стандартная библиотека.
Подпишись 👉🏻 @KodduuPython 🤖
import time
import logging
from functools import wraps
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
def log_runtime(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.perf_counter() - start
logging.info(f"⏱️ {func.__name__} завершилась за {duration:.3f} сек")
return wrapper
# Пример использования
@log_runtime
def slow_operation():
time.sleep(1.5)
return "Done"
slow_operation()
📌 Декоратор логирует, сколько времени заняло выполнение функции. Полезно для профилирования, анализа производительности, в том числе в проде.
🛠 Ничего устанавливать не нужно — только стандартная библиотека.
Подпишись 👉🏻 @KodduuPython 🤖
🆒2👍1
📁 Блокировка файла — безопасный доступ к ресурсу без гонок
📌 Защита от параллельного запуска скрипта или доступа к ресурсу. Подходит для cron, пайплайнов, обработки файлов и очередей. Работает на Unix (Linux/macOS).
🛠 Всё из коробки — только
Подпишись 👉🏻 @KodduuPython 🤖
import fcntl
import time
from pathlib import Path
def with_file_lock(lock_path: Path, timeout: float = 10.0):
def decorator(func):
def wrapper(*args, **kwargs):
with open(lock_path, 'w') as lock_file:
start = time.time()
while True:
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() - start > timeout:
raise TimeoutError("🔒 Не удалось получить файл-лок вовремя")
time.sleep(0.1)
try:
return func(*args, **kwargs)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
return wrapper
return decorator
# Пример использования
@with_file_lock(Path("/tmp/my_script.lock"))
def critical_section():
print("🔧 Работаю в критической секции...")
time.sleep(3)
critical_section()
📌 Защита от параллельного запуска скрипта или доступа к ресурсу. Подходит для cron, пайплайнов, обработки файлов и очередей. Работает на Unix (Linux/macOS).
🛠 Всё из коробки — только
fcntl и pathlib.Подпишись 👉🏻 @KodduuPython 🤖
📬 Мини-клиент для ретраев запросов с бэкоффом и логами
📌 HTTP-клиент с автоматическими повторами при ошибках, экспоненциальным бэкоффом и логированием. Применим для нестабильных API, ETL, микросервисов.
🛠
Подпишись 👉🏻 @KodduuPython 🤖
import requests
import logging
import time
from requests.exceptions import RequestException
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
def fetch_with_retries(url, retries=3, backoff=2.0, timeout=5.0):
attempt = 0
while attempt < retries:
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
logging.info(f"✅ Успешно на {attempt+1} попытке")
return response.text
except RequestException as e:
logging.warning(f"⚠️ Попытка {attempt+1} не удалась: {e}")
attempt += 1
if attempt < retries:
sleep_time = backoff ** attempt
logging.info(f"⏳ Ожидаю {sleep_time:.1f} сек перед новой попыткой")
time.sleep(sleep_time)
else:
logging.error("❌ Все попытки исчерпаны")
raise
# Пример использования
if __name__ == "__main__":
content = fetch_with_retries("https://httpstat.us/503?sleep=1000", retries=4)
📌 HTTP-клиент с автоматическими повторами при ошибках, экспоненциальным бэкоффом и логированием. Применим для нестабильных API, ETL, микросервисов.
🛠
pip install requests — всё остальное в стандартной библиотеке.Подпишись 👉🏻 @KodduuPython 🤖
Весенняя распродажа на Stepik 🔥🔥🔥
Самое время пройти Python: самый быстрый курс 🧐
Или окунутся в мир Data Science на курсе Python Data Science: самый быстрый курс 🤓
И наконец узнать все сразу в программе курсов Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
Самое время пройти Python: самый быстрый курс 🧐
Или окунутся в мир Data Science на курсе Python Data Science: самый быстрый курс 🤓
И наконец узнать все сразу в программе курсов Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
🧠 Кеш в памяти с TTL — умный memoize-декоратор
📌 Временный кеш в памяти с автоочисткой по TTL. Ускоряет дорогие функции без внешних зависимостей. Полезно в API, CLI, веб-сервисах и чат-ботах.
🛠 Без зависимостей — всё на стандартной библиотеке.
Подпишись 👉🏻 @KodduuPython 🤖
import time
from functools import wraps
_cache = {}
def memoize_ttl(ttl: float = 60.0):
def decorator(func):
@wraps(func)
def wrapper(*args):
key = (func.__name__, args)
now = time.time()
if key in _cache:
result, timestamp = _cache[key]
if now - timestamp < ttl:
return result
result = func(*args)
_cache[key] = (result, now)
return result
return wrapper
return decorator
# Пример использования
@memoize_ttl(ttl=10)
def heavy_compute(x):
print(f"🔄 Вычисляю для {x}...")
time.sleep(2)
return x * x
# Повторные вызовы быстро
heavy_compute(4)
heavy_compute(4)
📌 Временный кеш в памяти с автоочисткой по TTL. Ускоряет дорогие функции без внешних зависимостей. Полезно в API, CLI, веб-сервисах и чат-ботах.
🛠 Без зависимостей — всё на стандартной библиотеке.
Подпишись 👉🏻 @KodduuPython 🤖
👍2
Вот минимальный пример кода на Python, демонстрирующий реализацию стриминга ответа от LLM в Telegram в реальном времени. Мы используем:
*
*
🔧 Установка зависимостей
💬 Пример кода
💡 Советы
* Не редактируйте сообщение слишком часто — Telegram ограничивает это (лучше каждые 10–30 символов).
* Можно реализовать альтернативу через
* Для продвинутой UX: добавьте кнопку отмены генерации и историю запросов.
Подпишись 👉🏻 @KodduuPython 🤖
*
OpenAI API (через stream=True) для получения ответа частями*
python-telegram-bot для отправки сообщений в Telegram🔧 Установка зависимостей
pip install openai python-telegram-bot
💬 Пример кода
import openai
import asyncio
from telegram import Bot
from telegram.ext import Application, CommandHandler, ContextTypes
# Настройки
TELEGRAM_TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
openai.api_key = OPENAI_API_KEY
# ID чата можно узнать после отправки /start боту
USER_CHAT_ID = 123456789
# Основная функция генерации ответа с потоком
async def stream_response(prompt, bot: Bot, chat_id: int):
# Отправим пустое сообщение, которое будем редактировать
msg = await bot.send_message(chat_id=chat_id, text="⌛️ Generating...")
full_text = ""
# Запрос к OpenAI с потоковой генерацией
response = openai.ChatCompletion.create(
model="gpt-4", # или gpt-3.5-turbo
messages=[{"role": "user", "content": prompt}],
stream=True
)
# Обрабатываем поток
async for chunk in wrap_stream(response):
delta = chunk['choices'][0].get('delta', {}).get('content')
if delta:
full_text += delta
# Периодически обновляем сообщение
if len(full_text) % 20 == 0:
await msg.edit_text(full_text)
# Финальное обновление
await msg.edit_text(full_text)
# Обёртка для OpenAI-стрима (сделаем из генератора асинхронный)
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)
# Обработчик команды /ask
async def ask(update, context: ContextTypes.DEFAULT_TYPE):
prompt = " ".join(context.args)
if not prompt:
await update.message.reply_text("❗️ Введите текст после /ask")
return
await stream_response(prompt, context.bot, update.effective_chat.id)
# Запуск бота
def main():
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("ask", ask))
app.run_polling()
if __name__ == "__main__":
main()
💡 Советы
* Не редактируйте сообщение слишком часто — Telegram ограничивает это (лучше каждые 10–30 символов).
* Можно реализовать альтернативу через
send_chat_action для отображения typing....* Для продвинутой UX: добавьте кнопку отмены генерации и историю запросов.
Подпишись 👉🏻 @KodduuPython 🤖
🆒1
This media is not supported in your browser
VIEW IN TELEGRAM
Стриминг по коду выше 👆👆👆
Как это применяется реальных проектах читайте тут @aigentto
Подпишись 👉🏻 @KodduuPython 🤖
Как это применяется реальных проектах читайте тут @aigentto
Подпишись 👉🏻 @KodduuPython 🤖
Flood limit в Telegram возникает, когда вы:
* слишком часто отправляете сообщения одному пользователю или в один чат;
* редактируете одно и то же сообщение слишком часто (например, при стриминге LLM-ответа).
🛡 Как бороться с flood limit
✅1. Добавь `try/except` с `telegram.error.RetryAfter`
Telegram возвращает ошибку
✅ 2. Ограничь частоту `edit_message_text`
Не вызывай её слишком часто — безопасно 1 раз в 0.8–1 сек или по достижении порога символов.
📦 Пример: анти-флуд логика в Telegram-боте
🚀 Оптимальные параметры
| Метод | Безопасная частота |
| ------------------————-- | ------------------—————- |
|
|
|
Подпишись 👉🏻 @KodduuPython 🤖
* слишком часто отправляете сообщения одному пользователю или в один чат;
* редактируете одно и то же сообщение слишком часто (например, при стриминге LLM-ответа).
🛡 Как бороться с flood limit
✅1. Добавь `try/except` с `telegram.error.RetryAfter`
Telegram возвращает ошибку
RetryAfter, в которой указано, сколько секунд нужно подождать.✅ 2. Ограничь частоту `edit_message_text`
Не вызывай её слишком часто — безопасно 1 раз в 0.8–1 сек или по достижении порога символов.
📦 Пример: анти-флуд логика в Telegram-боте
import openai
import asyncio
from telegram import Bot
from telegram.ext import Application, CommandHandler, ContextTypes
from telegram.error import RetryAfter
TELEGRAM_TOKEN = "YOUR_TELEGRAM_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
openai.api_key = OPENAI_API_KEY
async def stream_response(prompt, bot: Bot, chat_id: int):
msg = await bot.send_message(chat_id=chat_id, text="⌛️ Generating...")
full_text = ""
last_edit_time = asyncio.get_event_loop().time()
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in wrap_stream(response):
delta = chunk['choices'][0].get('delta', {}).get('content')
if delta:
full_text += delta
now = asyncio.get_event_loop().time()
# Ограничим частоту до 1 обновления в секунду
if now - last_edit_time >= 1:
await safe_edit(bot, msg, full_text)
last_edit_time = now
await safe_edit(bot, msg, full_text) # Финальное обновление
async def safe_edit(bot: Bot, msg, text: str):
try:
await bot.edit_message_text(chat_id=msg.chat_id, message_id=msg.message_id, text=text)
except RetryAfter as e:
print(f"⚠️ Flood limit hit. Sleeping for {e.retry_after} seconds.")
await asyncio.sleep(e.retry_after)
await safe_edit(bot, msg, text)
except Exception as e:
print(f"⚠️ Unexpected error: {e}")
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)
async def ask(update, context: ContextTypes.DEFAULT_TYPE):
prompt = " ".join(context.args)
if not prompt:
await update.message.reply_text("❗️ Введите текст после /ask")
return
await stream_response(prompt, context.bot, update.effective_chat.id)
def main():
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("ask", ask))
app.run_polling()
if __name__ == "__main__":
main()
🚀 Оптимальные параметры
| Метод | Безопасная частота |
| ------------------————-- | ------------------—————- |
|
send_message | 30–60 сообщений/мин ||
edit_message_text | 1 раз в 0.8–1.5 сек ||
send_chat_action | не чаще 1 в 5 сек |Подпишись 👉🏻 @KodduuPython 🤖
Весна закончилась, и распродажа на Stepik тоже закончится сегодня в 23:59 🔥🔥🔥
👉 Python: самый быстрый курс 🧐
👉 Python Data Science: самый быстрый курс 🤓
👉 Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
👉 Python: самый быстрый курс 🧐
👉 Python Data Science: самый быстрый курс 🤓
👉 Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
Ниже пример, как реализовать стриминг из LLM в Telegram с защитой от flood limit с помощью `aiogram`, и объяснение, почему `aiogram` лучше для продвинутых Telegram-ботов.
📦 Установка
📜 Пример: aiogram + OpenAI Streaming + Flood Control
✅ Что ты получаешь с
* 🌐 Полностью async на
* 🧠 Легко масштабируется с
* 🚀 Совместим с FastAPI и BackgroundTasks.
* 📉 Безопасен к flood limit через
Подпишись 👉🏻 @KodduuPython 🤖
📦 Установка
pip install openai aiogram
📜 Пример: aiogram + OpenAI Streaming + Flood Control
import asyncio
import logging
from aiogram import Bot, Dispatcher, types
from aiogram.types import Message
from aiogram.filters import Command
from aiogram.exceptions import TelegramRetryAfter
import openai
# Настройки
BOT_TOKEN = "YOUR_TELEGRAM_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_KEY"
openai.api_key = OPENAI_API_KEY
# Логгирование
logging.basicConfig(level=logging.INFO)
bot = Bot(token=BOT_TOKEN)
dp = Dispatcher()
# Обертка для стриминга OpenAI
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)
# Функция безопасного редактирования (борьба с flood)
async def safe_edit(message: Message, text: str):
try:
await message.edit_text(text)
except TelegramRetryAfter as e:
logging.warning(f"Flood control! Sleep for {e.timeout} seconds.")
await asyncio.sleep(e.timeout)
await safe_edit(message, text)
except Exception as e:
logging.warning(f"Unexpected error during edit: {e}")
# Основная логика генерации и стриминга
@dp.message(Command("ask"))
async def handle_ask(message: Message):
prompt = message.text.replace("/ask", "").strip()
if not prompt:
await message.reply("❗️ Пожалуйста, укажи запрос после /ask")
return
reply = await message.answer("⌛️ Думаю...")
full_text = ""
last_edit = asyncio.get_event_loop().time()
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in wrap_stream(response):
delta = chunk["choices"][0].get("delta", {}).get("content")
if delta:
full_text += delta
now = asyncio.get_event_loop().time()
if now - last_edit > 1:
await safe_edit(reply, full_text)
last_edit = now
await safe_edit(reply, full_text)
# Запуск бота
async def main():
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main())
✅ Что ты получаешь с
aiogram:* 🌐 Полностью async на
asyncio — эффективнее и отзывчивее.* 🧠 Легко масштабируется с
middlewares, filters, states.* 🚀 Совместим с FastAPI и BackgroundTasks.
* 📉 Безопасен к flood limit через
TelegramRetryAfter.Подпишись 👉🏻 @KodduuPython 🤖
👨💻1
Вот 🔥 очень простой и наглядный пример асинхронности в Python с
⚡️ Пример: Асинхронный vs Синхронный вызов
🔍 Что происходит?
* Синхронно: задачи идут одна за другой → итог 4 секунды.
* Асинхронно: обе задачи запускаются одновременно → итог 2 секунды.
---
## 📦 Вывод для Telegram-поста
Подпишись 👉🏻 @KodduuPython 🤖
asyncio. Демонстрация, как async работает быстрее, чем обычный sleep.⚡️ Пример: Асинхронный vs Синхронный вызов
import asyncio
import time
# Синхронная версия
def sync_task(name, delay):
print(f"[{name}] Начинаю задачу")
time.sleep(delay)
print(f"[{name}] Закончил через {delay} сек")
# Асинхронная версия
async def async_task(name, delay):
print(f"[{name}] Начинаю задачу")
await asyncio.sleep(delay)
print(f"[{name}] Закончил через {delay} сек")
# Сравнение
def run_sync():
start = time.time()
sync_task("A", 2)
sync_task("B", 2)
print(f"⏱️ Синхронно заняло {time.time() - start:.2f} сек")
async def run_async():
start = time.time()
await asyncio.gather(
async_task("A", 2),
async_task("B", 2),
)
print(f"⏱️ Асинхронно заняло {time.time() - start:.2f} сек")
# Запуск
if __name__ == "__main__":
print("=== Синхронный запуск ===")
run_sync()
print("\n=== Асинхронный запуск ===")
asyncio.run(run_async())
🔍 Что происходит?
* Синхронно: задачи идут одна за другой → итог 4 секунды.
* Асинхронно: обе задачи запускаются одновременно → итог 2 секунды.
---
## 📦 Вывод для Telegram-поста
Синхронность:
[Task A] Начинаю задачу
[Task A] Закончил через 2 сек
[Task B] Начинаю задачу
[Task B] Закончил через 2 сек
⏱️ Синхронно заняло 4.00 сек
Асинхронность:
[Task A] Начинаю задачу
[Task B] Начинаю задачу
[Task A] Закончил через 2 сек
[Task B] Закончил через 2 сек
⏱️ Асинхронно заняло 2.00 сек
Подпишись 👉🏻 @KodduuPython 🤖
Вот второй, тоже 🔥наглядный пример: асинхронная загрузка нескольких URLов. Он отлично показывает, как
⚡️ Пример: Асинхронная загрузка страниц
🧠 Что это демонстрирует
* Каждое
* Даже если 1 сайт тормозит (например,
* Всё работает одновременно, а не "по очереди".
📦 Как это будет выглядеть в Telegram
Если бы это был синхронный код — заняло бы 1 + 2 + 3 = 6 секунд, а не 3. Вот и вся магия асинхронности ✨
Подпишись 👉🏻 @KodduuPython 🤖
asyncio + aiohttp позволяет скачать сразу кучу страниц в разы быстрее, чем по одной.⚡️ Пример: Асинхронная загрузка страниц
import asyncio
import aiohttp
import time
urls = [
"https://example.com",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
"https://httpbin.org/delay/1",
]
# Асинхронная загрузка
async def fetch(session, url):
print(f"📡 Загружаю: {url}")
async with session.get(url) as response:
text = await response.text()
print(f"✅ Готово: {url} ({len(text)} символов)")
return text
async def main():
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
await asyncio.gather(*tasks)
print(f"\n⏱️ Все загружено за {time.time() - start:.2f} сек")
if __name__ == "__main__":
asyncio.run(main())
🧠 Что это демонстрирует
* Каждое
fetch() ждет ответ от сервера, но другие задачи не простаивают.* Даже если 1 сайт тормозит (например,
delay/3), остальные уже выполняются.* Всё работает одновременно, а не "по очереди".
📦 Как это будет выглядеть в Telegram
📡 Загружаю: https://example.com
📡 Загружаю: https://httpbin.org/delay/2
📡 Загружаю: https://httpbin.org/delay/3
📡 Загружаю: https://httpbin.org/delay/1
✅ Готово: https://example.com (1256 символов)
✅ Готово: https://httpbin.org/delay/1 (437 символов)
✅ Готово: https://httpbin.org/delay/2 (437 символов)
✅ Готово: https://httpbin.org/delay/3 (437 символов)
⏱️ Все загружено за 3.00 сек
Если бы это был синхронный код — заняло бы 1 + 2 + 3 = 6 секунд, а не 3. Вот и вся магия асинхронности ✨
Подпишись 👉🏻 @KodduuPython 🤖