-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathfractalic.py
executable file
·195 lines (158 loc) · 8.22 KB
/
fractalic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import os
import sys
import io
import builtins
import argparse
import traceback
import toml
from pathlib import Path
from core.git import commit_changes, ensure_git_repo
from core.ast_md.parser import print_parsed_structure
from core.utils import parse_file, load_settings
from core.config import Config
from core.ast_md.ast import AST
from core.utils import read_file
from core.operations.runner import run
from core.operations.call_tree import CallTreeNode
from core.errors import BlockNotFoundError, UnknownOperationError
from core.render.render_ast import render_ast_to_markdown
from rich.console import Console
from rich.panel import Panel
# Set the encoding for standard output, input, and error streams to UTF-8
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stdin = io.TextIOWrapper(sys.stdin.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
original_open = open
def setup_provider_config(args, settings):
"""Setup provider configuration with proper error handling."""
# Define provider to environment variable mapping
PROVIDER_API_KEYS = {
'openai': 'OPENAI_API_KEY',
'anthropic': 'ANTHROPIC_API_KEY',
'groq': 'GROQ_API_KEY'
}
provider = args.provider.lower()
# Validate provider
if provider not in PROVIDER_API_KEYS:
raise ValueError(f"Unsupported provider: {provider}")
api_key_env_var = PROVIDER_API_KEYS[provider]
provider_settings = settings.get('settings', {}).get(provider, {})
# API key precedence: 1) command line arg, 2) settings.toml, 3) environment variable
api_key = (args.api_key or
provider_settings.get('apiKey') or
os.getenv(api_key_env_var))
# Log API key sources (without showing the actual keys)
console = Console(force_terminal=True, color_system="auto")
def mask_key(key):
if key and len(key) > 7:
return f"{key[:5]}........{key[-2:]}"
return key if key else None
api_key_masked = mask_key(api_key)
console.print(f"\n[bold] Provider: [light_green]{provider}[/light_green][/bold]")
if args.api_key:
console.print(f"[light_green]✓[/light_green] API key from arguments: [light_green]{api_key_masked}[/light_green]")
else:
console.print(f"[dim][red]✗[/red] API key from arguments[/dim]")
if provider_settings.get('apiKey'):
console.print(f"[light_green]✓[/light_green] API key from settings.toml: [light_green]{api_key_masked}[/light_green]")
else:
console.print(f"[dim][red]✗[/red] API key from settings.toml[/dim]")
if os.getenv(api_key_env_var):
console.print(f"[light_green]✓[/light_green] API key from environment variable {api_key_env_var}: [light_green]{api_key_masked}[/light_green]")
else:
console.print(f"[dim][red]✗[/red] API key from environment variable {api_key_env_var}[/dim]")
if not api_key:
raise ValueError(f"API key for {provider} must be provided either as an argument, in settings.toml, or through the {api_key_env_var} environment variable.")
return provider, api_key, provider_settings
def main():
settings = load_settings() # Load settings.toml once
default_provider = settings.get('defaultProvider', 'openai')
default_operation = settings.get('defaultOperation', 'append')
parser = argparse.ArgumentParser(description="Process and run operations on a markdown file.")
parser.add_argument('input_file', type=str, help='Path to the input markdown file.')
parser.add_argument('--task_file', type=str, help='Path to the task markdown file.')
parser.add_argument('--api_key', type=str, help='LLM API key', default=None)
parser.add_argument('--provider', type=str, help='LLM provider (e.g., openai, anthropic, groq)',
default=default_provider)
parser.add_argument('--operation', type=str, help='Default operation to perform',
default=default_operation)
parser.add_argument('--param_input_user_request', type=str,
help='Part path for ParamInput-UserRequest', default=None)
parser.add_argument('-v', '--show-operations', action='store_true',
help='Make operations visible to LLM (overrides TOML setting)')
args = parser.parse_args()
try:
provider, api_key, provider_settings = setup_provider_config(args, settings)
# Update TOML settings if show-operations flag is explicitly set
if args.show_operations:
if 'settings' not in settings:
settings['settings'] = {}
settings['settings']['enableOperationsVisibility'] = True
Config.TOML_SETTINGS = settings
Config.LLM_PROVIDER = provider
Config.API_KEY = api_key
Config.DEFAULT_OPERATION = args.operation
os.environ[f"{provider.upper()}_API_KEY"] = api_key
if not os.path.exists(args.input_file):
raise FileNotFoundError(f"Input file not found: {args.input_file}")
if args.task_file and args.param_input_user_request:
if not os.path.exists(args.task_file):
raise FileNotFoundError(f"Task file not found: {args.task_file}")
temp_ast = parse_file(args.task_file)
param_node = temp_ast.get_part_by_path(args.param_input_user_request, True)
result_nodes, call_tree_root, ctx_file, ctx_hash, trc_file, trc_hash, branch_name, explicit_return = run(
args.input_file,
param_node,
p_call_tree_node=None
)
else:
result_nodes, call_tree_root, ctx_file, ctx_hash, trc_file, trc_hash, branch_name, explicit_return = run(
args.input_file,
p_call_tree_node=None
)
# Save call tree
abs_path = os.path.abspath(args.input_file)
file_dir = os.path.dirname(abs_path)
call_tree_path = os.path.join(file_dir, 'call_tree.json')
with open(call_tree_path, 'w', encoding='utf-8') as json_file:
call_tree_root.ctx_file = ctx_file
call_tree_root.ctx_hash = ctx_hash
call_tree_root.trc_file = trc_file
call_tree_root.trc_hash = trc_hash
json_file.write(call_tree_root.to_json())
md_commit_hash = commit_changes(
file_dir,
"Saving call_tree.json",
[call_tree_path],
None,
None
)
# Send message to UI for branch information
print(f"[EventMessage: Root-Context-Saved] ID: {branch_name}, {ctx_hash}")
# Log information about how the workflow completed
if explicit_return:
print(f"[EventMessage: Execution-Mode] Explicit @return operation")
print(f"[EventMessage: Return-Nodes-Count] {len(result_nodes.parser.nodes)}")
# Print the content of the returned AST
print("\n[EventMessage: Return-Content-Start]")
# Print each node's content in sequence
current_node = result_nodes.first()
while current_node:
print(current_node.content)
current_node = current_node.next
print("[EventMessage: Return-Content-End]\n")
else:
print(f"[EventMessage: Execution-Mode] Natural workflow completion")
# No need to print the full AST for natural completion as it's already in the .ctx file
except (BlockNotFoundError, UnknownOperationError, FileNotFoundError, ValueError) as e:
print(f"[ERROR fractalic.py] {str(e)}")
sys.exit(1)
except Exception as e:
exc_type, exc_value, exc_traceback = sys.exc_info()
tb = traceback.extract_tb(exc_traceback)
filename, line_no, func_name, text = tb[-1] # Get the last frame (where error originated)
print(f"[ERROR][Unexpected] {exc_type.__name__} in module {filename}, line {line_no}: {str(e)}")
traceback.print_exc()
sys.exit(1)
if __name__ == "__main__":
main()