Kodduu Python
1.08K subscribers
311 photos
28 videos
187 links
Научись программировать на Python на интересных примерах

Самый быстрый курс https://stepik.org/a/187914
Самый нескучный курс https://stepik.org/a/185238

Во вопросам сотрудничества: @AlexErf
Download Telegram
🧠 Умная CLI-загрузка: прогресс-бар с ETA, скоростью и цветами
Выглядит профессионально, пишется за 15 строк, реально полезен в любом скрипте ⚙️📦

from tqdm import tqdm
import time
from colorama import Fore, Style, init

init(autoreset=True)

for i in tqdm(range(100), desc=Fore.CYAN + "Загрузка данных" + Style.RESET_ALL,
bar_format="{l_bar}%s{bar}%s{r_bar}" % (Fore.GREEN, Style.RESET_ALL)):
time.sleep(0.05)


📌 Что делает:
— Красивый прогресс-бар в терминале
— Показывает % выполнения, ETA, скорость
— Цветной вывод через colorama, бар от tqdm

📦 Установка:

pip install tqdm colorama


🔧 Используй в своих скриптах для загрузки, скачивания, обработки файлов — это реально удобнее, чем print() и sleep() 🙌

Подпишись 👉🏻 @KodduuPython 🤖
⚙️ Умный таймер-функция: логирует время, ошибки и возвращает результат
Полезный @decorator для любых боевых функций — отслеживает время выполнения и ловит баги без лишнего кода 🔥

import time
import functools
import logging

logging.basicConfig(level=logging.INFO)

def timed(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
t0 = time.time()
try:
result = func(*args, **kwargs)
logging.info(f"{func.__name__} finished in {time.time() - t0:.2f}s")
return result
except Exception as e:
logging.exception(f"{func.__name__} failed after {time.time() - t0:.2f}s")
raise
return wrapper

# 🔧 Пример использования
@timed
def process_data():
time.sleep(1)
return " Done"

process_data()


📌 Что делает:
— Оборачивает любую функцию
— Логирует время выполнения
— Отлавливает и логирует ошибки (с трассировкой!)
— Используется в проде для мониторинга, отладки и контроля SLA

📦 Всё стандартное: time, functools, logging — ничего ставить не надо
Работает и в скриптах, и в backend API, и в пайплайнах.

Подпишись 👉🏻 @KodduuPython 🤖
🛡 Безопасный вызов стороннего API с автоповторами и таймаутом
Анти-фейл сессия для любого запроса в боевом сервисе — не упадёт и не зависнет 🚀

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()
retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)

# 🔧 Пример запроса
try:
response = session.get("https://api.example.com/data", timeout=3)
print(response.json())
except requests.RequestException as e:
print(" Ошибка запроса:", e)


📌 Что делает:
— Повторяет запрос до 3 раз при временных ошибках (500+)
— Использует экспоненциальную задержку (backoff_factor)
— Не виснет бесконечно — timeout=3 сек
— Работает со всеми методами: .get(), .post() и т.д.

📦 Библиотека: requests
Установка:

pip install requests


🔥 Практично для микросервисов, загрузки данных, интеграций с внешними API.
Можешь встроить прямо в боевую систему — сэкономит часы отладки и нервов.

Подпишись 👉🏻 @KodduuPython 🤖
👍1
🔐 Безопасное скачивание файла с логами и таймаутом

import requests
import logging
from pathlib import Path
from requests.exceptions import RequestException, Timeout

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def download_file(url: str, dest: Path, timeout: float = 5.0):
try:
response = requests.get(url, stream=True, timeout=timeout)
response.raise_for_status()

dest.parent.mkdir(parents=True, exist_ok=True)
with open(dest, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)

logging.info(f" Файл сохранён: {dest.resolve()}")
except (RequestException, Timeout) as e:
logging.error(f" Ошибка при скачивании: {e}")

# Пример использования
download_file(
url="https://example.com/data/report.csv",
dest=Path("./downloads/report.csv")
)


📌 Скачивает файл по URL с таймаутом и логирует успех или ошибку. Создаёт нужные папки автоматически. Подходит для бэкенда, cron-задач, скриптов загрузки данных.

Подпишись 👉🏻 @KodduuPython 🤖
⏱️ Декоратор с логгингом времени выполнения

import time
import logging
from functools import wraps

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def log_runtime(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.perf_counter() - start
logging.info(f"⏱️ {func.__name__} завершилась за {duration:.3f} сек")
return wrapper

# Пример использования
@log_runtime
def slow_operation():
time.sleep(1.5)
return "Done"

slow_operation()


📌 Декоратор логирует, сколько времени заняло выполнение функции. Полезно для профилирования, анализа производительности, в том числе в проде.

🛠 Ничего устанавливать не нужно — только стандартная библиотека.

Подпишись 👉🏻 @KodduuPython 🤖
🆒2👍1
📁 Блокировка файла — безопасный доступ к ресурсу без гонок

import fcntl
import time
from pathlib import Path

def with_file_lock(lock_path: Path, timeout: float = 10.0):
def decorator(func):
def wrapper(*args, **kwargs):
with open(lock_path, 'w') as lock_file:
start = time.time()
while True:
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() - start > timeout:
raise TimeoutError("🔒 Не удалось получить файл-лок вовремя")
time.sleep(0.1)
try:
return func(*args, **kwargs)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
return wrapper
return decorator

# Пример использования
@with_file_lock(Path("/tmp/my_script.lock"))
def critical_section():
print("🔧 Работаю в критической секции...")
time.sleep(3)

critical_section()


📌 Защита от параллельного запуска скрипта или доступа к ресурсу. Подходит для cron, пайплайнов, обработки файлов и очередей. Работает на Unix (Linux/macOS).

🛠 Всё из коробки — только fcntl и pathlib.

Подпишись 👉🏻 @KodduuPython 🤖
📬 Мини-клиент для ретраев запросов с бэкоффом и логами

import requests
import logging
import time
from requests.exceptions import RequestException

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def fetch_with_retries(url, retries=3, backoff=2.0, timeout=5.0):
attempt = 0
while attempt < retries:
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
logging.info(f" Успешно на {attempt+1} попытке")
return response.text
except RequestException as e:
logging.warning(f"⚠️ Попытка {attempt+1} не удалась: {e}")
attempt += 1
if attempt < retries:
sleep_time = backoff ** attempt
logging.info(f" Ожидаю {sleep_time:.1f} сек перед новой попыткой")
time.sleep(sleep_time)
else:
logging.error(" Все попытки исчерпаны")
raise

# Пример использования
if __name__ == "__main__":
content = fetch_with_retries("https://httpstat.us/503?sleep=1000", retries=4)


📌 HTTP-клиент с автоматическими повторами при ошибках, экспоненциальным бэкоффом и логированием. Применим для нестабильных API, ETL, микросервисов.

🛠 pip install requests — всё остальное в стандартной библиотеке.

Подпишись 👉🏻 @KodduuPython 🤖
Весенняя распродажа на Stepik 🔥🔥🔥

Самое время пройти Python: самый быстрый курс 🧐

Или окунутся в мир Data Science на курсе Python Data Science: самый быстрый курс 🤓

И наконец узнать все сразу в программе курсов Junior Python Developer и Data Scientist +интервью тест 🧑‍🎓

Подпишись 👉🏻 @KodduuPython 🤖
🧠 Кеш в памяти с TTL — умный memoize-декоратор

import time
from functools import wraps

_cache = {}

def memoize_ttl(ttl: float = 60.0):
def decorator(func):
@wraps(func)
def wrapper(*args):
key = (func.__name__, args)
now = time.time()
if key in _cache:
result, timestamp = _cache[key]
if now - timestamp < ttl:
return result
result = func(*args)
_cache[key] = (result, now)
return result
return wrapper
return decorator

# Пример использования
@memoize_ttl(ttl=10)
def heavy_compute(x):
print(f"🔄 Вычисляю для {x}...")
time.sleep(2)
return x * x

# Повторные вызовы быстро
heavy_compute(4)
heavy_compute(4)


📌 Временный кеш в памяти с автоочисткой по TTL. Ускоряет дорогие функции без внешних зависимостей. Полезно в API, CLI, веб-сервисах и чат-ботах.

🛠 Без зависимостей — всё на стандартной библиотеке.

Подпишись 👉🏻 @KodduuPython 🤖
👍2
Вот минимальный пример кода на Python, демонстрирующий реализацию стриминга ответа от LLM в Telegram в реальном времени. Мы используем:

* OpenAI API (через stream=True) для получения ответа частями
* python-telegram-bot для отправки сообщений в Telegram

🔧 Установка зависимостей

pip install openai python-telegram-bot


💬 Пример кода

import openai
import asyncio
from telegram import Bot
from telegram.ext import Application, CommandHandler, ContextTypes

# Настройки
TELEGRAM_TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
openai.api_key = OPENAI_API_KEY

# ID чата можно узнать после отправки /start боту
USER_CHAT_ID = 123456789

# Основная функция генерации ответа с потоком
async def stream_response(prompt, bot: Bot, chat_id: int):
# Отправим пустое сообщение, которое будем редактировать
msg = await bot.send_message(chat_id=chat_id, text="⌛️ Generating...")
full_text = ""

# Запрос к OpenAI с потоковой генерацией
response = openai.ChatCompletion.create(
model="gpt-4", # или gpt-3.5-turbo
messages=[{"role": "user", "content": prompt}],
stream=True
)

# Обрабатываем поток
async for chunk in wrap_stream(response):
delta = chunk['choices'][0].get('delta', {}).get('content')
if delta:
full_text += delta
# Периодически обновляем сообщение
if len(full_text) % 20 == 0:
await msg.edit_text(full_text)

# Финальное обновление
await msg.edit_text(full_text)


# Обёртка для OpenAI-стрима (сделаем из генератора асинхронный)
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)

# Обработчик команды /ask
async def ask(update, context: ContextTypes.DEFAULT_TYPE):
prompt = " ".join(context.args)
if not prompt:
await update.message.reply_text("❗️ Введите текст после /ask")
return

await stream_response(prompt, context.bot, update.effective_chat.id)

# Запуск бота
def main():
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("ask", ask))
app.run_polling()

if __name__ == "__main__":
main()


💡 Советы

* Не редактируйте сообщение слишком часто — Telegram ограничивает это (лучше каждые 10–30 символов).
* Можно реализовать альтернативу через send_chat_action для отображения typing....
* Для продвинутой UX: добавьте кнопку отмены генерации и историю запросов.

Подпишись 👉🏻 @KodduuPython 🤖
🆒1
This media is not supported in your browser
VIEW IN TELEGRAM
Стриминг по коду выше 👆👆👆

Как это применяется реальных проектах читайте тут @aigentto

Подпишись 👉🏻 @KodduuPython 🤖
Flood limit в Telegram возникает, когда вы:

* слишком часто отправляете сообщения одному пользователю или в один чат;
* редактируете одно и то же сообщение слишком часто (например, при стриминге LLM-ответа).

🛡 Как бороться с flood limit

1. Добавь `try/except` с `telegram.error.RetryAfter`

Telegram возвращает ошибку RetryAfter, в которой указано, сколько секунд нужно подождать.

2. Ограничь частоту `edit_message_text`

Не вызывай её слишком часто — безопасно 1 раз в 0.8–1 сек или по достижении порога символов.

📦 Пример: анти-флуд логика в Telegram-боте

import openai
import asyncio
from telegram import Bot
from telegram.ext import Application, CommandHandler, ContextTypes
from telegram.error import RetryAfter

TELEGRAM_TOKEN = "YOUR_TELEGRAM_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
openai.api_key = OPENAI_API_KEY


async def stream_response(prompt, bot: Bot, chat_id: int):
msg = await bot.send_message(chat_id=chat_id, text="⌛️ Generating...")
full_text = ""
last_edit_time = asyncio.get_event_loop().time()

response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)

async for chunk in wrap_stream(response):
delta = chunk['choices'][0].get('delta', {}).get('content')
if delta:
full_text += delta
now = asyncio.get_event_loop().time()
# Ограничим частоту до 1 обновления в секунду
if now - last_edit_time >= 1:
await safe_edit(bot, msg, full_text)
last_edit_time = now

await safe_edit(bot, msg, full_text) # Финальное обновление


async def safe_edit(bot: Bot, msg, text: str):
try:
await bot.edit_message_text(chat_id=msg.chat_id, message_id=msg.message_id, text=text)
except RetryAfter as e:
print(f"⚠️ Flood limit hit. Sleeping for {e.retry_after} seconds.")
await asyncio.sleep(e.retry_after)
await safe_edit(bot, msg, text)
except Exception as e:
print(f"⚠️ Unexpected error: {e}")


async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)


async def ask(update, context: ContextTypes.DEFAULT_TYPE):
prompt = " ".join(context.args)
if not prompt:
await update.message.reply_text("❗️ Введите текст после /ask")
return
await stream_response(prompt, context.bot, update.effective_chat.id)


def main():
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("ask", ask))
app.run_polling()


if __name__ == "__main__":
main()


🚀 Оптимальные параметры

| Метод | Безопасная частота |
| ------------------————-- | ------------------—————- |
| send_message | 30–60 сообщений/мин |
| edit_message_text | 1 раз в 0.8–1.5 сек |
| send_chat_action | не чаще 1 в 5 сек |


Подпишись 👉🏻 @KodduuPython 🤖
Весна закончилась, и распродажа на Stepik тоже закончится сегодня в 23:59 🔥🔥🔥

👉 Python: самый быстрый курс 🧐

👉 Python Data Science: самый быстрый курс 🤓

👉 Junior Python Developer и Data Scientist +интервью тест 🧑‍🎓

Подпишись 👉🏻 @KodduuPython 🤖
Ниже пример, как реализовать стриминг из LLM в Telegram с защитой от flood limit с помощью `aiogram`, и объяснение, почему `aiogram` лучше для продвинутых Telegram-ботов.

📦 Установка

pip install openai aiogram


📜 Пример: aiogram + OpenAI Streaming + Flood Control

import asyncio
import logging
from aiogram import Bot, Dispatcher, types
from aiogram.types import Message
from aiogram.filters import Command
from aiogram.exceptions import TelegramRetryAfter
import openai

# Настройки
BOT_TOKEN = "YOUR_TELEGRAM_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_KEY"
openai.api_key = OPENAI_API_KEY

# Логгирование
logging.basicConfig(level=logging.INFO)

bot = Bot(token=BOT_TOKEN)
dp = Dispatcher()

# Обертка для стриминга OpenAI
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)

# Функция безопасного редактирования (борьба с flood)
async def safe_edit(message: Message, text: str):
try:
await message.edit_text(text)
except TelegramRetryAfter as e:
logging.warning(f"Flood control! Sleep for {e.timeout} seconds.")
await asyncio.sleep(e.timeout)
await safe_edit(message, text)
except Exception as e:
logging.warning(f"Unexpected error during edit: {e}")

# Основная логика генерации и стриминга
@dp.message(Command("ask"))
async def handle_ask(message: Message):
prompt = message.text.replace("/ask", "").strip()
if not prompt:
await message.reply("❗️ Пожалуйста, укажи запрос после /ask")
return

reply = await message.answer("⌛️ Думаю...")
full_text = ""
last_edit = asyncio.get_event_loop().time()

response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)

async for chunk in wrap_stream(response):
delta = chunk["choices"][0].get("delta", {}).get("content")
if delta:
full_text += delta
now = asyncio.get_event_loop().time()
if now - last_edit > 1:
await safe_edit(reply, full_text)
last_edit = now

await safe_edit(reply, full_text)

# Запуск бота
async def main():
await dp.start_polling(bot)

if __name__ == "__main__":
asyncio.run(main())


Что ты получаешь с aiogram:

* 🌐 Полностью async на asyncio — эффективнее и отзывчивее.
* 🧠 Легко масштабируется с middlewares, filters, states.
* 🚀 Совместим с FastAPI и BackgroundTasks.
* 📉 Безопасен к flood limit через TelegramRetryAfter.

Подпишись 👉🏻 @KodduuPython 🤖
👨‍💻1
Вот 🔥 очень простой и наглядный пример асинхронности в Python с asyncio. Демонстрация, как async работает быстрее, чем обычный sleep.

⚡️ Пример: Асинхронный vs Синхронный вызов

import asyncio
import time

# Синхронная версия
def sync_task(name, delay):
print(f"[{name}] Начинаю задачу")
time.sleep(delay)
print(f"[{name}] Закончил через {delay} сек")

# Асинхронная версия
async def async_task(name, delay):
print(f"[{name}] Начинаю задачу")
await asyncio.sleep(delay)
print(f"[{name}] Закончил через {delay} сек")

# Сравнение
def run_sync():
start = time.time()
sync_task("A", 2)
sync_task("B", 2)
print(f"⏱️ Синхронно заняло {time.time() - start:.2f} сек")

async def run_async():
start = time.time()
await asyncio.gather(
async_task("A", 2),
async_task("B", 2),
)
print(f"⏱️ Асинхронно заняло {time.time() - start:.2f} сек")

# Запуск
if __name__ == "__main__":
print("=== Синхронный запуск ===")
run_sync()

print("\n=== Асинхронный запуск ===")
asyncio.run(run_async())


🔍 Что происходит?

* Синхронно: задачи идут одна за другой → итог 4 секунды.
* Асинхронно: обе задачи запускаются одновременно → итог 2 секунды.

---

## 📦 Вывод для Telegram-поста

Синхронность:
[Task A] Начинаю задачу
[Task A] Закончил через 2 сек
[Task B] Начинаю задачу
[Task B] Закончил через 2 сек
⏱️ Синхронно заняло 4.00 сек

Асинхронность:
[Task A] Начинаю задачу
[Task B] Начинаю задачу
[Task A] Закончил через 2 сек
[Task B] Закончил через 2 сек
⏱️ Асинхронно заняло 2.00 сек


Подпишись 👉🏻 @KodduuPython 🤖
Вот второй, тоже 🔥наглядный пример: асинхронная загрузка нескольких URLов. Он отлично показывает, как asyncio + aiohttp позволяет скачать сразу кучу страниц в разы быстрее, чем по одной.

⚡️ Пример: Асинхронная загрузка страниц

import asyncio
import aiohttp
import time

urls = [
"https://example.com",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
"https://httpbin.org/delay/1",
]

# Асинхронная загрузка
async def fetch(session, url):
print(f"📡 Загружаю: {url}")
async with session.get(url) as response:
text = await response.text()
print(f" Готово: {url} ({len(text)} символов)")
return text

async def main():
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
await asyncio.gather(*tasks)
print(f"\n⏱️ Все загружено за {time.time() - start:.2f} сек")

if __name__ == "__main__":
asyncio.run(main())


🧠 Что это демонстрирует

* Каждое fetch() ждет ответ от сервера, но другие задачи не простаивают.
* Даже если 1 сайт тормозит (например, delay/3), остальные уже выполняются.
* Всё работает одновременно, а не "по очереди".

📦 Как это будет выглядеть в Telegram

📡 Загружаю: https://example.com
📡 Загружаю: https://httpbin.org/delay/2
📡 Загружаю: https://httpbin.org/delay/3
📡 Загружаю: https://httpbin.org/delay/1
Готово: https://example.com (1256 символов)
Готово: https://httpbin.org/delay/1 (437 символов)
Готово: https://httpbin.org/delay/2 (437 символов)
Готово: https://httpbin.org/delay/3 (437 символов)

⏱️ Все загружено за 3.00 сек


Если бы это был синхронный код — заняло бы 1 + 2 + 3 = 6 секунд, а не 3. Вот и вся магия асинхронности

Подпишись 👉🏻 @KodduuPython 🤖