Kodduu Python
1.08K subscribers
311 photos
28 videos
187 links
Научись программировать на Python на интересных примерах

Самый быстрый курс https://stepik.org/a/187914
Самый нескучный курс https://stepik.org/a/185238

Во вопросам сотрудничества: @AlexErf
Download Telegram
Скидка на программу FullStack Developer and Data Scientist (Python+JS+Data) до 18 мая 🔥🔥🔥

Подпишись 👉🏻 @KodduuPython 🤖
⚡️ Мега-быстрая загрузка 10 000 файлов через aiohttp и asyncio

Если нужны реально тысячи параллельных скачиваний — только асинхронность спасёт.

Вот рабочий код:


import asyncio
import aiohttp
import aiofiles

urls = [f"https://example.com/file{i}.txt" for i in range(1, 10001)]

async def download(session, url):
try:
async with session.get(url, timeout=10) as resp:
if resp.status == 200:
fname = url.split('/')[-1]
async with aiofiles.open(fname, 'wb') as f:
await f.write(await resp.read())
except Exception as e:
print(f"Ошибка {url}: {e}")

async def main():
connector = aiohttp.TCPConnector(limit=100)
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [download(session, url) for url in urls]
await asyncio.gather(*tasks)

asyncio.run(main())


📌 Этот код:
- Открывает до 100 соединений одновременно (`limit=100`).
- Асинхронно читает сеть и сохраняет файлы без блокировки диска.
- Ловит ошибки загрузки, чтобы не падать целиком.

📦 Установка зависимостей:

pip install aiohttp aiofiles


---

Реальная сложность задачи:
- Асинхронное скачивание требует правильно держать много TCP-сессий одновременно.
- Нужно ограничивать коннекторы, чтобы не забить сеть и сервер.
- Нужно параллельно писать на диск без блокировки основного потока.

---

🔥 Что важно знать:
- Такой код может качать сотни файлов в секунду.
- Работает почти на уровне максимальной скорости вашего интернета.
- Если слишком агрессивно — можно получить 429 ("Too Many Requests") от сервера. Тогда надо добавить semaphore или задержку между запросами.

Подпишись 👉🏻 @KodduuPython 🤖
🔥3
✍️ Как за 20 строк научить Python исправлять опечатки в текстах

Мини-скрипт для автокоррекции человеческих ошибок в строках.


from textblob import TextBlob

def correct_text(text):
blob = TextBlob(text)
return str(blob.correct())

examples = [
"I havv goood speling",
"Ths is a smple txt with erors",
"Pythn is amazng!"
]

for sentence in examples:
print(f"Before: {sentence}")
print(f"After : {correct_text(sentence)}")
print()


📌 Этот код:
- Использует TextBlob для анализа и исправления текста.
- Работает из коробки на английском языке.
- Исправляет самые частые опечатки автоматически.

📦 Установка:

pip install textblob
python -m textblob.download_corpora


---

Реальная сложность задачи:
- Опечатки непредсказуемы → нужна статистическая модель исправления.
- Нужно быстро обрабатывать текст без больших моделей вроде GPT.
- Нужно уметь работать со словарями и контекстами без тонкой настройки.

---

💡 Этот трюк можно встроить в:
- Автоматические чат-боты
- Умные формы ввода
- Лингвистические анализаторы

Подпишись 👉🏻 @KodduuPython 🤖
2
Скидка 50% на программу FullStack Developer and Data Scientist (Python+JS+Data) до 18 мая 🔥🔥🔥

Подпишись 👉🏻 @KodduuPython 🤖
🚀 Как скачать 10 000 файлов быстро и без перегрева системы

Когда надо скачать тысячи файлов (логов, документов, изображений) — тупой requests.get в цикле будет очень медленным.
Решение — параллельная загрузка через пул потоков.

Вот рабочий код:


import requests
from concurrent.futures import ThreadPoolExecutor

urls = [f"https://example.com/file{i}.txt" for i in range(1, 10001)]

def download(url):
try:
r = requests.get(url, timeout=10)
if r.status_code == 200:
with open(url.split('/')[-1], 'wb') as f:
f.write(r.content)
except Exception as e:
print(f"Ошибка при скачивании {url}: {e}")

with ThreadPoolExecutor(max_workers=20) as executor:
executor.map(download, urls)


📌 Этот код:
- Скачивает до 20 файлов одновременно (параметр `max_workers`).
- Обрабатывает ошибки (например, таймауты и падение сети).
- Бережёт ресурсы: не создаёт слишком много потоков и не душит сервер.

📦 Нужно установить только requests, если ещё нет:

pip install requests


Реальная сложность задачи:
- Слишком много запросов → можно легко убить свой интернет или упасть по памяти.
- Нужна балансировка скорости и нагрузки (потому ThreadPoolExecutor с лимитом).
- Требуется аккуратно сохранять файлы и обрабатывать возможные сбои.

Подпишись 👉🏻 @KodduuPython 🤖
👍3
📌 Задача

Три тела взаимодействуют гравитационно в пустом пространстве. Мы хотим смоделировать их движение и визуализировать траектории.

## Требуемые библиотеки

pip install numpy matplotlib



## 🧠 Код: Решение и Визуализация

import numpy as np
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

G = 1.0 # Гравитационная постоянная

# Массы тел
m1, m2, m3 = 1.0, 1.0, 1.0

# Начальные координаты (x, y, z) и скорости (vx, vy, vz)
# Можно изменить под свой случай
r1 = np.array([1.0, 0.0, 0.0])
r2 = np.array([-1.0, 0.0, 0.0])
r3 = np.array([0.0, 1.0, 0.0])

v1 = np.array([0.0, 0.3, 0.0])
v2 = np.array([0.0, -0.3, 0.0])
v3 = np.array([0.0, 0.0, 0.0])

# Функция для расчета ускорений
def acceleration(ri, rj, mj):
r = rj - ri
dist = np.linalg.norm(r) + 1e-5
return G * mj * r / dist**3

# Метод Рунге-Кутты 4-го порядка
def rk4_step(r, v, m, dt):
a = np.zeros_like(r)

for i in range(3):
for j in range(3):
if i != j:
a[i] += acceleration(r[i], r[j], m[j])

r_new = r + v * dt + 0.5 * a * dt**2
v_new = v + a * dt

return r_new, v_new

# Инициализация массивов
steps = 5000
dt = 0.001

r = np.array([r1, r2, r3])
v = np.array([v1, v2, v3])
m = np.array([m1, m2, m3])

trajectories = [[], [], []]

# Основной цикл
for _ in range(steps):
for i in range(3):
trajectories[i].append(r[i].copy())

r, v = rk4_step(r, v, m, dt)

trajectories = np.array(trajectories)

# 🎥 Визуализация в 3D
fig = plt.figure(figsize=(10, 7))
ax = fig.add_subplot(111, projection='3d')

for i in range(3):
ax.plot(trajectories[i][:, 0], trajectories[i][:, 1], trajectories[i][:, 2], label=f"Body {i+1}")

ax.set_xlabel("X")
ax.set_ylabel("Y")
ax.set_zlabel("Z")
ax.set_title("Three-Body Problem Simulation")
ax.legend()
plt.tight_layout()
plt.show()


## ⚙️ Что делает код:

* Использует метод Рунге-Кутты для численного интегрирования движения.
* Визуализирует траектории всех трех тел в 3D.
* Можно изменить массы, начальные координаты и скорости для экспериментов.

Подпишись 👉🏻 @KodduuPython 🤖
This media is not supported in your browser
VIEW IN TELEGRAM
Анимация кода выше 👆👆👆

Подпишись 👉🏻 @KodduuPython 🤖
🧠 Умная CLI-загрузка: прогресс-бар с ETA, скоростью и цветами
Выглядит профессионально, пишется за 15 строк, реально полезен в любом скрипте ⚙️📦

from tqdm import tqdm
import time
from colorama import Fore, Style, init

init(autoreset=True)

for i in tqdm(range(100), desc=Fore.CYAN + "Загрузка данных" + Style.RESET_ALL,
bar_format="{l_bar}%s{bar}%s{r_bar}" % (Fore.GREEN, Style.RESET_ALL)):
time.sleep(0.05)


📌 Что делает:
— Красивый прогресс-бар в терминале
— Показывает % выполнения, ETA, скорость
— Цветной вывод через colorama, бар от tqdm

📦 Установка:

pip install tqdm colorama


🔧 Используй в своих скриптах для загрузки, скачивания, обработки файлов — это реально удобнее, чем print() и sleep() 🙌

Подпишись 👉🏻 @KodduuPython 🤖
⚙️ Умный таймер-функция: логирует время, ошибки и возвращает результат
Полезный @decorator для любых боевых функций — отслеживает время выполнения и ловит баги без лишнего кода 🔥

import time
import functools
import logging

logging.basicConfig(level=logging.INFO)

def timed(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
t0 = time.time()
try:
result = func(*args, **kwargs)
logging.info(f"{func.__name__} finished in {time.time() - t0:.2f}s")
return result
except Exception as e:
logging.exception(f"{func.__name__} failed after {time.time() - t0:.2f}s")
raise
return wrapper

# 🔧 Пример использования
@timed
def process_data():
time.sleep(1)
return " Done"

process_data()


📌 Что делает:
— Оборачивает любую функцию
— Логирует время выполнения
— Отлавливает и логирует ошибки (с трассировкой!)
— Используется в проде для мониторинга, отладки и контроля SLA

📦 Всё стандартное: time, functools, logging — ничего ставить не надо
Работает и в скриптах, и в backend API, и в пайплайнах.

Подпишись 👉🏻 @KodduuPython 🤖
🛡 Безопасный вызов стороннего API с автоповторами и таймаутом
Анти-фейл сессия для любого запроса в боевом сервисе — не упадёт и не зависнет 🚀

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

session = requests.Session()
retry = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)

# 🔧 Пример запроса
try:
response = session.get("https://api.example.com/data", timeout=3)
print(response.json())
except requests.RequestException as e:
print(" Ошибка запроса:", e)


📌 Что делает:
— Повторяет запрос до 3 раз при временных ошибках (500+)
— Использует экспоненциальную задержку (backoff_factor)
— Не виснет бесконечно — timeout=3 сек
— Работает со всеми методами: .get(), .post() и т.д.

📦 Библиотека: requests
Установка:

pip install requests


🔥 Практично для микросервисов, загрузки данных, интеграций с внешними API.
Можешь встроить прямо в боевую систему — сэкономит часы отладки и нервов.

Подпишись 👉🏻 @KodduuPython 🤖
👍1
🔐 Безопасное скачивание файла с логами и таймаутом

import requests
import logging
from pathlib import Path
from requests.exceptions import RequestException, Timeout

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def download_file(url: str, dest: Path, timeout: float = 5.0):
try:
response = requests.get(url, stream=True, timeout=timeout)
response.raise_for_status()

dest.parent.mkdir(parents=True, exist_ok=True)
with open(dest, "wb") as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)

logging.info(f" Файл сохранён: {dest.resolve()}")
except (RequestException, Timeout) as e:
logging.error(f" Ошибка при скачивании: {e}")

# Пример использования
download_file(
url="https://example.com/data/report.csv",
dest=Path("./downloads/report.csv")
)


📌 Скачивает файл по URL с таймаутом и логирует успех или ошибку. Создаёт нужные папки автоматически. Подходит для бэкенда, cron-задач, скриптов загрузки данных.

Подпишись 👉🏻 @KodduuPython 🤖
⏱️ Декоратор с логгингом времени выполнения

import time
import logging
from functools import wraps

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def log_runtime(func):
@wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = func(*args, **kwargs)
return result
finally:
duration = time.perf_counter() - start
logging.info(f"⏱️ {func.__name__} завершилась за {duration:.3f} сек")
return wrapper

# Пример использования
@log_runtime
def slow_operation():
time.sleep(1.5)
return "Done"

slow_operation()


📌 Декоратор логирует, сколько времени заняло выполнение функции. Полезно для профилирования, анализа производительности, в том числе в проде.

🛠 Ничего устанавливать не нужно — только стандартная библиотека.

Подпишись 👉🏻 @KodduuPython 🤖
🆒2👍1
📁 Блокировка файла — безопасный доступ к ресурсу без гонок

import fcntl
import time
from pathlib import Path

def with_file_lock(lock_path: Path, timeout: float = 10.0):
def decorator(func):
def wrapper(*args, **kwargs):
with open(lock_path, 'w') as lock_file:
start = time.time()
while True:
try:
fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
break
except BlockingIOError:
if time.time() - start > timeout:
raise TimeoutError("🔒 Не удалось получить файл-лок вовремя")
time.sleep(0.1)
try:
return func(*args, **kwargs)
finally:
fcntl.flock(lock_file, fcntl.LOCK_UN)
return wrapper
return decorator

# Пример использования
@with_file_lock(Path("/tmp/my_script.lock"))
def critical_section():
print("🔧 Работаю в критической секции...")
time.sleep(3)

critical_section()


📌 Защита от параллельного запуска скрипта или доступа к ресурсу. Подходит для cron, пайплайнов, обработки файлов и очередей. Работает на Unix (Linux/macOS).

🛠 Всё из коробки — только fcntl и pathlib.

Подпишись 👉🏻 @KodduuPython 🤖
📬 Мини-клиент для ретраев запросов с бэкоффом и логами

import requests
import logging
import time
from requests.exceptions import RequestException

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")

def fetch_with_retries(url, retries=3, backoff=2.0, timeout=5.0):
attempt = 0
while attempt < retries:
try:
response = requests.get(url, timeout=timeout)
response.raise_for_status()
logging.info(f" Успешно на {attempt+1} попытке")
return response.text
except RequestException as e:
logging.warning(f"⚠️ Попытка {attempt+1} не удалась: {e}")
attempt += 1
if attempt < retries:
sleep_time = backoff ** attempt
logging.info(f" Ожидаю {sleep_time:.1f} сек перед новой попыткой")
time.sleep(sleep_time)
else:
logging.error(" Все попытки исчерпаны")
raise

# Пример использования
if __name__ == "__main__":
content = fetch_with_retries("https://httpstat.us/503?sleep=1000", retries=4)


📌 HTTP-клиент с автоматическими повторами при ошибках, экспоненциальным бэкоффом и логированием. Применим для нестабильных API, ETL, микросервисов.

🛠 pip install requests — всё остальное в стандартной библиотеке.

Подпишись 👉🏻 @KodduuPython 🤖