Commit 1c726c10 authored by Roman Alifanov's avatar Roman Alifanov

remove old settings code

parent 7cc73949
from .base import Backend
from .gsettings import GSettingsBackend
from .file import FileBackend
from .binary import BinaryBackend
class BackendFactory:
def __init__(self):
self.backends = {
'gsettings': GSettingsBackend,
'file': FileBackend,
'binary': BinaryBackend,
}
def get_backend(self, backend_name, params=None):
backend_class = self.backends.get(backend_name)
if backend_class:
# Передаем параметры в конструктор бэкенда, если они есть
return backend_class(params) if params else backend_class()
return None
backend_factory = BackendFactory()
class RootBackendFactory:
def __init__(self):
self.backends = {
'file': FileBackend,
'binary': BinaryBackend,
}
def get_backend(self, backend_name, params=None):
backend_class = self.backends.get(backend_name)
if backend_class:
# Передаем параметры в конструктор бэкенда, если они есть
return backend_class(params) if params else backend_class()
return None
root_backend_factory = RootBackendFactory()
import logging
class Backend:
def __init__(self, params=None):
# Параметры, передаваемые при инициализации
self.params = params or {}
self.logger = logging.getLogger(f"{self.__class__.__name__}")
def get_value(self, key, gtype):
raise NotImplementedError("Метод get_value должен быть реализован")
def get_range(self, key, gtype):
raise NotImplementedError("Метод get_range должен быть реализован")
def set_value(self, key, value, gtype):
raise NotImplementedError("Метод set_value должен быть реализован")
import ast
import os
import subprocess
from .base import Backend
class BinaryBackend(Backend):
def __init__(self, params=None):
super().__init__(params)
self.binary_path = os.path.join(
self.params.get('module_path'),
self.params.get('binary_path')
)
self.binary_name = self.params.get('binary_name')
def _run_binary(self, command, *args):
try:
full_command = (
[self.binary_path + self.binary_name, command]
+ [x for x in args if x is not None]
)
result = subprocess.run(full_command, capture_output=True, text=True, check=True)
return result.stdout.strip()
except subprocess.CalledProcessError as e:
self.logger.error(f"Ошибка при выполнении команды {command}: {e}")
return None
def get_value(self, key, gtype):
self.logger.debug(f"Получение значения: key={key}, gtype={gtype}")
result = self._run_binary('get_value', key)
if result:
try:
return ast.literal_eval(result)
except (ValueError, SyntaxError) as e:
self.logger.warning(f"Ошибка при преобразовании результата {result}: {e}")
return result
return None
def get_range(self, key, gtype):
self.logger.debug(f"Получение диапазона: key={key}, gtype={gtype}")
result = self._run_binary('get_range', key)
if not result:
self.logger.error(f"Пустой результат или ошибка при выполнении команды get_range для ключа {key}")
return None
try:
parsed_result = ast.literal_eval(result)
return parsed_result
except (ValueError, SyntaxError) as e:
self.logger.error(f"Ошибка при преобразовании результата {result} для ключа {key}: {e}")
return None
def set_value(self, key, value, gtype):
self.logger.debug(f"Установка значения: key={key}, value={value}, gtype={gtype}")
result = self._run_binary('set_value', key, str(value))
if result:
try:
return ast.literal_eval(result)
except (ValueError, SyntaxError) as e:
self.logger.error(f"Ошибка при преобразовании результата {result}: {e}")
return None
import os
import re
from .base import Backend
class FileBackend(Backend):
def __init__(self, params=None):
super().__init__(params)
self.file_path = os.path.expanduser(params.get('file_path'))
self.file_path = os.path.expandvars(self.file_path)
self.lines = []
self.vars = {}
self._parse_file()
def _parse_file(self):
if os.path.exists(self.file_path):
with open(self.file_path, 'r') as f:
self.lines = f.readlines()
for line_num, line in enumerate(self.lines):
self._parse_line(line_num, line)
def _parse_line(self, line_num, line):
line = line.rstrip('\n')
parsed = {
'raw': line,
'active': True,
'var_name': None,
'value': None,
'comment': '',
'style': {}
}
if re.match(r'^\s*#', line):
parsed['active'] = False
line = re.sub(r'^\s*#', '', line, count=1)
var_match = re.match(
r'^\s*([A-Za-z_][A-Za-z0-9_]*)\s*([=])\s*(.*?)(\s*(#.*)?)$',
line
)
if var_match:
parsed['var_name'] = var_match.group(1)
parsed['value'] = self._parse_value(var_match.group(3))
parsed['comment'] = var_match.group(4) or ''
value_part = var_match.group(3)
parsed['style'] = {
'space_before': ' ' if ' ' in var_match.group(0).split('=')[0][-1:] else '',
'space_after': ' ' if ' ' in var_match.group(0).split('=')[1][:1] else '',
'quote': self._detect_quote(var_match.group(3)),
'commented': not parsed['active']
}
if parsed['var_name'] not in self.vars:
self.vars[parsed['var_name']] = []
self.vars[parsed['var_name']].append((line_num, parsed))
@staticmethod
def _parse_value(value_str):
value_str = value_str.strip()
for quote in ['"', "'"]:
if value_str.startswith(quote) and value_str.endswith(quote):
return value_str[1:-1]
return value_str
@staticmethod
def _detect_quote(value_str):
value_str = value_str.strip()
if value_str.startswith('"') and value_str.endswith('"'):
return '"'
if value_str.startswith("'") and value_str.endswith("'"):
return "'"
return ''
def _get_style_template(self):
if not self.vars:
return {
'space_before': '',
'space_after': ' ',
'quote': '"',
'commented': False
}
last_var = next(reversed(self.vars.values()))[-1][1]
return last_var['style']
@staticmethod
def _build_line(key, value, style):
quote = style['quote']
value_str = f"{quote}{value}{quote}" if quote else str(value)
return (
f"{key}{style['space_before']}="
f"{style['space_after']}{value_str}"
)
def _save_file(self):
with open(self.file_path, 'w') as f:
f.writelines(self.lines)
def get_value(self, key, gtype):
entries = self.vars.get(key, [])
for entry in reversed(entries):
return entry[1]['value']
return None
def set_value(self, key, value, gtype):
entries = self.vars.get(key, [])
style = self._get_style_template()
key = os.path.expanduser(key)
key = os.path.expandvars(key)
if entries:
line_num, last_entry = entries[-1]
style = last_entry['style']
line = self._build_line(key, value, style)
self.lines[line_num] = line + '\n'
if last_entry['style']['commented']:
self.lines[line_num] = self.lines[line_num].lstrip('#')
else:
line = self._build_line(key, value, style)
self.lines.append(line + '\n')
self._save_file()
from gi.repository import Gio, GLib
from .base import Backend
class GSettingsBackend(Backend):
def _get_schema(self, schema_name):
source = Gio.SettingsSchemaSource.get_default()
if source.lookup(schema_name, True) is None:
self.logger.error(f"Scheme {schema_name} is not installed")
return None
return Gio.Settings.new(schema_name)
def _get_single_value(self, key):
"""Get value for a single key."""
schema_name, key_name = key.rsplit('.', 1)
schema = self._get_schema(schema_name)
if not schema:
return None
try:
value = schema.get_value(key_name)
return value.unpack()
except Exception as e:
self.logger.error(f"Error when getting the value {key}: {e}")
return None
def _set_single_value(self, key, value, gtype):
"""Set value for a single key."""
schema_name, key_name = key.rsplit('.', 1)
schema = self._get_schema(schema_name)
if not schema:
return False
try:
schema.set_value(key_name, GLib.Variant(gtype, value))
return True
except Exception as e:
self.logger.error(f"Error when setting the value {key}: {e}")
return False
def get_value(self, key, gtype):
# Support for list of keys
if isinstance(key, list):
return [self._get_single_value(k) for k in key]
return self._get_single_value(key)
def get_range(self, key, gtype):
# For list of keys, get range from first key only
if isinstance(key, list):
key = key[0]
schema_name, key_name = key.rsplit('.', 1)
schema = self._get_schema(schema_name)
if not schema:
return None
try:
value = schema.get_range(key_name)
return value.unpack()[1]
except Exception as e:
self.logger.error(f"Error when getting the range {key}: {e}")
return None
def set_value(self, key, value, gtype):
# Support for list of keys
if isinstance(key, list):
gtypes = gtype if isinstance(gtype, list) else [gtype] * len(key)
values = value if isinstance(value, list) else [value] * len(key)
for k, v, g in zip(key, values, gtypes):
self._set_single_value(k, v, g)
return
self._set_single_value(key, value, gtype)
import logging
import dbus
import ast
class DaemonClient:
def __new__(cls, bus_name="ru.ximperlinux.TuneIt.Daemon", object_path="/Daemon"):
"""
Создает экземпляр клиента только в случае, если сервис доступен.
:param bus_name: Имя D-Bus сервиса.
:param object_path: Путь объекта в D-Bus.
:return: Экземпляр DaemonClient или None, если сервис недоступен.
"""
logger = logging.getLogger(f"DaemonClient")
try:
bus = dbus.SystemBus()
bus.get_object(bus_name, object_path) # Проверка доступности объекта
return super(DaemonClient, cls).__new__(cls)
except dbus.DBusException:
logger.debug(f"Service '{bus_name}' is not running.")
return None
def __init__(self, bus_name="ru.ximperlinux.TuneIt.Daemon", object_path="/Daemon"):
"""
Инициализация клиента для взаимодействия с D-Bus сервисом.
:param bus_name: Имя D-Bus сервиса.
:param object_path: Путь объекта в D-Bus.
"""
self.logger = logging.getLogger(f"{self.__class__.__name__}")
self.bus_name = bus_name
self.object_path = object_path
self.bus = dbus.SystemBus()
self.proxy = self.bus.get_object(bus_name, object_path)
self.interface = dbus.Interface(
self.proxy, dbus_interface="ru.ximperlinux.TuneIt.DaemonInterface"
)
self.logger.debug("dbus client connected")
self.backend_name = None
self.backend_params = None
def set_backend_name(self, backend_name):
"""
Устанавливает имя backend.
:param backend_name: Имя backend.
"""
self.backend_name = backend_name
def set_backend_params(self, backend_params):
"""
Устанавливает параметры backend.
:param backend_params: Параметры backend в формате JSON.
"""
self.backend_params = str(backend_params)
def get_value(self, key, gtype):
"""
Вызывает метод GetValue на D-Bus сервисе.
:param key: Ключ для получения значения.
:param gtype: Тип значения.
:return: Полученное значение.
"""
try:
return ast.literal_eval(str(self.interface.GetValue(self.backend_name, str(self.backend_params), key, gtype)))
except dbus.DBusException as e:
self.logger.error(f"Error in GetValue: {e}")
return None
def set_value(self, key, value, gtype):
"""
Вызывает метод SetValue на D-Bus сервисе.
:param key: Ключ для установки значения.
:param value: Устанавливаемое значение.
:param gtype: Тип значения.
:return: Результат операции.
"""
try:
self.interface.SetValue(self.backend_name, str(self.backend_params), key, str(value), gtype)
except dbus.DBusException as e:
self.logger.error(f"Error in SetValue: {e}")
def get_range(self, key, gtype):
"""
Вызывает метод GetRange на D-Bus сервисе.
:param key: Ключ для получения диапазона.
:param gtype: Тип значения.
:return: Диапазон значений.
"""
try:
return ast.literal_eval(str(self.interface.GetRange(self.backend_name, str(self.backend_params), key, gtype)))
except dbus.DBusException as e:
self.logger.error(f"Error in GetRange: {e}")
return None
dclient = DaemonClient()
import logging
from .os import OSReleaseChecker
from .path import PathChecker
from .binary import BinaryChecker
class DependencyCheckerFactory:
def __init__(self):
self._checkers = {
'os': OSReleaseChecker,
'path': PathChecker,
'binary': BinaryChecker,
}
def create_checker(self, dependency_type):
checker_class = self._checkers.get(dependency_type)
if not checker_class:
raise ValueError(f"Unfinished type of dependence: {dependency_type}")
return checker_class()
class DependencyManager:
def __init__(self):
self.factory = DependencyCheckerFactory()
self.logger = logging.getLogger(f"{self.__class__.__name__}")
def _verify(self, items, check_type):
results = []
for item_type, expected_value in items.items():
try:
checker = self.factory.create_checker(item_type)
result = checker.check(expected_value, is_conflict=(check_type == "conflict"))
results.append({
'type': check_type,
'name': item_type,
'success': result['success'],
'actual': result['actual'],
'expected': result['expected'],
'error': result.get('error', '')
})
except Exception as e:
self.logger.error(f"Error when checking {item_type}: {str(e)}")
results.append({
'type': check_type,
'name': item_type,
'success': False,
'error': str(e)
})
return results
def verify_deps(self, dependencies):
return self._verify(dependencies, "dependency")
def verify_conflicts(self, conflicts):
return self._verify(conflicts, "conflict")
def format_results(self, results):
message = ""
for result in results:
label = {
'dependency': f"{result['name']} {_("dependency")}",
'conflict': f"{result['name']} {_("conflict")}"
}[result['type']]
status = "✓" if result['success'] else "✕"
message += f"{label} {status}\n"
if 'actual' in result:
message += f" {_("Actual")} {result['actual']}\n"
message += f" {_("Expected")}: {result['expected']}\n"
if result['error']:
message += f" {_("Error")} {result['error']}\n"
return message
class DependencyChecker:
def check(self, expected_value, is_conflict=False):
raise NotImplementedError("check() method must be implemented in the subclass!")
from .base import DependencyChecker
class BinaryChecker(DependencyChecker):
def check(self, expected_value, is_conflict=False):
from shutil import which
binary_path = which(expected_value)
if isinstance(expected_value, (str, bytes)):
expected_values = (expected_value,)
else:
expected_values = tuple(expected_value)
if is_conflict:
success = binary_path is None
else:
success = binary_path is not None
return {
'success': success,
'actual': binary_path,
'expected': expected_values
}
from .base import DependencyChecker
class OSReleaseChecker(DependencyChecker):
def check(self, expected_value='Etersoft Ximper', is_conflict=False):
from ..backends.file import FileBackend
actual_name = FileBackend({'file_path': '/etc/os-release'}).get_value('NAME', 's')
if isinstance(expected_value, (str, bytes)):
expected_values = (expected_value,)
else:
expected_values = tuple(expected_value)
if is_conflict:
success = actual_name not in expected_values
else:
success = actual_name in expected_values
return {
'success': success,
'actual': actual_name,
'expected': expected_values
}
from .base import DependencyChecker
class PathChecker(DependencyChecker):
def check(self, expected_value, is_conflict=False):
from os import path
file_path = path.expanduser(expected_value)
file_path = path.expandvars(file_path)
file_exists = path.exists(file_path)
if isinstance(expected_value, (str, bytes)):
expected_values = (expected_value,)
else:
expected_values = tuple(expected_value)
if is_conflict:
success = not file_exists
else:
success = file_exists
return {
'success': success,
'actual': file_path,
'expected': expected_values,
'exists': file_exists
}
import logging
from gi.repository import GLib
import traceback
from .module import Module
from .page import Page
from .sections import SectionFactory
from .tools.yml_tools import load_modules
from .widgets.deps_alert_dialog import TuneItDepsAlertDialog
from .deps import DependencyManager
logger = logging.getLogger("init_settings_stack")
def init_settings_stack(stack, listbox, split_view):
yaml_data = load_modules()
section_factory = SectionFactory()
modules_dict = {}
pages_dict = {}
dep_manager = DependencyManager()
if stack.get_pages():
logger.info("Clear pages...")
listbox.remove_all()
for page in stack.get_pages(): stack.remove(page)
else:
logger.info("First init...")
current_module_index = 0
modules = list(yaml_data)
window = listbox.get_root()
def process_next_module():
nonlocal current_module_index
if current_module_index >= len(modules):
finalize_processing()
return
module_data = modules[current_module_index]
current_module_index += 1
try:
deps_results = dep_manager.verify_deps(module_data.get('deps', {}))
conflicts_results = dep_manager.verify_conflicts(module_data.get('conflicts', {}))
deps_message = dep_manager.format_results(deps_results)
conflicts_message = dep_manager.format_results(conflicts_results)
all_deps_ok = all(r['success'] for r in deps_results)
all_conflicts_ok = all(r['success'] for r in conflicts_results)
if all_deps_ok and all_conflicts_ok:
process_module(module_data)
GLib.idle_add(process_next_module)
else:
show_dialog(module_data, deps_message, conflicts_message)
except Exception as e:
handle_error(e, module_data)
GLib.idle_add(process_next_module)
def show_dialog(module_data, deps_msg, conflicts_msg):
dialog = TuneItDepsAlertDialog()
dialog.set_body(module_data['name'])
dialog.deps_message_textbuffer.set_text(f"{deps_msg}\n{conflicts_msg}")
def handle_response(response):
if response == "skip":
GLib.idle_add(process_next_module)
else:
process_module(module_data)
GLib.idle_add(process_next_module)
dialog.ask_user(window, handle_response)
def process_module(module_data):
module = Module(module_data)
modules_dict[module.name] = module
for section_data in module_data.get('sections', []):
page_name = module.get_translation(section_data.get('page', 'Default'))
module_page_name = section_data.get('page', 'Default')
if page_name not in pages_dict:
page_info = module.pages.get(f"_{module_page_name}", {}) or module.pages.get(module_page_name, {})
page = Page(name=page_name, icon=page_info.get('icon'))
pages_dict[page_name] = page
section = section_factory.create_section(section_data, module)
pages_dict[page_name].add_section(section)
def finalize_processing():
pages = sorted(pages_dict.values(), key=lambda p: p.name)
for page in pages:
page.sort_sections()
page.create_stack_page(stack, listbox)
if not stack:
logger.error("settings_pagestack не найден.")
def on_row_selected(listbox, row):
if row:
page_id = row.props.name
visible_child = stack.get_child_by_name(page_id)
if visible_child:
stack.set_visible_child(visible_child)
split_view.set_show_content(True)
listbox.connect("row-selected", on_row_selected)
def handle_error(e, module_data):
from ..main import get_error
error = get_error()
full_traceback = traceback.format_exc()
error_msg = f"Module '{module_data['name']}' loading error\nError: {e}\nFull traceback:\n{full_traceback}"
error(error_msg)
GLib.idle_add(process_next_module)
\ No newline at end of file
import gettext
import locale
import logging
import os
class Module:
def __init__(self, module_data):
self.name = module_data['name']
self.logger = logging.getLogger(f"{self.__class__.__name__}[{self.name}]")
self.weight = module_data.get('weight', 0)
self.path = module_data.get("module_path")
self.logger.debug(self.path)
self.pages = {
page['name']: page for page in module_data.get('pages', [])
}
self.sections = []
self.system_lang_code = self.get_system_language()
def add_section(self, section):
self.sections.append(section)
def get_sorted_sections(self):
return sorted(self.sections, key=lambda s: s.weight)
@staticmethod
def get_system_language():
lang, _ = locale.getdefaultlocale()
return lang.split('_')[0] if lang else 'en'
def get_translation(self, text, lang_code=None):
if text.startswith('_'):
text = text[1:]
locales_path = os.path.join(self.path, "locale")
if os.path.exists(locales_path):
text = gettext.translation(
domain='messages',
localedir=locales_path,
fallback=True
).gettext(text)
return text
import logging
from gi.repository import GLib, Adw
from .widgets.panel_row import TuneItPanelRow
class Page:
def __init__(self, name, icon=None):
self.name = name
self.icon = icon or "preferences-system" # Значение по умолчанию
self.logger = logging.getLogger(f"{self.__class__.__name__}[{self.name}]")
self.sections = []
def add_section(self, section):
self.sections.append(section)
def sort_sections(self):
self.sections = sorted(self.sections, key=lambda s: s.weight)
def create_stack_page(self, stack, listbox):
pref_page = Adw.PreferencesPage()
not_empty = False
for section in self.sections:
preferences_group = section.create_preferences_group()
if preferences_group:
pref_page.add(preferences_group)
not_empty = True
else:
self.logger.warn(f"Секция {section.name} не создала виджетов.")
if not_empty:
self.update_ui(stack, listbox, pref_page)
else:
self.logger.warn(f"the page {self.name} is empty, ignored")
def update_ui(self, stack, listbox, pref_page):
stack.add_titled(pref_page, self.name, self.name)
row = TuneItPanelRow()
row.props.name = self.name
row.props.title = self.name
row.props.icon_name = self.icon
listbox.append(row)
import logging
import os
import fnmatch
from .backends import FileBackend
class Searcher:
def __init__(self, search_paths, exclude_paths, exclude_names, recursive):
self.search_paths = [
os.path.expanduser(os.path.expandvars(path))
for path in search_paths
]
self.exclude_paths = [
os.path.expanduser(os.path.expandvars(path))
for path in exclude_paths
]
self.exclude_names = exclude_names
self.recursive = recursive
def is_excluded(self, path):
abs_path = os.path.abspath(path)
for excluded in self.exclude_paths:
abs_excluded = os.path.abspath(excluded)
if abs_path == abs_excluded or abs_path.startswith(abs_excluded + os.sep):
return True
path_parts = os.path.normpath(abs_path).split(os.sep)
if any(part in self.exclude_names for part in path_parts):
return True
return False
class DirSearcher(Searcher):
def search(self):
result = []
for base_path in self.search_paths:
if not os.path.isdir(base_path) or self.is_excluded(base_path):
continue
if self.recursive:
for root, dirs, _ in os.walk(base_path):
dirs[:] = [d for d in dirs if not self.is_excluded(os.path.join(root, d))]
if self.is_excluded(root):
continue
result.append(root)
else:
try:
result.extend([
os.path.join(base_path, d)
for d in os.listdir(base_path)
if os.path.isdir(os.path.join(base_path, d))
and not self.is_excluded(os.path.join(base_path, d))
])
except PermissionError:
pass
return result
class FileSearcher(Searcher):
def __init__(self, search_paths, exclude_paths, exclude_names, recursive, pattern):
super().__init__(search_paths, exclude_paths, exclude_names, recursive)
self.pattern = pattern
def search(self):
result = []
for base_path in self.search_paths:
if not os.path.exists(base_path):
continue
if self.recursive and os.path.isdir(base_path):
for root, dirs, files in os.walk(base_path):
dirs[:] = [d for d in dirs if not self.is_excluded(os.path.join(root, d))]
if self.is_excluded(root):
continue
result.extend([
os.path.join(root, f)
for f in files
if fnmatch.fnmatch(f, self.pattern)
and not self.is_excluded(os.path.join(root, f))
])
else:
if os.path.isfile(base_path):
filename = os.path.basename(base_path)
if fnmatch.fnmatch(filename, self.pattern) and not self.is_excluded(base_path):
result.append(base_path)
else:
try:
result.extend([
os.path.join(base_path, f)
for f in os.listdir(base_path)
if os.path.isfile(os.path.join(base_path, f))
and fnmatch.fnmatch(f, self.pattern)
and not self.is_excluded(os.path.join(base_path, f))
])
except PermissionError:
pass
return result
class ValueInFileSearcher(Searcher):
def __init__(self, search_paths, exclude_paths, exclude_names, recursive,
file_pattern, key, exclude_neighbor_files=None):
super().__init__(search_paths, exclude_paths, exclude_names, recursive)
self.logger = logging.getLogger(f"{self.__class__.__name__}")
self.file_pattern = file_pattern
self.key = key
self.exclude_neighbor_files = exclude_neighbor_files or []
def has_exclude_neighbor(self, file_path):
dir_path = os.path.dirname(file_path)
return any(
os.path.exists(os.path.join(dir_path, neighbor))
for neighbor in self.exclude_neighbor_files
)
def search(self):
result = []
file_searcher = FileSearcher(
self.search_paths,
self.exclude_paths,
self.exclude_names,
self.recursive,
self.file_pattern
)
for file_path in file_searcher.search():
try:
if self.exclude_neighbor_files and self.has_exclude_neighbor(file_path):
continue
result.append(FileBackend(params={'file_path': file_path}).get_value(self.key, 's'))
except Exception as e:
self.logger.error(f"Error processing {file_path}: {str(e)}")
continue
return result
class SearcherFactory:
_searchers = {
'dir': DirSearcher,
'file': FileSearcher,
'value_in_file': ValueInFileSearcher
}
@classmethod
def create(cls, config):
searcher_type = config['type']
params = {
'search_paths': config['search_paths'],
'exclude_paths': config.get('exclude_paths', []),
'exclude_names': config.get('exclude_names', []),
'recursive': config.get('recursive', True)
}
if searcher_type == 'file':
params['pattern'] = config['pattern']
elif searcher_type == 'value_in_file':
params['file_pattern'] = config['file_pattern']
params['key'] = config['key']
params['exclude_neighbor_files'] = config.get('exclude_neighbor_files', [])
return cls._searchers[searcher_type](**params)
from .classic import ClassicSection
from .custom import CustomSection
from .dynamic import DynamicSection
class SectionFactory:
def __init__(self):
self.sections = {
'classic': ClassicSection,
'custom': CustomSection,
'dynamic': DynamicSection,
}
def create_section(self, section_data, module):
section_type = section_data.get('type', 'classic')
section = self.sections.get(section_type)
if not section:
raise ValueError(f"Unknown type of section: {section_type}")
return section(section_data, module)
\ No newline at end of file
class BaseSection():
def __init__(self, section_data, module):
self.section_data = section_data
self.module = module
self.settings = []
self.name = module.get_translation(section_data['name'])
self.weight = section_data.get('weight', 0)
self.page = section_data.get('page')
import logging
from gi.repository import Adw
from ..setting.setting import Setting
from .base import BaseSection
class ClassicSection(BaseSection):
def __init__(self, section_data, module):
super().__init__(section_data, module)
self.logger = logging.getLogger(f"{self.__class__.__name__}[{self.name}]")
self.settings = [Setting(s, module) for s in section_data.get('settings', [])]
self.settings = sorted(self.settings, key=lambda s: s.weight, reverse=True)
self.module.add_section(self)
def create_preferences_group(self):
group = Adw.PreferencesGroup(title=self.name, description=self.module.name)
not_empty = False
for setting in self.settings:
row = setting.create_row()
if row:
self.logger.debug(f"Adding a row for setting: {setting.name}")
group.add(row)
not_empty = True
else:
self.logger.debug(f"Failed to create a row for setting: {setting.name}")
if not_empty:
return group
else:
return None
from gi.repository import Adw
from ..setting.custom_setting import CustomSetting
from .base import BaseSection
import logging
class CustomSection(BaseSection):
settings = []
def __init__(self, section_data, module):
super().__init__(section_data, module)
self.logger = logging.getLogger(f"{self.__class__.__name__}[{self.name}]")
self.settings = [CustomSetting(s, module, self) for s in section_data.get('settings', [])]
self.settings_dict = {s.orig_name: s for s in self.settings}
self.module.add_section(self)
self._callback_buffer = []
def create_preferences_group(self):
group = Adw.PreferencesGroup(title=self.name, description=self.module.name)
not_empty = False
for setting in self.settings:
try:
row = setting.create_row()
if row:
self.logger.debug(f"Adding a row for setting: {setting.name}")
group.add(row)
not_empty = True
except Exception as e:
self.logger.error(f"Error creating row for {setting.orig_name}: {str(e)}")
self._process_buffered_callbacks()
return group if not_empty else None
def handle_callback(self, action, target, value):
self.logger.debug(f"handled callback action={action}, target={target}, value={value}")
try:
if target not in self.settings_dict:
self._callback_buffer.append((action, target, value))
self.logger.debug(f"Buffering callback for {target}")
return
self._apply_callback(action, target, value)
except Exception as e:
self.logger.error(f"Callback handling error: {str(e)}")
def _apply_callback(self, action, target, value):
setting = self.settings_dict[target]
if action == 'set':
setting.create_row = value
setting._update_widget()
elif action == 'set_apply':
setting.set_value(value)
elif action == 'visible':
if setting.row:
setting.row.set_visible(value.lower() == 'true')
elif action == 'enabled':
if setting.row:
setting.row.set_sensitive(value.lower() == 'true')
else:
self.logger.warning(f"Unknown callback action: {action}")
def _process_buffered_callbacks(self):
while self._callback_buffer:
action, target, value = self._callback_buffer.pop(0)
try:
if target in self.settings_dict:
self._apply_callback(action, target, value)
else:
self.logger.warning(f"Unknown target after processing buffer: {target}")
except Exception as e:
self.logger.error(f"Error processing buffered callback: {str(e)}")
def get_all_values(self):
return {
setting.orig_name: setting._current_value
for setting in self.settings
}
import json
import logging
import re
import subprocess
from .base import BaseSection
from .custom import CustomSection
from ..setting.custom_setting import CustomSetting
class DynamicSection(CustomSection):
"""
Section that dynamically generates settings from a command output.
The generator_command should return a JSON array of objects.
Each object is used to populate the setting_template.
Example YAML:
sections:
- name: "Network Interfaces"
type: dynamic
generator_command: "ip -j link show"
setting_template:
name: "{ifname}"
type: boolean
get_command: "ip link show {ifname} | grep -q UP && echo True || echo False"
set_command: "ip link set {ifname} {value}"
"""
def __init__(self, section_data, module):
# Вызываем BaseSection напрямую, минуя CustomSection.__init__,
# который создаёт settings из section_data
BaseSection.__init__(self, section_data, module)
self.logger = logging.getLogger(f"{self.__class__.__name__}[{self.name}]")
self.generator_command = section_data.get('generator_command')
self.setting_template = section_data.get('setting_template', {})
self._generate_settings()
self.settings_dict = {s.orig_name: s for s in self.settings}
self._callback_buffer = []
self.module.add_section(self)
def _generate_settings(self):
items = self._execute_generator()
if not items:
self.logger.warning("Generator returned no items")
return
for item in items:
try:
setting_data = self._apply_template(self.setting_template, item)
setting = CustomSetting(setting_data, self.module, self)
self.settings.append(setting)
except Exception as e:
self.logger.error(f"Error creating setting from item {item}: {e}")
self.settings = sorted(self.settings, key=lambda s: s.weight, reverse=True)
self.logger.info(f"Generated {len(self.settings)} settings")
def _execute_generator(self):
if not self.generator_command:
self.logger.error("No generator_command specified")
return []
try:
result = subprocess.run(
self.generator_command,
shell=True,
capture_output=True,
text=True,
timeout=30
)
if result.returncode != 0:
self.logger.error(f"Generator command failed: {result.stderr}")
return []
output = result.stdout.strip()
if not output:
return []
data = json.loads(output)
# Normalize to list
if isinstance(data, dict):
# Single object -> wrap in list
return [data]
elif isinstance(data, list):
# Normalize simple lists: ["a", "b"] -> [{"_item": "a"}, {"_item": "b"}]
if data and not isinstance(data[0], dict):
return [{"_item": item, "_index": i} for i, item in enumerate(data)]
return data
else:
# Scalar value
return [{"_item": data, "_index": 0}]
except json.JSONDecodeError as e:
self.logger.error(f"Failed to parse generator output as JSON: {e}")
# Try line-by-line parsing
lines = result.stdout.strip().split('\n')
if lines:
return [{"_item": line.strip(), "_index": i} for i, line in enumerate(lines) if line.strip()]
return []
except subprocess.TimeoutExpired:
self.logger.error("Generator command timed out")
return []
except Exception as e:
self.logger.error(f"Generator execution error: {e}")
return []
def _apply_template(self, template, item):
"""
Recursively apply item values to template.
- "{key}" in string context -> string substitution
- "{key}" as entire value -> preserves type (dict, list, etc.)
- Supports nested paths: "{a.b.c}"
"""
if isinstance(template, str):
# Check if entire string is a single placeholder
match = re.fullmatch(r'\{(\w+(?:\.\w+)*)\}', template.strip())
if match:
# Return value as-is (preserves type)
value = self._get_nested(item, match.group(1))
if value is not None:
return value
return template
# String interpolation with multiple placeholders
return self._format_string(template, item)
elif isinstance(template, dict):
return {k: self._apply_template(v, item) for k, v in template.items()}
elif isinstance(template, list):
return [self._apply_template(v, item) for v in template]
return template
def _format_string(self, template, item):
"""Format string with item values, supporting nested paths"""
def replacer(match):
path = match.group(1)
value = self._get_nested(item, path)
if value is not None:
return str(value)
return match.group(0)
return re.sub(r'\{(\w+(?:\.\w+)*)\}', replacer, template)
def _get_nested(self, data, path):
"""Get value by dot-separated path: 'a.b.c' -> data['a']['b']['c']"""
try:
for key in path.split('.'):
if isinstance(data, dict):
data = data.get(key)
elif isinstance(data, (list, tuple)) and key.isdigit():
data = data[int(key)]
else:
return None
if data is None:
return None
return data
except (KeyError, IndexError, TypeError):
return None
import logging
import threading
import time
from ..searcher import SearcherFactory
from gi.repository import GLib
class BaseSetting:
def __init__(self, setting_data, module):
self._ = module.get_translation
self.module = module
self.name = self._(setting_data['name'])
self.orig_name = setting_data['name']
self.logger = logging.getLogger(f"{self.__class__.__name__}[{self.name}]")
self.weight = setting_data.get('weight', 1)
self.widget = None
self.root = setting_data.get('root', False)
self.params = {
**setting_data.get('params', {}),
'module_path': module.path
}
self.type = setting_data['type']
self.help = setting_data.get('help', None)
if self.help is not None:
self.help = self._(self.help)
self.default = setting_data.get('default')
self.gtype = setting_data.get('gtype', [])
self._current_value = None
self.search_target = setting_data.get('search_target', None)
self.map = setting_data.get('map')
self.previews = setting_data.get('previews')
self.prepare_map()
self.update_interval = setting_data.get('update_interval', None)
if self.update_interval:
self._start_update_thread()
def prepare_map(self):
if self.map is None:
if self.search_target is not None:
self.map = SearcherFactory.create(self.search_target).search()
else:
self.map = self._default_map()
if isinstance(self.map, list) and ('choice' in self.type or 'list_dual' in self.type):
self.map = {
item.title(): item for item in self.map
if item is not None
}
if not self.map:
self.logger.warning(f"Warning: 'map' is empty for setting {self.name}. Check data source.")
if isinstance(self.map, dict) and ('choice' in self.type or 'list_dual' in self.type):
self.map = {
self._(key) if isinstance(key, str) else key: value
for key, value in self.map.items()
}
if not self.map:
self.logger.warning(f"Warning: 'map' is empty for setting {self.name}. Check data source.")
if len(self.gtype) > 2:
self.gtype = self.gtype[0]
else:
self.gtype = self.gtype
def _default_map(self):
if self.type == 'boolean':
# Дефолтная карта для булевых настроек
return {True: True, False: False}
if 'choice' in self.type or 'list_dual' in self.type:
map = {}
range = self._get_backend_range()
if range is None:
return {}
for var in range:
key = var
display_name = var[0].upper() + var[1:] if var else ''
map[key] = display_name
return map
if self.type == 'number':
map = {}
range = self._get_backend_range()
if range is None:
return {}
map["upper"] = range[1]
map["lower"] = range[0]
# Кол-во после запятой
map["digits"] = len(str(range[0]).split('.')[-1]) if '.' in str(range[0]) else 0
# Минимальное число с этим количеством
map["step"] = 10 ** -map["digits"] if map["digits"] > 0 else 0
return map
return {}
def _get_selected_row_index(self):
current_value = self._get_backend_value()
return list(self.map.values()).index(current_value) if current_value in self.map.values() else 0
def _get_default_row_index(self):
return list(self.map.values()).index(self.default) if self.default in self.map.values() else None
def _start_update_thread(self):
def update_loop():
while True:
time.sleep(self.update_interval)
prev_value = self._current_value
current_value = self._get_backend_value(force=True)
if current_value != prev_value:
GLib.idle_add(self._update_widget)
thread = threading.Thread(target=update_loop, daemon=True)
thread.start()
def _update_widget(self):
if self.widget:
self.widget.update_display()
return False
from .base import BaseSetting
from .widgets import WidgetFactory
import logging
import subprocess
import ast
class CustomSetting(BaseSetting):
def __init__(self, setting_data, module, section):
self.section = section
self.get_command = setting_data.get('get_command')
self.get_range_command = setting_data.get('get_range_command')
self.set_command = setting_data.get('set_command')
super().__init__(setting_data, module)
def create_row(self):
try:
self.widget = WidgetFactory.create_widget(self)
if self.widget:
self.row = self.widget.create_row()
return self.row
except Exception as e:
self.logger.error(f"Error creating row: {str(e)}")
return None
def get_value(self):
if self._current_value is None:
self._current_value = self._execute_get_command()
return self._current_value
def set_value(self, value):
success = self._execute_set_command(value)
if success:
self._current_value = value
self._update_widget()
def get_range(self):
return self._execute_get_range_command()
def _execute_command(self, cmd, capture_output=True):
with subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
shell=True,
bufsize=1
) as p:
output = []
def process_line(line):
line = line.strip()
if not line:
return
if line.startswith('CALLBACK:'):
self._handle_callback(line)
elif line.startswith('NOTIFY:'):
self._handle_notify(line)
elif capture_output:
output.append(line)
while p.poll() is None:
line = p.stdout.readline()
process_line(line)
for line in p.stdout.read().splitlines():
process_line(line)
stderr = p.stderr.read().strip()
if p.returncode != 0:
raise subprocess.CalledProcessError(p.returncode, cmd, stderr=stderr)
return '\n'.join(output) if capture_output else ''
def _execute_get_command(self):
if not self.get_command:
return self.default
try:
cmd = self._format_command(self.get_command)
output = self._execute_command(cmd)
try: value = ast.literal_eval(output)
except Exception as e:
value = output
self.logger.info(f"GET: {output} with error {e}")
return value
except subprocess.CalledProcessError as e:
self.logger.error(f"Get command failed: {e.stderr}")
return self.default
def _execute_set_command(self, value):
if not self.set_command:
return False
try:
cmd = self._format_command(self.set_command, value)
self._execute_command(cmd, capture_output=False)
return True
except subprocess.CalledProcessError as e:
self.logger.error(f"Set command failed: {e.stderr}")
return False
def _execute_get_range_command(self):
if not self.get_range_command:
return None
try:
cmd = self._format_command(self.get_range_command)
output = self._execute_command(cmd)
return ast.literal_eval(output)
except subprocess.CalledProcessError as e:
self.logger.error(f"Get range command failed: {e.stderr}")
return None
def _format_command(self, template, value=None):
variables = {
'value': value,
**self.params,
**self.section.get_all_values()
}
return template.format(**variables)
def _handle_callback(self, line):
try:
_, action, target, value = line.split(':', 3)
self.section.handle_callback(
action.strip(),
target.strip(),
value.strip()
)
except ValueError:
self.logger.error(f"Invalid callback format: {line}")
def _handle_notify(self, line):
self.logger.debug("handled notify")
try:
parts = line.split(':', 2)
parts += [None] * (3 - len(parts))
_, notification, seconds = parts
from ...main import get_main_window
get_main_window().setting_notify(
self.module.name,
self._(notification),
int(seconds) if seconds else None
)
except ValueError:
self.logger.error(f"Invalid notify format: {line}")
@property
def current_value(self):
return self.get_value()
def _get_backend_value(self):
return self.get_value()
def _set_backend_value(self, value):
self.set_value(value)
def _get_backend_range(self):
return self.get_range()
from .base import BaseSetting
from .widgets import WidgetFactory
from ..backends import backend_factory
from ..daemon_client import dclient
from ..tools.gvariant import convert_by_gvariant
from ..widgets.service_dialog import ServiceNotStartedDialog
service_stopped = False
class Setting(BaseSetting):
def __init__(self, setting_data, module):
self.backend = setting_data.get('backend')
self.key = setting_data.get('key')
super().__init__(setting_data, module)
def create_row(self):
if self.root is True:
self.logger.info("Root is true")
if dclient is not None:
self.widget = WidgetFactory.create_widget(self)
return self.widget.create_row() if self.widget else None
else:
global service_stopped
if service_stopped is False:
from ...main import get_main_window
dialog = ServiceNotStartedDialog()
dialog.present(get_main_window())
service_stopped = True
return None
self.widget = WidgetFactory.create_widget(self)
return self.widget.create_row() if self.widget else None
def _get_backend_value(self, force=False):
if self._current_value is None or force is True:
backend = self._get_backend()
value = self.default
if backend:
backend_value = backend.get_value(self.key, self.gtype)
if isinstance(backend_value, list):
if all(v is not None for v in backend_value):
value = backend_value
elif backend_value is not None:
value = backend_value
self._current_value = value
return self._current_value
def _get_backend_range(self):
backend = self._get_backend()
if backend:
return backend.get_range(self.key, self.gtype)
def _set_backend_value(self, value):
self.logger.info(f"SET VALUE {value}")
backend = self._get_backend()
if backend:
backend.set_value(self.key, value, self.gtype)
self._current_value = value
def _get_backend(self):
if self.root is True:
backend = dclient
backend.set_backend_name(self.backend)
backend.set_backend_params(self.params)
else:
backend = backend_factory.get_backend(self.backend, self.params)
if not backend:
self.logger.error(f"Бекенд {self.backend} не зарегистрирован.")
return backend
from gi.repository import Adw, Gtk, Gdk
from gi.repository import Gio
import os
from .BaseWidget import BaseWidget
class AvatarWidget(BaseWidget):
def create_row(self):
row = Adw.PreferencesRow(
activatable=False,
focusable=False
)
control_box = Gtk.Box(
spacing=6,
orientation=Gtk.Orientation.HORIZONTAL,
halign=Gtk.Align.CENTER,
hexpand=True,
margin_top=12,
margin_bottom=12,
)
self.avatar = Adw.Avatar(
size=128,
text=None
)
self.change_button = Gtk.Button(
icon_name="edit-symbolic",
valign=Gtk.Align.END,
halign=Gtk.Align.END,
margin_bottom=4,
margin_end=4,
tooltip_text="Change your avatar"
)
self.change_button.get_style_context().add_class("circular")
self.change_button.get_style_context().add_class("secondary")
self.change_button.connect("clicked", self._on_change_clicked)
self.reset_button = Gtk.Button(
icon_name="user-trash-symbolic",
valign=Gtk.Align.START,
halign=Gtk.Align.END,
margin_top=4,
margin_end=4,
tooltip_text="Reset the avatar"
)
self.reset_button.get_style_context().add_class("circular")
self.reset_button.get_style_context().add_class("destructive-action")
self.reset_button.connect("clicked", self._on_reset_clicked)
self.reset_revealer = Gtk.Revealer(
transition_type=Gtk.RevealerTransitionType.CROSSFADE,
transition_duration=150,
child=self.reset_button,
reveal_child=False,
valign=Gtk.Align.START,
halign=Gtk.Align.END
)
overlay = Gtk.Overlay()
overlay.set_child(self.avatar)
overlay.add_overlay(self.reset_revealer)
overlay.add_overlay(self.change_button)
control_box.append(overlay)
row.set_child(control_box)
self.update_display()
self._update_reset_visibility()
return row
def update_display(self):
current = self.setting._get_backend_value()
if current and isinstance(current, str) and current.startswith("file://"):
current = current[7:]
self.value_separated = True
if current and os.path.exists(current):
file = Gio.File.new_for_path(current)
try:
texture = Gdk.Texture.new_from_file(file)
self.avatar.set_custom_image(texture)
except Exception as e:
self.avatar.set_custom_image(None)
else:
self.avatar.set_custom_image(None)
self._update_reset_visibility()
def _on_reset_clicked(self, button):
default_value = self.setting.default
if default_value is not None:
if isinstance(default_value, str) and default_value.startswith("file://"):
default_value = default_value[7:]
self.setting._set_backend_value(default_value)
self.update_display()
self._update_reset_visibility()
def _update_reset_visibility(self):
current_value = self.setting._get_backend_value()
default_value = self.setting.default
if isinstance(current_value, str) and current_value.startswith("file://"):
current_value = current_value[7:]
if current_value:
current_value = os.path.expanduser(current_value)
current_value = os.path.expandvars(current_value)
if default_value:
default_value = os.path.expanduser(default_value)
default_value = os.path.expandvars(default_value)
self.reset_revealer.set_reveal_child(
current_value != default_value if default_value is not None
else False
)
def _on_change_clicked(self, button):
dialog = Gtk.FileDialog()
# Настройка фильтров
if 'extensions' in self.setting.map:
filters = self._create_file_filters()
if filters.get_n_items() > 0:
dialog.set_filters(filters)
dialog.set_default_filter(filters.get_item(0))
try:
dialog.open(
parent=button.get_root(),
callback=self._on_file_selected
)
except Exception as e:
self.logger.error(f"File dialog error: {e}")
def _create_file_filters(self):
filters = Gio.ListStore.new(Gtk.FileFilter)
patterns = [p for p in self.setting.map.get('extensions', [])]
if patterns:
file_filter = Gtk.FileFilter()
file_filter.set_name("Supported files")
for pattern in patterns:
if pattern.startswith('.'):
file_filter.add_suffix(pattern[1:])
else:
file_filter.add_pattern(pattern)
filters.append(file_filter)
return filters
def _on_file_selected(self, dialog, result):
try:
file = dialog.open_finish(result)
if file:
self.setting._set_backend_value(file.get_path())
self.update_display()
except Exception as e:
self.logger.error(f"File selection error: {e}")
import logging
from gi.repository import Gtk
class BaseWidget:
def __init__(self, setting):
self.setting = setting
self.logger = logging.getLogger(f"{self.__class__.__name__}")
self.reset_button = Gtk.Button(
icon_name="edit-undo-symbolic",
valign=Gtk.Align.CENTER,
halign=Gtk.Align.CENTER,
tooltip_text=_("Restore Default")
)
self.reset_button.add_css_class('flat')
self.reset_button.connect("clicked", self._on_reset_clicked)
self.reset_revealer = Gtk.Revealer(
transition_type=Gtk.RevealerTransitionType.CROSSFADE,
transition_duration=150,
child=self.reset_button,
reveal_child=False,
halign=Gtk.Align.END
)
def update_display(self):
raise NotImplementedError("update_display method should be implemented in the subclass")
def set_visible(self, visible: bool):
self.row.set_visible(visible)
def set_enabled(self, enabled: bool):
self.row.set_sensitive(enabled)
def create_row(self):
raise NotImplementedError("create_row method should be implemented in the subclass")
def _on_reset_clicked(self, button):
raise NotImplementedError("_on_reset_clicked method should be implemented in the subclass")
from gi.repository import Adw, Gtk
from .BaseWidget import BaseWidget
class BooleanWidget(BaseWidget):
def create_row(self):
self.row = Adw.ActionRow(title=self.setting.name, subtitle=self.setting.help)
self.switch = Gtk.Switch(
valign=Gtk.Align.CENTER,
halign=Gtk.Align.CENTER,
)
self.handler_id = self.switch.connect("notify::active", self._on_boolean_toggled)
self.row.set_activatable_widget(self.switch)
control_box = Gtk.Box(spacing=6, orientation=Gtk.Orientation.HORIZONTAL)
control_box.append(self.reset_revealer)
control_box.append(self.switch)
self.row.add_suffix(control_box)
self._update_initial_state()
return self.row
def _update_initial_state(self):
current_value = self.setting._get_backend_value()
is_active = current_value == self.setting.map.get(True)
with self.switch.handler_block(self.handler_id):
self.switch.set_active(is_active)
self._update_reset_visibility()
def update_display(self):
self._update_initial_state()
def _on_boolean_toggled(self, switch, _):
value = self.setting.map.get(True) if switch.get_active() else self.setting.map.get(False)
self.setting._set_backend_value(value)
self._update_reset_visibility()
def _on_reset_clicked(self, button):
default_value = self.setting.map.get(self.setting.default)
self.setting._set_backend_value(default_value)
with self.switch.handler_block(self.handler_id):
self.switch.set_active(self.setting.default)
self._update_reset_visibility()
def _update_reset_visibility(self):
current_value = self.setting._get_backend_value()
default_value = self.setting.map.get(self.setting.default)
self.reset_revealer.set_reveal_child(
current_value != default_value if default_value is not None
else False
)
from gi.repository import Adw, Gtk
from .BaseWidget import BaseWidget
class ButtonWidget(BaseWidget):
def create_row(self):
self.row = Adw.ButtonRow(
title=self.setting.name,
)
self.row.connect("activated", self._on_button_clicked)
return self.row
def _on_button_clicked(self, button):
self.setting._set_backend_value(True)
def update_display(self):
pass
from gi.repository import Adw, Gtk
from .BaseWidget import BaseWidget
class ChoiceWidget(BaseWidget):
def create_row(self):
items = list(self.setting.map.keys())
self.row = Adw.ActionRow(title=self.setting.name, subtitle=self.setting.help)
self.dropdown = Gtk.DropDown.new_from_strings(items)
self.dropdown.set_halign(Gtk.Align.CENTER)
self.dropdown.set_valign(Gtk.Align.CENTER)
self.handler_id = self.dropdown.connect("notify::selected", self._on_choice_changed)
self.row.set_activatable_widget(self.dropdown)
self._update_dropdown_selection()
control_box = Gtk.Box(spacing=6, orientation=Gtk.Orientation.HORIZONTAL)
control_box.append(self.reset_revealer)
control_box.append(self.dropdown)
self.row.add_suffix(control_box)
self._update_reset_visibility()
return self.row
def update_display(self):
self._update_dropdown_selection()
self._update_reset_visibility()
def _update_dropdown_selection(self):
current_index = self.setting._get_selected_row_index()
with self.dropdown.handler_block(self.handler_id):
self.dropdown.set_selected(current_index)
def _on_choice_changed(self, dropdown, _):
selected = dropdown.get_selected()
if selected < 0 or selected >= len(self.setting.map):
return
selected_value = list(self.setting.map.values())[selected]
self.setting._set_backend_value(selected_value)
self._update_reset_visibility()
def _on_reset_clicked(self, button):
default_value = self.setting._get_default_row_index()
if default_value is not None:
with self.dropdown.handler_block(self.handler_id):
self.dropdown.set_selected(default_value)
self.setting._set_backend_value(self.setting.default)
self._update_reset_visibility()
def _update_reset_visibility(self):
current_value = self.setting._get_selected_row_index()
default_value = self.setting._get_default_row_index()
self.reset_revealer.set_reveal_child(
current_value != default_value
if default_value is not None
else False
)
\ No newline at end of file
from gi.repository import Gtk, Adw
from .BaseWidget import BaseWidget
class EntryWidget(BaseWidget):
def create_row(self):
self.row = Adw.ActionRow(title=self.setting.name)
self.entry = Gtk.Entry()
self.entry.set_halign(Gtk.Align.CENTER)
self.entry.set_text(str(self.setting._get_backend_value() or ""))
self.entry.connect("activate", self._on_text_changed)
control_box = Gtk.Box(
orientation=Gtk.Orientation.HORIZONTAL,
spacing=6,
margin_start=12,
valign=Gtk.Align.CENTER,
halign=Gtk.Align.CENTER,
)
control_box.append(self.reset_revealer)
control_box.append(self.entry)
self.row.add_suffix(control_box)
self._update_reset_visibility()
return self.row
def update_display(self):
current_value = self.setting._get_backend_value()
self.entry.set_text(str(current_value) if current_value is not None else "")
self._update_reset_visibility()
def _on_text_changed(self, entry):
new_value = entry.get_text()
self.setting._set_backend_value(new_value)
self._update_reset_visibility()
def _on_reset_clicked(self, button):
default_value = self.setting.default
self.setting._set_backend_value(default_value)
self.entry.set_text(str(default_value) if default_value is not None else "")
self._update_reset_visibility()
def _update_reset_visibility(self):
current_value = self.entry.get_text()
default_value = str(self.setting.default) if self.setting.default is not None else ""
has_default = self.setting.default is not None
is_default = current_value == default_value
self.reset_revealer.set_reveal_child(not is_default and has_default)
\ No newline at end of file
from gi.repository import Adw, Gtk
from gi.repository import Gio
import os
from .BaseWidget import BaseWidget
class FileChooser(BaseWidget):
def create_row(self):
self.value_separated = False
self.multiple_mode = self.setting.map.get('multiple', False)
self.folder_mode = 'folder' in self.setting.map.get('extensions', [])
row = Adw.ActionRow(
title=self.setting.name,
subtitle=self.setting.help,
subtitle_selectable=True
)
control_box = Gtk.Box(
spacing=6,
margin_end=12,
halign=Gtk.Align.END
)
control_box.append(self.reset_revealer)
if not self.multiple_mode and not self.folder_mode:
self.entry = Gtk.Entry(
placeholder_text="Enter path or click to browse",
hexpand=True,
valign=Gtk.Align.CENTER,
halign=Gtk.Align.END,
)
self.entry_handler_id = self.entry.connect("activate", self._on_entry_changed)
control_box.append(self.entry)
else:
self.info_label = Gtk.Label(
label="No selection" if self.folder_mode else "No files selected",
valign=Gtk.Align.CENTER,
css_classes=["dim-label"]
)
control_box.append(self.info_label)
self.select_button = Gtk.Button.new_from_icon_name(
icon_name="folder-open-symbolic"
)
self.select_button.set_valign(Gtk.Align.CENTER)
row.set_activatable_widget(self.select_button)
self.select_button.connect("clicked", self._on_button_clicked)
control_box.append(self.select_button)
row.add_suffix(control_box)
self.update_display()
self._update_reset_visibility()
return row
def _on_reset_clicked(self, button):
default_value = self.setting.default
if default_value is not None:
if isinstance(default_value, str) and default_value.startswith("file://"):
default_value = default_value[7:]
self.setting._set_backend_value(default_value)
self.update_display()
self._update_reset_visibility()
def _update_reset_visibility(self):
current_value = self.setting._get_backend_value()
default_value = self.setting.default
if isinstance(current_value, str) and current_value.startswith("file://"):
current_value = current_value[7:]
if current_value:
current_value = os.path.expanduser(current_value)
current_value = os.path.expandvars(current_value)
if default_value:
default_value = os.path.expanduser(default_value)
default_value = os.path.expandvars(default_value)
self.reset_revealer.set_reveal_child(
current_value != default_value if default_value is not None
else False
)
def update_display(self):
current = self.setting._get_backend_value()
if current and isinstance(current, str) and current.startswith("file://"):
current = current[7:]
self.value_separated = True
if self.folder_mode:
self._update_folder_display(current)
elif self.multiple_mode:
self._update_multiple_files_display(current)
else:
self._update_single_file_display(current)
self._update_reset_visibility()
def _on_button_clicked(self, button):
dialog = Gtk.FileDialog()
# Настройка фильтров
if not self.folder_mode and 'extensions' in self.setting.map:
filters = self._create_file_filters()
if filters.get_n_items() > 0:
dialog.set_filters(filters)
dialog.set_default_filter(filters.get_item(0))
# Установка начальной директории
current = self.setting._get_backend_value()
if current:
try:
current = os.path.expanduser(current)
current = os.path.expandvars(current)
current_file = Gio.File.new_for_path(current)
parent = current_file.get_parent() if not self.folder_mode else current_file
dialog.set_initial_folder(parent)
except Exception as e:
self.logger.error(f"Error setting initial folder: {e}")
# Выбор метода открытия
try:
if self.folder_mode:
dialog.select_folder(
parent=button.get_root(),
callback=self._on_folder_selected
)
elif self.multiple_mode:
dialog.open_multiple(
parent=button.get_root(),
callback=self._on_files_selected
)
else:
dialog.open(
parent=button.get_root(),
callback=self._on_file_selected
)
except Exception as e:
self.logger.error(f"File dialog error: {e}")
def _create_file_filters(self):
filters = Gio.ListStore.new(Gtk.FileFilter)
patterns = [p for p in self.setting.map.get('extensions', []) if p != 'folder']
if patterns:
file_filter = Gtk.FileFilter()
file_filter.set_name("Supported files")
for pattern in patterns:
if pattern.startswith('.'):
file_filter.add_suffix(pattern[1:])
else:
file_filter.add_pattern(pattern)
filters.append(file_filter)
return filters
def _on_file_selected(self, dialog, result):
try:
file = dialog.open_finish(result)
if file:
self.setting._set_backend_value(file.get_path())
self.update_display()
except Exception as e:
self.logger.error(f"File selection error: {e}")
def _on_files_selected(self, dialog, result):
try:
file_list = dialog.open_multiple_finish(result)
if file_list:
paths = [f.get_path() for f in file_list]
self.setting._set_backend_value(paths)
self.update_display()
except Exception as e:
self.logger.error(f"Multiple files selection error: {e}")
def _on_folder_selected(self, dialog, result):
try:
folder = dialog.select_folder_finish(result)
if folder:
self.setting._set_backend_value(folder.get_path())
self.update_display()
except Exception as e:
self.logger.error(f"Folder selection error: {e}")
def _update_folder_display(self, current):
if current:
folder = Gio.File.new_for_path(current)
self.info_label.set_label(folder.get_basename() or current)
else:
self.info_label.set_label("No folder selected")
def _update_multiple_files_display(self, current):
count = len(current) if current else 0
self.info_label.set_label(f"{count} files selected" if count else "No files selected")
def _update_single_file_display(self, current):
with self.entry.handler_block(self.entry_handler_id):
self.entry.set_text(current or "")
if current:
file = Gio.File.new_for_path(current)
self.entry.set_tooltip_text(file.get_parse_name())
else:
self.entry.set_tooltip_text(None)
def _on_entry_changed(self, entry):
if not self.folder_mode and not self.multiple_mode:
path = entry.get_text().strip()
self.setting._set_backend_value(path)
self._update_reset_visibility()
\ No newline at end of file
from gi.repository import Adw, Gtk, Gdk
from gi.repository import Gio
import os
from .BaseWidget import BaseWidget
class ImageChooserWidget(BaseWidget):
def create_row(self):
self.has_file_prefix = False
self.row = Adw.PreferencesRow(
activatable=False,
focusable=False
)
main_box = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
spacing=12,
margin_top=12,
margin_bottom=12,
margin_start=12,
margin_end=12,
)
if self.setting.name:
title_box = Gtk.Box(
orientation=Gtk.Orientation.HORIZONTAL,
spacing=6,
)
title_label = Gtk.Label(
label=self.setting.name,
halign=Gtk.Align.START,
hexpand=True,
css_classes=["title-4"]
)
title_box.append(title_label)
if self.setting.help:
help_label = Gtk.Label(
label=self.setting.help,
halign=Gtk.Align.START,
css_classes=["dim-label"]
)
title_box.append(help_label)
main_box.append(title_box)
image_frame = Gtk.Frame(
css_classes=["view"],
halign=Gtk.Align.CENTER,
)
self.picture = Gtk.Picture(
content_fit=Gtk.ContentFit.COVER,
can_shrink=True,
)
self.picture.set_size_request(320, 180)
self.placeholder = Adw.StatusPage(
icon_name="image-missing-symbolic",
)
self.placeholder.set_size_request(320, 180)
self.image_stack = Gtk.Stack(
transition_type=Gtk.StackTransitionType.CROSSFADE,
transition_duration=150,
)
self.image_stack.add_named(self.placeholder, "placeholder")
self.image_stack.add_named(self.picture, "picture")
overlay = Gtk.Overlay()
overlay.set_child(self.image_stack)
self.change_button = Gtk.Button(
icon_name="document-edit-symbolic",
valign=Gtk.Align.END,
halign=Gtk.Align.END,
margin_bottom=5,
margin_end=5,
css_classes=["circular", "secondary"],
)
self.change_button.connect("clicked", self._on_change_clicked)
self.image_reset_button = Gtk.Button(
icon_name="edit-undo-symbolic",
margin_top=5,
margin_end=5,
css_classes=["circular", "secondary"],
)
self.image_reset_button.connect("clicked", self._on_reset_clicked)
self.image_reset_revealer = Gtk.Revealer(
transition_type=Gtk.RevealerTransitionType.CROSSFADE,
transition_duration=150,
child=self.image_reset_button,
reveal_child=False,
valign=Gtk.Align.START,
halign=Gtk.Align.END
)
overlay.add_overlay(self.change_button)
overlay.add_overlay(self.image_reset_revealer)
image_frame.set_child(overlay)
main_box.append(image_frame)
self.row.set_child(main_box)
self.update_display()
self._update_reset_visibility()
return self.row
def _normalize_path(self, path):
if not path or not isinstance(path, str):
return None
if path.startswith("file://"):
self.has_file_prefix = True
path = path[7:]
return os.path.expandvars(os.path.expanduser(path))
def update_display(self):
current = self._normalize_path(self.setting._get_backend_value())
if current and os.path.exists(current):
file = Gio.File.new_for_path(current)
try:
texture = Gdk.Texture.new_from_file(file)
self.picture.set_paintable(texture)
self.image_stack.set_visible_child_name("picture")
except Exception as e:
self.logger.error(f"Error loading image: {e}")
self.picture.set_paintable(None)
self.image_stack.set_visible_child_name("placeholder")
else:
self.picture.set_paintable(None)
self.image_stack.set_visible_child_name("placeholder")
self._update_reset_visibility()
def _on_reset_clicked(self, button):
default_value = self.setting.default
if default_value is not None:
self.setting._set_backend_value(default_value)
self.update_display()
def _update_reset_visibility(self):
current_value = self._normalize_path(self.setting._get_backend_value())
default_value = self._normalize_path(self.setting.default)
self.image_reset_revealer.set_reveal_child(
current_value != default_value if default_value is not None
else False
)
def _on_change_clicked(self, button):
dialog = Gtk.FileDialog()
if 'extensions' in self.setting.map:
filters = self._create_file_filters()
if filters.get_n_items() > 0:
dialog.set_filters(filters)
dialog.set_default_filter(filters.get_item(0))
current = self._normalize_path(self.setting._get_backend_value())
if current:
try:
current_file = Gio.File.new_for_path(current)
parent = current_file.get_parent()
if parent:
dialog.set_initial_folder(parent)
except Exception as e:
self.logger.error(f"Error setting initial folder: {e}")
try:
dialog.open(
parent=button.get_root(),
callback=self._on_file_selected
)
except Exception as e:
self.logger.error(f"File dialog error: {e}")
def _create_file_filters(self):
filters = Gio.ListStore.new(Gtk.FileFilter)
patterns = [p for p in self.setting.map.get('extensions', [])]
if patterns:
file_filter = Gtk.FileFilter()
file_filter.set_name(_("Image files"))
for pattern in patterns:
if pattern.startswith('.'):
file_filter.add_suffix(pattern[1:])
else:
file_filter.add_pattern(pattern)
filters.append(file_filter)
return filters
def _on_file_selected(self, dialog, result):
try:
file = dialog.open_finish(result)
if file:
path = file.get_path()
if self.has_file_prefix:
path = f"file://{path}"
self.setting._set_backend_value(path)
self.update_display()
except Exception as e:
if "dismissed" not in str(e).lower():
self.logger.error(f"File selection error: {e}")
from gi.repository import Adw, Gtk
from .BaseWidget import BaseWidget
class InfoDictWidget(BaseWidget):
def create_row(self):
main_box = Adw.PreferencesRow()
main_box.set_activatable(False)
content_box = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
margin_top=8,
margin_bottom=8,
margin_start=12,
margin_end=12
)
main_box.set_child(content_box)
title_label = Gtk.Label(
label=self.setting.name,
halign=Gtk.Align.START,
margin_bottom=6
)
content_box.append(title_label)
if self.setting.help:
subtitle_label = Gtk.Label(
label=self.setting.help,
halign=Gtk.Align.START,
wrap=True,
margin_bottom=8
)
subtitle_label.add_css_class("dim-label")
content_box.append(subtitle_label)
self.main_list = Gtk.ListBox()
self.main_list.set_selection_mode(Gtk.SelectionMode.NONE)
self.main_list.add_css_class('boxed-list')
content_box.append(self.main_list)
self._update_initial_state()
return main_box
def _create_value_label(self, value, level):
label = Gtk.Label(
label=str(value),
halign=Gtk.Align.END,
hexpand=True,
margin_start=12,
margin_end=12 * level
)
label.add_css_class('dim-label')
return label
def _render_value(self, value, container, level=0):
if isinstance(value, dict):
self._render_dict(value, container, level)
elif isinstance(value, (list, tuple)):
self._render_list(value, container, level)
else:
container.append(self._create_value_label(value, level))
def _render_dict(self, data, parent_container, level):
for key, value in data.items():
row = Adw.ActionRow()
row.set_title(key)
row.set_activatable(False)
if isinstance(value, (dict, list, tuple)):
value_widget = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self._render_value(value, value_widget, level + 1)
else:
value_widget = self._create_value_label(value, level)
row.add_suffix(value_widget)
parent_container.append(row)
def _render_list(self, data, parent_container, level):
for item in data:
row = Adw.ActionRow()
row.set_title("•")
row.set_activatable(False)
if isinstance(item, (dict, list, tuple)):
value_widget = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self._render_value(item, value_widget, level + 1)
else:
value_widget = self._create_value_label(item, level)
row.add_suffix(value_widget)
parent_container.append(row)
def _update_initial_state(self):
current_value = self.setting._get_backend_value() or {}
while child := self.main_list.get_first_child():
self.main_list.remove(child)
self._render_value(current_value, self.main_list, 0)
def update_display(self):
self._update_initial_state()
from gi.repository import Adw, Gtk, GLib
from .BaseWidget import BaseWidget
class InfoLabelWidget(BaseWidget):
def create_row(self):
self.row = Adw.ActionRow(title=self.setting.name, subtitle=self.setting.help)
self.label = Gtk.Label(
valign=Gtk.Align.CENTER,
halign=Gtk.Align.CENTER,
)
self.label.add_css_class("dim-label")
control_box = Gtk.Box(spacing=6, orientation=Gtk.Orientation.HORIZONTAL)
control_box.append(self.label)
self.row.add_suffix(control_box)
self._update_initial_state()
return self.row
def _update_initial_state(self):
current_value = self.setting._get_backend_value()
self.label.set_label(current_value)
def update_display(self):
self._update_initial_state()
from gi.repository import Adw, Gtk
from .BaseWidget import BaseWidget
class NumStepper(BaseWidget):
def create_row(self):
map = self.setting.map
map_keys = list(map.keys())
row = Adw.ActionRow(
title=self.setting.name,
subtitle=self.setting.help,
activatable=False
)
self.spin = Gtk.SpinButton(
valign=Gtk.Align.CENTER,
halign=Gtk.Align.CENTER,
)
adjustment = Gtk.Adjustment(
value=self.setting._get_backend_value(),
lower=map["lower"],
upper=map["upper"],
step_increment=map["step"],
)
self.spin.set_adjustment(adjustment)
if "digits" in map_keys:
self.spin.set_digits(map["digits"])
control_box = Gtk.Box(
orientation=Gtk.Orientation.HORIZONTAL,
spacing=6,
margin_start=12
)
control_box.append(self.reset_revealer)
control_box.append(self.spin)
row.add_suffix(control_box)
self.spin_handler_id = self.spin.connect("value-changed", self._on_num_changed)
self._update_reset_visibility()
return row
def update_display(self):
current_value = self.setting._get_backend_value()
with self.spin.handler_block(self.spin_handler_id):
self.spin.set_value(float(current_value))
self._update_reset_visibility()
def _on_num_changed(self, widget):
selected_value = widget.get_value()
self.setting._set_backend_value(selected_value)
self._update_reset_visibility()
def _on_reset_clicked(self, button):
default_value = self.setting.default
if default_value is not None:
with self.spin.handler_block(self.spin_handler_id):
self.setting._set_backend_value(default_value)
self.spin.set_value(float(default_value))
self._update_reset_visibility()
def _update_reset_visibility(self):
current_value = float(self.setting._get_backend_value())
default_value = self.setting.default
self.reset_revealer.set_reveal_child(
current_value != default_value if default_value is not None
else False
)
\ No newline at end of file
from gi.repository import Gtk, Adw, GObject
from .BaseWidget import BaseWidget
class RadioChoiceWidget(BaseWidget):
def create_row(self):
main_box = Adw.PreferencesRow()
content_box = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
spacing=6,
margin_top=8,
margin_bottom=8,
margin_start=12,
margin_end=12
)
main_box.set_child(content_box)
title_horizontal_box = Gtk.Box(
orientation=Gtk.Orientation.HORIZONTAL,
hexpand=True,
)
content_box.append(title_horizontal_box)
title_box = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
hexpand=True,
spacing=1,
)
title_horizontal_box.append(title_box)
title_label = Gtk.Label(
label=self.setting.name,
halign=Gtk.Align.START,
)
title_box.append(title_label)
if self.setting.help:
subtitle_label = Gtk.Label(
label=self.setting.help,
halign=Gtk.Align.START,
wrap=True,
margin_bottom=4
)
subtitle_label.add_css_class("caption")
subtitle_label.add_css_class("dim-label")
title_box.append(subtitle_label)
radio_container = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
spacing=8
)
content_box.append(radio_container)
self.radio_buttons = {}
current_value = self.setting._get_backend_value()
group = None
# Создаем радио-кнопки
for label, value in self.setting.map.items():
radio = Gtk.CheckButton(
label=label,
halign=Gtk.Align.START,
active=(value == current_value)
)
radio.add_css_class('selection-mode')
if group:
radio.set_group(group)
else:
group = radio
handler_id = radio.connect("toggled", self._on_toggle, value)
self.radio_buttons[value] = (radio, handler_id)
radio_container.append(radio)
self.reset_revealer.set_halign(Gtk.Align.END)
title_horizontal_box.append(self.reset_revealer)
self._update_reset_visibility()
return main_box
def update_display(self):
current_value = self.setting._get_backend_value()
for value, (radio, handler_id) in self.radio_buttons.items():
with GObject.signal_handler_block(radio, handler_id):
radio.set_active(value == current_value)
self._update_reset_visibility()
def _on_toggle(self, button, value):
if button.get_active():
self.setting._set_backend_value(value)
self._update_reset_visibility()
def _on_reset_clicked(self, button):
default_value = self.setting.default
if default_value is not None:
self.setting._set_backend_value(default_value)
if default_value in self.radio_buttons:
radio, handler_id = self.radio_buttons[default_value]
with GObject.signal_handler_block(radio, handler_id):
radio.set_active(True)
self._update_reset_visibility()
def _update_reset_visibility(self):
current_value = self.setting._get_backend_value()
default_value = self.setting.default
self.reset_revealer.set_reveal_child(
current_value != default_value if default_value is not None else False
)
from gi.repository import Gtk, Adw, GObject, Gdk
from .BaseWidget import BaseWidget
class ThemeChooserWidget(BaseWidget):
"""
Widget for choosing themes with visual previews.
YAML usage:
- name: Style
type: theme_chooser
gtype: string
backend: gsettings
key: org.gnome.desktop.interface.color-scheme
default: "default"
map:
Light: prefer-light
Dark: prefer-dark
previews: # optional, auto-generates if not specified
prefer-light: {tuneit_images}/light.png
prefer-dark: {tuneit_images}/dark.png
Path templates:
{tuneit_images} - expands to resource:///ru.ximperlinux.TuneIt/images
Example: {tuneit_images}/theme-dark.png
"""
# Built-in color schemes for common theme types
THEME_COLORS = {
'prefer-light': ('#EBEBED', '#FFFFFF'), # Light theme
'prefer-dark': ('#222226', '#2E2E32'), # Dark theme
}
# Path template expansions
PATH_TEMPLATES = {
'{tuneit_images}': 'resource:///ru.ximperlinux.TuneIt/images',
}
@classmethod
def _expand_path(cls, path):
"""Expand path templates like {tuneit_images} to full resource paths."""
if path is None:
return None
for template, replacement in cls.PATH_TEMPLATES.items():
if template in path:
path = path.replace(template, replacement)
return path
def create_row(self):
self.row = Adw.PreferencesRow(
activatable=False,
focusable=False
)
content_box = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
spacing=12,
margin_top=12,
margin_bottom=12,
margin_start=12,
margin_end=12
)
self.row.set_child(content_box)
# Header with title and reset button
header_box = Gtk.Box(
orientation=Gtk.Orientation.HORIZONTAL,
hexpand=True,
)
content_box.append(header_box)
title_box = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
hexpand=True,
spacing=2,
)
header_box.append(title_box)
title_label = Gtk.Label(
label=self.setting.name,
halign=Gtk.Align.START,
)
title_box.append(title_label)
if self.setting.help:
subtitle_label = Gtk.Label(
label=self.setting.help,
halign=Gtk.Align.START,
wrap=True,
)
subtitle_label.add_css_class("caption")
subtitle_label.add_css_class("dim-label")
title_box.append(subtitle_label)
self.reset_revealer.set_halign(Gtk.Align.END)
header_box.append(self.reset_revealer)
# FlowBox for theme cards
self.flowbox = Gtk.FlowBox(
homogeneous=True,
selection_mode=Gtk.SelectionMode.SINGLE,
min_children_per_line=2,
max_children_per_line=2,
row_spacing=12,
column_spacing=12,
)
content_box.append(self.flowbox)
self.theme_cards = {}
current_value = self.setting._get_backend_value()
previews = self.setting.previews or {}
for label, value in self.setting.map.items():
card = self._create_theme_card(label, value, previews.get(value))
flowbox_child = Gtk.FlowBoxChild()
flowbox_child.set_child(card)
flowbox_child.value = value
self.flowbox.append(flowbox_child)
self.theme_cards[value] = flowbox_child
if value == current_value:
self.flowbox.select_child(flowbox_child)
self.handler_id = self.flowbox.connect(
"child-activated", self._on_theme_selected)
self._update_reset_visibility()
return self.row
def _create_theme_card(self, label, value, preview_path=None):
"""Create a theme preview card."""
card = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
spacing=8,
width_request=120,
)
card.add_css_class("card")
# Preview area
preview_path = self._expand_path(preview_path)
if preview_path and preview_path.startswith('resource://'):
# Load image from resource
preview = Gtk.Picture()
preview.set_resource(preview_path.replace('resource://', ''))
preview.set_size_request(100, 90)
preview.set_content_fit(Gtk.ContentFit.COVER)
else:
# Generate color preview
preview = self._create_color_preview(value)
preview_frame = Gtk.Frame()
preview_frame.set_child(preview)
preview_frame.add_css_class("theme-preview-frame")
card_inner = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
spacing=6,
margin_top=8,
margin_bottom=8,
margin_start=8,
margin_end=8,
)
card_inner.append(preview_frame)
# Label
name_label = Gtk.Label(
label=label,
halign=Gtk.Align.CENTER,
)
name_label.add_css_class("caption")
card_inner.append(name_label)
card.append(card_inner)
return card
def _create_color_preview(self, value):
"""Create a colored preview box for the theme."""
preview = Gtk.Box(
orientation=Gtk.Orientation.VERTICAL,
hexpand=True,
)
preview.set_size_request(100, 90)
colors = self.THEME_COLORS.get(value, ('#f6f5f4', '#deddda'))
bg_color, accent_color = colors
# Create a drawing area for the preview
drawing = Gtk.DrawingArea()
drawing.set_size_request(100, 90)
drawing.set_draw_func(self._draw_theme_preview,
(bg_color, accent_color))
preview.append(drawing)
return preview
def _draw_theme_preview(self, area, cr, width, height, colors):
"""Draw a simple theme preview."""
bg_color, accent_color = colors
# Parse colors
bg_rgba = Gdk.RGBA()
bg_rgba.parse(bg_color)
accent_rgba = Gdk.RGBA()
accent_rgba.parse(accent_color)
# Draw background
cr.set_source_rgba(bg_rgba.red, bg_rgba.green, bg_rgba.blue, 1.0)
cr.rectangle(0, 0, width, height)
cr.fill()
# Draw sidebar
sidebar_width = 28
cr.set_source_rgba(accent_rgba.red, accent_rgba.green,
accent_rgba.blue, 0.5)
cr.rectangle(0, 0, sidebar_width, height)
cr.fill()
# Draw header bar simulation
cr.set_source_rgba(accent_rgba.red, accent_rgba.green,
accent_rgba.blue, 1.0)
cr.rectangle(0, 0, width, 16)
cr.fill()
# Draw window controls (circles)
cr.set_source_rgba(bg_rgba.red, bg_rgba.green, bg_rgba.blue, 0.8)
for x in [width - 12, width - 24, width - 36]:
cr.arc(x, 8, 4, 0, 2 * 3.14159)
cr.fill()
# Draw sidebar items
cr.set_source_rgba(bg_rgba.red, bg_rgba.green, bg_rgba.blue, 0.6)
for y in [26, 40, 54, 68]:
cr.rectangle(6, y, sidebar_width - 12, 8)
cr.fill()
# Draw content lines
content_start = sidebar_width + 8
cr.set_source_rgba(accent_rgba.red, accent_rgba.green,
accent_rgba.blue, 0.6)
for y in [28, 46, 64]:
cr.rectangle(content_start, y, width - content_start - 8, 6)
cr.fill()
def update_display(self):
current_value = self.setting._get_backend_value()
with GObject.signal_handler_block(self.flowbox, self.handler_id):
if current_value in self.theme_cards:
self.flowbox.select_child(self.theme_cards[current_value])
self._update_reset_visibility()
def _on_theme_selected(self, flowbox, child):
value = child.value
self.setting._set_backend_value(value)
self._update_reset_visibility()
def _on_reset_clicked(self, button):
default_value = self.setting.default
if default_value is not None:
self.setting._set_backend_value(default_value)
if default_value in self.theme_cards:
with GObject.signal_handler_block(self.flowbox, self.handler_id):
self.flowbox.select_child(self.theme_cards[default_value])
self._update_reset_visibility()
def _update_reset_visibility(self):
current_value = self.setting._get_backend_value()
default_value = self.setting.default
self.reset_revealer.set_reveal_child(
current_value != default_value if default_value is not None else False
)
from .AvatarWidget import AvatarWidget
from .BooleanWidget import BooleanWidget
from .ChoiceWidget import ChoiceWidget
from .RadioChoiceWidget import RadioChoiceWidget
from .ThemeChooserWidget import ThemeChooserWidget
from .EntryWidget import EntryWidget
from .NumStepper import NumStepper
from .FileChooser import FileChooser
from .ButtonWidget import ButtonWidget
from .InfoLabelWidget import InfoLabelWidget
from .InfoDictWidget import InfoDictWidget
from .DualListWidget import DualListWidget
from .ImageChooserWidget import ImageChooserWidget
from .DualImageChooserWidget import DualImageChooserWidget
import logging
logger = logging.getLogger(f"{__name__}")
class WidgetFactory:
widget_map = {
'avatar': AvatarWidget,
'file': FileChooser,
'image': ImageChooserWidget,
'image_dual': DualImageChooserWidget,
'choice': ChoiceWidget,
'choice_radio': RadioChoiceWidget,
'theme_chooser': ThemeChooserWidget,
'boolean': BooleanWidget,
'entry': EntryWidget,
'number': NumStepper,
'button': ButtonWidget,
'info_label': InfoLabelWidget,
'info_dict': InfoDictWidget,
'list_dual': DualListWidget,
}
@staticmethod
def create_widget(setting):
widget_class = WidgetFactory.widget_map.get(setting.type)
if widget_class:
return widget_class(setting)
else:
logger.error(f"Неизвестный тип виджета: {setting.type}")
return None
import logging
logger = logging.getLogger(f"{__name__}")
def convert_by_gvariant(value, gtype):
"""
Приводит значение к нужному типу в зависимости от GVariant gtype.
:param value: Исходное значение
:param gtype: Тип GVariant ('b', 'y', 'n', 'q', 'i', 'u', 'x', 't', 'd', 's')
:return: Значение, приведенное к указанному типу
"""
try:
if gtype == 'b': # Boolean
return bool(value)
elif gtype == 'y': # Byte
return max(0, min(255, int(value))) # Ограничение диапазона
elif gtype == 'n': # Int16
return max(-32768, min(32767, int(value))) # Ограничение диапазона
elif gtype == 'q': # Uint16
return max(0, min(65535, int(value))) # Ограничение диапазона
elif gtype == 'i': # Int32
return max(-2147483648, min(2147483647, int(value)))
elif gtype == 'u': # Uint32
return max(0, min(4294967295, int(value)))
elif gtype == 'x': # Int64
return max(-9223372036854775808, min(9223372036854775807, int(value)))
elif gtype == 't': # Uint64
return max(0, min(18446744073709551615, int(value)))
elif gtype == 'd': # Double
return float(value)
elif gtype == 's': # String
return str(value)
else:
raise ValueError(f"Неизвестный GVariant тип: {gtype}")
except (ValueError, TypeError) as e:
logger.error(f"Ошибка приведения типа: {e}")
return None
import os
import yaml
import logging
logger = logging.getLogger(f"{__name__}")
def get_local_module_directory():
home_directory = os.path.expanduser("~")
return os.path.join(home_directory, ".local", "share", "tuneit", "modules")
def get_module_directory():
return os.path.join(tuneit_config.pkgdatadir, "modules")
def load_modules():
modules = []
local_modules_directory = get_local_module_directory()
global_modules_directory = get_module_directory()
all_modules = set(os.listdir(global_modules_directory))
if os.path.exists(local_modules_directory) and os.path.isdir(local_modules_directory):
for module_name in os.listdir(local_modules_directory):
module_path = os.path.join(local_modules_directory, module_name)
if os.path.isdir(module_path):
modules += load_yaml_files_from_directory(module_path)
all_modules.discard(module_name)
for module_name in all_modules:
module_path = os.path.join(global_modules_directory, module_name)
if os.path.isdir(module_path):
modules += load_yaml_files_from_directory(module_path)
return modules
def load_yaml_files_from_directory(directory):
yaml_data = []
for file in os.listdir(directory):
if file.endswith(".yml") or file.endswith(".yaml"):
file_path = os.path.join(directory, file)
with open(file_path, 'r', encoding='utf-8') as f:
try:
data = yaml.safe_load(f)
if data:
for item in data:
item['module_path'] = directory
yaml_data.extend(data)
except yaml.YAMLError as e:
logger.error(f"Ошибка при чтении файла {file_path}: {e}")
sections_data = []
sections_directory = os.path.join(directory, 'sections')
if os.path.exists(sections_directory) and os.path.isdir(sections_directory):
for file in os.listdir(sections_directory):
if file.endswith(".yml") or file.endswith(".yaml"):
file_path = os.path.join(sections_directory, file)
with open(file_path, 'r', encoding='utf-8') as f:
try:
data = yaml.safe_load(f)
if data:
sections_data.extend(data)
except yaml.YAMLError as e:
logger.error(f"Ошибка при чтении файла {file_path}: {e}")
for module in yaml_data:
if 'sections' in module:
module['sections'].extend(sections_data)
else:
module['sections'] = sections_data
return yaml_data
using Gtk 4.0;
using Adw 1;
template $TuneItDepsAlertDialog: Adw.AlertDialog {
heading: _("The module has unmet dependencies");
responses [
close: _("Ignore") destructive,
skip: _("Skip module") suggested,
]
close-response: "close";
extra-child: Gtk.TextView{
wrap-mode: word_char;
buffer: Gtk.TextBuffer deps_message_textbuffer {};
};
}
from gi.repository import GLib, Adw, Gtk
@Gtk.Template(resource_path='/ru.ximperlinux.TuneIt/settings/widgets/deps_alert_dialog.ui')
class TuneItDepsAlertDialog(Adw.AlertDialog):
__gtype_name__ = "TuneItDepsAlertDialog"
deps_message_textbuffer = Gtk.Template.Child()
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.callback = None
def ask_user(self, window, callback):
self.callback = callback
self.present(window)
self.connect('response', self.on_response)
def on_response(self, dialog, response):
if self.callback:
self.callback(response)
using Gtk 4.0;
using Adw 1;
template $TuneItErrorDialog: Adw.AlertDialog {
heading: _("Error in Tune it!");
body: _("You can write in an official telegram chat by applying this log.");
responses [
close: _("Ignore") destructive,
copy: _("Copy the log and go to telegram chat") suggested,
]
close-response: "close";
extra-child: Gtk.ScrolledWindow {
hscrollbar-policy: never;
vscrollbar-policy: automatic;
height-request: 185;
Gtk.TextView{
wrap-mode: word_char;
buffer: Gtk.TextBuffer textbuffer {};
}
};
}
import webbrowser
from gi.repository import Adw, Gtk, Gdk, Gio
@Gtk.Template(resource_path='/ru.ximperlinux.TuneIt/settings/widgets/error_dialog.ui')
class TuneItErrorDialog(Adw.AlertDialog):
__gtype_name__ = "TuneItErrorDialog"
textbuffer = Gtk.Template.Child()
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.connect('response', self.on_response)
def on_response(self, _, response):
if response == 'copy':
self.on_copy()
def on_copy(self):
app_info = Gio.AppInfo.get_default_for_uri_scheme('tg')
if app_info:
Gio.AppInfo.launch_default_for_uri('tg://resolve?domain=tuneit', None)
else:
webbrowser.open("https://t.me/tuneit")
def copy_error(self):
display = Gdk.Display.get_default()
clipboard = display.get_clipboard()
clipboard.set(self.textbuffer.get_text(
self.textbuffer.get_start_iter(),
self.textbuffer.get_end_iter(),
False
))
using Gtk 4.0;
template $TuneItPanelRow: ListBoxRow {
Box {
spacing: 12;
margin-top: 6;
margin-bottom: 6;
margin-start: 2;
margin-end: 2;
Image thumbnail_image {
icon-name: bind template.icon-name;
}
Box {
orientation: vertical;
valign: center;
spacing: 2;
Label title_label {
label: bind template.title;
halign: start;
justify: left;
wrap: true;
}
Label subtitle_label {
styles [
"caption",
"dim-label",
]
label: bind template.subtitle;
visible: bind template.subtitle-visible;
halign: start;
justify: left;
wrap: true;
}
}
}
}
from gi.repository import GObject, Adw, Gtk
@Gtk.Template(resource_path='/ru.ximperlinux.TuneIt/settings/widgets/panel_row.ui')
class TuneItPanelRow(Gtk.ListBoxRow):
__gtype_name__ = "TuneItPanelRow"
name = GObject.Property(type=str, default='')
title = GObject.Property(type=str, default='')
subtitle = GObject.Property(type=str, default='')
subtitle_visible = GObject.Property(type=bool, default=False)
icon_name = GObject.Property(type=str, default='')
import os
import subprocess
import sys
from time import sleep
from gi.repository import GLib, Adw
import logging
logger = logging.getLogger(f"{__name__}")
class ServiceNotStartedDialog(Adw.AlertDialog):
response = ""
def __init__(self):
super().__init__()
self.sname = 'tuneit-daemon'
self.set_heading(_("Dbus service is disabled or unresponsive."))
self.set_body(_("It is needed for modules that require root permissions.\nDo you want to try to turn on the service?\nTune It will restart after enabling the service."))
self.add_response("yes", _("Yes"))
self.add_response("no", _("No"))
self.connect('response', self.on_response)
def on_response(self, dialog, response):
if response == "yes":
self.service_enable_with_restart()
elif response in ("no", "close"):
dialog.close()
def service_status(self):
try:
# Запускаем команду systemctl is-active <service_name>
result = subprocess.run(
['systemctl', 'is-active', self.sname],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
# Проверяем статус
if result.stdout.decode('utf-8').strip() == 'active':
return True
else:
return False
except Exception as e:
logger.error(f"An error occurred: {e}")
return False
def service_enable(self):
try:
subprocess.run(
['pkexec', 'systemctl', '--now', 'enable', self.sname],
stdout=subprocess.PIPE, stderr=subprocess.PIPE
)
except Exception as e:
logger.error(f"An error occurred: {e}")
def service_enable_with_restart(self):
self.service_enable()
self.close()
os.execv(sys.argv[0], sys.argv)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment