Kodduu Python
1.08K subscribers
311 photos
28 videos
187 links
Научись программировать на Python на интересных примерах

Самый быстрый курс https://stepik.org/a/187914
Самый нескучный курс https://stepik.org/a/185238

Во вопросам сотрудничества: @AlexErf
Download Telegram
📌 Задача

Три тела взаимодействуют гравитационно в пустом пространстве. Мы хотим смоделировать их движение и визуализировать траектории.

## Требуемые библиотеки

pip install numpy matplotlib



## 🧠 Код: Решение и Визуализация

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

G = 1.0 # Гравитационная постоянная

# Массы тел
m1, m2, m3 = 1.0, 1.0, 1.0

# Начальные координаты (x, y, z) и скорости (vx, vy, vz)
# Можно изменить под свой случай
r1 = np.array([1.0, 0.0, 0.0])
r2 = np.array([-1.0, 0.0, 0.0])
r3 = np.array([0.0, 1.0, 0.0])

v1 = np.array([0.0, 0.3, 0.0])
v2 = np.array([0.0, -0.3, 0.0])
v3 = np.array([0.0, 0.0, 0.0])

# Функция для расчета ускорений
def acceleration(ri, rj, mj):
r = rj - ri
dist = np.linalg.norm(r) + 1e-5
return G * mj * r / dist**3

# Метод Рунге-Кутты 4-го порядка
def rk4_step(r, v, m, dt):
a = np.zeros_like(r)

for i in range(3):
for j in range(3):
if i != j:
a[i] += acceleration(r[i], r[j], m[j])

r_new = r + v * dt + 0.5 * a * dt**2
v_new = v + a * dt

return r_new, v_new

# Инициализация массивов
steps = 5000
dt = 0.001

r = np.array([r1, r2, r3])
v = np.array([v1, v2, v3])
m = np.array([m1, m2, m3])

trajectories = [[], [], []]

# Основной цикл
for _ in range(steps):
for i in range(3):
trajectories[i].append(r[i].copy())

r, v = rk4_step(r, v, m, dt)

trajectories = np.array(trajectories)

# 🎥 Визуализация в 3D
fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')

for i in range(3):
ax.plot(trajectories[i][:, 0], trajectories[i][:, 1], trajectories[i][:, 2], label=f"Body {i+1}")

ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.set_zlabel("Z")
ax.set_title("Three-Body Problem Simulation")
ax.legend()
plt.tight_layout()
plt.show()


## ⚙️ Что делает код:

* Использует метод Рунге-Кутты для численного интегрирования движения.
* Визуализирует траектории всех трех тел в 3D.
* Можно изменить массы, начальные координаты и скорости для экспериментов.

Подпишись 👉🏻 @KodduuPython 🤖
This media is not supported in your browser
VIEW IN TELEGRAM
Анимация кода выше 👆👆👆

Подпишись 👉🏻 @KodduuPython 🤖
🧠 Умная CLI-загрузка: прогресс-бар с ETA, скоростью и цветами
Выглядит профессионально, пишется за 15 строк, реально полезен в любом скрипте ⚙️📦

from tqdm import tqdm
import time
from colorama import Fore, Style, init

init(autoreset=True)

for i in tqdm(range(100), desc=Fore.CYAN + "Загрузка данных" + Style.RESET_ALL,
bar_format="{l_bar}%s{bar}%s{r_bar}" % (Fore.GREEN, Style.RESET_ALL)):
time.sleep(0.05)


📌 Что делает:
— Красивый прогресс-бар в терминале
— Показывает % выполнения, ETA, скорость
— Цветной вывод через colorama, бар от tqdm

📦 Установка:

pip install tqdm colorama


🔧 Используй в своих скриптах для загрузки, скачивания, обработки файлов — это реально удобнее, чем print() и sleep() 🙌

Подпишись 👉🏻 @KodduuPython 🤖
⚙️ Умный таймер-функция: логирует время, ошибки и возвращает результат
Полезный @decorator для любых боевых функций — отслеживает время выполнения и ловит баги без лишнего кода 🔥

import time
import functools
import logging

logging.basicConfig(level=logging.INFO)

def timed(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
t0 = time.time()
try:
result = func(*args, **kwargs)
logging.info(f"{func.__name__} finished in {time.time() - t0:.2f}s")
return result
except Exception as e:
logging.exception(f"{func.__name__} failed after {time.time() - t0:.2f}s")
raise
return wrapper

# 🔧 Пример использования
@timed
def process_data():
time.sleep(1)
return " Done"

process_data()


📌 Что делает:
— Оборачивает любую функцию
— Логирует время выполнения
— Отлавливает и логирует ошибки (с трассировкой!)
— Используется в проде для мониторинга, отладки и контроля SLA

📦 Всё стандартное: time, functools, logging — ничего ставить не надо
Работает и в скриптах, и в backend API, и в пайплайнах.

Подпишись 👉🏻 @KodduuPython 🤖
🛡 Безопасный вызов стороннего API с автоповторами и таймаутом
Анти-фейл сессия для любого запроса в боевом сервисе — не упадёт и не зависнет 🚀

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()
retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)

# 🔧 Пример запроса
try:
response = session.get("https://api.example.com/data", timeout=3)
print(response.json())
except requests.RequestException as e:
print(" Ошибка запроса:", e)


📌 Что делает:
— Повторяет запрос до 3 раз при временных ошибках (500+)
— Использует экспоненциальную задержку (backoff_factor)
— Не виснет бесконечно — timeout=3 сек
— Работает со всеми методами: .get(), .post() и т.д.

📦 Библиотека: requests
Установка:

pip install requests


🔥 Практично для микросервисов, загрузки данных, интеграций с внешними API.
Можешь встроить прямо в боевую систему — сэкономит часы отладки и нервов.

Подпишись 👉🏻 @KodduuPython 🤖
👍1
🔐 Безопасное скачивание файла с логами и таймаутом

import requests
import logging
from pathlib import Path
from requests.exceptions import RequestException, Timeout

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def download_file(url: str, dest: Path, timeout: float = 5.0):
try:
response = requests.get(url, stream=True, timeout=timeout)
response.raise_for_status()

dest.parent.mkdir(parents=True, exist_ok=True)
with open(dest, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)

logging.info(f" Файл сохранён: {dest.resolve()}")
except (RequestException, Timeout) as e:
logging.error(f" Ошибка при скачивании: {e}")

# Пример использования
download_file(
url="https://example.com/data/report.csv",
dest=Path("./downloads/report.csv")
)


📌 Скачивает файл по URL с таймаутом и логирует успех или ошибку. Создаёт нужные папки автоматически. Подходит для бэкенда, cron-задач, скриптов загрузки данных.

Подпишись 👉🏻 @KodduuPython 🤖
⏱️ Декоратор с логгингом времени выполнения

import time
import logging
from functools import wraps

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def log_runtime(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.perf_counter() - start
logging.info(f"⏱️ {func.__name__} завершилась за {duration:.3f} сек")
return wrapper

# Пример использования
@log_runtime
def slow_operation():
time.sleep(1.5)
return "Done"

slow_operation()


📌 Декоратор логирует, сколько времени заняло выполнение функции. Полезно для профилирования, анализа производительности, в том числе в проде.

🛠 Ничего устанавливать не нужно — только стандартная библиотека.

Подпишись 👉🏻 @KodduuPython 🤖
🆒2👍1
📁 Блокировка файла — безопасный доступ к ресурсу без гонок

import fcntl
import time
from pathlib import Path

def with_file_lock(lock_path: Path, timeout: float = 10.0):
def decorator(func):
def wrapper(*args, **kwargs):
with open(lock_path, 'w') as lock_file:
start = time.time()
while True:
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() - start > timeout:
raise TimeoutError("🔒 Не удалось получить файл-лок вовремя")
time.sleep(0.1)
try:
return func(*args, **kwargs)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
return wrapper
return decorator

# Пример использования
@with_file_lock(Path("/tmp/my_script.lock"))
def critical_section():
print("🔧 Работаю в критической секции...")
time.sleep(3)

critical_section()


📌 Защита от параллельного запуска скрипта или доступа к ресурсу. Подходит для cron, пайплайнов, обработки файлов и очередей. Работает на Unix (Linux/macOS).

🛠 Всё из коробки — только fcntl и pathlib.

Подпишись 👉🏻 @KodduuPython 🤖
📬 Мини-клиент для ретраев запросов с бэкоффом и логами

import requests
import logging
import time
from requests.exceptions import RequestException

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def fetch_with_retries(url, retries=3, backoff=2.0, timeout=5.0):
attempt = 0
while attempt < retries:
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
logging.info(f" Успешно на {attempt+1} попытке")
return response.text
except RequestException as e:
logging.warning(f"⚠️ Попытка {attempt+1} не удалась: {e}")
attempt += 1
if attempt < retries:
sleep_time = backoff ** attempt
logging.info(f" Ожидаю {sleep_time:.1f} сек перед новой попыткой")
time.sleep(sleep_time)
else:
logging.error(" Все попытки исчерпаны")
raise

# Пример использования
if __name__ == "__main__":
content = fetch_with_retries("https://httpstat.us/503?sleep=1000", retries=4)


📌 HTTP-клиент с автоматическими повторами при ошибках, экспоненциальным бэкоффом и логированием. Применим для нестабильных API, ETL, микросервисов.

🛠 pip install requests — всё остальное в стандартной библиотеке.

Подпишись 👉🏻 @KodduuPython 🤖
Весенняя распродажа на Stepik 🔥🔥🔥

Самое время пройти Python: самый быстрый курс 🧐

Или окунутся в мир Data Science на курсе Python Data Science: самый быстрый курс 🤓

И наконец узнать все сразу в программе курсов Junior Python Developer и Data Scientist +интервью тест 🧑‍🎓

Подпишись 👉🏻 @KodduuPython 🤖
🧠 Кеш в памяти с TTL — умный memoize-декоратор

import time
from functools import wraps

_cache = {}

def memoize_ttl(ttl: float = 60.0):
def decorator(func):
@wraps(func)
def wrapper(*args):
key = (func.__name__, args)
now = time.time()
if key in _cache:
result, timestamp = _cache[key]
if now - timestamp < ttl:
return result
result = func(*args)
_cache[key] = (result, now)
return result
return wrapper
return decorator

# Пример использования
@memoize_ttl(ttl=10)
def heavy_compute(x):
print(f"🔄 Вычисляю для {x}...")
time.sleep(2)
return x * x

# Повторные вызовы быстро
heavy_compute(4)
heavy_compute(4)


📌 Временный кеш в памяти с автоочисткой по TTL. Ускоряет дорогие функции без внешних зависимостей. Полезно в API, CLI, веб-сервисах и чат-ботах.

🛠 Без зависимостей — всё на стандартной библиотеке.

Подпишись 👉🏻 @KodduuPython 🤖
👍2
Вот минимальный пример кода на Python, демонстрирующий реализацию стриминга ответа от LLM в Telegram в реальном времени. Мы используем:

* OpenAI API (через stream=True) для получения ответа частями
* python-telegram-bot для отправки сообщений в Telegram

🔧 Установка зависимостей

pip install openai python-telegram-bot


💬 Пример кода

import openai
import asyncio
from telegram import Bot
from telegram.ext import Application, CommandHandler, ContextTypes

# Настройки
TELEGRAM_TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
openai.api_key = OPENAI_API_KEY

# ID чата можно узнать после отправки /start боту
USER_CHAT_ID = 123456789

# Основная функция генерации ответа с потоком
async def stream_response(prompt, bot: Bot, chat_id: int):
# Отправим пустое сообщение, которое будем редактировать
msg = await bot.send_message(chat_id=chat_id, text="⌛️ Generating...")
full_text = ""

# Запрос к OpenAI с потоковой генерацией
response = openai.ChatCompletion.create(
model="gpt-4", # или gpt-3.5-turbo
messages=[{"role": "user", "content": prompt}],
stream=True
)

# Обрабатываем поток
async for chunk in wrap_stream(response):
delta = chunk['choices'][0].get('delta', {}).get('content')
if delta:
full_text += delta
# Периодически обновляем сообщение
if len(full_text) % 20 == 0:
await msg.edit_text(full_text)

# Финальное обновление
await msg.edit_text(full_text)


# Обёртка для OpenAI-стрима (сделаем из генератора асинхронный)
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)

# Обработчик команды /ask
async def ask(update, context: ContextTypes.DEFAULT_TYPE):
prompt = " ".join(context.args)
if not prompt:
await update.message.reply_text("❗️ Введите текст после /ask")
return

await stream_response(prompt, context.bot, update.effective_chat.id)

# Запуск бота
def main():
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("ask", ask))
app.run_polling()

if __name__ == "__main__":
main()


💡 Советы

* Не редактируйте сообщение слишком часто — Telegram ограничивает это (лучше каждые 10–30 символов).
* Можно реализовать альтернативу через send_chat_action для отображения typing....
* Для продвинутой UX: добавьте кнопку отмены генерации и историю запросов.

Подпишись 👉🏻 @KodduuPython 🤖
🆒1
This media is not supported in your browser
VIEW IN TELEGRAM
Стриминг по коду выше 👆👆👆

Как это применяется реальных проектах читайте тут @aigentto

Подпишись 👉🏻 @KodduuPython 🤖
Flood limit в Telegram возникает, когда вы:

* слишком часто отправляете сообщения одному пользователю или в один чат;
* редактируете одно и то же сообщение слишком часто (например, при стриминге LLM-ответа).

🛡 Как бороться с flood limit

1. Добавь `try/except` с `telegram.error.RetryAfter`

Telegram возвращает ошибку RetryAfter, в которой указано, сколько секунд нужно подождать.

2. Ограничь частоту `edit_message_text`

Не вызывай её слишком часто — безопасно 1 раз в 0.8–1 сек или по достижении порога символов.

📦 Пример: анти-флуд логика в Telegram-боте

import openai
import asyncio
from telegram import Bot
from telegram.ext import Application, CommandHandler, ContextTypes
from telegram.error import RetryAfter

TELEGRAM_TOKEN = "YOUR_TELEGRAM_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
openai.api_key = OPENAI_API_KEY


async def stream_response(prompt, bot: Bot, chat_id: int):
msg = await bot.send_message(chat_id=chat_id, text="⌛️ Generating...")
full_text = ""
last_edit_time = asyncio.get_event_loop().time()

response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)

async for chunk in wrap_stream(response):
delta = chunk['choices'][0].get('delta', {}).get('content')
if delta:
full_text += delta
now = asyncio.get_event_loop().time()
# Ограничим частоту до 1 обновления в секунду
if now - last_edit_time >= 1:
await safe_edit(bot, msg, full_text)
last_edit_time = now

await safe_edit(bot, msg, full_text) # Финальное обновление


async def safe_edit(bot: Bot, msg, text: str):
try:
await bot.edit_message_text(chat_id=msg.chat_id, message_id=msg.message_id, text=text)
except RetryAfter as e:
print(f"⚠️ Flood limit hit. Sleeping for {e.retry_after} seconds.")
await asyncio.sleep(e.retry_after)
await safe_edit(bot, msg, text)
except Exception as e:
print(f"⚠️ Unexpected error: {e}")


async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)


async def ask(update, context: ContextTypes.DEFAULT_TYPE):
prompt = " ".join(context.args)
if not prompt:
await update.message.reply_text("❗️ Введите текст после /ask")
return
await stream_response(prompt, context.bot, update.effective_chat.id)


def main():
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("ask", ask))
app.run_polling()


if __name__ == "__main__":
main()


🚀 Оптимальные параметры

| Метод | Безопасная частота |
| ------------------————-- | ------------------—————- |
| send_message | 30–60 сообщений/мин |
| edit_message_text | 1 раз в 0.8–1.5 сек |
| send_chat_action | не чаще 1 в 5 сек |


Подпишись 👉🏻 @KodduuPython 🤖
Весна закончилась, и распродажа на Stepik тоже закончится сегодня в 23:59 🔥🔥🔥

👉 Python: самый быстрый курс 🧐

👉 Python Data Science: самый быстрый курс 🤓

👉 Junior Python Developer и Data Scientist +интервью тест 🧑‍🎓

Подпишись 👉🏻 @KodduuPython 🤖