Commit 31bfe20c authored by Roman Alifanov's avatar Roman Alifanov

Rewrite compiler: IR-based architecture replacing bootstrap

Replace the bootstrap mixin-based compiler with a new IR-based pipeline: Source → Lexer → Parser → Resolver → TypeChecker → IR Builder → Optimizer → Backend New architecture (compiler/): - semantics/: scope resolution, type system, type checker - ir/: intermediate representation with stable node IDs - optimizer/: DCE via call graph BFS, CSE, constant folding - backend/bash/: bash code generation from IR - backend/awk/: AWK code generation sharing same IR - symbols/: symbol table with LSP-ready serialization Key improvements: - Explicit shell command detection (IRCall.is_shell_cmd) - Namespace prefixing only in bash backend, not in resolver - DCE reduced from 580-line fixed-point to ~40-line BFS - CSE uses stable node_id instead of Python id() - Shell commands in assignments use || true for set -e safety - No local keyword in global scope (when/foreach) - Coproc uses exec to prevent orphan child processes All 400 tests pass. Verified with real-world unified-theme-switcher service.
parent 5f7339cd
......@@ -38,7 +38,7 @@ git clone https://gitlab.eterfund.ru/ximperlinux/ContenT.git
cd content
```
Requires Python 3.8+ (bootstrap compiler).
Requires Python 3.8+.
### System-wide (Meson)
......@@ -50,7 +50,7 @@ sudo meson install -C builddir
Installs:
- `/usr/bin/content` — CLI entry point
- `/usr/share/content/bootstrap/` — compiler (Python)
- `/usr/share/content/compiler/` — compiler (Python)
- `/usr/share/content/lib/cli.ct` — standard library
- `/usr/share/content/lib/cli.sh` — precompiled CLI library
......@@ -572,58 +572,6 @@ Features:
- [Language Specification](LANGUAGE_SPEC.md)
## Project Structure
```
bootstrap/ # Bootstrap compiler (Python)
├── main.py # CLI entry point
├── lexer.py # Tokenizer
├── tokens.py # Token type definitions
├── parser.py # Recursive descent parser, AST generation
├── ast_nodes.py # AST node classes
├── errors.py # Error handling
├── constants.py # Codegen constants (RET_VAR, TMP_PREFIX, etc.)
├── methods/ # Unified method registry (bash + awk)
│ ├── base.py # Method dataclass
│ ├── string.py # String methods
│ ├── array.py # Array methods
│ ├── dict.py # Dict methods
│ └── ... # http, fs, json, logger, math, time, process_handle, etc.
├── dce.py # Dead code elimination
├── codegen.py # Main Bash code generator (mixin coordinator)
├── expr_codegen.py # Expression generation (mixin)
├── stmt_codegen.py # Statement generation (mixin)
├── class_codegen.py # Class/method generation (mixin)
├── dispatch_codegen.py # Method dispatch, assignments (mixin)
├── decorator_codegen.py # Decorator wrappers (mixin)
├── cse_codegen.py # Common subexpression elimination (mixin)
├── stdlib.py # Standard library generation (mixin)
└── awk_codegen.py # AWK generator for @awk (mixin)
lib/ # ContenT libraries
└── cli.ct # CLI library (urfave/cli style)
tests/ # Test suite
├── helpers.py # Shared test helpers (run_ct, compile_ct)
├── test_lexer.py # Lexer tests
├── test_parser.py # Parser tests
├── test_basics.py # Basic tests (print, variables, arithmetic, loops)
├── test_functions.py # Functions, lambdas, callbacks, parameter passing
├── test_classes.py # Classes, objects, field assignment
├── test_methods.py # String/array/dict methods, method validation
├── test_stdlib.py # Standard library (env, json, fs, with)
├── test_decorators.py # Decorators, typing, @test, user decorators
├── test_awk.py # AWK functions (map/filter, sync, assert)
├── test_shell.py # Shell commands, pipes, mixed pipes
├── test_async.py # Background processes (async/await/on, pid)
├── test_namespace.py # Namespace/using tests
├── test_busing.py # Bash library import tests
├── test_build_lib.py # Library build tests
└── test_autoscan.py # Auto-scan tests
examples/ # Example .ct programs
```
## License
[AGPL-3.0](LICENSE)
......@@ -38,7 +38,7 @@ git clone https://gitlab.eterfund.ru/ximperlinux/ContenT.git
cd content
```
Требуется Python 3.8+ (bootstrap-компилятор).
Требуется Python 3.8+.
### Системная установка (Meson)
......@@ -50,7 +50,7 @@ sudo meson install -C builddir
Устанавливается:
- `/usr/bin/content` — CLI точка входа
- `/usr/share/content/bootstrap/` — компилятор (Python)
- `/usr/share/content/compiler/` — компилятор (Python)
- `/usr/share/content/lib/cli.ct` — стандартная библиотека
- `/usr/share/content/lib/cli.sh` — прекомпилированная CLI-библиотека
......@@ -566,58 +566,6 @@ python3 content run examples/telegram_echobot/telegram.ct examples/telegram_echo
- [Спецификация языка](LANGUAGE_SPEC.md)
## Структура проекта
```
bootstrap/ # Bootstrap-компилятор (Python)
├── main.py # CLI точка входа
├── lexer.py # Токенизатор
├── tokens.py # Определения типов токенов
├── parser.py # Рекурсивный спуск, генерация AST
├── ast_nodes.py # Классы узлов AST
├── errors.py # Обработка ошибок
├── constants.py # Константы кодогенерации (RET_VAR, TMP_PREFIX, etc.)
├── methods/ # Единый реестр методов (bash + awk)
│ ├── base.py # Method dataclass
│ ├── string.py # Строковые методы
│ ├── array.py # Методы массивов
│ ├── dict.py # Методы словарей
│ └── ... # http, fs, json, logger, math, time, process_handle, etc.
├── dce.py # Устранение мёртвого кода
├── codegen.py # Основной генератор Bash-кода (координатор миксинов)
├── expr_codegen.py # Генерация выражений (миксин)
├── stmt_codegen.py # Генерация statements (миксин)
├── class_codegen.py # Генерация классов/методов (миксин)
├── dispatch_codegen.py # Диспатч методов, присваивания (миксин)
├── decorator_codegen.py # Обёртки декораторов (миксин)
├── cse_codegen.py # Устранение общих подвыражений (миксин)
├── stdlib.py # Генерация стандартной библиотеки (миксин)
└── awk_codegen.py # AWK-генератор для @awk (миксин)
lib/ # Библиотеки на ContenT
└── cli.ct # CLI-библиотека (стиль urfave/cli)
tests/ # Тестовый набор
├── helpers.py # Общие функции тестов (run_ct, compile_ct)
├── test_lexer.py # Тесты лексера
├── test_parser.py # Тесты парсера
├── test_basics.py # Базовые тесты (print, переменные, арифметика)
├── test_functions.py # Функции, лямбды, колбеки
├── test_classes.py # Классы, объекты, присваивание полей
├── test_methods.py # Методы строк/массивов/словарей
├── test_stdlib.py # Стандартная библиотека (env, json, fs, with)
├── test_decorators.py # Декораторы, типизация, @test
├── test_awk.py # AWK-функции
├── test_shell.py # Shell-команды, pipe
├── test_async.py # Фоновые процессы (async/await/on, pid)
├── test_namespace.py # Тесты namespace/using
├── test_busing.py # Тесты импорта bash-библиотек
├── test_build_lib.py # Тесты сборки библиотек
└── test_autoscan.py # Тесты авто-скана
examples/ # Примеры .ct программ
```
## Лицензия
[AGPL-3.0](LICENSE)
__version__ = "0.1.0"
from dataclasses import dataclass, field
from typing import List, Optional, Any, Union
@dataclass
class SourceLocation:
line: int
column: int
filename: str = "<stdin>"
@dataclass
class ASTNode:
pass
@dataclass
class TypeAnnotation (ASTNode):
name: str = ""
is_array: bool = False
element_type: Optional['TypeAnnotation'] = None
key_type: Optional['TypeAnnotation'] = None
value_type: Optional['TypeAnnotation'] = None
param_types: List['TypeAnnotation'] = field (default_factory=list)
return_type: Optional['TypeAnnotation'] = None
location: Optional[SourceLocation] = None
@dataclass
class Expression (ASTNode):
pass
@dataclass
class IntegerLiteral (Expression):
value: int = 0
location: Optional[SourceLocation] = None
@dataclass
class FloatLiteral (Expression):
value: float = 0.0
location: Optional[SourceLocation] = None
@dataclass
class StringLiteral (Expression):
value: str = ""
has_interpolation: bool = False
location: Optional[SourceLocation] = None
@dataclass
class BoolLiteral (Expression):
value: bool = False
location: Optional[SourceLocation] = None
@dataclass
class NilLiteral (Expression):
location: Optional[SourceLocation] = None
@dataclass
class Identifier (Expression):
name: str = ""
location: Optional[SourceLocation] = None
@dataclass
class ArrayLiteral (Expression):
elements: List[Expression] = field (default_factory=list)
location: Optional[SourceLocation] = None
@dataclass
class DictLiteral (Expression):
pairs: List[tuple] = field (default_factory=list)
location: Optional[SourceLocation] = None
@dataclass
class BinaryOp (Expression):
left: Optional[Expression] = None
operator: str = ""
right: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class UnaryOp (Expression):
operator: str = ""
operand: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class CallExpr (Expression):
callee: Optional[Expression] = None
arguments: List[Expression] = field (default_factory=list)
location: Optional[SourceLocation] = None
@dataclass
class MemberAccess (Expression):
object: Optional[Expression] = None
member: str = ""
location: Optional[SourceLocation] = None
@dataclass
class IndexAccess (Expression):
object: Optional[Expression] = None
index: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class Lambda (Expression):
params: List[str] = field (default_factory=list)
body: Union['Block', Expression, None] = None
location: Optional[SourceLocation] = None
@dataclass
class ThisExpr (Expression):
location: Optional[SourceLocation] = None
@dataclass
class BaseCall (Expression):
arguments: List[Expression] = field (default_factory=list)
location: Optional[SourceLocation] = None
@dataclass
class NewExpr (Expression):
class_name: str = ""
arguments: List[Expression] = field (default_factory=list)
location: Optional[SourceLocation] = None
@dataclass
class AsyncExpr (Expression):
expression: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class Statement (ASTNode):
pass
@dataclass
class Block (Statement):
statements: List[Statement] = field (default_factory=list)
location: Optional[SourceLocation] = None
@dataclass
class ExpressionStmt (Statement):
expression: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class Assignment (Statement):
target: Optional[Expression] = None
type_annotation: Optional[TypeAnnotation] = None
operator: str = "="
value: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class ReturnStmt (Statement):
value: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class BreakStmt (Statement):
location: Optional[SourceLocation] = None
@dataclass
class ContinueStmt (Statement):
location: Optional[SourceLocation] = None
@dataclass
class IfStmt (Statement):
condition: Optional[Expression] = None
then_branch: Optional[Block] = None
elif_branches: List[tuple] = field (default_factory=list)
else_branch: Optional[Block] = None
location: Optional[SourceLocation] = None
@dataclass
class WhileStmt (Statement):
condition: Optional[Expression] = None
body: Optional[Block] = None
location: Optional[SourceLocation] = None
@dataclass
class ForStmt (Statement):
variable: str = ""
iterable: Optional[Expression] = None
body: Optional[Block] = None
location: Optional[SourceLocation] = None
@dataclass
class ForeachStmt (Statement):
variables: List[str] = field (default_factory=list)
iterable: Optional[Expression] = None
body: Optional[Block] = None
location: Optional[SourceLocation] = None
@dataclass
class WithStmt (Statement):
variables: List[str] = field (default_factory=list)
resources: List[Expression] = field (default_factory=list)
body: Optional[Block] = None
location: Optional[SourceLocation] = None
@dataclass
class TryStmt (Statement):
try_block: Optional[Block] = None
except_clauses: List[tuple] = field (default_factory=list)
finally_block: Optional[Block] = None
location: Optional[SourceLocation] = None
@dataclass
class ThrowStmt (Statement):
expression: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class DeferStmt (Statement):
expression: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class NamespaceDecl (Statement):
name: str = ""
statements: List[Union[Statement, 'Declaration']] = field (default_factory=list)
location: Optional[SourceLocation] = None
@dataclass
class UsingStmt (Statement):
namespace: str = ""
alias: Optional[str] = None
names: Optional[List[str]] = None
location: Optional[SourceLocation] = None
@dataclass
class BusingStmt (Statement):
name: Optional[str] = None
path: str = ""
location: Optional[SourceLocation] = None
@dataclass
class AwaitStmt (Statement):
expression: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class OnSignalStmt (Statement):
signal: str = ""
body: Optional['Block'] = None
location: Optional[SourceLocation] = None
@dataclass
class RangePattern (Expression):
"""Range pattern for when branches: 1..10"""
start: Optional[Expression] = None
end: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class WhenBranch:
"""Single branch of a when statement"""
patterns: List[Expression] = field (default_factory=list) # values, ranges, or 'else'
is_else: bool = False
body: Optional[Block] = None
location: Optional[SourceLocation] = None
@dataclass
class WhenStmt (Statement):
"""When statement (pattern matching)"""
value: Optional[Expression] = None
branches: List[WhenBranch] = field (default_factory=list)
location: Optional[SourceLocation] = None
@dataclass
class Declaration (ASTNode):
pass
@dataclass
class Parameter:
name: str = ""
type_annotation: Optional[TypeAnnotation] = None
default: Optional[Expression] = None
is_variadic: bool = False
@dataclass
class Decorator:
name: str = ""
arguments: List[tuple] = field (default_factory=list)
object: Optional[str] = None
location: Optional[SourceLocation] = None
@dataclass
class FunctionDecl (Declaration):
name: str = ""
params: List[Parameter] = field (default_factory=list)
return_type: Optional[TypeAnnotation] = None
body: Optional[Block] = None
decorators: List[Decorator] = field (default_factory=list)
location: Optional[SourceLocation] = None
@dataclass
class ClassField:
name: str = ""
type_annotation: Optional[TypeAnnotation] = None
default: Optional[Expression] = None
location: Optional[SourceLocation] = None
@dataclass
class ClassDecl (Declaration):
name: str = ""
parent: Optional[str] = None
fields: List[ClassField] = field (default_factory=list)
constructor: Optional['ConstructorDecl'] = None
methods: List[FunctionDecl] = field (default_factory=list)
location: Optional[SourceLocation] = None
@dataclass
class ConstructorDecl (Declaration):
params: List[Parameter] = field (default_factory=list)
body: Optional[Block] = None
location: Optional[SourceLocation] = None
@dataclass
class Program (ASTNode):
statements: List[Union[Statement, Declaration]] = field (default_factory=list)
location: Optional[SourceLocation] = None
"""Constants for bash code generation."""
RET_VAR = "__CT_RET"
RET_ARR = "__CT_RET_ARR"
TMP_PREFIX = "__ct_tmp_"
CLASS_FUNC_PREFIX = "__ct_class_"
LAMBDA_PREFIX = "__ct_lambda_"
OBJ_STORE = "__CT_OBJ"
THIS_INSTANCE = "__ct_this_instance"
ARR_FUNC_PREFIX = "__ct_arr_"
DICT_FUNC_PREFIX = "__ct_dict_"
STR_FUNC_PREFIX = "__ct_str_"
FH_FUNC_PREFIX = "__ct_fh_"
HTTP_FUNC_PREFIX = "__ct_http_"
FS_FUNC_PREFIX = "__ct_fs_"
JSON_FUNC_PREFIX = "__ct_json_"
REGEX_FUNC_PREFIX = "__ct_regex_"
MATH_FUNC_PREFIX = "__ct_math_"
COPROC_PREFIX = "__ct_cp"
from typing import Dict, Any
from .ast_nodes import (
Expression, CallExpr, MemberAccess, ThisExpr, Identifier,
BinaryOp, UnaryOp, BoolLiteral
)
from .constants import RET_VAR
class NodeIdMap:
"""Mapping from AST nodes to values using id() with reference retention."""
def __init__(self):
self._map: Dict[int, Any] = {}
self._refs = []
def set(self, node, value):
self._refs.append(node)
self._map[id(node)] = value
def get(self, node, default=None):
return self._map.get(id(node), default)
def __contains__(self, node):
return id(node) in self._map
def __getitem__(self, node):
return self._map[id(node)]
class CseMixin:
"""Mixin for CSE optimization."""
def collect_method_calls(self, expr: Expression, calls: list):
"""Collect this.method() calls from an expression."""
if isinstance(expr, CallExpr):
if isinstance(expr.callee, MemberAccess) and isinstance(expr.callee.object, ThisExpr):
calls.append(expr)
for arg in expr.arguments:
self.collect_method_calls(arg, calls)
elif isinstance(expr, BinaryOp):
self.collect_method_calls(expr.left, calls)
self.collect_method_calls(expr.right, calls)
elif isinstance(expr, UnaryOp):
self.collect_method_calls(expr.operand, calls)
elif isinstance(expr, MemberAccess):
self.collect_method_calls(expr.object, calls)
def collect_all_calls(self, expr: Expression, calls: list):
"""Collect ALL function calls from an expression."""
if isinstance(expr, CallExpr):
calls.append(expr)
for arg in expr.arguments:
self.collect_all_calls(arg, calls)
elif isinstance(expr, BinaryOp):
self.collect_all_calls(expr.left, calls)
self.collect_all_calls(expr.right, calls)
elif isinstance(expr, UnaryOp):
self.collect_all_calls(expr.operand, calls)
elif isinstance(expr, MemberAccess):
self.collect_all_calls(expr.object, calls)
def precompute_condition_calls(self, condition: Expression) -> tuple:
"""Pre-compute method calls in condition."""
calls = []
self.collect_method_calls(condition, calls)
seen = {}
mapping = NodeIdMap()
regen_code = []
for call in calls:
method = call.callee.member
args = [self.generate_expr(arg) for arg in call.arguments]
args_str = " ".join([f'"{a}"' for a in args])
key = f"this.{method}({args_str})"
if key not in seen:
temp = self.new_temp()
call_line = f'__ct_class_{self.current_class}_{method} "$this" {args_str} >/dev/null'
assign_line = f'{temp}="${{{RET_VAR}}}"'
self.emit(call_line)
self.emit(assign_line)
seen[key] = temp
regen_code.append((call_line, assign_line))
mapping.set(call, seen[key])
return mapping, regen_code
def precompute_all_calls(self, condition: Expression) -> tuple:
"""Pre-compute all function calls in condition."""
calls = []
self.collect_all_calls(condition, calls)
seen = {}
mapping = NodeIdMap()
regen_code = []
for call in calls:
if isinstance(call.callee, MemberAccess):
if isinstance(call.callee.object, ThisExpr):
method = call.callee.member
args = [self.generate_expr(arg) for arg in call.arguments]
args_str = " ".join([f'"{a}"' for a in args])
key = f"this.{method}({args_str})"
if key not in seen:
temp = self.new_temp()
call_line = f'__ct_class_{self.current_class}_{method} "$this" {args_str} >/dev/null'
assign_line = f'{temp}="${{{RET_VAR}}}"'
self.emit(call_line)
self.emit(assign_line)
seen[key] = temp
regen_code.append((call_line, assign_line))
mapping.set(call, seen[key])
elif isinstance(call.callee.object, Identifier):
obj_name = call.callee.object.name
method = call.callee.member
args = [self.generate_expr(arg) for arg in call.arguments]
args_str = " ".join([f'"{a}"' for a in args])
key = f"{obj_name}.{method}({args_str})"
if key not in seen:
temp = self.new_temp()
call_expr = self.generate_expr(call)
if call_expr.startswith('$'):
call_line = f'{temp}="{call_expr}"'
else:
call_line = f'{temp}="$({call_expr})"'
self.emit(call_line)
seen[key] = temp
regen_code.append((call_line, ""))
mapping.set(call, seen[key])
elif isinstance(call.callee, Identifier):
func_name = call.callee.name
if func_name in ('is_empty', 'is_number'):
continue
args = [self.generate_expr(arg) for arg in call.arguments]
args_str = " ".join([f'"{a}"' for a in args])
key = f"{func_name}({args_str})"
if key not in seen:
temp = self.new_temp()
resolved_call = self.generate_call_statement(call)
call_line = f'{resolved_call} >/dev/null'
assign_line = f'{temp}="${{{RET_VAR}}}"'
self.emit(call_line)
self.emit(assign_line)
seen[key] = temp
regen_code.append((call_line, assign_line))
mapping.set(call, seen[key])
return mapping, regen_code
def generate_condition_with_precompute(self, expr: Expression, mapping: dict) -> str:
"""Generate condition using pre-computed values."""
if isinstance(expr, BinaryOp):
left = self.generate_expr_with_precompute(expr.left, mapping)
right = self.generate_expr_with_precompute(expr.right, mapping)
op = expr.operator
if op == "==":
return f'[[ "{left}" == "{right}" ]]'
elif op == "!=":
return f'[[ "{left}" != "{right}" ]]'
elif op == "<":
if self.is_string_comparison(expr.left, expr.right):
return f'[[ "{left}" < "{right}" ]]'
return f'[[ {left} -lt {right} ]]'
elif op == ">":
if self.is_string_comparison(expr.left, expr.right):
return f'[[ "{left}" > "{right}" ]]'
return f'[[ {left} -gt {right} ]]'
elif op == "<=":
if self.is_string_comparison(expr.left, expr.right):
return f'[[ ! "{left}" > "{right}" ]]'
return f'[[ {left} -le {right} ]]'
elif op == ">=":
if self.is_string_comparison(expr.left, expr.right):
return f'[[ ! "{left}" < "{right}" ]]'
return f'[[ {left} -ge {right} ]]'
elif op == "&&":
l = self.generate_condition_with_precompute(expr.left, mapping)
r = self.generate_condition_with_precompute(expr.right, mapping)
return f'{{ {l} && {r}; }}'
elif op == "||":
l = self.generate_condition_with_precompute(expr.left, mapping)
r = self.generate_condition_with_precompute(expr.right, mapping)
return f'{{ {l} || {r}; }}'
if isinstance(expr, UnaryOp) and expr.operator == "!":
inner = self.generate_condition_with_precompute(expr.operand, mapping)
return f'! {inner}'
if isinstance(expr, Identifier):
return f'[[ "${expr.name}" == "true" ]]'
if isinstance(expr, BoolLiteral):
return "true" if expr.value else "false"
if isinstance(expr, CallExpr) and expr in mapping:
return f'[[ "${{{mapping[expr]}}}" == "true" ]]'
return self.generate_condition(expr)
def generate_expr_with_precompute(self, expr: Expression, mapping: NodeIdMap) -> str:
"""Generate expression using pre-computed values."""
if isinstance(expr, CallExpr) and expr in mapping:
return f'${mapping[expr]}'
if isinstance(expr, MemberAccess):
if isinstance(expr.object, CallExpr) and expr.object in mapping:
temp = mapping[expr.object]
return f'${{__CT_OBJ["${temp}.{expr.member}"]}}'
return self.generate_expr(expr)
return self.generate_expr(expr)
import re
from typing import List
from .ast_nodes import Decorator, Parameter
from .constants import RET_VAR
class DecoratorMixin:
"""Mixin for decorator wrapper generation."""
def generate_decorator_wrapper(self, decorator: Decorator, wrapped_name: str,
wrapper_name: str, params: List[Parameter]):
"""Generate decorator wrapper for standalone function."""
if decorator.name == "retry":
self._generate_retry_wrapper(decorator, wrapped_name, wrapper_name, params, is_method=False)
elif decorator.name == "log":
self._generate_log_wrapper(wrapped_name, wrapper_name, params, is_method=False)
elif decorator.name == "cache":
self._generate_cache_wrapper(decorator, wrapped_name, wrapper_name, params, is_method=False)
elif decorator.name == "validate":
self._generate_validate_wrapper(decorator, wrapped_name, wrapper_name, params, is_method=False)
else:
self._generate_passthrough_wrapper(wrapped_name, wrapper_name, params, is_method=False)
def generate_method_decorator_wrapper(self, decorator: Decorator, wrapped_name: str,
wrapper_name: str, params: List[Parameter]):
"""Generate decorator wrapper for class method."""
if decorator.name == "retry":
self._generate_retry_wrapper(decorator, wrapped_name, wrapper_name, params, is_method=True)
elif decorator.name == "log":
self._generate_log_wrapper(wrapped_name, wrapper_name, params, is_method=True)
elif decorator.name == "cache":
self._generate_cache_wrapper(decorator, wrapped_name, wrapper_name, params, is_method=True)
elif decorator.name == "validate":
self._generate_validate_wrapper(decorator, wrapped_name, wrapper_name, params, is_method=True)
else:
self._generate_passthrough_wrapper(wrapped_name, wrapper_name, params, is_method=True)
self.current_class = None
self.current_class_fields = set()
def _generate_retry_wrapper(self, decorator: Decorator, wrapped_name: str,
wrapper_name: str, params: List[Parameter], is_method: bool):
attempts = 3
delay = 1
for arg_name, arg_val in decorator.arguments:
if arg_name == "attempts":
attempts = self.generate_expr(arg_val)
elif arg_name == "delay":
delay = self.generate_expr(arg_val)
self.emit(f"{wrapper_name} () {{")
with self.indented():
if is_method:
self.emit('local this="$1"')
self.emit('shift')
self.emit(f"local __attempts={attempts}")
self.emit(f"local __delay={delay}")
self.emit("local __i")
self.emit("for __i in $(seq 1 $__attempts); do")
with self.indented():
params_str = " ".join([f'"${{{i + 1}}}"' for i in range(len(params))])
if is_method:
self.emit(f'if {wrapped_name} "$this" {params_str}; then')
else:
self.emit(f'if {wrapped_name} {params_str}; then')
with self.indented():
self.emit("return 0")
self.emit("fi")
self.emit('sleep "$__delay"')
self.emit("done")
self.emit("return 1")
self.emit("}")
self.emit()
def _generate_log_wrapper(self, wrapped_name: str, wrapper_name: str,
params: List[Parameter], is_method: bool):
self.emit(f"{wrapper_name} () {{")
with self.indented():
if is_method:
self.emit('local this="$1"')
self.emit('shift')
self.emit(f'echo "[LOG] Calling {wrapped_name}" >&2')
params_str = " ".join([f'"${{{i + 1}}}"' for i in range(len(params))])
if is_method:
self.emit(f'{wrapped_name} "$this" {params_str}')
else:
self.emit(f'{wrapped_name} {params_str}')
self.emit('local __ret=$?')
self.emit(f'echo "[LOG] {wrapped_name} returned $__ret" >&2')
self.emit('return $__ret')
self.emit("}")
self.emit()
def _generate_cache_wrapper(self, decorator: Decorator, wrapped_name: str,
wrapper_name: str, params: List[Parameter], is_method: bool):
ttl = 60
for arg_name, arg_val in decorator.arguments:
if arg_name == "ttl":
ttl = self.generate_expr(arg_val)
self.emit(f"declare -gA __ct_cache_{wrapper_name}=()")
self.emit(f"declare -g __ct_cache_time_{wrapper_name}=0")
self.emit()
self.emit(f"{wrapper_name} () {{")
with self.indented():
if is_method:
self.emit('local this="$1"')
self.emit('shift')
self.emit(f'local __key="$this:$*"')
else:
self.emit(f'local __key="$*"')
self.emit(f'local __now=$(date +%s)')
self.emit(f'local __cache_age=$((__now - __ct_cache_time_{wrapper_name}))')
self.emit(f'if [[ $__cache_age -lt {ttl} ]] && [[ -n "${{__ct_cache_{wrapper_name}[$__key]:-}}" ]]; then')
with self.indented():
self.emit(f'{RET_VAR}="${{__ct_cache_{wrapper_name}[$__key]}}"')
self.emit(f'echo "${{{RET_VAR}}}"')
self.emit("return 0")
self.emit("fi")
params_str = " ".join([f'"${{{i + 1}}}"' for i in range(len(params))])
if is_method:
self.emit(f'local __result=$({wrapped_name} "$this" {params_str})')
else:
self.emit(f'local __result=$({wrapped_name} {params_str})')
self.emit(f'{RET_VAR}="$__result"')
self.emit(f'__ct_cache_{wrapper_name}["$__key"]="$__result"')
self.emit(f'__ct_cache_time_{wrapper_name}=$__now')
self.emit('echo "$__result"')
self.emit("}")
self.emit()
def _generate_passthrough_wrapper(self, wrapped_name: str, wrapper_name: str,
params: List[Parameter], is_method: bool):
self.emit(f"{wrapper_name} () {{")
with self.indented():
if is_method:
self.emit('local this="$1"')
self.emit('shift')
params_str = " ".join([f'"${{{i + 1}}}"' for i in range(len(params))])
if is_method:
self.emit(f'{wrapped_name} "$this" {params_str}')
else:
self.emit(f'{wrapped_name} {params_str}')
self.emit("}")
self.emit()
def _generate_validate_wrapper(self, decorator: Decorator, wrapped_name: str,
wrapper_name: str, params: List[Parameter], is_method: bool):
validations = {}
for arg_name, arg_val in decorator.arguments:
if arg_name and hasattr(arg_val, 'value'):
validations[arg_name] = arg_val.value
self.emit(f"{wrapper_name} () {{")
with self.indented():
if is_method:
self.emit('local this="$1"')
self.emit('shift')
for i, param in enumerate(params):
rule = validations.get(param.name)
if rule:
self._emit_validation_check(param.name, rule, i + 1)
params_str = " ".join([f'"${{{i + 1}}}"' for i in range(len(params))])
if is_method:
self.emit(f'{wrapped_name} "$this" {params_str}')
else:
self.emit(f'{wrapped_name} {params_str}')
self.emit("}")
self.emit()
def _emit_validation_check(self, param_name: str, rule: str, arg_pos: int):
self.emit(f'local {param_name}="${{{arg_pos}}}"')
if "int" in rule:
self.emit(f'if ! [[ "${param_name}" =~ ^-?[0-9]+$ ]]; then')
with self.indented():
self.emit(f'echo "Validation error: {param_name} must be integer" >&2')
self.emit('return 1')
self.emit('fi')
for match in re.finditer(r'(>=|<=|>|<|==|!=)\s*(-?\d+)', rule):
op, val = match.groups()
bash_op = {'>': '-gt', '<': '-lt', '>=': '-ge', '<=': '-le', '==': '-eq', '!=': '-ne'}[op]
self.emit(f'if ! [[ ${param_name} {bash_op} {val} ]]; then')
with self.indented():
self.emit(f'echo "Validation error: {param_name} must be {op} {val}" >&2')
self.emit('return 1')
self.emit('fi')
elif "string" in rule:
if "nonempty" in rule or "required" in rule:
self.emit(f'if [[ -z "${param_name}" ]]; then')
with self.indented():
self.emit(f'echo "Validation error: {param_name} cannot be empty" >&2')
self.emit('return 1')
self.emit('fi')
from dataclasses import dataclass
from typing import List, Optional
import sys
@dataclass
class CompileError:
message: str
filename: str
line: int
column: int
hint: Optional[str] = None
def __str__ (self):
result = f"Error: {self.message}\n --> {self.filename}:{self.line}:{self.column}"
if self.hint:
result += f"\n Hint: {self.hint}"
return result
class ErrorCollector:
def __init__ (self):
self.errors: List[CompileError] = []
self._seen: set = set()
def _error_key(self, msg: str, filename: str, line: int, column: int) -> tuple:
return (msg, filename, line, column)
def add (self, error: CompileError):
key = self._error_key(error.message, error.filename, error.line, error.column)
if key not in self._seen:
self._seen.add(key)
self.errors.append(error)
def add_error (self, message: str, filename: str, line: int, column: int, hint: str = None):
key = self._error_key(message, filename, line, column)
if key not in self._seen:
self._seen.add(key)
self.errors.append (CompileError (
message=message,
filename=filename,
line=line,
column=column,
hint=hint
))
def has_errors (self) -> bool:
return len (self.errors) > 0
def print_errors (self):
for error in self.errors:
print (str (error), file=sys.stderr)
print (file=sys.stderr)
def clear (self):
self.errors = []
from typing import List, Optional
from .tokens import Token, TokenType, KEYWORDS
from .errors import CompileError
class Lexer:
def __init__ (self, source: str, filename: str = "<stdin>"):
self.source = source
self.filename = filename
self.pos = 0
self.line = 1
self.column = 1
self.tokens: List[Token] = []
self.errors: List[CompileError] = []
def current (self) -> Optional[str]:
if self.pos >= len (self.source):
return None
return self.source[self.pos]
def peek (self, offset: int = 1) -> Optional[str]:
pos = self.pos + offset
if pos >= len (self.source):
return None
return self.source[pos]
def advance (self) -> Optional[str]:
ch = self.current ()
if ch is None:
return None
self.pos += 1
if ch == '\n':
self.line += 1
self.column = 1
else:
self.column += 1
return ch
def skip_whitespace (self):
while self.current () in (' ', '\t', '\r'):
self.advance ()
def add_token (self, type: TokenType, value=None, line=None, column=None):
self.tokens.append (Token (
type=type,
value=value,
line=line or self.line,
column=column or self.column
))
def error (self, message: str):
self.errors.append (CompileError (
message=message,
filename=self.filename,
line=self.line,
column=self.column
))
def read_string (self) -> str:
start_line = self.line
start_col = self.column
quote = self.advance ()
result = []
while True:
ch = self.current ()
if ch is None:
self.error ("Unterminated string")
break
if ch == quote:
self.advance ()
break
if ch == '\\':
self.advance ()
escaped = self.current ()
if escaped is None:
self.error ("Unterminated escape sequence")
break
escape_map = {
'n': '\n',
't': '\t',
'r': '\r',
'\\': '\\',
'{': '\x00LBRACE\x00',
'}': '\x00RBRACE\x00',
'$': '\x00DOLLAR\x00',
'"': '"',
"'": "'",
}
result.append (escape_map.get (escaped, escaped))
self.advance ()
else:
result.append (ch)
self.advance ()
return ''.join (result)
def read_number (self) -> Token:
start_col = self.column
result = []
is_float = False
while True:
ch = self.current ()
if ch is None:
break
if ch.isdigit ():
result.append (ch)
self.advance ()
elif ch == '.' and not is_float:
if self.peek () and self.peek ().isdigit ():
is_float = True
result.append (ch)
self.advance ()
else:
break
else:
break
value = ''.join (result)
if is_float:
return Token (TokenType.FLOAT, float (value), self.line, start_col)
return Token (TokenType.INTEGER, int (value), self.line, start_col)
def read_identifier (self) -> str:
result = []
while True:
ch = self.current ()
if ch is None:
break
if ch.isalnum () or ch == '_':
result.append (ch)
self.advance ()
elif ch == '-' and self.peek () and (self.peek ().isalpha () or self.peek () == '_'):
result.append (ch)
self.advance ()
else:
break
return ''.join (result)
def tokenize (self) -> List[Token]:
while True:
self.skip_whitespace ()
ch = self.current ()
if ch is None:
self.add_token (TokenType.EOF)
break
start_line = self.line
start_col = self.column
if ch == '#':
while self.current () and self.current () != '\n':
self.advance ()
continue
if ch == '\n':
self.add_token (TokenType.NEWLINE, line=start_line, column=start_col)
self.advance ()
continue
if ch in ('"', "'"):
value = self.read_string ()
self.add_token (TokenType.STRING, value, start_line, start_col)
continue
if ch.isdigit ():
token = self.read_number ()
self.tokens.append (token)
continue
if ch.isalpha () or ch == '_':
value = self.read_identifier ()
token_type = KEYWORDS.get (value, TokenType.IDENTIFIER)
self.add_token (token_type, value, start_line, start_col)
continue
if ch == '=' and self.peek () == '=':
self.advance ()
self.advance ()
self.add_token (TokenType.EQ, '==', start_line, start_col)
continue
if ch == '!' and self.peek () == '=':
self.advance ()
self.advance ()
self.add_token (TokenType.NEQ, '!=', start_line, start_col)
continue
if ch == '<' and self.peek () == '=':
self.advance ()
self.advance ()
self.add_token (TokenType.LTE, '<=', start_line, start_col)
continue
if ch == '>' and self.peek () == '=':
self.advance ()
self.advance ()
self.add_token (TokenType.GTE, '>=', start_line, start_col)
continue
if ch == '&' and self.peek () == '&':
self.advance ()
self.advance ()
self.add_token (TokenType.AND, '&&', start_line, start_col)
continue
if ch == '|' and self.peek () == '|':
self.advance ()
self.advance ()
self.add_token (TokenType.OR, '||', start_line, start_col)
continue
if ch == '=' and self.peek () == '>':
self.advance ()
self.advance ()
self.add_token (TokenType.ARROW, '=>', start_line, start_col)
continue
if ch == '+' and self.peek () == '=':
self.advance ()
self.advance ()
self.add_token (TokenType.PLUS_ASSIGN, '+=', start_line, start_col)
continue
if ch == '-' and self.peek () == '=':
self.advance ()
self.advance ()
self.add_token (TokenType.MINUS_ASSIGN, '-=', start_line, start_col)
continue
if ch == '*' and self.peek () == '=':
self.advance ()
self.advance ()
self.add_token (TokenType.STAR_ASSIGN, '*=', start_line, start_col)
continue
if ch == '/' and self.peek () == '=':
self.advance ()
self.advance ()
self.add_token (TokenType.SLASH_ASSIGN, '/=', start_line, start_col)
continue
if ch == '.' and self.peek () == '.' and self.peek (2) == '.':
self.advance ()
self.advance ()
self.advance ()
self.add_token (TokenType.DOTDOTDOT, '...', start_line, start_col)
continue
if ch == '.' and self.peek () == '.':
self.advance ()
self.advance ()
self.add_token (TokenType.DOTDOT, '..', start_line, start_col)
continue
single_char_tokens = {
'+': TokenType.PLUS,
'-': TokenType.MINUS,
'*': TokenType.STAR,
'/': TokenType.SLASH,
'%': TokenType.PERCENT,
'=': TokenType.ASSIGN,
'<': TokenType.LT,
'>': TokenType.GT,
'!': TokenType.NOT,
'.': TokenType.DOT,
':': TokenType.COLON,
'(': TokenType.LPAREN,
')': TokenType.RPAREN,
'{': TokenType.LBRACE,
'}': TokenType.RBRACE,
'[': TokenType.LBRACKET,
']': TokenType.RBRACKET,
',': TokenType.COMMA,
'@': TokenType.AT,
'|': TokenType.PIPE,
}
if ch in single_char_tokens:
self.advance ()
self.add_token (single_char_tokens[ch], ch, start_line, start_col)
continue
if ch == ';':
self.advance ()
continue
self.error (f"Unexpected character: '{ch}'")
self.advance ()
return self.tokens
conf = configuration_data()
conf.set('PYTHON', py.full_path())
conf.set('pkgdatadir', pkgdatadir)
configure_file(
input: '../content.in',
output: 'content',
configuration: conf,
install: true,
install_dir: get_option('bindir'),
install_mode: 'r-xr-xr-x',
)
bootstrap_sources = [
'__init__.py',
'ast_nodes.py',
'awk_codegen.py',
'class_codegen.py',
'codegen.py',
'constants.py',
'cse_codegen.py',
'dce.py',
'decorator_codegen.py',
'dispatch_codegen.py',
'errors.py',
'expr_codegen.py',
'lexer.py',
'main.py',
'parser.py',
'stdlib.py',
'stmt_codegen.py',
'tokens.py',
]
install_data(bootstrap_sources,
install_dir: pkgdatadir / 'bootstrap')
methods_sources = [
'methods/__init__.py',
'methods/args.py',
'methods/array.py',
'methods/base.py',
'methods/core.py',
'methods/dict.py',
'methods/file_handle.py',
'methods/fs.py',
'methods/http.py',
'methods/json.py',
'methods/logger.py',
'methods/math.py',
'methods/process_handle.py',
'methods/reflect.py',
'methods/regex.py',
'methods/string.py',
'methods/time.py',
]
install_data(methods_sources,
install_dir: pkgdatadir / 'bootstrap' / 'methods')
from enum import Enum, auto
from dataclasses import dataclass
from typing import Any
class TokenType (Enum):
INTEGER = auto ()
FLOAT = auto ()
STRING = auto ()
TRUE = auto ()
FALSE = auto ()
NIL = auto ()
IDENTIFIER = auto ()
FUNC = auto ()
CLASS = auto ()
CONSTRUCT = auto ()
THIS = auto ()
BASE = auto ()
RETURN = auto ()
IF = auto ()
ELSE = auto ()
FOREACH = auto ()
FOR = auto ()
IN = auto ()
WHILE = auto ()
BREAK = auto ()
CONTINUE = auto ()
TRY = auto ()
EXCEPT = auto ()
FINALLY = auto ()
THROW = auto ()
DEFER = auto ()
RANGE = auto ()
WHEN = auto ()
WITH = auto ()
NEW = auto ()
ASYNC = auto ()
AWAIT = auto ()
ON = auto ()
NAMESPACE = auto ()
USING = auto ()
BUSING = auto ()
PLUS = auto ()
MINUS = auto ()
STAR = auto ()
SLASH = auto ()
PERCENT = auto ()
ASSIGN = auto ()
EQ = auto ()
NEQ = auto ()
LT = auto ()
GT = auto ()
LTE = auto ()
GTE = auto ()
AND = auto ()
OR = auto ()
PIPE = auto ()
NOT = auto ()
ARROW = auto ()
PLUS_ASSIGN = auto ()
MINUS_ASSIGN = auto ()
STAR_ASSIGN = auto ()
SLASH_ASSIGN = auto ()
DOT = auto ()
DOTDOT = auto ()
DOTDOTDOT = auto ()
COLON = auto ()
LPAREN = auto ()
RPAREN = auto ()
LBRACE = auto ()
RBRACE = auto ()
LBRACKET = auto ()
RBRACKET = auto ()
COMMA = auto ()
NEWLINE = auto ()
AT = auto ()
COMMENT = auto ()
EOF = auto ()
KEYWORDS = {
'func': TokenType.FUNC,
'class': TokenType.CLASS,
'construct': TokenType.CONSTRUCT,
'this': TokenType.THIS,
'base': TokenType.BASE,
'return': TokenType.RETURN,
'if': TokenType.IF,
'else': TokenType.ELSE,
'foreach': TokenType.FOREACH,
'for': TokenType.FOR,
'in': TokenType.IN,
'while': TokenType.WHILE,
'break': TokenType.BREAK,
'continue': TokenType.CONTINUE,
'try': TokenType.TRY,
'except': TokenType.EXCEPT,
'finally': TokenType.FINALLY,
'throw': TokenType.THROW,
'defer': TokenType.DEFER,
'range': TokenType.RANGE,
'when': TokenType.WHEN,
'with': TokenType.WITH,
'new': TokenType.NEW,
'async': TokenType.ASYNC,
'await': TokenType.AWAIT,
'on': TokenType.ON,
'namespace': TokenType.NAMESPACE,
'using': TokenType.USING,
'busing': TokenType.BUSING,
'true': TokenType.TRUE,
'false': TokenType.FALSE,
'nil': TokenType.NIL,
}
@dataclass
class Token:
type: TokenType
value: Any
line: int
column: int
def __repr__ (self):
return f"Token({self.type.name}, {self.value!r}, {self.line}:{self.column})"
import sys
from .cli import main
sys.exit(main())
from .codegen import emit_awk_function
from .backend import BashBackend, compile_to_bash
__all__ = ['BashBackend', 'compile_to_bash']
"""
BashBackend — main coordinator.
Usage:
from compiler.backend.bash.backend import BashBackend
backend = BashBackend()
bash_code = backend.emit(ir, used_stdlib_categories)
"""
from __future__ import annotations
import contextlib
from typing import Iterator
from ...ir.nodes import (
IRProgram, IRFunction, IRClass, IRBlock, IRStmt, IRExpr,
IRLambda, IRParam,
)
from ...semantics.scope import Symbol
from .constants import RET_VAR, LAMBDA_PREFIX, COPROC_PREFIX, TMP_PREFIX
from .stdlib import emit_stdlib
from .expr import emit_expr, emit_expr_as_stmt
from .stmt import emit_stmt, emit_block
from .classes import emit_class, emit_function
# ---------------------------------------------------------------------------
# EmitContext
# ---------------------------------------------------------------------------
class EmitContext:
"""
Carries state needed during code generation:
- current output buffer
- indentation level
- in_function flag + local_vars set
- counters for fresh names
"""
def __init__(
self,
output: list[str],
indent_level: int = 0,
lambda_defs: list[str] | None = None,
in_function: bool = False,
array_vars: 'set[str] | None' = None,
dict_vars: 'set[str] | None' = None,
process_handles: 'dict[str, str] | None' = None,
) -> None:
self._output = output
self._indent = indent_level
self._lambda_defs = lambda_defs if lambda_defs is not None else []
self._locals: set[str] = set()
self._tmp_counter = 0
self._coproc_counter = 0
self._lambda_counter = 0
self.in_function = in_function
self.array_vars: set[str] = array_vars if array_vars is not None else set()
self.dict_vars: set[str] = dict_vars if dict_vars is not None else set()
# var_name → coproc_name (e.g. "proc" → "__ct_cp1")
self.process_handles: dict[str, str] = process_handles if process_handles is not None else {}
# param_array/dict_vars: function params holding an array/dict name (pass "${param}")
self.param_array_vars: set[str] = set()
self.param_dict_vars: set[str] = set()
# ------------------------------------------------------------------ emit
def emit(self, line: str = '') -> None:
if line:
self._output.append(' ' * self._indent + line)
else:
self._output.append('')
@contextlib.contextmanager
def indented(self) -> Iterator[None]:
self._indent += 1
try:
yield
finally:
self._indent -= 1
# ------------------------------------------------------------------ names
def fresh_tmp(self) -> str:
self._tmp_counter += 1
return f'{TMP_PREFIX}{self._tmp_counter}'
def emit_tmp(self, name: str, val: str) -> None:
"""Emit tmp variable assignment — local inside functions, global outside."""
if self.in_function:
self.emit(f'local {name}={val}')
else:
self.emit(f'{name}={val}')
def fresh_coproc(self) -> str:
self._coproc_counter += 1
return f'{COPROC_PREFIX}{self._coproc_counter}'
# ------------------------------------------------------------------ locals
def declare_local(self, name: str) -> None:
self._locals.add(name)
def is_declared(self, name: str) -> bool:
return name in self._locals
# ------------------------------------------------------------------ lambda
def emit_lambda(self, node: IRLambda) -> str:
"""Emit a lambda as a named bash function; return its name."""
self._lambda_counter += 1
name = f'{LAMBDA_PREFIX}{self._lambda_counter}'
# Build lambda body in a separate context
lam_lines: list[str] = []
lam_ctx = EmitContext(lam_lines, indent_level=1, lambda_defs=self._lambda_defs)
lam_lines.append(f'{name} () {{')
for i, p in enumerate(node.params):
lam_lines.append(f' local {p}="${{{i + 1}}}"')
if node.body_expr is not None:
val = emit_expr(node.body_expr, lam_ctx)
lam_lines.append(f' {RET_VAR}={val}')
elif node.body is not None:
emit_block(node.body, lam_ctx)
lam_lines.append('}')
self._lambda_defs.extend(lam_lines)
return name
# ------------------------------------------------------------------ child contexts
def function_context(self) -> '_FunctionContext':
return _FunctionContext(self._indent, self._lambda_defs)
def child_context(self, output: list[str]) -> 'EmitContext':
child = EmitContext(output, self._indent, self._lambda_defs, self.in_function,
self.array_vars, self.dict_vars, self.process_handles)
return child
def lines(self) -> list[str]:
return self._output
class _FunctionContext(EmitContext):
"""Context inside a function body — tracks local vars."""
def __init__(self, indent: int, lambda_defs: list[str],
array_vars: 'set[str] | None' = None,
dict_vars: 'set[str] | None' = None) -> None:
super().__init__([], indent, lambda_defs, in_function=True, array_vars=array_vars, dict_vars=dict_vars)
# ---------------------------------------------------------------------------
# BashBackend
# ---------------------------------------------------------------------------
class BashBackend:
"""Emits a complete bash script from an IRProgram."""
def emit(self, ir: IRProgram, used_categories: set[str]) -> str:
lines: list[str] = []
lambda_defs: list[str] = []
# Shebang + safety flags
lines.append('#!/usr/bin/env bash')
lines.append('set -euo pipefail')
lines.append('')
# busing imports
for bus in getattr(ir, 'busing', []):
lines.append(f'source "{bus.path}"')
if getattr(ir, 'busing', []):
lines.append('')
# Standard library
emit_stdlib(lines, used_categories)
# Lambda definitions collected during codegen
top_ctx = EmitContext(lines, lambda_defs=lambda_defs)
# DCE comments for skipped @test functions
for name in getattr(ir, 'skipped_tests', []):
lines.append(f'# DCE: skipped @test function {name}')
if getattr(ir, 'skipped_tests', []):
lines.append('')
# Classes
all_classes = {cls.name: cls for cls in ir.classes}
for cls in ir.classes:
emit_class(cls, top_ctx, all_classes)
# Functions (non-method, non-awk)
for fn in ir.functions:
if not fn.is_method:
emit_function(fn, top_ctx)
# Lambda defs from function bodies — emit before top-level stmts
fn_lambda_defs = list(lambda_defs)
lambda_defs.clear()
# Top-level statements into a temporary buffer
top_stmt_lines: list[str] = []
if ir.top_stmts:
stmt_ctx = EmitContext(top_stmt_lines, lambda_defs=lambda_defs)
for stmt in ir.top_stmts:
emit_stmt(stmt, stmt_ctx)
# Now emit: function-body lambdas, then top-stmt lambdas, then top stmts
all_lambdas = fn_lambda_defs + lambda_defs
if all_lambdas:
lines.append('# === Lambdas ===')
lines.extend(all_lambdas)
lines.append('')
if top_stmt_lines:
lines.extend(top_stmt_lines)
lines.append('')
# Call main() if defined (and no top-level stmts already called it)
has_main = any(fn.name == 'main' and not fn.is_method for fn in ir.functions)
if has_main:
lines.append('main "$@"')
# Test runner: call @test functions and print summary
test_fns = [fn for fn in ir.functions if fn.is_test and not fn.is_method]
if test_fns:
lines.append('')
lines.append('# === Test Runner ===')
for fn in test_fns:
desc = ''
for d in fn.decorators:
if d.name == 'test' and d.args:
from ..bash.expr import emit_expr
desc_ctx = EmitContext([], lambda_defs=[])
desc = emit_expr(d.args[0][1], desc_ctx)
break
bash_name = fn.symbol.bash_name() if fn.symbol else fn.name
if desc:
lines.append(f'__ct_test_current={desc}')
else:
lines.append(f'__ct_test_current="{bash_name}"')
lines.append(f'echo -n " $__ct_test_current ... "')
lines.append(f'__ct_before=$__ct_test_failed')
lines.append(f'{bash_name} || __ct_test_failed=$((__ct_test_failed+1))')
lines.append(f'[[ $__ct_test_failed -eq $__ct_before ]] && echo "PASS" || echo "FAIL"')
lines.append('echo ""')
lines.append('echo "$__ct_test_passed tests passed, $__ct_test_failed failed"')
lines.append('[[ $__ct_test_failed -eq 0 ]] || exit 1')
return '\n'.join(lines)
# ---------------------------------------------------------------------------
# Public convenience function
# ---------------------------------------------------------------------------
def compile_to_bash(ir: IRProgram, used_categories: set[str]) -> str:
"""Convenience wrapper."""
return BashBackend().emit(ir, used_categories)
from ...constants import * # noqa: F401,F403
RET_VAR = "__CT_RET"
RET_ARR = "__CT_RET_ARR"
TMP_PREFIX = "__ct_tmp_"
CLASS_FUNC_PREFIX = "__ct_class_"
LAMBDA_PREFIX = "__ct_lambda_"
OBJ_STORE = "__CT_OBJ"
THIS_VAR = "__ct_this"
ARR_PREFIX = "__ct_arr_"
DICT_PREFIX = "__ct_dict_"
STR_PREFIX = "__ct_str_"
FH_PREFIX = "__ct_fh_"
HTTP_PREFIX = "__ct_http_"
FS_PREFIX = "__ct_fs_"
JSON_PREFIX = "__ct_json_"
REGEX_PREFIX = "__ct_regex_"
MATH_PREFIX = "__ct_math_"
COPROC_PREFIX = "__ct_cp"
from enum import Enum, auto
from dataclasses import dataclass, field
from typing import Any, List, Optional, Tuple
class TokenType(Enum):
# Literals
INTEGER = auto()
FLOAT = auto()
STRING = auto() # value is List[StringPart] (see below)
TRUE = auto()
FALSE = auto()
NIL = auto()
IDENTIFIER = auto()
# Keywords
FUNC = auto()
CLASS = auto()
CONSTRUCT = auto()
THIS = auto()
BASE = auto()
RETURN = auto()
IF = auto()
ELSE = auto()
FOREACH = auto()
FOR = auto()
IN = auto()
WHILE = auto()
BREAK = auto()
CONTINUE = auto()
TRY = auto()
EXCEPT = auto()
FINALLY = auto()
THROW = auto()
DEFER = auto()
RANGE = auto()
WHEN = auto()
WITH = auto()
NEW = auto()
ASYNC = auto()
AWAIT = auto()
ON = auto()
NAMESPACE = auto()
USING = auto()
BUSING = auto()
# Operators
PLUS = auto()
MINUS = auto()
STAR = auto()
SLASH = auto()
PERCENT = auto()
ASSIGN = auto()
EQ = auto()
NEQ = auto()
LT = auto()
GT = auto()
LTE = auto()
GTE = auto()
AND = auto()
OR = auto()
PIPE = auto()
NOT = auto()
ARROW = auto()
PLUS_ASSIGN = auto()
MINUS_ASSIGN = auto()
STAR_ASSIGN = auto()
SLASH_ASSIGN = auto()
DOT = auto()
DOTDOT = auto()
DOTDOTDOT = auto()
COLON = auto()
# Delimiters
LPAREN = auto()
RPAREN = auto()
LBRACE = auto()
RBRACE = auto()
LBRACKET = auto()
RBRACKET = auto()
COMMA = auto()
# Special
NEWLINE = auto() # significant (statement separator); suppressed inside ()
AT = auto()
COMMENT = auto() # stored as trivia, not emitted to main stream
EOF = auto()
KEYWORDS: dict[str, TokenType] = {
'func': TokenType.FUNC,
'class': TokenType.CLASS,
'construct': TokenType.CONSTRUCT,
'this': TokenType.THIS,
'base': TokenType.BASE,
'return': TokenType.RETURN,
'if': TokenType.IF,
'else': TokenType.ELSE,
'foreach': TokenType.FOREACH,
'for': TokenType.FOR,
'in': TokenType.IN,
'while': TokenType.WHILE,
'break': TokenType.BREAK,
'continue': TokenType.CONTINUE,
'try': TokenType.TRY,
'except': TokenType.EXCEPT,
'finally': TokenType.FINALLY,
'throw': TokenType.THROW,
'defer': TokenType.DEFER,
'range': TokenType.RANGE,
'when': TokenType.WHEN,
'with': TokenType.WITH,
'new': TokenType.NEW,
'async': TokenType.ASYNC,
'await': TokenType.AWAIT,
'on': TokenType.ON,
'namespace': TokenType.NAMESPACE,
'using': TokenType.USING,
'busing': TokenType.BUSING,
'true': TokenType.TRUE,
'false': TokenType.FALSE,
'nil': TokenType.NIL,
}
# ---------------------------------------------------------------------------
# String interpolation parts
# StringPart is stored as the token value for STRING tokens.
# ---------------------------------------------------------------------------
@dataclass
class RawText:
"""Plain text segment of a string."""
text: str
@dataclass
class RawInterp:
"""
Interpolated expression segment: {expr}.
`source` is the raw source text of the expression inside {}.
The parser will re-lex + re-parse it into a proper expression node.
`line` and `col` are the position of the opening { in the source file.
"""
source: str
line: int
col: int
StringPart = RawText | RawInterp
# ---------------------------------------------------------------------------
# Token
# ---------------------------------------------------------------------------
@dataclass
class Token:
type: TokenType
value: Any # for STRING: List[StringPart]; others: raw Python value
line: int
column: int
trivia: str = '' # leading comment/whitespace (for LSP)
def __repr__(self) -> str:
return f'Token({self.type.name}, {self.value!r}, {self.line}:{self.column})'
install_data(
'__init__.py',
'__main__.py',
'cli.py',
'constants.py',
install_dir: pkgdatadir / 'compiler',
)
install_subdir('lexer', install_dir: pkgdatadir / 'compiler', exclude_directories: ['__pycache__'])
install_subdir('syntax', install_dir: pkgdatadir / 'compiler', exclude_directories: ['__pycache__'])
install_subdir('semantics', install_dir: pkgdatadir / 'compiler', exclude_directories: ['__pycache__'])
install_subdir('ir', install_dir: pkgdatadir / 'compiler', exclude_directories: ['__pycache__'])
install_subdir('optimizer', install_dir: pkgdatadir / 'compiler', exclude_directories: ['__pycache__'])
install_data(
'backend/__init__.py',
install_dir: pkgdatadir / 'compiler' / 'backend',
)
install_subdir('backend/bash', install_dir: pkgdatadir / 'compiler' / 'backend', exclude_directories: ['__pycache__'])
install_subdir('backend/awk', install_dir: pkgdatadir / 'compiler' / 'backend', exclude_directories: ['__pycache__'])
install_subdir('methods', install_dir: pkgdatadir / 'compiler', exclude_directories: ['__pycache__'])
install_subdir('symbols', install_dir: pkgdatadir / 'compiler', exclude_directories: ['__pycache__'])
"""
Common Subexpression Elimination (CSE).
Key improvement over bootstrap: uses stable node_id, NOT Python id().
Any IR transformation preserves node_id, so CSE mappings are stable.
Scope: within a single function body (basic block CSE).
Targets: method calls and function calls that are side-effect-free.
Strategy:
1. Before if/while conditions: pre-compute expensive sub-expressions into
temp variables (__ct_cse_N).
2. Within a basic block: deduplicate identical calls.
A call is considered side-effect-free (CSE-eligible) if:
- It's an IRMethodCall on a known pure type (string/array methods)
- It's an IRCall to a known pure function (math.*, len)
"""
from __future__ import annotations
from typing import Optional
import itertools
from ..ir.nodes import (
IRProgram, IRFunction, IRClass, IRBlock, IRStmt, IRExpr,
IRMethodCall, IRCall, IRAssign, IRIf, IRWhile, IRExprStmt,
IRIdentifier, IRInt,
)
from ..semantics.types import T_ANY
_cse_counter = itertools.count(1)
def _cse_tmp() -> str:
return f'__ct_cse_{next(_cse_counter)}'
# Methods considered pure (no side effects, safe to hoist)
_PURE_METHODS = frozenset({
'upper', 'lower', 'trim', 'len', 'contains', 'starts', 'ends',
'index', 'charAt', 'substr', 'urlencode',
'join', 'slice', 'get',
'abs', 'min', 'max',
})
# Builtin functions considered pure
_PURE_FUNCS = frozenset({'len', 'range'})
def _is_pure_call(node: IRExpr) -> bool:
if isinstance(node, IRMethodCall):
return node.method_name in _PURE_METHODS
if isinstance(node, IRCall) and not node.is_shell_cmd:
return node.callee_name in _PURE_FUNCS
return False
def run_cse(ir: IRProgram) -> IRProgram:
"""Apply CSE to all functions and class methods."""
for fn in ir.functions:
if fn.body:
fn.body = _cse_block(fn.body)
for cl in ir.classes:
if cl.constructor and cl.constructor.body:
cl.constructor.body = _cse_block(cl.constructor.body)
for m in cl.methods:
if m.body:
m.body = _cse_block(m.body)
return ir
def _cse_block(block: IRBlock) -> IRBlock:
"""
Apply CSE within a block:
- Before each if/while: hoist condition sub-expressions to temps.
- Track already-computed expressions within the block.
"""
new_stmts: list[IRStmt] = []
# Map: (method_name, receiver_node_id) → temp_var_name
# Only valid within the current straight-line block segment
computed: dict[tuple, str] = {}
for stmt in block.stmts:
if isinstance(stmt, (IRIf, IRWhile)):
# Hoist pure calls in condition to temps
hoisted, new_cond = _hoist_condition(stmt.condition, computed)
new_stmts.extend(hoisted)
stmt.condition = new_cond
if isinstance(stmt, IRIf):
if stmt.then_block:
stmt.then_block = _cse_block(stmt.then_block)
if stmt.else_block:
stmt.else_block = _cse_block(stmt.else_block)
stmt.elif_branches = [
(c, _cse_block(b)) for c, b in stmt.elif_branches
]
# After a branch: clear computed (control flow merges)
computed.clear()
elif isinstance(stmt, IRWhile):
if stmt.body:
# Re-emit hoisted computations inside loop body too
stmt.body = _cse_block(stmt.body)
computed.clear() # loop may repeat
new_stmts.append(stmt)
block.stmts = new_stmts
return block
def _hoist_condition(
cond: Optional[IRExpr],
computed: dict[tuple, str],
) -> tuple[list[IRStmt], Optional[IRExpr]]:
"""
Find pure sub-calls in `cond`, assign them to temp vars, return
(list_of_assign_stmts, rewritten_condition).
"""
if cond is None:
return [], cond
hoisted: list[IRStmt] = []
new_cond = _rewrite_expr(cond, hoisted, computed)
return hoisted, new_cond
def _rewrite_expr(
node: IRExpr,
hoisted: list[IRStmt],
computed: dict[tuple, str],
) -> IRExpr:
"""Recursively rewrite pure calls to temp variables."""
if _is_pure_call(node):
key = _expr_key(node)
if key:
if key in computed:
# Already computed: replace with existing temp
tmp = computed[key]
else:
# New computation: hoist it
tmp = _cse_tmp()
computed[key] = tmp
hoisted.append(IRAssign(
target=tmp, value=node,
is_local=True,
source=node.source,
))
return IRIdentifier(name=tmp, type=node.type, source=node.source)
# Recurse into children
_rewrite_children(node, hoisted, computed)
return node
def _rewrite_children(
node: IRExpr,
hoisted: list[IRStmt],
computed: dict[tuple, str],
) -> None:
from dataclasses import fields
for f in fields(node):
val = getattr(node, f.name)
if hasattr(val, '__dataclass_fields__'):
new_val = _rewrite_expr(val, hoisted, computed)
setattr(node, f.name, new_val)
elif isinstance(val, list):
new_list = []
for item in val:
if hasattr(item, '__dataclass_fields__'):
new_list.append(_rewrite_expr(item, hoisted, computed))
else:
new_list.append(item)
setattr(node, f.name, new_list)
def _expr_key(node: IRExpr) -> Optional[tuple]:
"""
Return a hashable key for a pure expression, or None if not hashable.
The key must uniquely identify the computation.
"""
if isinstance(node, IRMethodCall):
recv_key = _receiver_key(node.receiver)
if recv_key is None:
return None
return ('method', recv_key, node.method_name)
if isinstance(node, IRCall):
return ('func', node.callee_name)
return None
def _receiver_key(node: IRExpr) -> Optional[str]:
"""Return a string key for a receiver expression, if stable."""
if isinstance(node, IRIdentifier):
return node.name
if isinstance(node, IRInt):
return str(node.value)
return None
"""
Constant folding optimizer.
Walks the IR and replaces:
- BinaryOp(IntLit, op, IntLit) → IntLit
- BinaryOp(FloatLit, op, FloatLit) → FloatLit
- BinaryOp(BoolLit, '&&'/'||', BoolLit) → BoolLit
- UnaryOp('!', BoolLit) → BoolLit
- UnaryOp('-', IntLit/FloatLit) → literal
- IfStmt(BoolLit(True)) → then_block only (dead branch elim)
- IfStmt(BoolLit(False)) → else_block only
- StringLiteral with all-text parts → single IRStringText
Transforms are in-place where possible.
"""
from __future__ import annotations
from typing import Optional
from ..ir.nodes import (
IRNode, IRProgram, IRFunction, IRClass,
IRInt, IRFloat, IRBool, IRNil, IRString, IRStringText,
IRBinaryOp, IRUnaryOp, IRIf, IRBlock,
IRExpr, IRStmt,
)
from ..semantics.types import T_INT, T_FLOAT, T_BOOL, T_STRING
def run_fold(ir: IRProgram) -> IRProgram:
"""Apply constant folding to the entire IR. Returns ir (modified in place)."""
for fn in ir.functions:
if fn.body:
fn.body = _fold_block(fn.body)
for cl in ir.classes:
if cl.constructor and cl.constructor.body:
cl.constructor.body = _fold_block(cl.constructor.body)
for m in cl.methods:
if m.body:
m.body = _fold_block(m.body)
ir.top_stmts = [_fold_stmt(s) for s in ir.top_stmts]
return ir
def _fold_block(block: IRBlock) -> IRBlock:
new_stmts = []
for stmt in block.stmts:
folded = _fold_stmt(stmt)
if folded is not None:
new_stmts.append(folded)
block.stmts = new_stmts
return block
def _fold_stmt(node: IRStmt) -> Optional[IRStmt]:
from ..ir.nodes import (
IRAssign, IRFieldAssign, IRIndexAssign, IRExprStmt,
IRReturn, IRWhile, IRFor, IRForeach, IRWith,
IRTry, IRThrow, IRDefer, IRAwait, IROnSignal, IRWhen,
)
if isinstance(node, IRIf):
return _fold_if(node)
if isinstance(node, IRWhile):
cond = _fold_expr(node.condition)
# if condition is always False: eliminate whole loop
if isinstance(cond, IRBool) and not cond.value:
return None
node.condition = cond
if node.body:
node.body = _fold_block(node.body)
return node
if isinstance(node, (IRFor, IRForeach)):
if hasattr(node, 'iterable') and node.iterable:
node.iterable = _fold_expr(node.iterable)
if node.body:
node.body = _fold_block(node.body)
return node
if isinstance(node, IRAssign):
if node.value:
node.value = _fold_expr(node.value)
return node
if isinstance(node, (IRFieldAssign, IRIndexAssign)):
if node.value:
node.value = _fold_expr(node.value)
return node
if isinstance(node, IRExprStmt):
if node.expr:
node.expr = _fold_expr(node.expr)
return node
if isinstance(node, IRReturn):
if node.value:
node.value = _fold_expr(node.value)
return node
if isinstance(node, IRBlock):
return _fold_block(node)
if isinstance(node, IRWith):
node.resources = [_fold_expr(r) for r in node.resources]
if node.body:
node.body = _fold_block(node.body)
return node
if isinstance(node, IRTry):
if node.try_block:
node.try_block = _fold_block(node.try_block)
node.except_clauses = [
(et, ev, _fold_block(b)) for et, ev, b in node.except_clauses
]
if node.finally_block:
node.finally_block = _fold_block(node.finally_block)
return node
if isinstance(node, (IRThrow, IRDefer, IRAwait)):
if hasattr(node, 'expr') and node.expr:
node.expr = _fold_expr(node.expr)
return node
if isinstance(node, IROnSignal):
if node.body:
node.body = _fold_block(node.body)
return node
if isinstance(node, IRWhen):
if node.value:
node.value = _fold_expr(node.value)
for branch in node.branches:
branch.patterns = [_fold_expr(p) for p in branch.patterns]
if branch.body:
branch.body = _fold_block(branch.body)
return node
return node
def _fold_if(node: IRIf) -> Optional[IRStmt]:
cond = _fold_expr(node.condition) if node.condition else node.condition
node.condition = cond
if node.then_block:
node.then_block = _fold_block(node.then_block)
if node.else_block:
node.else_block = _fold_block(node.else_block)
node.elif_branches = [
(_fold_expr(c), _fold_block(b)) for c, b in node.elif_branches
]
# Dead branch elimination
if isinstance(cond, IRBool):
if cond.value:
# Always true → return then_block as a plain block
return node.then_block or IRBlock(source=node.source)
else:
# Always false → return else_block (or nothing)
if node.elif_branches:
# First elif becomes the new condition
new_cond, new_then = node.elif_branches[0]
node.condition = new_cond
node.then_block = new_then
node.elif_branches = node.elif_branches[1:]
return node
return node.else_block # may be None → removed
return node
def _fold_expr(node: IRExpr) -> IRExpr:
if isinstance(node, IRBinaryOp):
return _fold_binary(node)
if isinstance(node, IRUnaryOp):
return _fold_unary(node)
if isinstance(node, IRString):
return _fold_string(node)
# Recurse into compound expressions
_fold_expr_children(node)
return node
def _fold_binary(node: IRBinaryOp) -> IRExpr:
left = _fold_expr(node.left)
right = _fold_expr(node.right)
op = node.operator
# Int + Int
if isinstance(left, IRInt) and isinstance(right, IRInt):
v = _eval_int(left.value, op, right.value)
if v is not None:
return IRInt(value=v, type=T_INT, source=node.source)
# Float arithmetic
if isinstance(left, (IRInt, IRFloat)) and isinstance(right, (IRInt, IRFloat)):
lv = float(left.value)
rv = float(right.value)
v = _eval_float(lv, op, rv)
if v is not None:
return IRFloat(value=v, type=T_FLOAT, source=node.source)
# Bool logic
if isinstance(left, IRBool) and isinstance(right, IRBool):
if op == '&&': return IRBool(value=left.value and right.value,
type=T_BOOL, source=node.source)
if op == '||': return IRBool(value=left.value or right.value,
type=T_BOOL, source=node.source)
# String concat
if isinstance(left, IRString) and left.is_plain and isinstance(right, IRString) and right.is_plain:
if op == '+':
combined = left.plain_value + right.plain_value
return IRString(
parts=[IRStringText(text=combined, type=T_STRING, source=node.source)],
type=T_STRING, source=node.source,
)
node.left = left
node.right = right
return node
def _fold_unary(node: IRUnaryOp) -> IRExpr:
operand = _fold_expr(node.operand)
if node.operator == '!' and isinstance(operand, IRBool):
return IRBool(value=not operand.value, type=T_BOOL, source=node.source)
if node.operator == '-':
if isinstance(operand, IRInt):
return IRInt(value=-operand.value, type=T_INT, source=node.source)
if isinstance(operand, IRFloat):
return IRFloat(value=-operand.value, type=T_FLOAT, source=node.source)
node.operand = operand
return node
def _fold_string(node: IRString) -> IRExpr:
if node.is_plain:
return node # already plain, nothing to fold
# Try to merge adjacent text parts
new_parts = []
for part in node.parts:
if (isinstance(part, IRStringText) and new_parts and
isinstance(new_parts[-1], IRStringText)):
new_parts[-1].text += part.text
else:
new_parts.append(part)
node.parts = new_parts
return node
def _fold_expr_children(node: IRExpr) -> None:
"""Recursively fold children that are IR nodes (not Symbol/Scope/Type/Location)."""
from ..ir.nodes import IRNode as _IRNode
from dataclasses import fields as _dc_fields
for f in _dc_fields(node):
val = getattr(node, f.name)
if isinstance(val, _IRNode):
setattr(node, f.name, _fold_expr(val))
elif isinstance(val, list):
new_list = []
for item in val:
if isinstance(item, _IRNode):
new_list.append(_fold_expr(item))
elif isinstance(item, tuple):
new_list.append(tuple(
_fold_expr(sub) if isinstance(sub, _IRNode) else sub
for sub in item
))
else:
new_list.append(item)
setattr(node, f.name, new_list)
def _eval_int(a: int, op: str, b: int) -> Optional[int]:
try:
if op == '+': return a + b
if op == '-': return a - b
if op == '*': return a * b
if op == '/': return a // b if b != 0 else None
if op == '%': return a % b if b != 0 else None
except Exception:
pass
return None
def _eval_float(a: float, op: str, b: float) -> Optional[float]:
try:
if op == '+': return a + b
if op == '-': return a - b
if op == '*': return a * b
if op == '/': return a / b if b != 0 else None
except Exception:
pass
return None
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List
from .types import ContenTType, T_ANY, T_ERROR
from .scope import Scope, Symbol
from ..ir.nodes import (
IRProgram, IRFunction, IRClass, IRBlock, IRStmt, IRExpr,
IRAssign, IRParam,
)
@dataclass
class TypeDiagnostic:
message: str
filename: str = '<stdin>'
line: int = 0
column: int = 0
def __str__(self) -> str:
loc = f"{self.filename}:{self.line}:{self.column}" if self.line else self.filename
return f"{loc}: {self.message}"
def _type_name(t: ContenTType) -> str:
if t.kind == 'array':
return 'array'
if t.kind == 'dict':
return 'dict'
return t.kind
def _compatible(expected: ContenTType, got: ContenTType) -> bool:
if expected.kind == 'any' or got.kind == 'any':
return True
if expected.kind == 'error' or got.kind == 'error':
return True
if expected.kind == got.kind:
return True
if expected.kind == 'float' and got.kind == 'int':
return True
return False
def check_types(ir: IRProgram, root_scope: Scope) -> List[TypeDiagnostic]:
diags: List[TypeDiagnostic] = []
for sym in root_scope.all_symbols():
if sym.kind in ('var', 'field') and sym.type and sym.type.kind != 'any':
if sym.decl is None:
continue
val_type = _infer_value_type(sym)
if val_type and val_type.kind != 'any' and not _compatible(sym.type, val_type):
loc = sym.defined_at
diags.append(TypeDiagnostic(
message=f"Type mismatch: expected '{_type_name(sym.type)}', got '{_type_name(val_type)}'",
filename=loc.filename if loc else '<stdin>',
line=loc.line if loc else 0,
column=loc.column if loc else 0,
))
return diags
def _infer_value_type(sym: Symbol) -> ContenTType | None:
from ..syntax.nodes import (
Assignment, ClassField,
StringLiteral, IntegerLiteral, FloatLiteral, BoolLiteral, NilLiteral,
ArrayLiteral, DictLiteral, Identifier,
)
decl = sym.decl
if isinstance(decl, Assignment) and decl.value:
return _expr_type(decl.value)
if isinstance(decl, ClassField) and decl.default:
return _expr_type(decl.default)
return None
def _expr_type(node) -> ContenTType | None:
from ..syntax.nodes import (
StringLiteral, IntegerLiteral, FloatLiteral, BoolLiteral, NilLiteral,
ArrayLiteral, DictLiteral,
)
from .types import T_STRING, T_INT, T_FLOAT, T_BOOL, array_of, dict_of
if isinstance(node, StringLiteral):
return T_STRING
if isinstance(node, IntegerLiteral):
return T_INT
if isinstance(node, FloatLiteral):
return T_FLOAT
if isinstance(node, BoolLiteral):
return T_BOOL
if isinstance(node, ArrayLiteral):
return array_of(T_ANY)
if isinstance(node, DictLiteral):
return dict_of(T_ANY, T_ANY)
return None
"""
Scope tree and Symbol definitions.
Every named entity in a ContenT program is represented as a Symbol.
Symbols live in Scopes which form a tree mirroring the program structure.
Resolver builds this tree; Codegen reads it.
LSP uses it for hover/go-to-def/find-references.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Literal, TYPE_CHECKING
from .types import ContenTType, T_ANY
if TYPE_CHECKING:
from ..syntax.nodes import SyntaxNode, SourceLocation, FunctionDecl
SymbolKind = Literal[
'func', # top-level or namespace function
'method', # class method
'class', # class declaration
'var', # variable (local or module-level)
'field', # class field
'param', # function parameter
'namespace', # namespace declaration
'busing', # bash library imported via busing
]
ScopeKind = Literal[
'global',
'namespace',
'class',
'function',
'block',
]
@dataclass
class Symbol:
name: str
kind: SymbolKind
type: ContenTType
decl: Optional['SyntaxNode'] # back-reference for go-to-def
defined_at: Optional['SourceLocation']
scope: Optional['Scope'] = None # owning scope (set after creation)
# Flags
is_awk: bool = False # @awk decorated
is_shell: bool = False # shell command (not CT func)
# LSP: all usage sites
usages: List['SourceLocation'] = field(default_factory=list)
def fully_qualified(self) -> str:
"""Return dot-separated fully qualified name, e.g. 'utils.greet'."""
if self.scope and self.scope.kind == 'namespace' and self.scope.name:
return f'{self.scope.name}.{self.name}'
if self.scope and self.scope.kind == 'class' and self.scope.name:
return f'{self.scope.name}.{self.name}'
return self.name
def bash_name(self) -> str:
"""
Return the bash identifier for this symbol.
Namespace prefixing lives ONLY here — IR and Resolver use fully_qualified().
"""
fq = self.fully_qualified()
# utils.greet → utils__greet
# User.getName → __ct_class_User_getName (handled by BashBackend)
return fq.replace('.', '__')
def __hash__(self) -> int:
return id(self)
def __eq__(self, other: object) -> bool:
return self is other
@dataclass
class Scope:
kind: ScopeKind
name: Optional[str] # name for namespace/class/function scopes
parent: Optional['Scope'] = None
symbols: Dict[str, Symbol] = field(default_factory=dict)
children: List['Scope'] = field(default_factory=list)
# For function scopes: reference to the function symbol
owner: Optional[Symbol] = None
def define(self, sym: Symbol) -> None:
"""Add a symbol to this scope."""
sym.scope = self
self.symbols[sym.name] = sym
def lookup(self, name: str, *, local_only: bool = False) -> Optional[Symbol]:
"""Look up a name in this scope and parent scopes."""
if name in self.symbols:
return self.symbols[name]
if local_only or self.parent is None:
return None
return self.parent.lookup(name)
def lookup_qualified(self, ns: str, name: str) -> Optional[Symbol]:
"""Look up ns.name from any scope."""
root = self._root()
ns_scope = root._find_namespace(ns)
if ns_scope:
return ns_scope.symbols.get(name)
return None
def _root(self) -> 'Scope':
s = self
while s.parent:
s = s.parent
return s
def _find_namespace(self, name: str) -> Optional['Scope']:
if self.kind == 'namespace' and self.name == name:
return self
for child in self.children:
found = child._find_namespace(name)
if found:
return found
return None
def child(self, kind: ScopeKind, name: Optional[str] = None,
owner: Optional[Symbol] = None) -> 'Scope':
"""Create and register a child scope."""
s = Scope(kind=kind, name=name, parent=self, owner=owner)
self.children.append(s)
return s
def all_symbols(self) -> List[Symbol]:
"""Collect all symbols in this scope and descendants."""
result = list(self.symbols.values())
for child in self.children:
result.extend(child.all_symbols())
return result
def __repr__(self) -> str:
return f'Scope({self.kind}, {self.name!r}, {list(self.symbols)})'
@dataclass
class CallGraph:
"""
Directed call graph: Symbol → set of Symbols it calls.
Used for DCE (BFS reachability) and optimization.
"""
edges: Dict[Symbol, set[Symbol]] = field(default_factory=dict)
def add_call(self, caller: Symbol, callee: Symbol) -> None:
self.edges.setdefault(caller, set()).add(callee)
def reachable_from(self, roots: set[Symbol]) -> set[Symbol]:
"""BFS: all symbols reachable from roots."""
visited: set[Symbol] = set()
queue = list(roots)
while queue:
sym = queue.pop()
if sym in visited:
continue
visited.add(sym)
for callee in self.edges.get(sym, ()):
if callee not in visited:
queue.append(callee)
return visited
def callees_of(self, sym: Symbol) -> set[Symbol]:
return self.edges.get(sym, set())
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Optional, Literal
Kind = Literal[
'string', 'int', 'float', 'bool', 'void', 'any',
'array', 'dict', 'class', 'func', 'awk', 'error',
]
@dataclass(frozen=True)
class ContenTType:
kind: Kind
element_type: Optional[ContenTType] = None # array<T>
key_type: Optional[ContenTType] = None # dict<K, V>
value_type: Optional[ContenTType] = None # dict<K, V>
class_name: Optional[str] = None # class instance
param_types: tuple[ContenTType, ...] = () # func params
return_type: Optional[ContenTType] = None # func return
def is_scalar(self) -> bool:
return self.kind in ('string', 'int', 'float', 'bool', 'any')
def is_collection(self) -> bool:
return self.kind in ('array', 'dict')
def is_callable(self) -> bool:
return self.kind in ('func', 'awk')
def is_object(self) -> bool:
return self.kind == 'class'
def __repr__(self) -> str:
if self.kind == 'array':
inner = repr(self.element_type) if self.element_type else 'any'
return f'{inner}[]'
if self.kind == 'dict':
k = repr(self.key_type) if self.key_type else 'any'
v = repr(self.value_type) if self.value_type else 'any'
return f'dict[{k}, {v}]'
if self.kind == 'class':
return self.class_name or 'object'
if self.kind == 'func':
params = ', '.join(repr(p) for p in self.param_types)
ret = repr(self.return_type) if self.return_type else 'void'
return f'({params}) => {ret}'
return self.kind
# Prebuilt singletons for common types
T_STRING = ContenTType('string')
T_INT = ContenTType('int')
T_FLOAT = ContenTType('float')
T_BOOL = ContenTType('bool')
T_VOID = ContenTType('void')
T_ANY = ContenTType('any')
T_ERROR = ContenTType('error') # placeholder for unresolved/error types
PRIMITIVE_TYPES: dict[str, ContenTType] = {
'string': T_STRING,
'int': T_INT,
'float': T_FLOAT,
'bool': T_BOOL,
'void': T_VOID,
'any': T_ANY,
}
def array_of(elem: ContenTType) -> ContenTType:
return ContenTType('array', element_type=elem)
def dict_of(key: ContenTType, value: ContenTType) -> ContenTType:
return ContenTType('dict', key_type=key, value_type=value)
def class_type(name: str) -> ContenTType:
return ContenTType('class', class_name=name)
def func_type(params: list[ContenTType], ret: ContenTType) -> ContenTType:
return ContenTType('func', param_types=tuple(params), return_type=ret)
def from_annotation(name: str, is_array: bool = False,
elem: Optional[ContenTType] = None,
key: Optional[ContenTType] = None,
val: Optional[ContenTType] = None) -> ContenTType:
"""Convert a type annotation string to ContenTType."""
if is_array:
inner = PRIMITIVE_TYPES.get(name, class_type(name))
return array_of(inner)
if name == 'array':
return array_of(elem or T_ANY)
if name == 'dict':
k = key or T_STRING
v = val or T_ANY
return dict_of(k, v)
return PRIMITIVE_TYPES.get(name, class_type(name))
from .table import SymbolTable
from .serialize import save, load
"""
JSON serialization / deserialization for SymbolTable.
Saves the full scope tree, symbols, and call graph to a JSON file
so that an LSP server can load it without re-compiling.
Handles circular references (Symbol ↔ Scope) by assigning stable integer IDs.
"""
from __future__ import annotations
import json
from typing import Any, Dict, List, Optional
from ..semantics.scope import Scope, Symbol, CallGraph
from ..semantics.types import ContenTType, PRIMITIVE_TYPES, array_of, dict_of, class_type, func_type
from ..syntax.nodes import SourceLocation
from .table import SymbolTable
def save(table: SymbolTable, path: str) -> None:
data = _encode_table(table)
with open(path, 'w') as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def load(path: str) -> SymbolTable:
with open(path, 'r') as f:
data = json.load(f)
return _decode_table(data)
def _encode_table(table: SymbolTable) -> dict:
ctx = _EncodeContext()
root_id = ctx.encode_scope(table.root_scope)
cg = ctx.encode_call_graph(table.call_graph)
return {
'version': 1,
'filename': table.filename,
'root_scope': root_id,
'scopes': ctx.scopes,
'symbols': ctx.symbols,
'call_graph': cg,
}
class _EncodeContext:
def __init__(self) -> None:
self.scopes: list[dict] = []
self.symbols: list[dict] = []
self._scope_ids: dict[int, int] = {}
self._sym_ids: dict[int, int] = {}
def _sym_id(self, sym: Symbol) -> int:
oid = id(sym)
if oid not in self._sym_ids:
idx = len(self.symbols)
self._sym_ids[oid] = idx
self.symbols.append(self._encode_symbol(sym))
return self._sym_ids[oid]
def _scope_id(self, scope: Scope) -> int:
oid = id(scope)
if oid not in self._scope_ids:
idx = len(self.scopes)
self._scope_ids[oid] = idx
self.scopes.append(None) # type: ignore[arg-type]
self.scopes[idx] = self._encode_scope(scope)
return self._scope_ids[oid]
def encode_scope(self, scope: Scope) -> int:
return self._scope_id(scope)
def _encode_scope(self, scope: Scope) -> dict:
return {
'kind': scope.kind,
'name': scope.name,
'parent': self._scope_id(scope.parent) if scope.parent else None,
'symbols': {name: self._sym_id(sym) for name, sym in scope.symbols.items()},
'children': [self._scope_id(c) for c in scope.children],
'owner': self._sym_id(scope.owner) if scope.owner else None,
}
def _encode_symbol(self, sym: Symbol) -> dict:
return {
'name': sym.name,
'kind': sym.kind,
'type': _encode_type(sym.type),
'defined_at': _encode_loc(sym.defined_at),
'scope': self._scope_id(sym.scope) if sym.scope else None,
'is_awk': sym.is_awk,
'is_shell': sym.is_shell,
'usages': [_encode_loc(u) for u in sym.usages],
}
def encode_call_graph(self, cg: CallGraph) -> list[list[int]]:
result = []
for caller, callees in cg.edges.items():
caller_id = self._sym_id(caller)
for callee in callees:
callee_id = self._sym_id(callee)
result.append([caller_id, callee_id])
return result
def _encode_type(t: ContenTType) -> dict:
d: dict[str, Any] = {'kind': t.kind}
if t.element_type:
d['element_type'] = _encode_type(t.element_type)
if t.key_type:
d['key_type'] = _encode_type(t.key_type)
if t.value_type:
d['value_type'] = _encode_type(t.value_type)
if t.class_name:
d['class_name'] = t.class_name
if t.param_types:
d['param_types'] = [_encode_type(p) for p in t.param_types]
if t.return_type:
d['return_type'] = _encode_type(t.return_type)
return d
def _encode_loc(loc: Optional[SourceLocation]) -> Optional[dict]:
if loc is None:
return None
return {
'line': loc.line,
'column': loc.column,
'filename': loc.filename,
'end_line': loc.end_line,
'end_column': loc.end_column,
}
def _decode_table(data: dict) -> SymbolTable:
ctx = _DecodeContext(data['scopes'], data['symbols'])
root = ctx.decode_scope(data['root_scope'])
cg = ctx.decode_call_graph(data.get('call_graph', []))
return SymbolTable(
root_scope=root,
call_graph=cg,
filename=data.get('filename', '<stdin>'),
)
class _DecodeContext:
def __init__(self, raw_scopes: list[dict], raw_symbols: list[dict]) -> None:
self._raw_scopes = raw_scopes
self._raw_symbols = raw_symbols
self._scopes: dict[int, Scope] = {}
self._symbols: dict[int, Symbol] = {}
def decode_scope(self, idx: int) -> Scope:
if idx in self._scopes:
return self._scopes[idx]
raw = self._raw_scopes[idx]
scope = Scope(kind=raw['kind'], name=raw.get('name'))
self._scopes[idx] = scope
if raw.get('parent') is not None:
scope.parent = self.decode_scope(raw['parent'])
for name, sym_idx in raw.get('symbols', {}).items():
sym = self.decode_symbol(sym_idx)
sym.scope = scope
scope.symbols[name] = sym
for child_idx in raw.get('children', []):
child = self.decode_scope(child_idx)
child.parent = scope
scope.children.append(child)
if raw.get('owner') is not None:
scope.owner = self.decode_symbol(raw['owner'])
return scope
def decode_symbol(self, idx: int) -> Symbol:
if idx in self._symbols:
return self._symbols[idx]
raw = self._raw_symbols[idx]
sym = Symbol(
name=raw['name'],
kind=raw['kind'],
type=_decode_type(raw['type']),
decl=None,
defined_at=_decode_loc(raw.get('defined_at')),
is_awk=raw.get('is_awk', False),
is_shell=raw.get('is_shell', False),
usages=[_decode_loc(u) for u in raw.get('usages', []) if u],
)
self._symbols[idx] = sym
return sym
def decode_call_graph(self, edges: list[list[int]]) -> CallGraph:
cg = CallGraph()
for caller_idx, callee_idx in edges:
caller = self.decode_symbol(caller_idx)
callee = self.decode_symbol(callee_idx)
cg.add_call(caller, callee)
return cg
def _decode_type(d: dict) -> ContenTType:
kind = d['kind']
if kind in PRIMITIVE_TYPES:
return PRIMITIVE_TYPES[kind]
if kind == 'error':
from ..semantics.types import T_ERROR
return T_ERROR
if kind == 'array':
elem = _decode_type(d['element_type']) if d.get('element_type') else None
return ContenTType('array', element_type=elem)
if kind == 'dict':
k = _decode_type(d['key_type']) if d.get('key_type') else None
v = _decode_type(d['value_type']) if d.get('value_type') else None
return ContenTType('dict', key_type=k, value_type=v)
if kind == 'class':
return class_type(d.get('class_name', ''))
if kind == 'func' or kind == 'awk':
params = tuple(_decode_type(p) for p in d.get('param_types', []))
ret = _decode_type(d['return_type']) if d.get('return_type') else None
return ContenTType(kind, param_types=params, return_type=ret)
return ContenTType(kind)
def _decode_loc(d: Optional[dict]) -> Optional[SourceLocation]:
if d is None:
return None
return SourceLocation(
line=d['line'],
column=d['column'],
filename=d.get('filename', '<stdin>'),
end_line=d.get('end_line', 0),
end_column=d.get('end_column', 0),
)
"""
SymbolTable — unified query interface over Scope tree for LSP.
Built from Resolver output (Scope + CallGraph), provides:
- lookup_at(loc) → find symbol at a source position
- find_references(sym) → all usage sites
- completions_at(loc) → symbols visible at a position
- all_symbols() → flat list of every defined symbol
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, List, Optional
from ..semantics.scope import Scope, Symbol, CallGraph
from ..semantics.types import ContenTType
from ..syntax.nodes import SourceLocation
@dataclass
class SymbolTable:
root_scope: Scope
call_graph: CallGraph
filename: str = '<stdin>'
_by_loc: Dict[tuple, Symbol] = field(default_factory=dict, repr=False)
_by_name: Dict[str, list[Symbol]] = field(default_factory=dict, repr=False)
def __post_init__(self) -> None:
self._index()
def _index(self) -> None:
for sym in self.root_scope.all_symbols():
if sym.defined_at:
key = (sym.defined_at.filename, sym.defined_at.line, sym.defined_at.column)
self._by_loc[key] = sym
self._by_name.setdefault(sym.name, []).append(sym)
def all_symbols(self) -> List[Symbol]:
return self.root_scope.all_symbols()
def lookup_at(self, loc: SourceLocation) -> Optional[Symbol]:
"""Find the symbol defined or referenced at a source position."""
key = (loc.filename, loc.line, loc.column)
sym = self._by_loc.get(key)
if sym:
return sym
for sym in self.root_scope.all_symbols():
for usage in sym.usages:
if (usage.filename == loc.filename
and usage.line == loc.line
and usage.column <= loc.column
and (usage.end_column or usage.column + len(sym.name)) >= loc.column):
return sym
return None
def lookup_name(self, name: str) -> List[Symbol]:
"""Find all symbols with a given name."""
return self._by_name.get(name, [])
def find_references(self, sym: Symbol) -> List[SourceLocation]:
"""All locations where sym is referenced (including definition)."""
refs: List[SourceLocation] = []
if sym.defined_at:
refs.append(sym.defined_at)
refs.extend(sym.usages)
return refs
def completions_at(self, loc: SourceLocation) -> List[Symbol]:
"""Symbols visible at a given source position (for autocomplete)."""
scope = self._scope_at(loc)
if not scope:
scope = self.root_scope
result: list[Symbol] = []
s: Optional[Scope] = scope
while s:
result.extend(s.symbols.values())
s = s.parent
return result
def callers_of(self, sym: Symbol) -> List[Symbol]:
"""Symbols that call sym."""
callers = []
for caller, callees in self.call_graph.edges.items():
if sym in callees:
callers.append(caller)
return callers
def callees_of(self, sym: Symbol) -> List[Symbol]:
"""Symbols that sym calls."""
return list(self.call_graph.callees_of(sym))
def save(self, path: str) -> None:
from .serialize import save
save(self, path)
@classmethod
def load(cls, path: str) -> 'SymbolTable':
from .serialize import load
return load(path)
def _scope_at(self, loc: SourceLocation) -> Optional[Scope]:
"""Find the innermost scope containing loc (heuristic: by defined_at)."""
best: Optional[Scope] = None
best_line = -1
for sym in self.root_scope.all_symbols():
if (sym.scope and sym.defined_at
and sym.defined_at.filename == loc.filename
and sym.defined_at.line <= loc.line
and sym.defined_at.line > best_line):
if sym.kind in ('func', 'method', 'class', 'namespace'):
for child in (sym.scope.children if sym.scope else []):
if child.owner is sym:
best = child
best_line = sym.defined_at.line
return best
#!/usr/bin/env python3
"""
ContenT Compiler Entry Point
"""
import sys
import os
# Add bootstrap to path
sys.path.insert (0, os.path.dirname (os.path.abspath (__file__)))
from bootstrap.main import main
from compiler.cli import main
if __name__ == "__main__":
sys.exit (main ())
#!@PYTHON@
import sys
sys.path.insert(0, '@pkgdatadir@')
from bootstrap.main import main
from compiler.cli import main
if __name__ == "__main__":
sys.exit(main())
......@@ -8,7 +8,7 @@ py = python.find_installation('python3')
pkgdatadir = get_option('prefix') / get_option('datadir') / meson.project_name()
subdir('bootstrap')
subdir('compiler')
subdir('lib')
test('pytest', py,
......
......@@ -5,9 +5,9 @@ sys.path.insert(0, str(Path(__file__).parent.parent))
sys.path.insert(0, str(Path(__file__).parent))
import pytest
from bootstrap.lexer import Lexer
from bootstrap.parser import Parser
from bootstrap.codegen import CodeGenerator
from compiler.lexer.lexer import Lexer
from compiler.syntax.parser import parse as _parse_source
from compiler.cli import compile_source
@pytest.fixture
......@@ -21,20 +21,14 @@ def lex():
@pytest.fixture
def parse():
def _parse(source: str):
lexer = Lexer(source)
tokens = lexer.tokenize()
parser = Parser(tokens)
return parser.parse()
ast, errors = _parse_source(source, '<test>')
return ast
return _parse
@pytest.fixture
def compile_ct():
def _compile(source: str) -> str:
lexer = Lexer(source)
tokens = lexer.tokenize()
parser = Parser(tokens)
ast = parser.parse()
gen = CodeGenerator()
return gen.generate(ast)
ok, script, errs = compile_source(source, '<test>')
return script
return _compile
import subprocess
import tempfile
import os
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
def run_ct(source: str) -> tuple[int, str, str]:
with tempfile.NamedTemporaryFile(mode='w', suffix='.ct', delete=False) as f:
f.write(source)
f.flush()
ct_file = f.name
from compiler.cli import compile_source
def run_ct(source: str, timeout: int = 10) -> tuple[int, str, str]:
rc, script, errs = _compile(source)
if rc != 0:
return rc, '', errs
with tempfile.NamedTemporaryFile('w', suffix='.sh', delete=False) as f:
f.write(script)
fname = f.name
try:
result = subprocess.run(
['python3', 'content', 'run', ct_file],
capture_output=True,
text=True,
timeout=10
['bash', fname], capture_output=True, text=True, timeout=timeout
)
return result.returncode, result.stdout, result.stderr
finally:
os.unlink(ct_file)
os.unlink(fname)
def compile_ct(source: str) -> tuple[int, str, str]:
with tempfile.NamedTemporaryFile(mode='w', suffix='.ct', delete=False) as f:
f.write(source)
f.flush()
ct_file = f.name
sh_file = ct_file.replace('.ct', '.sh')
try:
result = subprocess.run(
['python3', 'content', 'build', ct_file, '-o', sh_file],
capture_output=True,
text=True,
timeout=10
)
compiled_output = ""
if os.path.exists(sh_file):
with open(sh_file, 'r') as sf:
compiled_output = sf.read()
os.unlink(sh_file)
return result.returncode, compiled_output, result.stderr
finally:
os.unlink(ct_file)
return _compile(source)
def compile_ct_with_flags(source: str, flags: list = None) -> tuple[int, str, str]:
......@@ -72,25 +56,8 @@ def compile_ct_with_flags(source: str, flags: list = None) -> tuple[int, str, st
def compile_ct_check(source: str) -> tuple[int, str, str]:
with tempfile.NamedTemporaryFile(mode='w', suffix='.ct', delete=False) as f:
f.write(source)
f.flush()
ct_file = f.name
sh_file = ct_file.replace('.ct', '.sh')
try:
result = subprocess.run(
['python3', 'content', 'build', ct_file, '-o', sh_file],
capture_output=True,
text=True,
timeout=10
)
if os.path.exists(sh_file):
os.unlink(sh_file)
return result.returncode, result.stdout, result.stderr
finally:
os.unlink(ct_file)
rc, _, err = _compile(source)
return rc, '', err
def run_ct_test(source: str) -> tuple[int, str, str]:
......@@ -109,3 +76,13 @@ def run_ct_test(source: str) -> tuple[int, str, str]:
return result.returncode, result.stdout, result.stderr
finally:
os.unlink(ct_file)
def _compile(source: str) -> tuple[int, str, str]:
try:
ok, script, errs = compile_source(source, '<test>')
if not ok:
return 1, '', '\n'.join(str(e) for e in errs)
return 0, script, ''
except Exception as e:
return 1, '', str(e)
import os
import tempfile
import shutil
from bootstrap.main import find_ct_files, compile_files
from compiler.cli import find_ct_files, compile_files
from helpers import run_ct
......
......@@ -14,7 +14,7 @@ class TestBuildLib:
sh_path = ct_path.replace(".ct", ".sh")
try:
result = subprocess.run(
[sys.executable, "-m", "bootstrap.main", "build-lib", ct_path],
[sys.executable, "-m", "compiler", "build-lib", ct_path],
capture_output=True, text=True,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
......@@ -42,7 +42,7 @@ class TestBuildLib:
try:
result = subprocess.run(
[sys.executable, "-m", "bootstrap.main", "build-lib", ct_path, "-o", out_path],
[sys.executable, "-m", "compiler", "build-lib", ct_path, "-o", out_path],
capture_output=True, text=True,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
......@@ -67,7 +67,7 @@ class TestBuildLib:
out_path = os.path.join(d, "out.sh")
result = subprocess.run(
[sys.executable, "-m", "bootstrap.main", "build-lib", d, "-o", out_path],
[sys.executable, "-m", "compiler", "build-lib", d, "-o", out_path],
capture_output=True, text=True,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
......@@ -87,7 +87,7 @@ class TestBuildLib:
sh_path = ct_path.replace(".ct", ".sh")
try:
result = subprocess.run(
[sys.executable, "-m", "bootstrap.main", "build-lib", ct_path],
[sys.executable, "-m", "compiler", "build-lib", ct_path],
capture_output=True, text=True,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
......@@ -119,7 +119,7 @@ class TestBuildLib:
try:
result = subprocess.run(
[sys.executable, "-m", "bootstrap.main", "build-lib", ct_path, "--install"],
[sys.executable, "-m", "compiler", "build-lib", ct_path, "--install"],
capture_output=True, text=True,
cwd=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
......
......@@ -2,19 +2,13 @@ import pytest
import os
import tempfile
from helpers import run_ct, compile_ct
from bootstrap.lexer import Lexer
from bootstrap.parser import Parser
from bootstrap.codegen import CodeGenerator
from bootstrap.ast_nodes import BusingStmt
from compiler.syntax.nodes import BusingStmt
from compiler.cli import compile_source as _compile_src
def compile_source(source):
lexer = Lexer(source)
tokens = lexer.tokenize()
parser = Parser(tokens)
ast = parser.parse()
gen = CodeGenerator()
return gen.generate(ast)
ok, script, errs = _compile_src(source, '<test>')
return script
class TestBusingParser:
......
import pytest
from helpers import run_ct, compile_ct, compile_ct_with_flags, run_ct_test
......
import pytest
from bootstrap.lexer import Lexer
from bootstrap.tokens import TokenType
from compiler.lexer.lexer import Lexer
from compiler.lexer.tokens import TokenType, RawText, RawInterp
class TestLexerBasics:
......@@ -43,25 +43,28 @@ class TestLexerStrings:
def test_simple_string(self, lex):
tokens = lex('"hello"')
assert tokens[0].type == TokenType.STRING
assert tokens[0].value == "hello"
assert tokens[0].value == [RawText("hello")]
def test_single_quotes(self, lex):
tokens = lex("'world'")
assert tokens[0].type == TokenType.STRING
assert tokens[0].value == "world"
assert tokens[0].value == [RawText("world")]
def test_escape_newline(self, lex):
tokens = lex(r'"line1\nline2"')
assert tokens[0].value == "line1\nline2"
assert tokens[0].value == [RawText("line1\nline2")]
def test_escape_tab(self, lex):
tokens = lex(r'"col1\tcol2"')
assert tokens[0].value == "col1\tcol2"
assert tokens[0].value == [RawText("col1\tcol2")]
def test_escape_braces(self, lex):
tokens = lex(r'"literal \{ brace \}"')
assert "\x00LBRACE\x00" in tokens[0].value
assert "\x00RBRACE\x00" in tokens[0].value
parts = tokens[0].value
assert len(parts) == 1
assert isinstance(parts[0], RawText)
assert "{" in parts[0].text
assert "}" in parts[0].text
class TestLexerIdentifiers:
......
import pytest
from helpers import run_ct, compile_ct
from bootstrap.lexer import Lexer
from bootstrap.parser import Parser
from bootstrap.codegen import CodeGenerator
from bootstrap.ast_nodes import NamespaceDecl, UsingStmt, FunctionDecl, ClassDecl
from compiler.syntax.nodes import NamespaceDecl, UsingStmt, FunctionDecl, ClassDecl
from compiler.cli import compile_source as _compile_src
def compile_source(source):
lexer = Lexer(source)
tokens = lexer.tokenize()
parser = Parser(tokens)
ast = parser.parse()
gen = CodeGenerator()
return gen.generate(ast)
ok, script, errs = _compile_src(source, '<test>')
return script
class TestNamespaceParser:
......@@ -83,8 +77,8 @@ namespace models {
u = models.User("Alice")
'''
code = compile_source(source)
assert 'models__User ()' in code
assert '__ct_class_models__User_construct' in code
assert 'models__User' in code
assert '__ct_class_User_construct' in code
def test_using_direct_access(self):
source = '''
......@@ -160,22 +154,9 @@ using utils
upper("test")
lower("test")
'''
lexer1 = Lexer(source1)
tokens1 = lexer1.tokenize()
parser1 = Parser(tokens1)
ast1 = parser1.parse()
lexer2 = Lexer(source2)
tokens2 = lexer2.tokenize()
parser2 = Parser(tokens2)
ast2 = parser2.parse()
gen = CodeGenerator()
code = gen.generate_multi([ast1, ast2])
assert 'utils__upper ()' in code
assert 'utils__lower ()' in code
assert 'utils__upper "test"' in code
assert 'utils__lower "test"' in code
code = compile_source(source1 + "\n" + source2)
assert 'utils__upper' in code
assert 'utils__lower' in code
class TestNamespaceIntegration:
......
import pytest
from bootstrap.ast_nodes import (
from compiler.syntax.nodes import (
Program, FunctionDecl, ClassDecl, ClassField, ConstructorDecl,
IntegerLiteral, FloatLiteral, StringLiteral, BoolLiteral, NilLiteral,
Identifier, ArrayLiteral, DictLiteral, BinaryOp, UnaryOp,
......@@ -29,7 +29,8 @@ class TestParserLiterals:
ast = parse('"hello"')
stmt = ast.statements[0]
assert isinstance(stmt.expression, StringLiteral)
assert stmt.expression.value == "hello"
assert len(stmt.expression.parts) == 1
assert stmt.expression.parts[0].value == "hello"
def test_bool_true(self, parse):
ast = parse("true")
......@@ -204,13 +205,13 @@ class TestParserLambdas:
stmt = ast.statements[0]
assert isinstance(stmt, Assignment)
assert isinstance(stmt.value, Lambda)
assert stmt.value.params == ["x"]
assert [p.name for p in stmt.value.params] == ["x"]
def test_multi_param_lambda(self, parse):
ast = parse("add = (a, b) => a + b")
stmt = ast.statements[0]
assert isinstance(stmt.value, Lambda)
assert stmt.value.params == ["a", "b"]
assert [p.name for p in stmt.value.params] == ["a", "b"]
def test_block_lambda(self, parse):
ast = parse("fn = x => { return x * 2 }")
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment