Kodduu Python
1.08K subscribers
311 photos
28 videos
187 links
Научись программировать на Python на интересных примерах

Самый быстрый курс https://stepik.org/a/187914
Самый нескучный курс https://stepik.org/a/185238

Во вопросам сотрудничества: @AlexErf
Download Telegram
🧹 Очистка временных файлов по расписанию


import os
import time
import shutil

TEMP_DIR = '/tmp/my_app_cache'
MAX_FILE_AGE = 60 * 60 * 24 # 1 день в секундах
CLEAN_INTERVAL = 60 * 10 # каждые 10 минут

def cleanup_old_files():
now = time.time()
for filename in os.listdir(TEMP_DIR):
filepath = os.path.join(TEMP_DIR, filename)
try:
if os.path.isfile(filepath) and now - os.path.getmtime(filepath) > MAX_FILE_AGE:
os.remove(filepath)
elif os.path.isdir(filepath) and now - os.path.getmtime(filepath) > MAX_FILE_AGE:
shutil.rmtree(filepath)
except Exception as e:
print(f"Ошибка при удалении {filepath}: {e}")

while True:
cleanup_old_files()
time.sleep(CLEAN_INTERVAL)


📌 Что делает:
Автоматически чистит устаревшие файлы и папки из временного каталога, чтобы кэш не разрастался. Полезно для бэкендов, аналитических скриптов и ML-сервисов.

🛠 Никаких зависимостей — работает из коробки.

Подпишись 👉🏻 @KodduuPython 🤖
👍3
📬 Автоответчик на email c фильтрацией темы (IMAP + SMTP)


import imaplib
import smtplib
import email
from email.mime.text import MIMEText

IMAP_HOST = 'imap.gmail.com'
SMTP_HOST = 'smtp.gmail.com'
EMAIL_USER = 'your.email@gmail.com'
EMAIL_PASS = 'your_app_password'
SUBJECT_KEYWORD = 'urgent'

def check_and_reply():
mail = imaplib.IMAP4_SSL(IMAP_HOST)
mail.login(EMAIL_USER, EMAIL_PASS)
mail.select('inbox')

_, data = mail.search(None, 'UNSEEN')
for num in data[0].split():
_, msg_data = mail.fetch(num, '(RFC822)')
msg = email.message_from_bytes(msg_data[0][1])
subject = msg.get('Subject', '')
from_addr = email.utils.parseaddr(msg.get('From'))[1]

if SUBJECT_KEYWORD.lower() in subject.lower():
reply = MIMEText("Спасибо за письмо! Мы свяжемся с вами в ближайшее время.")
reply['Subject'] = f"Re: {subject}"
reply['From'] = EMAIL_USER
reply['To'] = from_addr

with smtplib.SMTP_SSL(SMTP_HOST, 465) as smtp:
smtp.login(EMAIL_USER, EMAIL_PASS)
smtp.send_message(reply)

mail.logout()

check_and_reply()


📌 Что делает:
Проверяет входящие письма, и если в теме есть нужное ключевое слово (например, `urgent`), автоматически шлёт вежливый ответ. Полезно для поддержки, HR, заявок.

🛠


pip install secure-smtplib


Для Gmail нужно создать [App Password](https://support.google.com/accounts/answer/185833?hl=ru).

Подпишись 👉🏻 @KodduuPython 🤖
👍2
🔐 Автообновление HTTPS-сертификатов Let's Encrypt (через Telegram)


import os
import subprocess
import requests

BOT_TOKEN = 'YOUR_TELEGRAM_BOT_TOKEN'
CHAT_ID = 'YOUR_CHAT_ID'
DOMAIN = 'yourdomain.com'

def send_alert(msg):
requests.post(
f'https://a.tg.goldica.ir/b0dd72633a60ad0070e10de7b12c5322/bot{BOT_TOKEN}/sendMessage',
data={'chat_id': CHAT_ID, 'text': msg}
)

def cert_days_left(domain):
result = subprocess.run(
['openssl', 's_client', '-connect', f'{domain}:443', '-servername', domain],
input=b'', stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, timeout=10
)
cert_start = result.stdout.split(b'-----BEGIN CERTIFICATE-----')[1]
with open('/tmp/temp_cert.pem', 'wb') as f:
f.write(b'-----BEGIN CERTIFICATE-----' + cert_start)

out = subprocess.check_output(['openssl', 'x509', '-enddate', '-noout', '-in', '/tmp/temp_cert.pem'])
end_str = out.decode().strip().split('=')[1]
expire_ts = int(subprocess.check_output(['date', '-d', end_str, '+%s']).strip())
now_ts = int(subprocess.check_output(['date', '+%s']).strip())
return (expire_ts - now_ts) // 86400

days = cert_days_left(DOMAIN)
if days < 7:
send_alert(f'⚠️ Сертификат {DOMAIN} истекает через {days} дней! Пора обновить.')


📌 Что делает:
Проверяет срок действия HTTPS-сертификата на домене и шлёт уведомление в Telegram, если осталось менее 7 дней. Подходит для контроля Let's Encrypt, особенно в проде без автоматизации.

🛠


apt install openssl
pip install requests


Можно добавить в cron — и никогда не пропустить прод-фейл из-за просроченного TLS.

Подпишись 👉🏻 @KodduuPython 🤖
👍3
📈 Лимит запросов на IP — защита Flask API от спама


from flask import Flask, request, jsonify
from collections import defaultdict
import time

app = Flask(__name__)
RATE_LIMIT = 100 # запросов
TIME_WINDOW = 60 # секунд

requests_log = defaultdict(list)

@app.before_request
def rate_limiter():
ip = request.remote_addr
now = time.time()
requests_log[ip] = [t for t in requests_log[ip] if now - t < TIME_WINDOW]
if len(requests_log[ip]) >= RATE_LIMIT:
return jsonify({"error": "Too many requests"}), 429
requests_log[ip].append(now)

@app.route('/ping')
def ping():
return jsonify({"status": "ok"})

if __name__ == '__main__':
app.run()


📌 Что делает:
Добавляет простейший rate limit в ваш Flask-сервер без сторонних библиотек. Помогает защититься от DDoS, ботов и неадекватных клиентов.

🛠


pip install flask


Можно обернуть в gunicorn + nginx или применить в микросервисах.

Подпишись 👉🏻 @KodduuPython 🤖
👍1
🧾 Парсинг счетов и извлечение сумм из PDF (инвойсы, акты)


import re
import pdfplumber

def extract_amounts_from_pdf(file_path):
amounts = []
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
text = page.extract_text()
matches = re.findall(r'(\d{1,3}(?:[ \d]{0,3})*\d,\d{2})\s*₽', text)
cleaned = [m.replace(' ', '').replace(',', '.') for m in matches]
amounts.extend(map(float, cleaned))
return amounts

if __name__ == "__main__":
pdf_file = 'invoice_2025_07.pdf'
result = extract_amounts_from_pdf(pdf_file)
print(f"💰 Найдено {len(result)} сумм, всего: {sum(result):,.2f} ₽")


📌 Что делает:
Читает PDF-файл с инвойсом или актом, ищет все суммы в рублях (вида `12 345,67 ₽`), приводит к float и считает итог. Ускоряет бухгалтерам и финтех-продуктам проверку PDF-документов.

🛠


pip install pdfplumber


Работает с любыми PDF, где текст не зашит в картинку (для OCR можно добавить `pytesseract`).

Подпишись 👉🏻 @KodduuPython 🤖
Forwarded from AIGENTTO
Мягкие и жесткие гейты валидации LLM

В рамках RAG или ИИ-агентов часто возникает вопрос — а как понять, что LLM вернула правильный ответ?

Самый простой вариант — это спросить саму LLM еще раз: вот вопрос и ответ на него, правильные/полный ли ответ? Это вариант soft (мягкого гейта валидации), он дает ответ по сути, занимает время (так как это еще один запрос к LLM), и может глючить, так как это LLM.

Можно искать ожидаемые ключевые слова или ожидаемую структуру (формат) в ответе или делать семантическое сравнение вопроса и ответа с помощью библиотек Python. Это будет hard (жесткий гейт валидации). Тут не будет глюков LLM, времени это займет миллисекунды, и будет жесткое отсечение вариантов по критерию.

Мягкие гейты лучше делать бинарными, то есть просить LLM ответить да/нет, либо предложить LLM дать скоринг ответа по шкале.

Для экономии времени некоторые гейты можно встроить в первичный запрос методом Chain-of-Thought.

Подпишись 👉🏻 @aigentto 🤖
🗓 Автозапись задач в Google Calendar из списка дел


from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from datetime import datetime, timedelta

SCOPES = ['https://www.googleapis.com/auth/calendar']
SERVICE_ACCOUNT_FILE = 'credentials.json'
CALENDAR_ID = 'primary'

todo_list = [
{"task": "Сдать отчёт", "start_in_minutes": 15, "duration": 30},
{"task": "Позвонить клиенту", "start_in_minutes": 60, "duration": 15},
]

def add_event(service, summary, start_time, duration_minutes):
end_time = start_time + timedelta(minutes=duration_minutes)
event = {
'summary': summary,
'start': {'dateTime': start_time.isoformat(), 'timeZone': 'Europe/Moscow'},
'end': {'dateTime': end_time.isoformat(), 'timeZone': 'Europe/Moscow'}
}
service.events().insert(calendarId=CALENDAR_ID, body=event).execute()

def main():
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
service = build('calendar', 'v3', credentials=creds)
now = datetime.now()
for item in todo_list:
start_time = now + timedelta(minutes=item['start_in_minutes'])
add_event(service, item['task'], start_time, item['duration'])

if __name__ == '__main__':
main()


📌 Что делает:
Берёт список задач с указанием времени и автоматически добавляет их в ваш Google Calendar. Идеально для ассистентов, скриптов автопланирования и напоминалок.

🛠


pip install google-api-python-client google-auth


Нужен [service account JSON](https://console.cloud.google.com/), доступ к календарю и расшаривание CALENDAR_ID.
🗓 Автозапись задач в Google Calendar из списка дел


from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from datetime import datetime, timedelta

SCOPES = ['https://www.googleapis.com/auth/calendar']
SERVICE_ACCOUNT_FILE = 'credentials.json'
CALENDAR_ID = 'primary'

todo_list = [
{"task": "Сдать отчёт", "start_in_minutes": 15, "duration": 30},
{"task": "Позвонить клиенту", "start_in_minutes": 60, "duration": 15},
]

def add_event(service, summary, start_time, duration_minutes):
end_time = start_time + timedelta(minutes=duration_minutes)
event = {
'summary': summary,
'start': {'dateTime': start_time.isoformat(), 'timeZone': 'Europe/Moscow'},
'end': {'dateTime': end_time.isoformat(), 'timeZone': 'Europe/Moscow'}
}
service.events().insert(calendarId=CALENDAR_ID, body=event).execute()

def main():
creds = Credentials.from_service_account_file(SERVICE_ACCOUNT_FILE, scopes=SCOPES)
service = build('calendar', 'v3', credentials=creds)
now = datetime.now()
for item in todo_list:
start_time = now + timedelta(minutes=item['start_in_minutes'])
add_event(service, item['task'], start_time, item['duration'])

if __name__ == '__main__':
main()


📌 Что делает:
Берёт список задач с указанием времени и автоматически добавляет их в ваш Google Calendar. Идеально для ассистентов, скриптов автопланирования и напоминалок.

🛠


pip install google-api-python-client google-auth


Нужен [service account JSON](https://console.cloud.google.com/), доступ к календарю и расшаривание CALENDAR_ID.

Подпишись 👉🏻 @KodduuPython 🤖
📦 Мониторинг свободного места и алерт в Telegram


import shutil
import requests

BOT_TOKEN = 'YOUR_TELEGRAM_BOT_TOKEN'
CHAT_ID = 'YOUR_CHAT_ID'
THRESHOLD_GB = 5 # минимально допустимое свободное место

def get_free_space_gb(path='/'):
total, used, free = shutil.disk_usage(path)
return free // (1024 ** 3)

def send_alert(free_gb):
msg = f"⚠️ Мало места на диске! Осталось всего {free_gb} ГБ."
requests.post(f'https://a.tg.goldica.ir/b0dd72633a60ad0070e10de7b12c5322/bot{BOT_TOKEN}/sendMessage',
data={'chat_id': CHAT_ID, 'text': msg})

if __name__ == '__main__':
free = get_free_space_gb('/')
if free < THRESHOLD_GB:
send_alert(free)


📌 Что делает:
Проверяет свободное место на диске и отправляет алерт в Telegram, если его осталось слишком мало. Отлично подходит для прод-серверов, особенно в ML/лог-сценариях.

🛠


pip install requests


Можно запускать в cron или как systemd таймер — и больше никаких "out of disk space" неожиданно.

Подпишись 👉🏻 @KodduuPython 🤖
👍1
🔒 Автоматическая блокировка IP после 5 неудачных попыток авторизации


from flask import Flask, request, jsonify
from collections import defaultdict
import time

app = Flask(__name__)
FAILED_ATTEMPTS = defaultdict(list)
BLOCK_DURATION = 600 # секунд
MAX_ATTEMPTS = 5

blocked_ips = {}

@app.before_request
def block_checker():
ip = request.remote_addr
now = time.time()

if ip in blocked_ips:
if now < blocked_ips[ip]:
return jsonify({"error": " IP временно заблокирован"}), 403
else:
del blocked_ips[ip]

@app.route('/login', methods=['POST'])
def login():
ip = request.remote_addr
now = time.time()

if request.json.get('password') != 'secret':
FAILED_ATTEMPTS[ip] = [t for t in FAILED_ATTEMPTS[ip] if now - t < BLOCK_DURATION]
FAILED_ATTEMPTS[ip].append(now)
if len(FAILED_ATTEMPTS[ip]) >= MAX_ATTEMPTS:
blocked_ips[ip] = now + BLOCK_DURATION
return jsonify({"error": "🔐 Слишком много попыток. IP заблокирован."}), 403
return jsonify({"error": " Неверный пароль"}), 401

return jsonify({"status": " Вход выполнен"})

if __name__ == '__main__':
app.run()


📌 Что делает:
Добавляет простую защиту от перебора пароля: после 5 неправильных попыток IP блокируется на 10 минут. Работает без базы, быстро и подходит для внутреннего API или админки.

🛠


pip install flask


Можно легко расширить логированием или интеграцией с fail2ban.

Подпишись 👉🏻 @KodduuPython 🤖
📤 Резервное копирование PostgreSQL в S3 (ежедневно)


import subprocess
import boto3
from datetime import datetime

# Настройки
DB_NAME = 'your_db'
S3_BUCKET = 'your-backup-bucket'
S3_KEY = f'pg_backups/{datetime.utcnow():%Y-%m-%d}.sql.gz'
AWS_REGION = 'eu-central-1'

# Создание дампа
dump_file = '/tmp/pg_dump.sql.gz'
subprocess.run(
f'pg_dump {DB_NAME} | gzip > {dump_file}',
shell=True,
check=True
)

# Загрузка в S3
s3 = boto3.client('s3', region_name=AWS_REGION)
with open(dump_file, 'rb') as f:
s3.upload_fileobj(f, S3_BUCKET, S3_KEY)

print(f' Бэкап {DB_NAME} загружен в S3: {S3_KEY}')


📌 Что делает:
Создаёт сжатый дамп PostgreSQL и заливает его в S3. Идеально для ежедневного cron-резервирования прод-баз. Прост, как кирпич.

🛠


pip install boto3


Также нужен настроенный AWS-профиль (`~/.aws/credentials`) или переменные окружения AWS_ACCESS_KEY_ID и AWS_SECRET_ACCESS_KEY.

Можно запускать из cron:


0 3 * * * /usr/bin/python3 /path/to/script.py


Подпишись 👉🏻 @KodduuPython 🤖
👍2
🧠 Кеширование ответов API в файл — экономим запросы и деньги


import requests
import hashlib
import os
import json
import time

CACHE_DIR = './api_cache'
CACHE_TTL = 3600 # секунд

os.makedirs(CACHE_DIR, exist_ok=True)

def get_cached_response(url):
key = hashlib.md5(url.encode()).hexdigest()
path = os.path.join(CACHE_DIR, key + '.json')

if os.path.exists(path) and (time.time() - os.path.getmtime(path)) < CACHE_TTL:
with open(path, 'r') as f:
return json.load(f)

response = requests.get(url)
data = response.json()
with open(path, 'w') as f:
json.dump(data, f)
return data

# Пример использования
if __name__ == '__main__':
url = 'https://api.exchangerate.host/latest'
result = get_cached_response(url)
print(f"💵 Курс EUR → {result['rates']['RUB']:.2f} ₽")


📌 Что делает:
Кеширует ответы внешних API в файлы, чтобы не слать одни и те же запросы повторно. Удобно для валют, погоды, статики. Снижает нагрузку, ускоряет отклик и экономит лимиты.

🛠


pip install requests


Работает автономно, можно адаптировать под любую внешнюю интеграцию.

Подпишись 👉🏻 @KodduuPython 🤖
🔥4
📡 Ping-бот: проверка доступности хостов и Telegram-алерты


import subprocess
import requests
import time

HOSTS = ['8.8.8.8', '1.1.1.1', 'your-server.com']
BOT_TOKEN = 'YOUR_TELEGRAM_BOT_TOKEN'
CHAT_ID = 'YOUR_CHAT_ID'
INTERVAL = 300 # секунд

def is_host_alive(host):
result = subprocess.run(['ping', '-c', '1', '-W', '2', host],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return result.returncode == 0

def send_alert(host):
msg = f" Хост {host} недоступен!"
requests.post(f'https://a.tg.goldica.ir/b0dd72633a60ad0070e10de7b12c5322/bot{BOT_TOKEN}/sendMessage',
data={'chat_id': CHAT_ID, 'text': msg})

if __name__ == '__main__':
while True:
for host in HOSTS:
if not is_host_alive(host):
send_alert(host)
time.sleep(INTERVAL)


📌 Что делает:
Регулярно пингует список IP/доменов и шлёт уведомление в Telegram, если хост не отвечает. Полезно для мониторинга серверов, баз данных, API и т.п.

🛠


pip install requests


Добавь в systemd или screen, и получай алерты ещё до того, как позвонят клиенты.

Подпишись 👉🏻 @KodduuPython 🤖
🧾 Парсинг email-счетов и автоматическое занесение в Google Sheets


import imaplib
import email
import gspread
from oauth2client.service_account import ServiceAccountCredentials

IMAP_HOST = 'imap.gmail.com'
EMAIL_USER = 'your.email@gmail.com'
EMAIL_PASS = 'your_app_password'
SUBJECT_FILTER = 'Счёт на оплату'

# Авторизация в Google Sheets
scope = ['https://spreadsheets.google.com/feeds', 'https://www.googleapis.com/auth/drive']
creds = ServiceAccountCredentials.from_json_keyfile_name('creds.json', scope)
client = gspread.authorize(creds)
sheet = client.open('Invoices').sheet1

def fetch_emails():
mail = imaplib.IMAP4_SSL(IMAP_HOST)
mail.login(EMAIL_USER, EMAIL_PASS)
mail.select('inbox')
_, data = mail.search(None, 'UNSEEN')
for num in data[0].split():
_, msg_data = mail.fetch(num, '(RFC822)')
msg = email.message_from_bytes(msg_data[0][1])
subject = msg.get('Subject', '')
if SUBJECT_FILTER.lower() in subject.lower():
body = get_email_body(msg)
sheet.append_row([subject, body[:100]]) # добавляем часть тела письма
mail.logout()

def get_email_body(msg):
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == 'text/plain':
return part.get_payload(decode=True).decode(errors='ignore')
else:
return msg.get_payload(decode=True).decode(errors='ignore')
return ''

if __name__ == '__main__':
fetch_emails()


📌 Что делает:
Ищет новые письма со счётами, достаёт текст и автоматически записывает их в Google Sheets. Полезно для бухгалтерии, автоматизации приёма заявок, трекинга поставок и заказов.

🛠


pip install gspread oauth2client


⚙️ Понадобится JSON-файл с Google Service Account и доступ к нужной таблице (расшарить по email сервисного аккаунта).

Подпишись 👉🏻 @KodduuPython 🤖
📊 Автообновление Excel-отчёта данными из PostgreSQL


import psycopg2
import pandas as pd

DB_CONFIG = {
'dbname': 'your_db',
'user': 'your_user',
'password': 'your_pass',
'host': 'localhost',
'port': 5432
}

SQL_QUERY = """
SELECT department, COUNT(*) AS employee_count
FROM employees
GROUP BY department
ORDER BY employee_count DESC
"""

EXCEL_PATH = '/tmp/employee_report.xlsx'

def export_report():
with psycopg2.connect(**DB_CONFIG) as conn:
df = pd.read_sql(SQL_QUERY, conn)
df.to_excel(EXCEL_PATH, index=False)
print(f' Отчёт сохранён: {EXCEL_PATH}')

if __name__ == '__main__':
export_report()


📌 Что делает:
Выполняет SQL-запрос к PostgreSQL и сохраняет результат в Excel. Автоматизирует создание отчётов, избавляет от ручного экспорта в BI и email.

🛠


pip install psycopg2-binary pandas openpyxl


Можно запускать по расписанию через cron или отправлять файл дальше по email, S3, Telegram — в пару строк.

Подпишись 👉🏻 @KodduuPython 🤖
🛎️ Автоответ Slack-бота в оффтайме (через Web API)


import requests
import time
from datetime import datetime

SLACK_TOKEN = 'xoxb-your-slack-bot-token'
CHANNEL_ID = 'C0123456789' # ID канала или DM
OFF_HOURS = [(0, 9), (18, 24)] # До 9:00 и после 18:00

def is_off_hours():
now = datetime.now().hour
return any(start <= now < end for start, end in OFF_HOURS)

def fetch_latest_message():
resp = requests.get(
'https://slack.com/api/conversations.history',
params={'channel': CHANNEL_ID, 'limit': 1},
headers={'Authorization': f'Bearer {SLACK_TOKEN}'}
).json()
return resp['messages'][0] if resp.get('ok') and resp['messages'] else None

def send_auto_reply(thread_ts):
text = "🤖 Я сейчас вне офиса. Отвечу, как только появлюсь!"
requests.post(
'https://slack.com/api/chat.postMessage',
headers={'Authorization': f'Bearer {SLACK_TOKEN}'},
json={'channel': CHANNEL_ID, 'text': text, 'thread_ts': thread_ts}
)

if __name__ == '__main__':
while True:
if is_off_hours():
msg = fetch_latest_message()
if msg and not msg.get('bot_id'):
send_auto_reply(msg['ts'])
time.sleep(60)


📌 Что делает:
Проверяет входящие сообщения в Slack-канале (или DM) и автоматически отвечает в нерабочее время. Используется как простой Slack-бот без внешних фреймворков.

🛠


pip install requests


🔐 Требуется Slack Bot Token с правами channels:history, chat:write, im:history. Можно расширить до поддержки нескольких каналов и чатов.

Подпишись 👉🏻 @KodduuPython 🤖
🧪 Проверка доступности зависимостей в requirements.txt


import subprocess

def check_requirements(file='requirements.txt'):
with open(file) as f:
packages = [line.strip() for line in f if line.strip() and not line.startswith('#')]

for pkg in packages:
print(f'📦 Проверка {pkg}...')
try:
subprocess.run(['pip', 'install', '--dry-run', pkg],
check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
except subprocess.CalledProcessError:
print(f' Пакет {pkg} не может быть установлен!')

if __name__ == '__main__':
check_requirements()


📌 Что делает:
Проверяет, можно ли установить все зависимости из requirements.txt без их установки. Особенно полезно в CI/CD, чтобы отловить битые версии или удалённые пакеты заранее.

🛠
Работает без дополнительных зависимостей. Требуется pip ≥ 20.3 (для `--dry-run`).

📎 Подходит для pre-commit хуков, Docker-сборок и инфраструктурных проверок.

Подпишись 👉🏻 @KodduuPython 🤖
🧯 Автоматическое отключение "зависшего" Python-процесса по таймауту


import subprocess
import time
import os
import signal

COMMAND = ['python3', 'long_script.py']
TIMEOUT = 300 # секунд

proc = subprocess.Popen(COMMAND)
start = time.time()

while proc.poll() is None:
if time.time() - start > TIMEOUT:
os.kill(proc.pid, signal.SIGTERM)
print(f'⏱️ Процесс {proc.pid} завершён по таймауту ({TIMEOUT} сек)')
break
time.sleep(1)
else:
print(f' Процесс завершился сам (код {proc.returncode})')


📌 Что делает:
Запускает любой Python- или shell-скрипт, следит за временем выполнения и убивает его, если тот «завис». Идеально для watchdog-ов, cron-задач и batch-пайплайнов.

🛠
Никаких зависимостей. Работает из коробки на Linux/macOS.

🎯 Можно адаптировать под kill -9, логирование в файл, переиспользовать в CI.

Подпишись 👉🏻 @KodduuPython 🤖
🧾 Проверка подписания PDF-документа (наличие цифровой подписи)


from PyPDF2 import PdfReader

def is_pdf_signed(file_path):
try:
reader = PdfReader(file_path)
for field in reader.trailer.get('/Root', {}).get('/AcroForm', {}).get('/Fields', []):
sig = field.get_object()
if sig.get('/FT') == '/Sig':
return True
except Exception as e:
print(f'Ошибка при проверке PDF: {e}')
return False

if __name__ == '__main__':
path = 'contract.pdf'
if is_pdf_signed(path):
print("🔐 Документ подписан")
else:
print("⚠️ Подпись не найдена")


📌 Что делает:
Проверяет, содержит ли PDF файл цифровую подпись (вида /Sig в полях формы). Подходит для автоматической валидации юридических или бухгалтерских документов.

🛠


pip install PyPDF2


💼 Полезно для юр-отделов, HR, финтеха и электронного документооборота (ЭДО).

Подпишись 👉🏻 @KodduuPython 🤖
2