tasks: add SSE notifications for task state changes

parent d4a8c8ba
...@@ -31,7 +31,7 @@ def profile_settings_branch_kb(): ...@@ -31,7 +31,7 @@ def profile_settings_branch_kb():
return kb.get_markup() return kb.get_markup()
def settings_tasks_kb(watch_task, bugs_task, pkg_watch_task): def settings_tasks_kb(watch_task, bugs_task, pkg_watch_task, task_events_enabled):
kb = InlineKeyboard() kb = InlineKeyboard()
if watch_task: if watch_task:
...@@ -72,6 +72,19 @@ def settings_tasks_kb(watch_task, bugs_task, pkg_watch_task): ...@@ -72,6 +72,19 @@ def settings_tasks_kb(watch_task, bugs_task, pkg_watch_task):
)) ))
kb.row() kb.row()
if task_events_enabled:
kb.add(InlineButton(
"Уведомления о задачах: вкл",
callback_data="profile/settings/mailing/task_events/off"
))
else:
kb.add(InlineButton(
"Уведомления о задачах: выкл",
callback_data="profile/settings/mailing/task_events/on"
))
kb.row()
kb.add(InlineButton( kb.add(InlineButton(
"Мои пакеты", "Мои пакеты",
callback_data="profile/settings/packages" callback_data="profile/settings/packages"
......
...@@ -94,6 +94,10 @@ class UserMethod: ...@@ -94,6 +94,10 @@ class UserMethod:
return [user for user in User.select()] return [user for user in User.select()]
@classmethod @classmethod
def get_by_maintainer(cls, nickname: str) -> list[User]:
return list(User.select().where(User.maintainer == nickname))
@classmethod
def get_roles(cls, user_id: int): def get_roles(cls, user_id: int):
"""получение ролей пользователя""" """получение ролей пользователя"""
user = cls.get(user_id) user = cls.get(user_id)
......
...@@ -22,14 +22,18 @@ def _mailing_text_and_markup(user): ...@@ -22,14 +22,18 @@ def _mailing_text_and_markup(user):
watch_task = DB.scheduler.get(user, 'watch') watch_task = DB.scheduler.get(user, 'watch')
bugs_task = DB.scheduler.get(user, 'bugs') bugs_task = DB.scheduler.get(user, 'bugs')
pkg_watch_task = DB.scheduler.get(user, 'pkg_watch') pkg_watch_task = DB.scheduler.get(user, 'pkg_watch')
task_events = DB.scheduler.get(user, 'task_events')
text = ( text = (
f"{_bold('Рассылка:\n\n')}" f"{_bold('Рассылка:\n\n')}"
f"{format_task(watch_task, 'Отслеживание')}" f"{format_task(watch_task, 'Отслеживание')}"
f"{format_task(bugs_task, 'Баги')}" f"{format_task(bugs_task, 'Баги')}"
f"{format_task(pkg_watch_task, 'Watch по пакетам')}" f"{format_task(pkg_watch_task, 'Watch по пакетам')}"
f"Уведомления о задачах: {'вкл' if task_events else 'выкл'}\n"
)
markup = profile_keyboards.settings_tasks_kb(
watch_task, bugs_task, pkg_watch_task, bool(task_events)
) )
markup = profile_keyboards.settings_tasks_kb(watch_task, bugs_task, pkg_watch_task)
return text, markup return text, markup
...@@ -206,6 +210,29 @@ async def mailing_time_handler(cb: CallbackQuery, task_type: str): ...@@ -206,6 +210,29 @@ async def mailing_time_handler(cb: CallbackQuery, task_type: str):
await cb.answer(f"Время: {time_str}") await cb.answer(f"Время: {time_str}")
@dp.callback_query(PayloadMarkupRule("profile/settings/mailing/task_events/on"))
async def task_events_on_handler(cb: CallbackQuery):
user = DB.user.get(cb.from_user.id)
if not user:
return
if not DB.scheduler.get(user, "task_events"):
DB.scheduler.add(user, "task_events", "", "00:00")
text, markup = _mailing_text_and_markup(user)
await cb.edit_text(text, reply_markup=markup)
await cb.answer("Уведомления включены")
@dp.callback_query(PayloadMarkupRule("profile/settings/mailing/task_events/off"))
async def task_events_off_handler(cb: CallbackQuery):
user = DB.user.get(cb.from_user.id)
if not user:
return
DB.scheduler.delete(user, "task_events")
text, markup = _mailing_text_and_markup(user)
await cb.edit_text(text, reply_markup=markup)
await cb.answer("Уведомления выключены")
@dp.callback_query(PayloadMarkupRule("profile/settings/mailing/<task_type>/<action>")) @dp.callback_query(PayloadMarkupRule("profile/settings/mailing/<task_type>/<action>"))
async def mailing_action_handler(cb: CallbackQuery, task_type: str, action: str): async def mailing_action_handler(cb: CallbackQuery, task_type: str, action: str):
await edit_days_message(cb, task_type) await edit_days_message(cb, task_type)
......
...@@ -17,6 +17,7 @@ from middlewares import UserMiddleware ...@@ -17,6 +17,7 @@ from middlewares import UserMiddleware
from services.test_api_version import test_api_version from services.test_api_version import test_api_version
from services.update_maintainers import update_maintainers from services.update_maintainers import update_maintainers
from services.appstream.update_appstream_data import update_appstream_data from services.appstream.update_appstream_data import update_appstream_data
from services.task_events import start_task_events_listener
bot = Telegrinder(tg_api) bot = Telegrinder(tg_api)
bot.dispatch.load_from_dir("src/handlers") bot.dispatch.load_from_dir("src/handlers")
...@@ -37,6 +38,8 @@ async def startup(): ...@@ -37,6 +38,8 @@ async def startup():
await update_maintainers() await update_maintainers()
await update_appstream_data() await update_appstream_data()
start_task_events_listener()
logger.info("initializing Scheduler") logger.info("initializing Scheduler")
scheduler.register_handler( scheduler.register_handler(
"watch", print "watch", print
......
...@@ -70,6 +70,8 @@ class CustomScheduler: ...@@ -70,6 +70,8 @@ class CustomScheduler:
self.scheduler.remove_all_jobs() self.scheduler.remove_all_jobs()
for task in DB.scheduler.all(): for task in DB.scheduler.all():
if not task.days:
continue
hour = task.send_time.hour hour = task.send_time.hour
minute = task.send_time.minute minute = task.send_time.minute
for day in task.days.split(","): for day in task.days.split(","):
......
import asyncio
from telegrinder.modules import logger
from telegrinder.tools.formatting import HTMLFormatter, link
from altrepo.taskoteka.types import TaskState
from config import tg_api, altrepo, TASK_URL
from database.func import DB
NOTIFY_STATES = {TaskState.TESTED, TaskState.FAILED, TaskState.DONE}
async def _listen():
logger.info("Task events: connecting to SSE stream")
async for event in altrepo.taskoteka.events():
if event.event != "task_updated":
continue
data = event.data
if data.state == data.prev_state:
continue
if data.state not in NOTIFY_STATES:
continue
try:
task = await altrepo.taskoteka.tasks(task_id=data.id)
repo = task.repo
except Exception:
repo = "unknown"
task_url = f"{TASK_URL}{data.id}"
task_link = HTMLFormatter(link(task_url, text=f"{repo}/{data.id}"))
message = f"Таск {task_link}\n\nСтатус: {data.prev_state} -> {data.state}"
users = DB.user.get_by_maintainer(data.owner)
for user in users:
if not DB.scheduler.get(user, "task_events"):
continue
try:
await tg_api.send_message(chat_id=user.user_id, text=message)
except Exception:
pass
def start_task_events_listener():
asyncio.create_task(_listen())
logger.info("Task events: listener started")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment