Commit c9b8baf3 authored by Roman Alifanov's avatar Roman Alifanov

Add log/progress D-Bus signals and PlayListUpdates method

- Emit non-progress stdout/stderr lines as NOTIFICATION signals - Read stdout/stderr concurrently via tokio::select - Parse wget progress (bar and dot formats) from stderr - Add PlayListUpdates D-Bus method (epm play --list-updates) - Add play-query list-updates CLI command - Filter PROGRESS events from GUI log view
parent 2895c86c
......@@ -163,6 +163,9 @@ pub enum PlayQueryCommands {
/// List all available play applications
ListAll,
/// List apps with available updates
ListUpdates,
/// List system scripts
ListScripts,
......
......@@ -28,6 +28,7 @@ trait EpmQuery {
// Play methods
async fn play_list(&self) -> zbus::fdo::Result<String>;
async fn play_list_all(&self) -> zbus::fdo::Result<String>;
async fn play_list_updates(&self) -> zbus::fdo::Result<String>;
async fn play_list_scripts(&self) -> zbus::fdo::Result<String>;
async fn play_search(&self, query: String) -> zbus::fdo::Result<String>;
async fn play_info(&self, app: String) -> zbus::fdo::Result<String>;
......@@ -214,6 +215,12 @@ impl EpmClient {
Ok(result)
}
pub async fn play_list_updates(&self) -> Result<String> {
let proxy = EpmQueryProxy::new(&self.connection).await?;
let result = proxy.play_list_updates().await?;
Ok(result)
}
pub async fn play_list_scripts(&self) -> Result<String> {
let proxy = EpmQueryProxy::new(&self.connection).await?;
let result = proxy.play_list_scripts().await?;
......@@ -374,27 +381,32 @@ fn parse_notification_signal(msg: &Message) -> Option<EventData> {
}
fn print_progress(event: &EventData, last_line_len: &mut usize) {
if event.event_type != "PROGRESS" {
return;
}
let progress_bar_width = 30;
let filled = (event.progress / 100.0 * progress_bar_width as f64) as usize;
let empty = progress_bar_width - filled;
let bar = format!(
"[{}{}] {:>3.0}% {}",
"#".repeat(filled),
"-".repeat(empty),
event.progress,
&event.view
);
if *last_line_len > 0 {
eprint!("\r{}\r", " ".repeat(*last_line_len));
if event.event_type == "PROGRESS" {
let progress_bar_width = 30;
let filled = (event.progress / 100.0 * progress_bar_width as f64) as usize;
let empty = progress_bar_width - filled;
let bar = format!(
"[{}{}] {:>3.0}% {}",
"#".repeat(filled),
"-".repeat(empty),
event.progress,
&event.view
);
if *last_line_len > 0 {
eprint!("\r{}\r", " ".repeat(*last_line_len));
}
eprint!("\r{}", bar);
let _ = stderr().flush();
*last_line_len = bar.len();
} else if event.event_type == "NOTIFICATION" && !event.view.is_empty() {
// Print log lines (e.g. from epm play)
if *last_line_len > 0 {
eprint!("\r{}\r", " ".repeat(*last_line_len));
*last_line_len = 0;
}
eprintln!("{}", &event.view);
}
eprint!("\r{}", bar);
let _ = stderr().flush();
*last_line_len = bar.len();
}
......@@ -166,6 +166,15 @@ impl QueryInterface {
to_json(&ApiResponse::success(result))
}
async fn play_list_updates(&self) -> zbus::fdo::Result<String> {
let config = self.config.read().await;
let result = play::list_updates(&config)
.await
.map_err(|e| zbus::fdo::Error::Failed(e.to_string()))?;
to_json(&ApiResponse::success(result))
}
async fn play_list_scripts(&self) -> zbus::fdo::Result<String> {
let config = self.config.read().await;
let result = play::list_scripts(&config)
......
......@@ -141,6 +141,19 @@ impl SignalEmitter {
self.emit(&event).await
}
pub async fn emit_log(
&self,
message: &str,
transaction: &str,
) -> crate::error::Result<()> {
let event = EventData::new("system.log")
.with_state(STATE_BEFORE)
.with_message(message)
.with_transaction(transaction);
self.emit(&event).await
}
#[allow(dead_code)]
pub async fn emit_task_started(
&self,
......@@ -188,6 +201,8 @@ pub fn parse_progress_line(line: &str) -> Option<ProgressEvent> {
static DOWNLOAD_RE: OnceLock<Regex> = OnceLock::new();
static INSTALL_RE: OnceLock<Regex> = OnceLock::new();
static GENERIC_RE: OnceLock<Regex> = OnceLock::new();
static WGET_RE: OnceLock<Regex> = OnceLock::new();
static WGET_DOT_RE: OnceLock<Regex> = OnceLock::new();
// "2% [10 speed-dreams-data 39368080/2037MB 1%]"
let download_re = DOWNLOAD_RE.get_or_init(|| {
......@@ -204,6 +219,16 @@ pub fn parse_progress_line(line: &str) -> Option<ProgressEvent> {
Regex::new(r"^(?P<msg>[A-Za-z]+\.{3})\s+[#]+\s*\[\s*(?P<percent>\d+)%\]").unwrap()
});
// "code-1.112.0.el8.x86_64.rpm 45%[==========> ] 75.00M 10.2MB/s"
let wget_re = WGET_RE.get_or_init(|| {
Regex::new(r"^(?P<file>\S+\.rpm)\s+(?P<percent>\d+)%\[").unwrap()
});
// " 1250K .......... .......... .......... .......... 0% 11.0M 2m47"
let wget_dot_re = WGET_DOT_RE.get_or_init(|| {
Regex::new(r"^\s*\d+K\s+[.\s]+\s+(?P<percent>\d+)%").unwrap()
});
let line = line.trim();
if let Some(caps) = download_re.captures(line) {
......@@ -253,6 +278,35 @@ pub fn parse_progress_line(line: &str) -> Option<ProgressEvent> {
return Some(ProgressEvent::Generic { message, percent });
}
// "code-1.112.0.el8.x86_64.rpm 45%[==========> ] 75.00M 10.2MB/s"
if let Some(caps) = wget_re.captures(line) {
let file = caps.name("file")
.map(|m| m.as_str().to_string())
.unwrap_or_default();
let percent = caps.name("percent")
.and_then(|m| m.as_str().parse().ok())
.unwrap_or(0.0);
return Some(ProgressEvent::Download {
global_percent: percent,
package: file,
local_percent: percent,
});
}
// " 1250K .......... .......... .......... 45% 11.0M 2m47"
if let Some(caps) = wget_dot_re.captures(line) {
let percent = caps.name("percent")
.and_then(|m| m.as_str().parse().ok())
.unwrap_or(0.0);
return Some(ProgressEvent::Download {
global_percent: percent,
package: String::new(),
local_percent: percent,
});
}
None
}
......@@ -300,4 +354,54 @@ mod tests {
panic!("Expected Download event");
}
}
#[test]
fn test_parse_wget_progress() {
let line = "code-1.112.0-1773778396.el8.x86_64.rpm 0%[ ] 400.00K 651KB/s";
let result = parse_progress_line(line);
assert!(result.is_some());
if let Some(ProgressEvent::Download { global_percent, package, .. }) = result {
assert_eq!(global_percent, 0.0);
assert_eq!(package, "code-1.112.0-1773778396.el8.x86_64.rpm");
} else {
panic!("Expected Download event");
}
}
#[test]
fn test_parse_wget_dot_progress() {
let line = " 1250K .......... .......... .......... .......... .......... 0% 11.0M 2m47";
let result = parse_progress_line(line);
assert!(result.is_some());
if let Some(ProgressEvent::Download { global_percent, .. }) = result {
assert_eq!(global_percent, 0.0);
} else {
panic!("Expected Download event");
}
}
#[test]
fn test_parse_wget_dot_progress_50() {
let line = " 86400K .......... .......... .......... .......... .......... 50% 10.2M 8s";
let result = parse_progress_line(line);
assert!(result.is_some());
if let Some(ProgressEvent::Download { global_percent, .. }) = result {
assert_eq!(global_percent, 50.0);
} else {
panic!("Expected Download event");
}
}
#[test]
fn test_parse_wget_progress_midway() {
let line = "code-1.112.0-1773778396.el8.x86_64.rpm 100%[========================================================================================================>] 168.49M 10.2MB/s in 18s";
let result = parse_progress_line(line);
assert!(result.is_some());
if let Some(ProgressEvent::Download { global_percent, package, .. }) = result {
assert_eq!(global_percent, 100.0);
assert_eq!(package, "code-1.112.0-1773778396.el8.x86_64.rpm");
} else {
panic!("Expected Download event");
}
}
}
......@@ -3,7 +3,7 @@ use crate::dbus::SignalEmitter;
use crate::eepm::{EpmExecutor, parser};
use crate::error::{EpmError, Result};
use crate::types::{
PlayListResponse, PlaySearchResponse, PlayInfoResponse,
PlayListResponse, PlayListUpdatesResponse, PlaySearchResponse, PlayInfoResponse,
PlayActionResponse, PlayVersionResponse,
};
......@@ -41,6 +41,20 @@ pub async fn list_all(config: &Config) -> Result<PlayListResponse> {
})
}
/// List applications with available updates
pub async fn list_updates(config: &Config) -> Result<PlayListUpdatesResponse> {
let executor = EpmExecutor::new(config.clone());
let output = executor.play_list_updates().await?;
let apps = parser::parse_play_list_updates_output(&output.stdout);
Ok(PlayListUpdatesResponse {
message: format!("Found {} applications with updates", apps.len()),
total_count: apps.len() as u32,
apps,
})
}
/// List system scripts
pub async fn list_scripts(config: &Config) -> Result<PlayListResponse> {
let executor = EpmExecutor::new(config.clone());
......
......@@ -122,13 +122,88 @@ impl EpmExecutor {
let signal_emitter = self.signal_emitter.clone();
let tx_str = transaction.to_string();
// Process stdout with progress detection
while let Ok(Some(line)) = stdout_reader.next_line().await {
debug!("stdout: {}", line);
// Read stdout and stderr concurrently
loop {
tokio::select! {
result = stdout_reader.next_line() => {
match result {
Ok(Some(line)) => {
debug!("stdout: {}", line);
if let Some(ref emitter) = signal_emitter {
if let Some(event) = parse_progress_line(&line) {
let (name, msg, percent) = match event {
ProgressEvent::Download { global_percent, package, .. } => (
"system.downloadProgress",
format!("Downloading {}", package),
global_percent,
),
ProgressEvent::Install { package, percent, .. } => (
"system.installProgress",
format!("Installing {}", package),
percent,
),
ProgressEvent::Generic { message, percent } => (
"system.progress",
message,
percent,
),
};
let _ = emitter.emit_progress(name, &msg, percent, &tx_str).await;
} else if !line.trim().is_empty() {
let _ = emitter.emit_log(&line, &tx_str).await;
}
}
stdout_lines.push(line);
}
_ => break,
}
}
result = stderr_reader.next_line() => {
match result {
Ok(Some(line)) => {
debug!("stderr: {}", line);
if let Some(ref emitter) = signal_emitter {
if let Some(event) = parse_progress_line(&line) {
let (name, msg, percent) = match event {
ProgressEvent::Download { global_percent, package, .. } => (
"system.downloadProgress",
format!("Downloading {}", package),
global_percent,
),
ProgressEvent::Install { package, percent, .. } => (
"system.installProgress",
format!("Installing {}", package),
percent,
),
ProgressEvent::Generic { message, percent } => (
"system.progress",
message,
percent,
),
};
let _ = emitter.emit_progress(name, &msg, percent, &tx_str).await;
} else if !line.trim().is_empty() {
let _ = emitter.emit_log(&line, &tx_str).await;
}
}
stderr_lines.push(line);
}
_ => {}
}
}
}
}
// Drain remaining stderr after stdout is done
while let Ok(Some(line)) = stderr_reader.next_line().await {
debug!("stderr: {}", line);
// Check for progress
if let Some(event) = parse_progress_line(&line) {
if let Some(ref emitter) = signal_emitter {
if let Some(ref emitter) = signal_emitter {
if let Some(event) = parse_progress_line(&line) {
let (name, msg, percent) = match event {
ProgressEvent::Download { global_percent, package, .. } => (
"system.downloadProgress",
......@@ -147,15 +222,11 @@ impl EpmExecutor {
),
};
let _ = emitter.emit_progress(name, &msg, percent, &tx_str).await;
} else if !line.trim().is_empty() {
let _ = emitter.emit_log(&line, &tx_str).await;
}
}
stdout_lines.push(line);
}
// Read stderr
while let Ok(Some(line)) = stderr_reader.next_line().await {
debug!("stderr: {}", line);
stderr_lines.push(line);
}
......@@ -386,6 +457,9 @@ impl EpmExecutor {
pub async fn play_available_version(&self, app: &str) -> Result<CommandOutput> {
self.execute(&["play", "--quiet", "--available-version", app]).await
}
pub async fn play_list_updates(&self) -> Result<CommandOutput> {
self.execute(&["play", "--quiet", "--list-updates"]).await
}
pub async fn play_list_scripts(&self) -> Result<CommandOutput> {
self.execute(&["play", "--quiet", "--list-scripts"]).await
}
......
......@@ -8,7 +8,7 @@ use regex::Regex;
use crate::error::{EpmError, Result};
use crate::types::{
Package, PackageChanges, PackageDependency, PackageFile, KernelUpdateInfo,
PlayApp, PlayAppInfo,
PlayApp, PlayAppInfo, PlayUpdateApp,
};
/// Parse apt-cache show style output (Key: Value format).
......@@ -615,6 +615,26 @@ pub fn parse_play_version_output(output: &str) -> Option<String> {
.map(|s| s.to_string())
}
/// Parse output of `epm play --quiet --list-updates`
/// Format: "zen-browser 1.18 1.19" (name installed_version available_version)
pub fn parse_play_list_updates_output(output: &str) -> Vec<PlayUpdateApp> {
output
.lines()
.filter_map(|line| {
let parts: Vec<&str> = line.trim().split_whitespace().collect();
if parts.len() >= 3 {
Some(PlayUpdateApp {
name: parts[0].to_string(),
installed_version: parts[1].to_string(),
available_version: parts[2].to_string(),
})
} else {
None
}
})
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
......@@ -762,4 +782,15 @@ Url: https://github.com/telegramdesktop/tdesktop"#;
let version = parse_play_version_output(output_with_prefix);
assert_eq!(version, Some("1.106.3".to_string()));
}
#[test]
fn test_parse_play_list_updates_output() {
let output = "zen-browser 1.18 1.19\ntelegram 4.7.0 4.7.1\n";
let apps = parse_play_list_updates_output(output);
assert_eq!(apps.len(), 2);
assert_eq!(apps[0].name, "zen-browser");
assert_eq!(apps[0].installed_version, "1.18");
assert_eq!(apps[0].available_version, "1.19");
assert_eq!(apps[1].name, "telegram");
}
}
......@@ -114,6 +114,7 @@ async fn run_client_command(cmd: Commands, format: OutputFormat, show_progress:
Commands::PlayQuery { cmd: pq } => match pq {
PlayQueryCommands::List => client.play_list().await,
PlayQueryCommands::ListAll => client.play_list_all().await,
PlayQueryCommands::ListUpdates => client.play_list_updates().await,
PlayQueryCommands::ListScripts => client.play_list_scripts().await,
PlayQueryCommands::Search { query } => client.play_search(&query).await,
PlayQueryCommands::Info { app } => client.play_info(&app).await,
......
......@@ -59,6 +59,22 @@ pub struct PlayActionResponse {
pub transaction: String,
}
/// Application with available update
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct PlayUpdateApp {
pub name: String,
pub installed_version: String,
pub available_version: String,
}
/// Response for play list-updates command
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlayListUpdatesResponse {
pub message: String,
pub apps: Vec<PlayUpdateApp>,
pub total_count: u32,
}
/// Response for play version queries
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PlayVersionResponse {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment