⏱️ Декоратор с логгингом времени выполнения
📌 Декоратор логирует, сколько времени заняло выполнение функции. Полезно для профилирования, анализа производительности, в том числе в проде.
🛠 Ничего устанавливать не нужно — только стандартная библиотека.
Подпишись 👉🏻 @KodduuPython 🤖
import time
import logging
from functools import wraps
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
def log_runtime(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.perf_counter() - start
logging.info(f"⏱️ {func.__name__} завершилась за {duration:.3f} сек")
return wrapper
# Пример использования
@log_runtime
def slow_operation():
time.sleep(1.5)
return "Done"
slow_operation()
📌 Декоратор логирует, сколько времени заняло выполнение функции. Полезно для профилирования, анализа производительности, в том числе в проде.
🛠 Ничего устанавливать не нужно — только стандартная библиотека.
Подпишись 👉🏻 @KodduuPython 🤖
🆒2👍1
📁 Блокировка файла — безопасный доступ к ресурсу без гонок
📌 Защита от параллельного запуска скрипта или доступа к ресурсу. Подходит для cron, пайплайнов, обработки файлов и очередей. Работает на Unix (Linux/macOS).
🛠 Всё из коробки — только
Подпишись 👉🏻 @KodduuPython 🤖
import fcntl
import time
from pathlib import Path
def with_file_lock(lock_path: Path, timeout: float = 10.0):
def decorator(func):
def wrapper(*args, **kwargs):
with open(lock_path, 'w') as lock_file:
start = time.time()
while True:
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() - start > timeout:
raise TimeoutError("🔒 Не удалось получить файл-лок вовремя")
time.sleep(0.1)
try:
return func(*args, **kwargs)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
return wrapper
return decorator
# Пример использования
@with_file_lock(Path("/tmp/my_script.lock"))
def critical_section():
print("🔧 Работаю в критической секции...")
time.sleep(3)
critical_section()
📌 Защита от параллельного запуска скрипта или доступа к ресурсу. Подходит для cron, пайплайнов, обработки файлов и очередей. Работает на Unix (Linux/macOS).
🛠 Всё из коробки — только
fcntl и pathlib.Подпишись 👉🏻 @KodduuPython 🤖
📬 Мини-клиент для ретраев запросов с бэкоффом и логами
📌 HTTP-клиент с автоматическими повторами при ошибках, экспоненциальным бэкоффом и логированием. Применим для нестабильных API, ETL, микросервисов.
🛠
Подпишись 👉🏻 @KodduuPython 🤖
import requests
import logging
import time
from requests.exceptions import RequestException
logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
def fetch_with_retries(url, retries=3, backoff=2.0, timeout=5.0):
attempt = 0
while attempt < retries:
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
logging.info(f"✅ Успешно на {attempt+1} попытке")
return response.text
except RequestException as e:
logging.warning(f"⚠️ Попытка {attempt+1} не удалась: {e}")
attempt += 1
if attempt < retries:
sleep_time = backoff ** attempt
logging.info(f"⏳ Ожидаю {sleep_time:.1f} сек перед новой попыткой")
time.sleep(sleep_time)
else:
logging.error("❌ Все попытки исчерпаны")
raise
# Пример использования
if __name__ == "__main__":
content = fetch_with_retries("https://httpstat.us/503?sleep=1000", retries=4)
📌 HTTP-клиент с автоматическими повторами при ошибках, экспоненциальным бэкоффом и логированием. Применим для нестабильных API, ETL, микросервисов.
🛠
pip install requests — всё остальное в стандартной библиотеке.Подпишись 👉🏻 @KodduuPython 🤖
Весенняя распродажа на Stepik 🔥🔥🔥
Самое время пройти Python: самый быстрый курс 🧐
Или окунутся в мир Data Science на курсе Python Data Science: самый быстрый курс 🤓
И наконец узнать все сразу в программе курсов Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
Самое время пройти Python: самый быстрый курс 🧐
Или окунутся в мир Data Science на курсе Python Data Science: самый быстрый курс 🤓
И наконец узнать все сразу в программе курсов Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
🧠 Кеш в памяти с TTL — умный memoize-декоратор
📌 Временный кеш в памяти с автоочисткой по TTL. Ускоряет дорогие функции без внешних зависимостей. Полезно в API, CLI, веб-сервисах и чат-ботах.
🛠 Без зависимостей — всё на стандартной библиотеке.
Подпишись 👉🏻 @KodduuPython 🤖
import time
from functools import wraps
_cache = {}
def memoize_ttl(ttl: float = 60.0):
def decorator(func):
@wraps(func)
def wrapper(*args):
key = (func.__name__, args)
now = time.time()
if key in _cache:
result, timestamp = _cache[key]
if now - timestamp < ttl:
return result
result = func(*args)
_cache[key] = (result, now)
return result
return wrapper
return decorator
# Пример использования
@memoize_ttl(ttl=10)
def heavy_compute(x):
print(f"🔄 Вычисляю для {x}...")
time.sleep(2)
return x * x
# Повторные вызовы быстро
heavy_compute(4)
heavy_compute(4)
📌 Временный кеш в памяти с автоочисткой по TTL. Ускоряет дорогие функции без внешних зависимостей. Полезно в API, CLI, веб-сервисах и чат-ботах.
🛠 Без зависимостей — всё на стандартной библиотеке.
Подпишись 👉🏻 @KodduuPython 🤖
👍2
Вот минимальный пример кода на Python, демонстрирующий реализацию стриминга ответа от LLM в Telegram в реальном времени. Мы используем:
*
*
🔧 Установка зависимостей
💬 Пример кода
💡 Советы
* Не редактируйте сообщение слишком часто — Telegram ограничивает это (лучше каждые 10–30 символов).
* Можно реализовать альтернативу через
* Для продвинутой UX: добавьте кнопку отмены генерации и историю запросов.
Подпишись 👉🏻 @KodduuPython 🤖
*
OpenAI API (через stream=True) для получения ответа частями*
python-telegram-bot для отправки сообщений в Telegram🔧 Установка зависимостей
pip install openai python-telegram-bot
💬 Пример кода
import openai
import asyncio
from telegram import Bot
from telegram.ext import Application, CommandHandler, ContextTypes
# Настройки
TELEGRAM_TOKEN = "YOUR_TELEGRAM_BOT_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
openai.api_key = OPENAI_API_KEY
# ID чата можно узнать после отправки /start боту
USER_CHAT_ID = 123456789
# Основная функция генерации ответа с потоком
async def stream_response(prompt, bot: Bot, chat_id: int):
# Отправим пустое сообщение, которое будем редактировать
msg = await bot.send_message(chat_id=chat_id, text="⌛️ Generating...")
full_text = ""
# Запрос к OpenAI с потоковой генерацией
response = openai.ChatCompletion.create(
model="gpt-4", # или gpt-3.5-turbo
messages=[{"role": "user", "content": prompt}],
stream=True
)
# Обрабатываем поток
async for chunk in wrap_stream(response):
delta = chunk['choices'][0].get('delta', {}).get('content')
if delta:
full_text += delta
# Периодически обновляем сообщение
if len(full_text) % 20 == 0:
await msg.edit_text(full_text)
# Финальное обновление
await msg.edit_text(full_text)
# Обёртка для OpenAI-стрима (сделаем из генератора асинхронный)
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)
# Обработчик команды /ask
async def ask(update, context: ContextTypes.DEFAULT_TYPE):
prompt = " ".join(context.args)
if not prompt:
await update.message.reply_text("❗️ Введите текст после /ask")
return
await stream_response(prompt, context.bot, update.effective_chat.id)
# Запуск бота
def main():
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("ask", ask))
app.run_polling()
if __name__ == "__main__":
main()
💡 Советы
* Не редактируйте сообщение слишком часто — Telegram ограничивает это (лучше каждые 10–30 символов).
* Можно реализовать альтернативу через
send_chat_action для отображения typing....* Для продвинутой UX: добавьте кнопку отмены генерации и историю запросов.
Подпишись 👉🏻 @KodduuPython 🤖
🆒1
This media is not supported in your browser
VIEW IN TELEGRAM
Стриминг по коду выше 👆👆👆
Как это применяется реальных проектах читайте тут @aigentto
Подпишись 👉🏻 @KodduuPython 🤖
Как это применяется реальных проектах читайте тут @aigentto
Подпишись 👉🏻 @KodduuPython 🤖
Flood limit в Telegram возникает, когда вы:
* слишком часто отправляете сообщения одному пользователю или в один чат;
* редактируете одно и то же сообщение слишком часто (например, при стриминге LLM-ответа).
🛡 Как бороться с flood limit
✅1. Добавь `try/except` с `telegram.error.RetryAfter`
Telegram возвращает ошибку
✅ 2. Ограничь частоту `edit_message_text`
Не вызывай её слишком часто — безопасно 1 раз в 0.8–1 сек или по достижении порога символов.
📦 Пример: анти-флуд логика в Telegram-боте
🚀 Оптимальные параметры
| Метод | Безопасная частота |
| ------------------————-- | ------------------—————- |
|
|
|
Подпишись 👉🏻 @KodduuPython 🤖
* слишком часто отправляете сообщения одному пользователю или в один чат;
* редактируете одно и то же сообщение слишком часто (например, при стриминге LLM-ответа).
🛡 Как бороться с flood limit
✅1. Добавь `try/except` с `telegram.error.RetryAfter`
Telegram возвращает ошибку
RetryAfter, в которой указано, сколько секунд нужно подождать.✅ 2. Ограничь частоту `edit_message_text`
Не вызывай её слишком часто — безопасно 1 раз в 0.8–1 сек или по достижении порога символов.
📦 Пример: анти-флуд логика в Telegram-боте
import openai
import asyncio
from telegram import Bot
from telegram.ext import Application, CommandHandler, ContextTypes
from telegram.error import RetryAfter
TELEGRAM_TOKEN = "YOUR_TELEGRAM_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_API_KEY"
openai.api_key = OPENAI_API_KEY
async def stream_response(prompt, bot: Bot, chat_id: int):
msg = await bot.send_message(chat_id=chat_id, text="⌛️ Generating...")
full_text = ""
last_edit_time = asyncio.get_event_loop().time()
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in wrap_stream(response):
delta = chunk['choices'][0].get('delta', {}).get('content')
if delta:
full_text += delta
now = asyncio.get_event_loop().time()
# Ограничим частоту до 1 обновления в секунду
if now - last_edit_time >= 1:
await safe_edit(bot, msg, full_text)
last_edit_time = now
await safe_edit(bot, msg, full_text) # Финальное обновление
async def safe_edit(bot: Bot, msg, text: str):
try:
await bot.edit_message_text(chat_id=msg.chat_id, message_id=msg.message_id, text=text)
except RetryAfter as e:
print(f"⚠️ Flood limit hit. Sleeping for {e.retry_after} seconds.")
await asyncio.sleep(e.retry_after)
await safe_edit(bot, msg, text)
except Exception as e:
print(f"⚠️ Unexpected error: {e}")
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)
async def ask(update, context: ContextTypes.DEFAULT_TYPE):
prompt = " ".join(context.args)
if not prompt:
await update.message.reply_text("❗️ Введите текст после /ask")
return
await stream_response(prompt, context.bot, update.effective_chat.id)
def main():
app = Application.builder().token(TELEGRAM_TOKEN).build()
app.add_handler(CommandHandler("ask", ask))
app.run_polling()
if __name__ == "__main__":
main()
🚀 Оптимальные параметры
| Метод | Безопасная частота |
| ------------------————-- | ------------------—————- |
|
send_message | 30–60 сообщений/мин ||
edit_message_text | 1 раз в 0.8–1.5 сек ||
send_chat_action | не чаще 1 в 5 сек |Подпишись 👉🏻 @KodduuPython 🤖
Весна закончилась, и распродажа на Stepik тоже закончится сегодня в 23:59 🔥🔥🔥
👉 Python: самый быстрый курс 🧐
👉 Python Data Science: самый быстрый курс 🤓
👉 Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
👉 Python: самый быстрый курс 🧐
👉 Python Data Science: самый быстрый курс 🤓
👉 Junior Python Developer и Data Scientist +интервью тест 🧑🎓
Подпишись 👉🏻 @KodduuPython 🤖
Ниже пример, как реализовать стриминг из LLM в Telegram с защитой от flood limit с помощью `aiogram`, и объяснение, почему `aiogram` лучше для продвинутых Telegram-ботов.
📦 Установка
📜 Пример: aiogram + OpenAI Streaming + Flood Control
✅ Что ты получаешь с
* 🌐 Полностью async на
* 🧠 Легко масштабируется с
* 🚀 Совместим с FastAPI и BackgroundTasks.
* 📉 Безопасен к flood limit через
Подпишись 👉🏻 @KodduuPython 🤖
📦 Установка
pip install openai aiogram
📜 Пример: aiogram + OpenAI Streaming + Flood Control
import asyncio
import logging
from aiogram import Bot, Dispatcher, types
from aiogram.types import Message
from aiogram.filters import Command
from aiogram.exceptions import TelegramRetryAfter
import openai
# Настройки
BOT_TOKEN = "YOUR_TELEGRAM_TOKEN"
OPENAI_API_KEY = "YOUR_OPENAI_KEY"
openai.api_key = OPENAI_API_KEY
# Логгирование
logging.basicConfig(level=logging.INFO)
bot = Bot(token=BOT_TOKEN)
dp = Dispatcher()
# Обертка для стриминга OpenAI
async def wrap_stream(generator):
loop = asyncio.get_event_loop()
for chunk in generator:
yield await loop.run_in_executor(None, lambda: chunk)
# Функция безопасного редактирования (борьба с flood)
async def safe_edit(message: Message, text: str):
try:
await message.edit_text(text)
except TelegramRetryAfter as e:
logging.warning(f"Flood control! Sleep for {e.timeout} seconds.")
await asyncio.sleep(e.timeout)
await safe_edit(message, text)
except Exception as e:
logging.warning(f"Unexpected error during edit: {e}")
# Основная логика генерации и стриминга
@dp.message(Command("ask"))
async def handle_ask(message: Message):
prompt = message.text.replace("/ask", "").strip()
if not prompt:
await message.reply("❗️ Пожалуйста, укажи запрос после /ask")
return
reply = await message.answer("⌛️ Думаю...")
full_text = ""
last_edit = asyncio.get_event_loop().time()
response = openai.ChatCompletion.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}],
stream=True
)
async for chunk in wrap_stream(response):
delta = chunk["choices"][0].get("delta", {}).get("content")
if delta:
full_text += delta
now = asyncio.get_event_loop().time()
if now - last_edit > 1:
await safe_edit(reply, full_text)
last_edit = now
await safe_edit(reply, full_text)
# Запуск бота
async def main():
await dp.start_polling(bot)
if __name__ == "__main__":
asyncio.run(main())
✅ Что ты получаешь с
aiogram:* 🌐 Полностью async на
asyncio — эффективнее и отзывчивее.* 🧠 Легко масштабируется с
middlewares, filters, states.* 🚀 Совместим с FastAPI и BackgroundTasks.
* 📉 Безопасен к flood limit через
TelegramRetryAfter.Подпишись 👉🏻 @KodduuPython 🤖
👨💻1
Вот 🔥 очень простой и наглядный пример асинхронности в Python с
⚡️ Пример: Асинхронный vs Синхронный вызов
🔍 Что происходит?
* Синхронно: задачи идут одна за другой → итог 4 секунды.
* Асинхронно: обе задачи запускаются одновременно → итог 2 секунды.
---
## 📦 Вывод для Telegram-поста
Подпишись 👉🏻 @KodduuPython 🤖
asyncio. Демонстрация, как async работает быстрее, чем обычный sleep.⚡️ Пример: Асинхронный vs Синхронный вызов
import asyncio
import time
# Синхронная версия
def sync_task(name, delay):
print(f"[{name}] Начинаю задачу")
time.sleep(delay)
print(f"[{name}] Закончил через {delay} сек")
# Асинхронная версия
async def async_task(name, delay):
print(f"[{name}] Начинаю задачу")
await asyncio.sleep(delay)
print(f"[{name}] Закончил через {delay} сек")
# Сравнение
def run_sync():
start = time.time()
sync_task("A", 2)
sync_task("B", 2)
print(f"⏱️ Синхронно заняло {time.time() - start:.2f} сек")
async def run_async():
start = time.time()
await asyncio.gather(
async_task("A", 2),
async_task("B", 2),
)
print(f"⏱️ Асинхронно заняло {time.time() - start:.2f} сек")
# Запуск
if __name__ == "__main__":
print("=== Синхронный запуск ===")
run_sync()
print("\n=== Асинхронный запуск ===")
asyncio.run(run_async())
🔍 Что происходит?
* Синхронно: задачи идут одна за другой → итог 4 секунды.
* Асинхронно: обе задачи запускаются одновременно → итог 2 секунды.
---
## 📦 Вывод для Telegram-поста
Синхронность:
[Task A] Начинаю задачу
[Task A] Закончил через 2 сек
[Task B] Начинаю задачу
[Task B] Закончил через 2 сек
⏱️ Синхронно заняло 4.00 сек
Асинхронность:
[Task A] Начинаю задачу
[Task B] Начинаю задачу
[Task A] Закончил через 2 сек
[Task B] Закончил через 2 сек
⏱️ Асинхронно заняло 2.00 сек
Подпишись 👉🏻 @KodduuPython 🤖
Вот второй, тоже 🔥наглядный пример: асинхронная загрузка нескольких URLов. Он отлично показывает, как
⚡️ Пример: Асинхронная загрузка страниц
🧠 Что это демонстрирует
* Каждое
* Даже если 1 сайт тормозит (например,
* Всё работает одновременно, а не "по очереди".
📦 Как это будет выглядеть в Telegram
Если бы это был синхронный код — заняло бы 1 + 2 + 3 = 6 секунд, а не 3. Вот и вся магия асинхронности ✨
Подпишись 👉🏻 @KodduuPython 🤖
asyncio + aiohttp позволяет скачать сразу кучу страниц в разы быстрее, чем по одной.⚡️ Пример: Асинхронная загрузка страниц
import asyncio
import aiohttp
import time
urls = [
"https://example.com",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
"https://httpbin.org/delay/1",
]
# Асинхронная загрузка
async def fetch(session, url):
print(f"📡 Загружаю: {url}")
async with session.get(url) as response:
text = await response.text()
print(f"✅ Готово: {url} ({len(text)} символов)")
return text
async def main():
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
await asyncio.gather(*tasks)
print(f"\n⏱️ Все загружено за {time.time() - start:.2f} сек")
if __name__ == "__main__":
asyncio.run(main())
🧠 Что это демонстрирует
* Каждое
fetch() ждет ответ от сервера, но другие задачи не простаивают.* Даже если 1 сайт тормозит (например,
delay/3), остальные уже выполняются.* Всё работает одновременно, а не "по очереди".
📦 Как это будет выглядеть в Telegram
📡 Загружаю: https://example.com
📡 Загружаю: https://httpbin.org/delay/2
📡 Загружаю: https://httpbin.org/delay/3
📡 Загружаю: https://httpbin.org/delay/1
✅ Готово: https://example.com (1256 символов)
✅ Готово: https://httpbin.org/delay/1 (437 символов)
✅ Готово: https://httpbin.org/delay/2 (437 символов)
✅ Готово: https://httpbin.org/delay/3 (437 символов)
⏱️ Все загружено за 3.00 сек
Если бы это был синхронный код — заняло бы 1 + 2 + 3 = 6 секунд, а не 3. Вот и вся магия асинхронности ✨
Подпишись 👉🏻 @KodduuPython 🤖
🎯 Как встроить семантический кэш в RAG и перестать зря гонять LLM
Ускорь ответы, сократи расходы и добавь ум в своего ассистента! 💡
Вот реальный пример кода, который:
* 🔐 Сохраняет пары "вопрос — ответ"
* 🧠 Находит похожие вопросы через cosine similarity
* ⚡️ Возвращает готовый ответ без вызова модели или retrieval
👇 Код:
🧠 Как это помогает в RAG:
* 💥 Если похожий вопрос уже был — мы возвращаем ответ мгновенно
* 💸 Экономим токены и не делаем повторный вызов LLM
* 📚 Учится на лету — чем больше вопросов, тем больше кэш
Пример использования:
Подпишись 👉🏻 @KodduuPython 🤖
Ускорь ответы, сократи расходы и добавь ум в своего ассистента! 💡
Вот реальный пример кода, который:
* 🔐 Сохраняет пары "вопрос — ответ"
* 🧠 Находит похожие вопросы через cosine similarity
* ⚡️ Возвращает готовый ответ без вызова модели или retrieval
👇 Код:
from sentence_transformers import SentenceTransformer, util
import re
import hashlib
import logging
logger = logging.getLogger(__name__)
def short_hash(text: str) -> str:
return hashlib.md5(text.encode("utf-8")).hexdigest()[:10]
class SemanticCache:
def __init__(self, model_name="paraphrase-multilingual-mpnet-base-v2"):
self.model = SentenceTransformer(model_name)
self.entries = [] # список словарей {"question": ..., "answer": ..., "embedding": ..., "hash": ...}
self.hash_map = {} # hash → normalized question
def normalize(self, text: str) -> str:
text = text.lower()
text = re.sub(r"[^\w\s]", "", text)
text = re.sub(r"\s+", " ", text).strip()
return text
def add(self, question: str, answer: str) -> str:
normalized = self.normalize(question)
embedding = self.model.encode(normalized, convert_to_tensor=True)
h = short_hash(normalized)
self.entries.append({
"question": question,
"normalized": normalized,
"answer": answer,
"embedding": embedding,
"hash": h
})
self.hash_map[h] = normalized
return h
def get(self, query: str, threshold: float = 0.89) -> str | None:
if not self.entries:
return None
normalized_query = self.normalize(query)
query_embedding = self.model.encode(normalized_query, convert_to_tensor=True)
scores = [util.pytorch_cos_sim(query_embedding, e["embedding"]).item() for e in self.entries]
best_idx = max(range(len(scores)), key=lambda i: scores[i])
if scores[best_idx] >= threshold:
return self.entries[best_idx]["answer"]
return None
def remove_by_hash(self, h: str):
before = len(self.entries)
self.entries = [e for e in self.entries if e["hash"] != h]
after = len(self.entries)
if before == after:
logger.info(f"❌ Хэш {h} не найден.")
else:
logger.info(f"🗑 Удалено {before - after} записей по хэшу {h}")
self.hash_map.pop(h, None)
🧠 Как это помогает в RAG:
* 💥 Если похожий вопрос уже был — мы возвращаем ответ мгновенно
* 💸 Экономим токены и не делаем повторный вызов LLM
* 📚 Учится на лету — чем больше вопросов, тем больше кэш
Пример использования:
cache = SemanticCache()
cache.add("Как работает кэш?", "Кэш хранит ответы на похожие вопросы.")
answer = cache.get("Объясни что такое кеширование") # ← вернёт готовый ответ
Подпишись 👉🏻 @KodduuPython 🤖
Forwarded from AIGENTTO
Новый обязательный вопрос на интервью разработчиков
Сейчас ведём найм backend-разработчиков, и на screening-интервью я добавил новый обязательный вопрос: А как вы используете LLM или IDE с LLM для написания кода?
Отвечать на такой вопрос — почти не использую, всё пишу сам — это примерно как отвечать 100 лет назад, что автомобиль пока не использую, лошадь надёжнее и понятнее.
LLM и всякие Cursor, Anthropic уже несколько лет с нами, вы просто обязаны их использовать для увеличения своей производительности 🚀
👉 Первый уровень — это обычно просто web-чат с ChatGPT по конкретному коду
👉 Второй уровень — это IDE с настройкой system prompt под свой проект (чтобы оно не тупило и не делало плохие штуки)
👉 Третий уровень — это подключение всяких JIRA, Wiki через MCP, чтобы бизнес-документация сама бралась в LLM, и задачки в JIRA сами двигались по workflow
Второй и третий уровень требуют усилий по настройке и тюнингу под ваши проекты, но если вы этого не сделаете, то и в XXI веке всё ещё будете использовать надёжную, хорошую, проверенную веками лошадь 🐴
Подпишись 👉🏻 @aigentto 🤖
Сейчас ведём найм backend-разработчиков, и на screening-интервью я добавил новый обязательный вопрос: А как вы используете LLM или IDE с LLM для написания кода?
Отвечать на такой вопрос — почти не использую, всё пишу сам — это примерно как отвечать 100 лет назад, что автомобиль пока не использую, лошадь надёжнее и понятнее.
LLM и всякие Cursor, Anthropic уже несколько лет с нами, вы просто обязаны их использовать для увеличения своей производительности 🚀
👉 Первый уровень — это обычно просто web-чат с ChatGPT по конкретному коду
👉 Второй уровень — это IDE с настройкой system prompt под свой проект (чтобы оно не тупило и не делало плохие штуки)
👉 Третий уровень — это подключение всяких JIRA, Wiki через MCP, чтобы бизнес-документация сама бралась в LLM, и задачки в JIRA сами двигались по workflow
Второй и третий уровень требуют усилий по настройке и тюнингу под ваши проекты, но если вы этого не сделаете, то и в XXI веке всё ещё будете использовать надёжную, хорошую, проверенную веками лошадь 🐴
Подпишись 👉🏻 @aigentto 🤖
🔥3❤2
Вот третий 🔥наглядный пример — асинхронный Telegram-бот на `aiogram`, который параллельно обрабатывает несколько пользователей. Отлично показывает, как
🤖 Асинхронный Telegram-бот на
⚙️ Как это работает
1. Пользователь пишет
2. Через 5 секунд отвечает "✅ Готово!"
3. Если 10 пользователей напишут одновременно — каждый получит ответ без задержек, параллельно.
🧪 Проверь сам
1. Запусти бота.
2. Открой 2 Telegram-аккаунта (или попроси друга).
3. Одновременно напиши
4. Каждый увидит мгновенный ответ, без блокировки друг друга.
🤔 Почему это важно
Если бы бот был синхронным, то пока обрабатывался один запрос (5 сек), все остальные бы тупо ждали. А с
Подпишись 👉🏻 @KodduuPython 🤖
asyncio даёт каждому пользователю "свою сессию", и никто не блокируется.🤖 Асинхронный Telegram-бот на
aiogramfrom aiogram import Bot, Dispatcher, types
from aiogram.types import Message
from aiogram.utils import executor
import asyncio
import logging
API_TOKEN = 'YOUR_TELEGRAM_BOT_TOKEN'
logging.basicConfig(level=logging.INFO)
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
# Эмуляция долгой операции (например, генерация отчета)
async def long_operation(user_id: int):
await bot.send_message(user_id, "⌛️ Обрабатываю ваш запрос (5 сек)...")
await asyncio.sleep(5)
await bot.send_message(user_id, "✅ Готово!")
@dp.message_handler(commands=["start", "go"])
async def handle_command(message: Message):
await long_operation(message.from_user.id)
if __name__ == "__main__":
executor.start_polling(dp, skip_updates=True)
⚙️ Как это работает
1. Пользователь пишет
/start — бот отвечает "⌛️ Обрабатываю..."2. Через 5 секунд отвечает "✅ Готово!"
3. Если 10 пользователей напишут одновременно — каждый получит ответ без задержек, параллельно.
🧪 Проверь сам
1. Запусти бота.
2. Открой 2 Telegram-аккаунта (или попроси друга).
3. Одновременно напиши
/start.4. Каждый увидит мгновенный ответ, без блокировки друг друга.
🤔 Почему это важно
Если бы бот был синхронным, то пока обрабатывался один запрос (5 сек), все остальные бы тупо ждали. А с
aiogram всё идёт в параллель, благодаря async def.Подпишись 👉🏻 @KodduuPython 🤖
❤1
Вот пример сложного и реалистичного случая асинхронщины, где всё может пойти не так — и это отлично демонстрирует, почему нужно уметь работать с ошибками, таймаутами и отменой задач в `asyncio`.
💣 Сценарий: параллельная загрузка API, где всё может сломаться
* Несколько API-эндпоинтов вызываются параллельно.
* Некоторые из них "висят", некоторые возвращают ошибку.
* Ты должен:
✅ получить успешные,
⚠️ обработать ошибки,
⏱️ отменить долгие.
⚙️ Код: асинхронная загрузка с таймаутами, отменой и ошибками
📦 Telegram-вывод:
🧠 Что демонстрирует
* 🔄
* ⏱️ Таймауты контролируют "висящие" запросы.
* ❌ Ошибки ловятся явно, без падения всего процесса.
* 🤯 Этот паттерн нужен в реальных ботах, где:
* LLM может зависнуть,
* вебхуки не отвечают,
* юзер отменил команду,
* Slack/Jira/GitHub API дали 500.
Подпишись 👉🏻 @KodduuPython 🤖
💣 Сценарий: параллельная загрузка API, где всё может сломаться
* Несколько API-эндпоинтов вызываются параллельно.
* Некоторые из них "висят", некоторые возвращают ошибку.
* Ты должен:
✅ получить успешные,
⚠️ обработать ошибки,
⏱️ отменить долгие.
⚙️ Код: асинхронная загрузка с таймаутами, отменой и ошибками
import asyncio
import aiohttp
import random
urls = [
"https://httpbin.org/delay/1", # быстрый
"https://httpbin.org/delay/5", # медленный (возможен таймаут)
"https://httpbin.org/status/500", # ошибка сервера
"https://httpbin.org/delay/2", # нормальный
]
async def fetch(session, url):
try:
timeout = aiohttp.ClientTimeout(total=3) # 3 секунды таймаут на каждый запрос
async with session.get(url, timeout=timeout) as resp:
if resp.status != 200:
raise Exception(f"Ошибка {resp.status}")
data = await resp.text()
print(f"✅ Успешно: {url[:30]}... ({len(data)} символов)")
return data
except asyncio.TimeoutError:
print(f"⏱️ Таймаут: {url}")
except Exception as e:
print(f"❌ Ошибка: {url} — {e}")
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
print("\n🏁 Готово.")
if __name__ == "__main__":
asyncio.run(main())
📦 Telegram-вывод:
✅ Успешно: https://httpbin.org/delay/1... (437 символов)
⏱️ Таймаут: https://httpbin.org/delay/5
❌ Ошибка: https://httpbin.org/status/500 — Ошибка 500
✅ Успешно: https://httpbin.org/delay/2... (437 символов)
🏁 Готово.
🧠 Что демонстрирует
* 🔄
gather(..., return_exceptions=True) позволяет не падать при ошибке.* ⏱️ Таймауты контролируют "висящие" запросы.
* ❌ Ошибки ловятся явно, без падения всего процесса.
* 🤯 Этот паттерн нужен в реальных ботах, где:
* LLM может зависнуть,
* вебхуки не отвечают,
* юзер отменил команду,
* Slack/Jira/GitHub API дали 500.
Подпишись 👉🏻 @KodduuPython 🤖
❤2
🌌 Крутая 3D-спираль на Python с Pygame — живой движок прямо у тебя на экране!
📌 Простая 3D-спираль с проекцией на 2D экран, вращающаяся в реальном времени.
📌 Использует Pygame для визуализации и базовую математику для 3D-проекций.
📌 Установи pygame: pip install pygame
📌 Запусти и наслаждайся плавной анимацией!
Подпишись 👉🏻 @KodduuPython 🤖
import pygame
import math
import sys
pygame.init()
screen = pygame.display.set_mode((600, 600))
clock = pygame.time.Clock()
points = []
for i in range(1000):
angle = i * 0.1
x = 200 * math.cos(angle)
y = 200 * math.sin(angle)
z = i * 0.5
points.append([x, y, z])
def project(x, y, z, screen_width, screen_height, fov, viewer_distance):
factor = fov / (viewer_distance + z)
x = x * factor + screen_width / 2
y = -y * factor + screen_height / 2
return int(x), int(y)
angle = 0
fov = 256
viewer_distance = 4
while True:
for event in pygame.event.get():
if event.type == pygame.QUIT:
pygame.quit()
sys.exit()
screen.fill((10, 10, 30))
angle += 0.01
for p in points:
# вращаем точки вокруг оси Y
x = p[0] * math.cos(angle) - p[2] * math.sin(angle)
z = p[0] * math.sin(angle) + p[2] * math.cos(angle)
y = p[1]
px, py = project(x, y, z, 600, 600, fov, viewer_distance)
shade = max(0, min(255, int(255 - z)))
pygame.draw.circle(screen, (shade, shade, 255), (px, py), 2)
pygame.display.flip()
clock.tick(60)
📌 Простая 3D-спираль с проекцией на 2D экран, вращающаяся в реальном времени.
📌 Использует Pygame для визуализации и базовую математику для 3D-проекций.
📌 Установи pygame: pip install pygame
📌 Запусти и наслаждайся плавной анимацией!
Подпишись 👉🏻 @KodduuPython 🤖
❤1⚡1