-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
1937 lines (1658 loc) · 82.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import requests
import json
import tiktoken
import os
import time
import argparse
import sys
from rich.console import Console
from rich.markdown import Markdown
from rich.panel import Panel
from rich.prompt import Prompt
from rich.progress import Progress
from rich.syntax import Syntax
import configparser
from dotenv import load_dotenv
import colorama
from colorama import Fore, Style
import datetime
import tempfile
import subprocess
import re
import importlib.util
import base64
import webbrowser
import urllib.request
from packaging import version
# Initialize colorama for cross-platform colored terminal output
colorama.init()
# Initialize Rich console
console = Console()
# Constants for the application
APP_NAME = "OrChat"
APP_VERSION = "1.0.0"
REPO_URL = "https://github.com/oop7/OrChat"
API_URL = "https://api.github.com/repos/oop7/OrChat/releases/latest"
# Add a new global variable at the top of the file
last_thinking_content = ""
def load_config():
"""Load configuration from .env file and/or config.ini"""
# First try to load from .env file
load_dotenv()
api_key = os.getenv("OPENROUTER_API_KEY")
# Then try config.ini (overrides .env if both exist)
config = configparser.ConfigParser()
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.ini')
if os.path.exists(config_file):
config.read(config_file)
if 'API' in config and 'OPENROUTER_API_KEY' in config['API']:
api_key = config['API']['OPENROUTER_API_KEY']
# Load other settings if available
if 'SETTINGS' in config:
settings = config['SETTINGS']
return {
'api_key': api_key,
'model': settings.get('MODEL', ""),
'temperature': settings.getfloat('TEMPERATURE', 0.7),
'system_instructions': settings.get('SYSTEM_INSTRUCTIONS', ""),
'theme': settings.get('THEME', 'default'),
'max_tokens': settings.getint('MAX_TOKENS', 8000),
'autosave_interval': settings.getint('AUTOSAVE_INTERVAL', 300),
'streaming': settings.getboolean('STREAMING', True),
'thinking_mode': settings.getboolean('THINKING_MODE', True) # Add this line
}
# Return defaults if no config file
return {
'api_key': api_key,
'model': "",
'temperature': 0.7,
'system_instructions': "",
'theme': 'default',
'max_tokens': 8000,
'autosave_interval': 300,
'streaming': True,
'thinking_mode': True # Add default thinking mode preference
}
def save_config(config_data):
"""Save configuration to config.ini file"""
config = configparser.ConfigParser()
config['API'] = {'OPENROUTER_API_KEY': config_data['api_key']}
config['SETTINGS'] = {
'MODEL': config_data['model'],
'TEMPERATURE': str(config_data['temperature']),
'SYSTEM_INSTRUCTIONS': config_data['system_instructions'],
'THEME': config_data['theme'],
'MAX_TOKENS': str(config_data['max_tokens']),
'AUTOSAVE_INTERVAL': str(config_data['autosave_interval']),
'STREAMING': str(config_data['streaming']),
'THINKING_MODE': str(config_data['thinking_mode']) # Add this line
}
config_file = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'config.ini')
with open(config_file, 'w') as f:
config.write(f)
console.print("[green]Configuration saved successfully![/green]")
def count_tokens(text, model_name="cl100k_base"):
"""Counts the number of tokens in a given text string using tiktoken."""
try:
encoding = tiktoken.get_encoding(model_name)
except KeyError:
console.print(f"[yellow]Warning: Model encoding {model_name} not found. Using cl100k_base as default[/yellow]")
encoding = tiktoken.get_encoding("cl100k_base")
tokens = encoding.encode(text)
return len(tokens)
def get_available_models():
"""Fetch available models from OpenRouter API"""
try:
config = load_config()
headers = {
"Authorization": f"Bearer {config['api_key']}",
"Content-Type": "application/json",
}
with console.status("[bold green]Fetching available models..."):
response = requests.get("https://openrouter.ai/api/v1/models", headers=headers)
if response.status_code == 200:
models_data = response.json()
return models_data["data"]
else:
console.print(f"[red]Error fetching models: {response.status_code}[/red]")
return []
except Exception as e:
console.print(f"[red]Error fetching models: {str(e)}[/red]")
return []
def select_model(config):
"""Simplified model selection interface"""
all_models = get_available_models()
if not all_models:
console.print("[red]No models available. Please check your API key and internet connection.[/red]")
return None
# Option to directly enter a model name
console.print("[bold green]Model Selection[/bold green]")
console.print("\n[bold magenta]Options:[/bold magenta]")
console.print("[bold]1[/bold] - View all available models")
console.print("[bold]2[/bold] - Show free models only")
console.print("[bold]3[/bold] - Enter model name directly")
console.print("[bold]q[/bold] - Cancel selection")
choice = Prompt.ask("Select an option", choices=["1", "2", "3", "q"], default="1")
if choice == "q":
return None
elif choice == "3":
# Direct model name entry
console.print("[yellow]Enter the exact model name (e.g., 'anthropic/claude-3-opus')[/yellow]")
model_name = Prompt.ask("Model name")
# Validate the model name
model_exists = any(model["id"] == model_name for model in all_models)
if model_exists:
return model_name
else:
console.print("[yellow]Warning: Model not found in available models. Using anyway.[/yellow]")
confirm = Prompt.ask("Continue with this model name? (y/n)", default="y")
if confirm.lower() == "y":
return model_name
else:
return select_model(config) # Start over
elif choice == "1":
# All models, simple numbered list
console.print("[bold green]All Available Models:[/bold green]")
for i, model in enumerate(all_models, 1):
# Highlight free models
if model['id'].endswith(":free"):
console.print(f"[bold]{i}.[/bold] {model['id']} [green](FREE)[/green]")
else:
console.print(f"[bold]{i}.[/bold] {model['id']}")
model_choice = Prompt.ask("Enter model number or 'b' to go back", default="1")
if model_choice.lower() == 'b':
return select_model(config)
try:
index = int(model_choice) - 1
if 0 <= index < len(all_models):
return all_models[index]['id']
else:
console.print("[red]Invalid selection[/red]")
return select_model(config)
except ValueError:
console.print("[red]Please enter a valid number[/red]")
return select_model(config)
elif choice == "2":
# Show only free models
free_models = [model for model in all_models if model['id'].endswith(":free")]
if not free_models:
console.print("[yellow]No free models found.[/yellow]")
Prompt.ask("Press Enter to continue")
return select_model(config)
console.print("[bold green]Free Models:[/bold green]")
for i, model in enumerate(free_models, 1):
console.print(f"[bold]{i}.[/bold] {model['id']} [green](FREE)[/green]")
model_choice = Prompt.ask("Enter model number or 'b' to go back", default="1")
if model_choice.lower() == 'b':
return select_model(config)
try:
index = int(model_choice) - 1
if 0 <= index < len(free_models):
return free_models[index]['id']
else:
console.print("[red]Invalid selection[/red]")
Prompt.ask("Press Enter to continue")
return select_model(config)
except ValueError:
console.print("[red]Please enter a valid number[/red]")
Prompt.ask("Press Enter to continue")
return select_model(config)
def setup_wizard():
"""Interactive setup wizard for first-time users"""
console.print(Panel.fit(
"[bold blue]Welcome to the OrChat Setup Wizard![/bold blue]\n"
"Let's configure your chat settings.",
title="Setup Wizard"
))
api_key = Prompt.ask("Enter your OpenRouter API key")
# Save API key temporarily to allow model fetching
temp_config = {'api_key': api_key}
# Use the simplified model selection
console.print("[bold]Select an AI model to use:[/bold]")
model = ""
try:
with console.status("[bold green]Connecting to OpenRouter...[/bold green]"):
# Small delay to ensure the API key is registered
time.sleep(1)
selected_model = select_model(temp_config)
if selected_model:
model = selected_model
else:
console.print("[yellow]Model selection cancelled. You can set a model later.[/yellow]")
except Exception as e:
console.print(f"[yellow]Error during model selection: {str(e)}. You can set a model later.[/yellow]")
temperature = float(Prompt.ask("Set temperature (0.0-2.0)", default="0.7"))
if temperature > 1.0:
console.print("[yellow]Warning: High temperature values (>1.0) may cause erratic or nonsensical responses.[/yellow]")
confirm = Prompt.ask("Are you sure you want to use this high temperature? (y/n)", default="n")
if confirm.lower() != 'y':
temperature = float(Prompt.ask("Enter a new temperature value (0.0-1.0)", default="0.7"))
console.print("[bold]Enter system instructions (guide the AI's behavior)[/bold]")
console.print("[dim]Press Enter twice to finish[/dim]")
lines = []
empty_line_count = 0
while True:
line = input()
if not line:
empty_line_count += 1
if empty_line_count >= 2: # Exit after two consecutive empty lines
break
else:
empty_line_count = 0 # Reset counter if non-empty line
lines.append(line)
system_instructions = "\n".join(lines) if lines else ""
# Add theme selection
available_themes = ['default', 'dark', 'light', 'hacker']
console.print("[green]Available themes:[/green]")
for theme in available_themes:
console.print(f"- {theme}")
theme_choice = Prompt.ask("Select theme", choices=available_themes, default="default")
# After theme selection, add thinking mode preference with 'n' as default
thinking_mode = Prompt.ask(
"Enable thinking mode? (Shows AI reasoning process)",
choices=["y", "n"],
default="n" # Changed from "y" to "n"
).lower() == "y"
config_data = {
'api_key': api_key,
'model': model,
'temperature': temperature,
'system_instructions': system_instructions,
'theme': theme_choice,
'max_tokens': 8000,
'autosave_interval': 300,
'streaming': True,
'thinking_mode': thinking_mode # Add this line
}
save_config(config_data)
return config_data
def format_time_delta(delta_seconds):
"""Format time delta in a human-readable format"""
if delta_seconds < 1:
return f"{delta_seconds*1000:.0f}ms"
elif delta_seconds < 60:
return f"{delta_seconds:.2f}s"
else:
minutes = int(delta_seconds // 60)
seconds = delta_seconds % 60
return f"{minutes}m {seconds:.2f}s"
def format_file_size(size_bytes):
"""Format file size in a human-readable way"""
if size_bytes < 1024:
return f"{size_bytes} bytes"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
elif size_bytes < 1024 * 1024 * 1024:
return f"{size_bytes / (1024 * 1024):.1f} MB"
else:
return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB"
def stream_response(response, start_time):
"""Stream the response from the API with proper text formatting"""
console.print("\n[bold green]Assistant[/bold green]")
# Full content accumulates everything
full_content = ""
# For thinking detection
thinking_content = ""
in_thinking = False
# Check for thinking mode preference (default to True for backward compatibility)
thinking_mode = True
# Create a temporary file to collect all content
# This avoids terminal display issues
collected_content = []
# For debugging purposes
global last_thinking_content
try:
for chunk in response.iter_lines():
if not chunk:
continue
chunk_text = chunk.decode('utf-8', errors='replace')
if "OPENROUTER PROCESSING" in chunk_text:
continue
if chunk_text.startswith('data:'):
chunk_text = chunk_text[5:].strip()
if chunk_text == "[DONE]":
continue
try:
chunk_data = json.loads(chunk_text)
if 'choices' in chunk_data and chunk_data['choices']:
delta = chunk_data['choices'][0].get('delta', {})
content = delta.get('content', delta.get('text', ''))
if content:
# Add to full content
full_content += content
# Only process thinking tags if thinking mode is enabled
if thinking_mode:
# Check for thinking tags
if "<thinking>" in content:
in_thinking = True
# Extract content after the tag
thinking_part = content.split("<thinking>", 1)[1]
thinking_content += thinking_part
# Skip this chunk - don't display the <thinking> tag
continue
if "</thinking>" in content:
in_thinking = False
# Extract content before the tag
thinking_part = content.split("</thinking>", 1)[0]
thinking_content += thinking_part
# Skip this chunk - don't display the </thinking> tag
continue
if in_thinking:
thinking_content += content
continue
# Not in thinking mode or model doesn't support thinking, collect for display
collected_content.append(content)
except json.JSONDecodeError:
# For non-JSON chunks, quietly ignore
pass
except Exception as e:
console.print(f"\n[red]Error during streaming: {str(e)}[/red]")
# Now display the collected content all at once
# This avoids the vertical text issue
if collected_content:
print("".join(collected_content))
else:
# If we only got thinking content, display a default response
print("Hello! I'm here to help you.")
# More robust thinking extraction - uses regex pattern to look for any thinking tags in the full content
thinking_section = ""
thinking_pattern = re.compile(r'<thinking>(.*?)</thinking>', re.DOTALL)
thinking_matches = thinking_pattern.findall(full_content)
if thinking_mode and thinking_matches:
thinking_section = "\n".join(thinking_matches)
# Update the global thinking content variable
last_thinking_content = thinking_section
else:
# Also check if thinking_content has any content from our incremental collection
if thinking_content.strip():
last_thinking_content = thinking_content
# Clean the full content - only if model supports thinking
cleaned_content = full_content
if thinking_mode and "<thinking>" in full_content:
# Remove the thinking sections with a more robust pattern
try:
# Use a non-greedy match to handle multiple thinking sections
cleaned_content = re.sub(r'<thinking>.*?</thinking>', '', full_content, flags=re.DOTALL)
cleaned_content = cleaned_content.strip()
except:
# Fallback to simpler method
parts = full_content.split("</thinking>")
if len(parts) > 1:
cleaned_content = parts[-1].strip()
# If after cleaning we have nothing, use a default response
if not cleaned_content.strip():
cleaned_content = "Hello! I'm here to help you."
response_time = time.time() - start_time
return cleaned_content, response_time
def save_conversation(conversation_history, filename, format="markdown"):
"""Save conversation to file in various formats"""
if format == "markdown":
with open(filename, 'w') as f:
f.write("# OrChat Conversation\n\n")
f.write(f"Date: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
for msg in conversation_history:
if msg['role'] == 'system':
f.write(f"## System Instructions\n\n{msg['content']}\n\n")
else:
f.write(f"## {msg['role'].capitalize()}\n\n{msg['content']}\n\n")
elif format == "json":
with open(filename, 'w') as f:
json.dump(conversation_history, f, indent=2)
elif format == "html":
with open(filename, 'w') as f:
f.write("<!DOCTYPE html>\n<html>\n<head>\n")
f.write("<title>OrChat Conversation</title>\n")
f.write("<style>\n")
f.write("body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; }\n")
f.write(".system { background-color: #f0f0f0; padding: 10px; border-radius: 5px; }\n")
f.write(".user { background-color: #e1f5fe; padding: 10px; border-radius: 5px; margin: 10px 0; }\n")
f.write(".assistant { background-color: #f1f8e9; padding: 10px; border-radius: 5px; margin: 10px 0; }\n")
f.write("</style>\n</head>\n<body>\n")
f.write(f"<h1>OrChat Conversation</h1>\n")
f.write(f"<p>Date: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</p>\n")
for msg in conversation_history:
f.write(f"<div class='{msg['role']}'>\n")
f.write(f"<h2>{msg['role'].capitalize()}</h2>\n")
f.write(f"<p>{msg['content'].replace('\n', '<br>')}</p>\n")
f.write("</div>\n")
f.write("</body>\n</html>")
return filename
def manage_context_window(conversation_history, max_tokens=8000, model_name="cl100k_base"):
"""Manage the context window to prevent exceeding token limits"""
# Always keep the system message
system_message = conversation_history[0]
# Count total tokens in the conversation
total_tokens = sum(count_tokens(msg["content"], model_name) for msg in conversation_history)
# If we're under the limit, no need to trim
if total_tokens <= max_tokens:
return conversation_history, 0
# We need to trim the conversation
# Start with just the system message
trimmed_history = [system_message]
current_tokens = count_tokens(system_message["content"], model_name)
# Add messages from the end (most recent) until we approach the limit
# Leave room for the next user message
messages_to_consider = conversation_history[1:]
trimmed_count = 0
for msg in reversed(messages_to_consider):
msg_tokens = count_tokens(msg["content"], model_name)
if current_tokens + msg_tokens < max_tokens - 1000: # Leave 1000 tokens buffer
trimmed_history.insert(1, msg) # Insert after system message
current_tokens += msg_tokens
else:
trimmed_count += 1
# Add a note about trimmed messages if any were removed
if trimmed_count > 0:
note = {"role": "system", "content": f"Note: {trimmed_count} earlier messages have been removed to stay within the context window."}
trimmed_history.insert(1, note)
return trimmed_history, trimmed_count
def process_file_upload(file_path, conversation_history):
"""Process a file upload and add its contents to the conversation"""
try:
with open(file_path, 'r') as f:
content = f.read()
file_ext = os.path.splitext(file_path)[1].lower()
file_name = os.path.basename(file_path)
# Determine file type and create appropriate message
if file_ext in ['.py', '.js', '.java', '.cpp', '.c', '.cs', '.go', '.rb', '.php', '.ts', '.swift']:
file_type = "code"
message = f"I'm uploading a code file named '{file_name}'. Please analyze it:\n\n```{file_ext[1:]}\n{content}\n```"
elif file_ext in ['.txt', '.md', '.csv', '.json', '.xml', '.html', '.css']:
file_type = "text"
message = f"I'm uploading a text file named '{file_name}'. Here are its contents:\n\n{content}"
else:
file_type = "unknown"
message = f"I'm uploading a file named '{file_name}'. Here are its contents:\n\n{content}"
# Add to conversation history
conversation_history.append({"role": "user", "content": message})
return True, f"File '{file_name}' uploaded successfully as {file_type}."
except Exception as e:
return False, f"Error processing file: {str(e)}"
def handle_attachment(file_path, conversation_history):
"""Enhanced file attachment handling with preview and metadata"""
try:
# Get file information
file_name = os.path.basename(file_path)
file_ext = os.path.splitext(file_path)[1].lower()
file_size = os.path.getsize(file_path)
file_size_formatted = format_file_size(file_size)
# Determine file type and create appropriate message
file_type, content = extract_file_content(file_path, file_ext)
# Create a message that includes metadata about the attachment
message = f"I'm sharing a file: **{file_name}** ({file_type}, {file_size_formatted})\n\n"
if file_type == "image":
# For images, we'll use the multimodal API format
with open(file_path, 'rb') as img_file:
base64_image = base64.b64encode(img_file.read()).decode('utf-8')
# Add to messages with proper format for multimodal models
conversation_history.append({
"role": "user",
"content": [
{"type": "text", "text": message},
{"type": "image_url", "image_url": {"url": f"data:image/{file_ext[1:]};base64,{base64_image}"}}
]
})
return True, f"Image '{file_name}' attached successfully."
else:
# For other file types, add content to the message
message += content
conversation_history.append({"role": "user", "content": message})
return True, f"File '{file_name}' attached successfully as {file_type}."
except Exception as e:
return False, f"Error processing attachment: {str(e)}"
def extract_file_content(file_path, file_ext):
"""Extract and format content from different file types"""
# Determine file type based on extension
if file_ext in ['.jpg', '.jpeg', '.png', '.gif', '.webp', '.bmp']:
return "image", ""
elif file_ext in ['.pdf']:
# Basic PDF handling - just mention it's a PDF
return "PDF document", "[PDF content not displayed in chat, but AI can analyze the document]"
elif file_ext in ['.py', '.js', '.java', '.cpp', '.c', '.cs', '.go', '.rb', '.php', '.ts', '.swift']:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
return "code", f"```{file_ext[1:]}\n{content}\n```"
elif file_ext in ['.txt', '.md', '.csv']:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
return "text", content
elif file_ext in ['.json', '.xml']:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
return "data", f"```{file_ext[1:]}\n{content}\n```"
elif file_ext in ['.html', '.css']:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
return "web", f"```{file_ext[1:]}\n{content}\n```"
elif file_ext in ['.zip', '.tar', '.gz', '.rar']:
return "archive", "[Archive content not displayed in chat]"
else:
# Try to read as text, but handle binary files
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
return "unknown", content
except:
return "binary", "[Binary content not displayed in chat]"
def execute_code(code_block, language):
"""Execute a code block and return the result"""
if language not in ['python']:
return False, f"Code execution not supported for {language}"
try:
# Create a temporary file
with tempfile.NamedTemporaryFile(suffix=f'.{language}', delete=False) as temp:
temp_name = temp.name
temp.write(code_block.encode('utf-8'))
# Execute the code
if language == 'python':
result = subprocess.run(
[sys.executable, temp_name],
capture_output=True,
text=True,
timeout=10 # 10 second timeout for safety
)
# Clean up
os.unlink(temp_name)
if result.returncode == 0:
return True, f"Code executed successfully:\n\n```\n{result.stdout}\n```"
else:
return False, f"Code execution failed:\n\n```\n{result.stderr}\n```"
except Exception as e:
return False, f"Error executing code: {str(e)}"
def apply_theme(theme_name):
"""Apply a color theme to the console"""
themes = {
'default': {
'user_color': 'blue',
'assistant_color': 'green',
'system_color': 'yellow',
'error_color': 'red',
'panel_border': 'green',
'panel_title': 'white'
},
'dark': {
'user_color': 'cyan',
'assistant_color': 'magenta',
'system_color': 'yellow',
'error_color': 'red',
'panel_border': 'cyan',
'panel_title': 'white'
},
'light': {
'user_color': 'blue',
'assistant_color': 'green',
'system_color': 'yellow',
'error_color': 'red',
'panel_border': 'blue',
'panel_title': 'black'
},
'hacker': {
'user_color': 'green',
'assistant_color': 'green',
'system_color': 'green',
'error_color': 'red',
'panel_border': 'green',
'panel_title': 'green'
}
}
return themes.get(theme_name, themes['default'])
class Plugin:
"""Base class for plugins"""
def __init__(self, name, description):
self.name = name
self.description = description
def on_load(self):
"""Called when the plugin is loaded"""
pass
def on_message(self, message, role):
"""Called when a message is sent or received"""
return message
def on_command(self, command, args):
"""Called when a command is executed"""
return False, "Command not handled by plugin"
def get_commands(self):
"""Return a list of commands this plugin provides"""
return []
def load_plugins():
"""Load plugins from the plugins directory"""
plugins = []
plugins_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "plugins")
if not os.path.exists(plugins_dir):
os.makedirs(plugins_dir, exist_ok=True)
return plugins
for filename in os.listdir(plugins_dir):
if filename.endswith(".py") and not filename.startswith("_"):
try:
module_name = filename[:-3]
spec = importlib.util.spec_from_file_location(
module_name,
os.path.join(plugins_dir, filename)
)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Look for Plugin subclasses in the module
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (isinstance(attr, type) and
issubclass(attr, Plugin) and
attr is not Plugin):
plugin = attr()
plugin.on_load()
plugins.append(plugin)
console.print(f"[green]Loaded plugin: {plugin.name}[/green]")
except Exception as e:
console.print(f"[red]Error loading plugin {filename}: {str(e)}[/red]")
return plugins
def optimize_streaming():
"""Configure optimized streaming for better performance"""
# Set chunk size optimization
chunk_size = 1024
# Configure adaptive timeouts
base_timeout = 30
per_token_timeout = 0.01
# Enable resilient reconnection
max_retries = 3
retry_delay = 1
return {
"chunk_size": chunk_size,
"timeout": {
"base": base_timeout,
"per_token": per_token_timeout,
"max": 120
},
"retry": {
"max": max_retries,
"delay": retry_delay
}
}
# Add this function to show about information
def show_about():
"""Display information about OrChat"""
console.print(Panel.fit(
f"[bold blue]Or[/bold blue][bold green]Chat[/bold green] [dim]v{APP_VERSION}[/dim]\n\n"
"A powerful CLI for chatting with AI models through OpenRouter.\n\n"
f"[link={REPO_URL}]{REPO_URL}[/link]\n\n"
"Created by OOP7\n"
"Licensed under MIT License",
title="ℹ️ About OrChat",
border_style="blue"
))
# Add this function to check for updates
def check_for_updates():
"""Check GitHub for newer versions of OrChat"""
console.print("[bold cyan]Checking for updates...[/bold cyan]")
try:
with urllib.request.urlopen(API_URL) as response:
if response.getcode() == 200:
data = json.loads(response.read().decode('utf-8'))
latest_version = data.get('tag_name', 'v0.0.0').lstrip('v')
if version.parse(latest_version) > version.parse(APP_VERSION):
console.print(Panel.fit(
f"[yellow]A new version of OrChat is available![/yellow]\n"
f"Current version: [cyan]{APP_VERSION}[/cyan]\n"
f"Latest version: [green]{latest_version}[/green]\n\n"
f"Update at: {REPO_URL}/releases",
title="📢 Update Available",
border_style="yellow"
))
open_browser = Prompt.ask("Open release page in browser?", choices=["y", "n"], default="n")
if open_browser.lower() == "y":
webbrowser.open(f"{REPO_URL}/releases")
else:
console.print("[green]You are using the latest version of OrChat![/green]")
else:
console.print("[yellow]Could not check for updates. Server returned status "
f"code {response.getcode()}[/yellow]")
except Exception as e:
console.print(f"[yellow]Could not check for updates: {str(e)}[/yellow]")
def model_supports_thinking(model_id):
"""Determine if a model properly supports the thinking tag format"""
# List of models known to support thinking tags correctly
thinking_compatible_models = [
"anthropic/claude-3.7-sonnet:thinking",
"google/gemini-2.0-flash-thinking-exp:free",
"google/gemini-2.0-flash-thinking-exp-1219:free",
"deepseek/deepseek-r1-zero:free",
"deepseek/deepseek-r1:free",
"deepseek/deepseek-r1",
"openai/o1",
"openai/o1-mini-2024-09-12",
"openai/o1-preview",
"openai/o1-mini",
"openai/o3-mini-high",
"openai/o3-mini",
"google/gemini-2.0-flash-thinking-exp:free",
"qwen/qwq-32b",
"qwen/qwq-32b:free"
]
# Check if the model ID starts with any of the compatible prefixes
return any(model_id.startswith(prefix) for prefix in thinking_compatible_models)
def chat_with_model(config, conversation_history=None, plugins=None):
if plugins is None:
plugins = []
# Add plugin commands to help
plugin_commands = []
for plugin in plugins:
plugin_commands.extend(plugin.get_commands())
if conversation_history is None:
# Use user's thinking mode preference instead of model detection
if config['thinking_mode']:
# Make the thinking instruction more explicit and mandatory
thinking_instruction = (
f"{config['system_instructions']}\n\n"
"CRITICAL INSTRUCTION: For EVERY response without exception, you MUST first explain your "
"thinking process between <thinking> and </thinking> tags, even for simple greetings or short "
"responses. This thinking section should explain your reasoning and approach. "
"After the thinking section, provide your final response. Example format:\n"
"<thinking>Here I analyze what to say, considering context and appropriate responses...</thinking>\n"
"This is my actual response to the user."
)
else:
# Use standard instructions without thinking tags
thinking_instruction = config['system_instructions']
conversation_history = [
{"role": "system", "content": thinking_instruction}
]
# Store thinking preference in conversation history for stream_response to use
conversation_history.append({"role": "system", "name": "config", "content":
f"thinking_mode: {config['thinking_mode']}"})
headers = {
"Authorization": f"Bearer {config['api_key']}",
"Content-Type": "application/json",
}
# Check if temperature is too high and warn the user
if config['temperature'] > 1.0:
console.print(Panel.fit(
f"[yellow]Warning: High temperature setting ({config['temperature']}) may cause erratic responses.[/yellow]\n"
f"Consider using a value between 0.0 and 1.0 for more coherent outputs.",
title="⚠️ High Temperature Warning",
border_style="yellow"
))
console.print(Panel.fit(
f"[bold blue]Or[/bold blue][bold green]Chat[/bold green] [dim]v{APP_VERSION}[/dim]\n"
f"[cyan]Model:[/cyan] {config['model']}\n"
f"[cyan]Temperature:[/cyan] {config['temperature']}\n"
f"[cyan]Thinking mode:[/cyan] {'[green]✓ Enabled[/green]' if config['thinking_mode'] else '[yellow]✗ Not supported by this model[/yellow]'}\n"
f"[cyan]Session started:[/cyan] {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
f"Type your message or use commands: /help for available commands",
title="🤖 Chat Session Active",
border_style="green"
))
# Add session tracking
session_start_time = time.time()
total_tokens_used = 0
response_times = []
message_count = 0
# Create a session directory for saving files
session_id = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
session_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), "sessions", session_id)
os.makedirs(session_dir, exist_ok=True)
# Auto-save conversation periodically
last_autosave = time.time()
autosave_interval = config['autosave_interval']
# Check if we need to trim the conversation history
conversation_history, trimmed_count = manage_context_window(conversation_history)
if trimmed_count > 0:
console.print(f"[yellow]Note: Removed {trimmed_count} earlier messages to stay within the context window.[/yellow]")
while True:
try:
user_input = Prompt.ask("\n[bold blue]You[/bold blue]")
# Handle special commands
if user_input.startswith('/'):
command = user_input.lower()
if command == '/exit':
console.print("[yellow]Exiting chat...[/yellow]")
break
elif command == '/help':
console.print(Panel.fit(
"/exit - Exit the chat\n"
"/new - Start a new conversation\n"
"/clear - Clear conversation history\n"
"/cls or /clear-screen - Clear terminal screen\n" # Add this line
"/save - Save conversation to file\n"
"/settings - Adjust model settings\n"
"/tokens - Show token usage statistics\n"
"/model - Change the AI model\n"
"/temperature <0.0-2.0> - Adjust temperature\n"
"/system - View or change system instructions\n"
"/speed - Show response time statistics\n"
"/theme <theme> - Change the color theme\n"
"/about - Show information about OrChat\n"
"/update - Check for updates\n"
"/thinking - Show last AI thinking process\n"
"/thinking-mode - Toggle thinking mode on/off\n"
"/attach or /upload - Share a file with the AI",
title="Available Commands"
))
continue
elif command == '/clear':
conversation_history = [{"role": "system", "content": config['system_instructions']}]
console.print("[green]Conversation history cleared![/green]")
continue
elif command == '/new':
# Check if there's any actual conversation to save
if len(conversation_history) > 1:
save_prompt = Prompt.ask(
"Would you like to save the current conversation before starting a new one?",
choices=["y", "n"],
default="n"
)
if save_prompt.lower() == "y":
# Auto-generate a filename with timestamp
filename = f"conversation_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.md"
filepath = os.path.join(session_dir, filename)
save_conversation(conversation_history, filepath, "markdown")
console.print(f"[green]Conversation saved to {filepath}[/green]")
# Reset conversation
conversation_history = [{"role": "system", "content": config['system_instructions']}]
# Reset session tracking variables
total_tokens_used = 0
response_times = []
message_count = 0
last_autosave = time.time()