news: add /news_range command for aggregated package news over date range

parent 500f1bc4
import aiohttp
from datetime import date
from typing import List, Literal
from . import models
from .news import urls_parser, packages_parser, bugs_parser
from .news import urls_parser, urls_for_range, packages_parser, bugs_parser
from .packages import ftbfs_parser, watch_parser
......@@ -53,6 +54,69 @@ class NewsInfo:
async def p10(self) -> models.PackagesModel | None:
return await self._get_packages("p10")
async def packages_by_range(
self, date_from: date, date_to: date
) -> models.PackagesModel | None:
urls = await urls_for_range(self.client, date_from, date_to)
if not urls:
return None
all_packages = []
for _, url in urls:
html = await self.client.get(url, "koi8-r")
packages = await packages_parser(html, url, self.client)
if packages and isinstance(packages, models.PackagesModel):
all_packages.append(packages)
if not all_packages:
return None
if len(all_packages) == 1:
return all_packages[0]
return _aggregate_packages(all_packages)
def _aggregate_packages(
packages_list: list[models.PackagesModel],
) -> models.PackagesModel:
latest_added = {}
latest_removed = {}
latest_updated = {}
for packages in packages_list:
for pkg in (packages.added or []):
latest_added[pkg.name] = pkg
for pkg in (packages.removed or []):
latest_removed[pkg.name] = pkg
for pkg in (packages.updated or []):
latest_updated[pkg.name] = pkg
# Добавлен + удалён → убираем из обоих
cancelled = set(latest_added) & set(latest_removed)
for name in cancelled:
del latest_added[name]
del latest_removed[name]
# Добавлен + обновлён → оставляем в "добавлено" с последними данными
for name in list(latest_updated):
if name in latest_added:
latest_added[name] = latest_updated.pop(name)
elif name in latest_removed:
del latest_updated[name]
added = list(latest_added.values())
removed = list(latest_removed.values())
updated = list(latest_updated.values())
return models.PackagesModel(
url=packages_list[-1].url,
total=packages_list[-1].total,
added=added or None,
removed=removed or None,
updated=updated or None,
)
class PackagesInfo:
def __init__(self, client: BaseParser):
......
from .urls import urls_parser
from .urls import urls_parser, urls_for_range
from .packages import packages_parser
from .bugs import bugs_parser
from bs4 import BeautifulSoup
from datetime import datetime
import re
from datetime import datetime, date
from .. import models
......@@ -40,6 +41,43 @@ async def urls_parser(client):
return models.NewsURL(**result)
async def urls_for_range(client, date_from: date, date_to: date) -> list[tuple[date, str]]:
results = []
current = date_from.replace(day=1)
while current <= date_to:
year_month = f"{current.year}-{current.strftime('%B')}"
base_url = CYBERTALK_URL.format(year_month)
try:
html = await client.get(f"{base_url}date.html", "koi8-r")
except:
current = _next_month(current)
continue
soup = BeautifulSoup(html, "html.parser")
for li in soup.find_all("li"):
a = li.find("a")
if not a or not a.get("href"):
continue
text = a.get_text(strip=True)
match = re.search(r"Sisyphus-(\d{8}) packages", text)
if match:
news_date = datetime.strptime(match.group(1), "%Y%m%d").date()
if date_from <= news_date <= date_to:
results.append((news_date, base_url + a["href"]))
current = _next_month(current)
return sorted(results)
def _next_month(d: date) -> date:
if d.month == 12:
return d.replace(year=d.year + 1, month=1)
return d.replace(month=d.month + 1)
async def _check_date(url, client):
html = await client.get(url, "koi8-r")
soup = BeautifulSoup(html, "html.parser")
......
......@@ -75,5 +75,6 @@ async def help_handler(m: Message) -> None:
f"{_bold("Прочее:")}\n"
" /statistics [branch] — статистика репозитория\n"
" /news — меню новостей\n"
" /news_range дата_от дата_до — новости за период\n"
)
from telegrinder import Dispatch, Message, CallbackQuery
from telegrinder.rules import Command, Argument, CallbackDataMarkup, Text, IsPrivate
from datetime import datetime
from altrepo import altrepo
from services.news import format_packages, format_bugs
......@@ -99,3 +101,65 @@ async def news_handler(
await m.ctx_api.send_message(
chat_id=m.from_user.id, text=f"{info_message}"
)
def _parse_date(s: str):
for fmt in ("%Y-%m-%d", "%d.%m.%Y"):
try:
return datetime.strptime(s, fmt).date()
except ValueError:
continue
return None
@dp.message(
Command(
"news_range",
Argument("date_from", optional=True),
Argument("date_to", optional=True),
), IsPrivate()
)
async def news_range_handler(
m: Message,
date_from: str | None = None,
date_to: str | None = None,
) -> None:
if not date_from or not date_to:
await m.answer("Использование: /news_range дата_от дата_до\nФормат: 2026-03-01 или 01.03.2026")
return
d_from = _parse_date(date_from)
d_to = _parse_date(date_to)
if not d_from or not d_to:
await m.answer("Неверный формат даты. Используйте YYYY-MM-DD или DD.MM.YYYY")
return
if d_from > d_to:
d_from, d_to = d_to, d_from
delta = (d_to - d_from).days
if delta > 180:
await m.answer("Максимальный диапазон — 180 дней.")
return
await m.answer(f"Загрузка новостей за {delta + 1} дн...")
packages_data = await altrepo.parser.news.packages_by_range(d_from, d_to)
if not packages_data:
await m.answer("Новостей за этот период не найдено.")
return
added, removed, updated, info_message = await format_packages(
packages_data, "sisyphus"
)
chat_id = m.from_user.id
for msg in added:
await m.ctx_api.send_message(chat_id=chat_id, text=msg)
if removed:
await m.ctx_api.send_message(chat_id=chat_id, text=removed)
for msg in updated:
await m.ctx_api.send_message(chat_id=chat_id, text=msg)
await m.ctx_api.send_message(chat_id=chat_id, text=info_message)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment