📌 Задача
Три тела взаимодействуют гравитационно в пустом пространстве. Мы хотим смоделировать их движение и визуализировать траектории.
## ✅ Требуемые библиотеки
## 🧠 Код: Решение и Визуализация
## ⚙️ Что делает код:
* Использует метод Рунге-Кутты для численного интегрирования движения.
* Визуализирует траектории всех трех тел в 3D.
* Можно изменить массы, начальные координаты и скорости для экспериментов.
Подпишись 👉🏻 @KodduuPython 🤖
Три тела взаимодействуют гравитационно в пустом пространстве. Мы хотим смоделировать их движение и визуализировать траектории.
## ✅ Требуемые библиотеки
pip install numpy matplotlib
## 🧠 Код: Решение и Визуализация
import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
G = 1.0 # Гравитационная постоянная
# Массы тел
m1, m2, m3 = 1.0, 1.0, 1.0
# Начальные координаты (x, y, z) и скорости (vx, vy, vz)
# Можно изменить под свой случай
r1 = np.array([1.0, 0.0, 0.0])
r2 = np.array([-1.0, 0.0, 0.0])
r3 = np.array([0.0, 1.0, 0.0])
v1 = np.array([0.0, 0.3, 0.0])
v2 = np.array([0.0, -0.3, 0.0])
v3 = np.array([0.0, 0.0, 0.0])
# Функция для расчета ускорений
def acceleration(ri, rj, mj):
r = rj - ri
dist = np.linalg.norm(r) + 1e-5
return G * mj * r / dist**3
# Метод Рунге-Кутты 4-го порядка
def rk4_step(r, v, m, dt):
a = np.zeros_like(r)
for i in range(3):
for j in range(3):
if i != j:
a[i] += acceleration(r[i], r[j], m[j])
r_new = r + v * dt + 0.5 * a * dt**2
v_new = v + a * dt
return r_new, v_new
# Инициализация массивов
steps = 5000
dt = 0.001
r = np.array([r1, r2, r3])
v = np.array([v1, v2, v3])
m = np.array([m1, m2, m3])
trajectories = [[], [], []]
# Основной цикл
for _ in range(steps):
for i in range(3):
trajectories[i].append(r[i].copy())
r, v = rk4_step(r, v, m, dt)
trajectories = np.array(trajectories)
# 🎥 Визуализация в 3D
fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')
for i in range(3):
ax.plot(trajectories[i][:, 0], trajectories[i][:, 1], trajectories[i][:, 2], label=f"Body {i+1}")
ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.set_zlabel("Z")
ax.set_title("Three-Body Problem Simulation")
ax.legend()
plt.tight_layout()
plt.show()
## ⚙️ Что делает код:
* Использует метод Рунге-Кутты для численного интегрирования движения.
* Визуализирует траектории всех трех тел в 3D.
* Можно изменить массы, начальные координаты и скорости для экспериментов.
Подпишись 👉🏻 @KodduuPython 🤖
Больше кейсов по RAG системам и ИИ-агентам в канале @AIGENTTO 🧐
Telegram
AIGENTTO
ИИ-агенты и RAG системы для бизнеса. Автоматизация сотрудников. Снижение ФОТ. Выявление намерений клиентов и продажи в AI агентах.
Forwarded from AIGENTTO
Вышла статья Как сделать RAG/ИИ-Асистента без кода 🔥
Делаем генератор постов для канала📖, поиск рейсов ✈️ и бронирование отелей через Yandex Travel API 🛎
Подпишись 👉🏻 @aigentto 🤖
Делаем генератор постов для канала📖, поиск рейсов ✈️ и бронирование отелей через Yandex Travel API 🛎
Подпишись 👉🏻 @aigentto 🤖
Хабр
Как сделать RAG/ИИ-ассистента без кода
Если вам нужно сконфигурировать персонального или командного AI-ассистента без единой строчки кода, то инфраструктура OpenAI позволяет это сделать. В этой статье мы сконфигурируем ассистента для...
🧠 Умная CLI-загрузка: прогресс-бар с ETA, скоростью и цветами
Выглядит профессионально, пишется за 15 строк, реально полезен в любом скрипте ⚙️📦
📌 Что делает:
— Красивый прогресс-бар в терминале
— Показывает % выполнения, ETA, скорость
— Цветной вывод через
📦 Установка:
🔧 Используй в своих скриптах для загрузки, скачивания, обработки файлов — это реально удобнее, чем print() и sleep() 🙌
Подпишись 👉🏻 @KodduuPython 🤖
Выглядит профессионально, пишется за 15 строк, реально полезен в любом скрипте ⚙️📦
from tqdm import tqdm
import time
from colorama import Fore, Style, init
init(autoreset=True)
for i in tqdm(range(100), desc=Fore.CYAN + "Загрузка данных" + Style.RESET_ALL,
bar_format="{l_bar}%s{bar}%s{r_bar}" % (Fore.GREEN, Style.RESET_ALL)):
time.sleep(0.05)
📌 Что делает:
— Красивый прогресс-бар в терминале
— Показывает % выполнения, ETA, скорость
— Цветной вывод через
colorama, бар от tqdm📦 Установка:
pip install tqdm colorama
🔧 Используй в своих скриптах для загрузки, скачивания, обработки файлов — это реально удобнее, чем print() и sleep() 🙌
Подпишись 👉🏻 @KodduuPython 🤖
⚙️ Умный таймер-функция: логирует время, ошибки и возвращает результат
Полезный
📌 Что делает:
— Оборачивает любую функцию
— Логирует время выполнения
— Отлавливает и логирует ошибки (с трассировкой!)
— Используется в проде для мониторинга, отладки и контроля SLA
📦 Всё стандартное:
Работает и в скриптах, и в backend API, и в пайплайнах.
Подпишись 👉🏻 @KodduuPython 🤖
Полезный
@decorator для любых боевых функций — отслеживает время выполнения и ловит баги без лишнего кода 🔥import time
import functools
import logging
logging.basicConfig(level=logging.INFO)
def timed(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
t0 = time.time()
try:
result = func(*args, **kwargs)
logging.info(f"{func.__name__} finished in {time.time() - t0:.2f}s")
return result
except Exception as e:
logging.exception(f"{func.__name__} failed after {time.time() - t0:.2f}s")
raise
return wrapper
# 🔧 Пример использования
@timed
def process_data():
time.sleep(1)
return "✅ Done"
process_data()
📌 Что делает:
— Оборачивает любую функцию
— Логирует время выполнения
— Отлавливает и логирует ошибки (с трассировкой!)
— Используется в проде для мониторинга, отладки и контроля SLA
📦 Всё стандартное:
time, functools, logging — ничего ставить не надоРаботает и в скриптах, и в backend API, и в пайплайнах.
Подпишись 👉🏻 @KodduuPython 🤖
🛡 Безопасный вызов стороннего API с автоповторами и таймаутом
Анти-фейл сессия для любого запроса в боевом сервисе — не упадёт и не зависнет 🚀
📌 Что делает:
— Повторяет запрос до 3 раз при временных ошибках (500+)
— Использует экспоненциальную задержку (
— Не виснет бесконечно —
— Работает со всеми методами:
📦 Библиотека:
Установка:
🔥 Практично для микросервисов, загрузки данных, интеграций с внешними API.
Можешь встроить прямо в боевую систему — сэкономит часы отладки и нервов.
Подпишись 👉🏻 @KodduuPython 🤖
Анти-фейл сессия для любого запроса в боевом сервисе — не упадёт и не зависнет 🚀
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
session = requests.Session()
retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 🔧 Пример запроса
try:
response = session.get("https://api.example.com/data", timeout=3)
print(response.json())
except requests.RequestException as e:
print("❌ Ошибка запроса:", e)
📌 Что делает:
— Повторяет запрос до 3 раз при временных ошибках (500+)
— Использует экспоненциальную задержку (
backoff_factor)— Не виснет бесконечно —
timeout=3 сек— Работает со всеми методами:
.get(), .post() и т.д.📦 Библиотека:
requestsУстановка:
pip install requests
🔥 Практично для микросервисов, загрузки данных, интеграций с внешними API.
Можешь встроить прямо в боевую систему — сэкономит часы отладки и нервов.
Подпишись 👉🏻 @KodduuPython 🤖
👍1
🔐 Безопасное скачивание файла с логами и таймаутом
📌 Скачивает файл по URL с таймаутом и логирует успех или ошибку. Создаёт нужные папки автоматически. Подходит для бэкенда, cron-задач, скриптов загрузки данных.
Подпишись 👉🏻 @KodduuPython 🤖
import requests
import logging
from pathlib import Path
from requests.exceptions import RequestException, Timeout
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
def download_file(url: str, dest: Path, timeout: float = 5.0):
try:
response = requests.get(url, stream=True, timeout=timeout)
response.raise_for_status()
dest.parent.mkdir(parents=True, exist_ok=True)
with open(dest, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logging.info(f"✅ Файл сохранён: {dest.resolve()}")
except (RequestException, Timeout) as e:
logging.error(f"❌ Ошибка при скачивании: {e}")
# Пример использования
download_file(
url="https://example.com/data/report.csv",
dest=Path("./downloads/report.csv")
)
📌 Скачивает файл по URL с таймаутом и логирует успех или ошибку. Создаёт нужные папки автоматически. Подходит для бэкенда, cron-задач, скриптов загрузки данных.
Подпишись 👉🏻 @KodduuPython 🤖
⏱️ Декоратор с логгингом времени выполнения
📌 Декоратор логирует, сколько времени заняло выполнение функции. Полезно для профилирования, анализа производительности, в том числе в проде.
🛠 Ничего устанавливать не нужно — только стандартная библиотека.
Подпишись 👉🏻 @KodduuPython 🤖
import time
import logging
from functools import wraps
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
def log_runtime(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.perf_counter() - start
logging.info(f"⏱️ {func.__name__} завершилась за {duration:.3f} сек")
return wrapper
# Пример использования
@log_runtime
def slow_operation():
time.sleep(1.5)
return "Done"
slow_operation()
📌 Декоратор логирует, сколько времени заняло выполнение функции. Полезно для профилирования, анализа производительности, в том числе в проде.
🛠 Ничего устанавливать не нужно — только стандартная библиотека.
Подпишись 👉🏻 @KodduuPython 🤖
🆒2👍1
📁 Блокировка файла — безопасный доступ к ресурсу без гонок
📌 Защита от параллельного запуска скрипта или доступа к ресурсу. Подходит для cron, пайплайнов, обработки файлов и очередей. Работает на Unix (Linux/macOS).
🛠 Всё из коробки — только
Подпишись 👉🏻 @KodduuPython 🤖
import fcntl
import time
from pathlib import Path
def with_file_lock(lock_path: Path, timeout: float = 10.0):
def decorator(func):
def wrapper(*args, **kwargs):
with open(lock_path, 'w') as lock_file:
start = time.time()
while True:
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() - start > timeout:
raise TimeoutError("🔒 Не удалось получить файл-лок вовремя")
time.sleep(0.1)
try:
return func(*args, **kwargs)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
return wrapper
return decorator
# Пример использования
@with_file_lock(Path("/tmp/my_script.lock"))
def critical_section():
print("🔧 Работаю в критической секции...")
time.sleep(3)
critical_section()
📌 Защита от параллельного запуска скрипта или доступа к ресурсу. Подходит для cron, пайплайнов, обработки файлов и очередей. Работает на Unix (Linux/macOS).
🛠 Всё из коробки — только
fcntl и pathlib.Подпишись 👉🏻 @KodduuPython 🤖
📬 Мини-клиент для ретраев запросов с бэкоффом и логами
📌 HTTP-клиент с автоматическими повторами при ошибках, экспоненциальным бэкоффом и логированием. Применим для нестабильных API, ETL, микросервисов.
🛠
Подпишись 👉🏻 @KodduuPython 🤖
import requests
import logging
import time
from requests.exceptions import RequestException
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
def fetch_with_retries(url, retries=3, backoff=2.0, timeout=5.0):
attempt = 0
while attempt < retries:
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
logging.info(f"✅ Успешно на {attempt+1} попытке")
return response.text
except RequestException as e:
logging.warning(f"⚠️ Попытка {attempt+1} не удалась: {e}")
attempt += 1
if attempt < retries:
sleep_time = backoff ** attempt
logging.info(f"⏳ Ожидаю {sleep_time:.1f} сек перед новой попыткой")
time.sleep(sleep_time)
else:
logging.error("❌ Все попытки исчерпаны")
raise
# Пример использования
if __name__ == "__main__":
content = fetch_with_retries("https://httpstat.us/503?sleep=1000", retries=4)
📌 HTTP-клиент с автоматическими повторами при ошибках, экспоненциальным бэкоффом и логированием. Применим для нестабильных API, ETL, микросервисов.
🛠
pip install requests — всё остальное в стандартной библиотеке.Подпишись 👉🏻 @KodduuPython 🤖
Весенняя распродажа на Stepik 🔥🔥🔥
Самое время пройти Python: самый быстрый курс 🧐
Или окунутся в мир Data Science на курсе Python Data Science: самый быстрый курс 🤓
И наконец узнать все сразу в программе курсов Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
Самое время пройти Python: самый быстрый курс 🧐
Или окунутся в мир Data Science на курсе Python Data Science: самый быстрый курс 🤓
И наконец узнать все сразу в программе курсов Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
🧠 Кеш в памяти с TTL — умный memoize-декоратор
📌 Временный кеш в памяти с автоочисткой по TTL. Ускоряет дорогие функции без внешних зависимостей. Полезно в API, CLI, веб-сервисах и чат-ботах.
🛠 Без зависимостей — всё на стандартной библиотеке.
Подпишись 👉🏻 @KodduuPython 🤖
import time
from functools import wraps
_cache = {}
def memoize_ttl(ttl: float = 60.0):
def decorator(func):
@wraps(func)
def wrapper(*args):
key = (func.__name__, args)
now = time.time()
if key in _cache:
result, timestamp = _cache[key]
if now - timestamp < ttl:
return result
result = func(*args)
_cache[key] = (result, now)
return result
return wrapper
return decorator
# Пример использования
@memoize_ttl(ttl=10)
def heavy_compute(x):
print(f"🔄 Вычисляю для {x}...")
time.sleep(2)
return x * x
# Повторные вызовы быстро
heavy_compute(4)
heavy_compute(4)
📌 Временный кеш в памяти с автоочисткой по TTL. Ускоряет дорогие функции без внешних зависимостей. Полезно в API, CLI, веб-сервисах и чат-ботах.
🛠 Без зависимостей — всё на стандартной библиотеке.
Подпишись 👉🏻 @KodduuPython 🤖
👍2
Вот минимальный пример кода на Python, демонстрирующий реализацию стриминга ответа от LLM в Telegram в реальном времени. Мы используем:
*
*
🔧 Установка зависимостей
💬 Пример кода
💡 Советы
* Не редактируйте сообщение слишком часто — Telegram ограничивает это (лучше каждые 10–30 символов).
* Можно реализовать альтернативу через
* Для продвинутой UX: добавьте кнопку отмены генерации и историю запросов.
Подпишись 👉🏻 @KodduuPython 🤖
*
OpenAI API (через stream=True) для получения ответа частями*
python-telegram-bot для отправки сообщений в Telegram🔧 Установка зависимостей
pip install openai python-telegram-bot
💬 Пример кода
import openai
import asyncio
from telegram import Bot
from telegram.ext import Application, CommandHandler, ContextTypes
# Настройки
TELEGRAM_TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
openai.api_key = OPENAI_API_KEY
# ID чата можно узнать после отправки /start боту
USER_CHAT_ID = 123456789
# Основная функция генерации ответа с потоком
async def stream_response(prompt, bot: Bot, chat_id: int):
# Отправим пустое сообщение, которое будем редактировать
msg = await bot.send_message(chat_id=chat_id, text="⌛️ Generating...")
full_text = ""
# Запрос к OpenAI с потоковой генерацией
response = openai.ChatCompletion.create(
model="gpt-4", # или gpt-3.5-turbo
messages=[{"role": "user", "content": prompt}],
stream=True
)
# Обрабатываем поток
async for chunk in wrap_stream(response):
delta = chunk['choices'][0].get('delta', {}).get('content')
if delta:
full_text += delta
# Периодически обновляем сообщение
if len(full_text) % 20 == 0:
await msg.edit_text(full_text)
# Финальное обновление
await msg.edit_text(full_text)
# Обёртка для OpenAI-стрима (сделаем из генератора асинхронный)
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)
# Обработчик команды /ask
async def ask(update, context: ContextTypes.DEFAULT_TYPE):
prompt = " ".join(context.args)
if not prompt:
await update.message.reply_text("❗️ Введите текст после /ask")
return
await stream_response(prompt, context.bot, update.effective_chat.id)
# Запуск бота
def main():
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("ask", ask))
app.run_polling()
if __name__ == "__main__":
main()
💡 Советы
* Не редактируйте сообщение слишком часто — Telegram ограничивает это (лучше каждые 10–30 символов).
* Можно реализовать альтернативу через
send_chat_action для отображения typing....* Для продвинутой UX: добавьте кнопку отмены генерации и историю запросов.
Подпишись 👉🏻 @KodduuPython 🤖
🆒1
This media is not supported in your browser
VIEW IN TELEGRAM
Стриминг по коду выше 👆👆👆
Как это применяется реальных проектах читайте тут @aigentto
Подпишись 👉🏻 @KodduuPython 🤖
Как это применяется реальных проектах читайте тут @aigentto
Подпишись 👉🏻 @KodduuPython 🤖
Flood limit в Telegram возникает, когда вы:
* слишком часто отправляете сообщения одному пользователю или в один чат;
* редактируете одно и то же сообщение слишком часто (например, при стриминге LLM-ответа).
🛡 Как бороться с flood limit
✅1. Добавь `try/except` с `telegram.error.RetryAfter`
Telegram возвращает ошибку
✅ 2. Ограничь частоту `edit_message_text`
Не вызывай её слишком часто — безопасно 1 раз в 0.8–1 сек или по достижении порога символов.
📦 Пример: анти-флуд логика в Telegram-боте
🚀 Оптимальные параметры
| Метод | Безопасная частота |
| ------------------————-- | ------------------—————- |
|
|
|
Подпишись 👉🏻 @KodduuPython 🤖
* слишком часто отправляете сообщения одному пользователю или в один чат;
* редактируете одно и то же сообщение слишком часто (например, при стриминге LLM-ответа).
🛡 Как бороться с flood limit
✅1. Добавь `try/except` с `telegram.error.RetryAfter`
Telegram возвращает ошибку
RetryAfter, в которой указано, сколько секунд нужно подождать.✅ 2. Ограничь частоту `edit_message_text`
Не вызывай её слишком часто — безопасно 1 раз в 0.8–1 сек или по достижении порога символов.
📦 Пример: анти-флуд логика в Telegram-боте
import openai
import asyncio
from telegram import Bot
from telegram.ext import Application, CommandHandler, ContextTypes
from telegram.error import RetryAfter
TELEGRAM_TOKEN = "YOUR_TELEGRAM_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
openai.api_key = OPENAI_API_KEY
async def stream_response(prompt, bot: Bot, chat_id: int):
msg = await bot.send_message(chat_id=chat_id, text="⌛️ Generating...")
full_text = ""
last_edit_time = asyncio.get_event_loop().time()
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in wrap_stream(response):
delta = chunk['choices'][0].get('delta', {}).get('content')
if delta:
full_text += delta
now = asyncio.get_event_loop().time()
# Ограничим частоту до 1 обновления в секунду
if now - last_edit_time >= 1:
await safe_edit(bot, msg, full_text)
last_edit_time = now
await safe_edit(bot, msg, full_text) # Финальное обновление
async def safe_edit(bot: Bot, msg, text: str):
try:
await bot.edit_message_text(chat_id=msg.chat_id, message_id=msg.message_id, text=text)
except RetryAfter as e:
print(f"⚠️ Flood limit hit. Sleeping for {e.retry_after} seconds.")
await asyncio.sleep(e.retry_after)
await safe_edit(bot, msg, text)
except Exception as e:
print(f"⚠️ Unexpected error: {e}")
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)
async def ask(update, context: ContextTypes.DEFAULT_TYPE):
prompt = " ".join(context.args)
if not prompt:
await update.message.reply_text("❗️ Введите текст после /ask")
return
await stream_response(prompt, context.bot, update.effective_chat.id)
def main():
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("ask", ask))
app.run_polling()
if __name__ == "__main__":
main()
🚀 Оптимальные параметры
| Метод | Безопасная частота |
| ------------------————-- | ------------------—————- |
|
send_message | 30–60 сообщений/мин ||
edit_message_text | 1 раз в 0.8–1.5 сек ||
send_chat_action | не чаще 1 в 5 сек |Подпишись 👉🏻 @KodduuPython 🤖
Весна закончилась, и распродажа на Stepik тоже закончится сегодня в 23:59 🔥🔥🔥
👉 Python: самый быстрый курс 🧐
👉 Python Data Science: самый быстрый курс 🤓
👉 Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
👉 Python: самый быстрый курс 🧐
👉 Python Data Science: самый быстрый курс 🤓
👉 Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖