-
Notifications
You must be signed in to change notification settings - Fork 0
/
tcli
executable file
·715 lines (596 loc) · 23.8 KB
/
tcli
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# -*- mode: python -*-
import sys
import re
import datetime
import time
import random
import string
#import heapq
import bisect
import os.path
sys.path.append( os.path.join( os.getenv( 'HOME' ), 'local', 'lib', 'python' ) )
import trie
import sys
from ConfigParser import ConfigParser
from logging import *
import logging
# known bugs: the index for searching is never cleaned up
getLogger().setLevel( ERROR )
#getLogger().setLevel(WARNING)
#getLogger().setLevel(INFO)
#getLogger().setLevel( DEBUG )
class holder:
def hold(self, stmt):
self.var = stmt
return self.var
def this_year():
return time.gmtime().tm_year
def today_as_ordinal():
"""return this day as ordinal"""
return datetime.date(*time.localtime()[0:3]).toordinal()
def base_ordinal():
d = datetime.date(1970,1,1)
return d.toordinal()
def sanity_check():
past = 1236861140 # not long time ago
future = past + 60*60*24*365*10 # ten years later
now = time.time()
if past > now or future < now:
error("there is something wrong with the time, check the computers date settings")
assert( past < now )
assert( future > now )
def setup( config ):
"""Tries to setup a working environment based on the configuration
values, if environment does not seem to exist already"""
#getting the basics first
top_dir = config.get( 'basic', 'top_dir' )
sane_env = _ask_create_dir( top_dir, 0744 )
if sane_env:
config.remove_option( 'basic', 'top_dir' )
for directory in config.options( 'basic' ):
if directory.endswith( '_dir' ):
_ask_create_dir( config.get( 'basic', directory ) )
def _ask_create_dir( name, mode=0740 ):
"""Runs an interactive session with the user for each folder that
is passed in, trying to construct it, if it doesn't exist
"""
if not os.path.exists( name ):
try:
inp = raw_input( "%s does not exist, should I create it? ( y/n )\n> "\
% ( name ) )
if( inp.lower() == 'y' ):
os.mkdir( name )
os.chmod( name, mode )
return True
else:
print "as requested, I did nothing on %s"%( name )
return False
except IOError, ioe:
"Could not create %s. Please create it manually before running task.py"\
% ( name )
return False
# priorities
# there are 3 kinds of priorities
# 1) prio: user given priorities, these range from 0-9 (default is 9)
# 2) intermediate priorities: equal to the user given priority unless
# date is reached, the intermediate prio is then min(prio, 1 +
# prio/10) range is: 0-1.9
# 3) realprio, the prio used for sorting the tasks, it is calculated
# as 10^6 * intermediate prio + the date (converted to days since
# 1970-1-1) if the task is closed, multiply by 10.
# class configuration:
# """simple container to set and hold configuration values"""
# def __init__(self):
# self.d = dict()
# # d["filter"] = undef
# def __getitem__(self, key):
# return self.d[key]
# def __setitem__(self, key, value):
# self.d[key] = value
# return value
# config = configuration()
# config["show_tasks"] = "20"
# config["top_dir"] = os.path.join( os.getenv( 'HOME' ), "task-projekt" )
# config["task_dir"] = config["top_dir"] + "tasks/"
# config["trash_dir"] = config["top_dir"] + "thrash/"
# config["closed_dir"] = config["top_dir"] + "closed/"
# config["task_no_file"] = config["top_dir"] + "+taskno"
# config["editor"] = "semacs"
config = ConfigParser()
config.add_section( 'basic' )
config.set( 'basic', 'show_tasks', '20' )
config.set( 'basic', 'top_dir', os.path.join( os.getenv("HOME"), ".tcli" ) )
config.set( 'basic', 'task_dir', os.path.join( config.get( 'basic', 'top_dir' ), 'tasks' ) )
config.set( 'basic', 'thrash_dir', os.path.join( config.get( 'basic', 'top_dir' ), 'thrash' ) )
config.set( 'basic', 'closed_dir', os.path.join( config.get( 'basic', 'top_dir' ), 'closed' ) )
config.set( 'basic', 'task_no_file', os.path.join( config.get( 'basic', 'top_dir' ), '+taskno' ) )
config.set( 'basic', 'editor', os.getenv( 'TASKEDITOR', 'emacs -nw -q' ) )
setup( config )
class task( object ):
"""task represents a task
it can read in a task and construct data
updates are done by editing the source and updating/recreating the object
"""
def __init__(self, filename_):
self.filename = filename_
# defaults
self.date = None
self.prio = 9 # prio written in task (or 9 if omitted or invalid)
self.realprio = 90; # sortprio
self.subj = "[no subject]"
self.open = True # True = open, False = closed
self.rtime = self.mtime() # modification time of file when object created
self.tags = dict()
self.read_header()
self.init_realprio()
def __cmp__(self, other):
return self.realprio.__cmp__(other.realprio)
def __hash__(self):
return hash( self.filename )
def mtime(self):
"""get the files mtime"""
return os.path.getmtime(self.filename)
def string_debug(self):
s = str()
s += self.filename + ": [" + str(self.prio) + "] "\
+ self.subj + "[" + str(self.date) + "]" + str(self.realprio)
return s
def string(self):
subj = self.subj
if not self.open:
subj = '-' + subj
return "%-40s [%s, %s]" % (subj, self.date, self.realprio)
def init_stat(self,s):
stat_s = re.compile(r'^(c)')
stat_s_m = stat_s.search(s)
if (stat_s_m):
self.open = False
debug("stat: False")
def init_date(self, s):
# set date from string
date_s = re.compile(r'^(\d{1,2})-(\d{1,2})-(\d{4})')
date_s_m = date_s.search(s)
if (date_s_m):
(d, m, y) = [int(s) for s in date_s_m.groups()]
if y > (this_year() + 10): y = this_year
m = min(m, 12)
d = min(d, 31)
try: self.date = datetime.date(y, m, d)
except ValueError: pass
debug(self.date)
def init_subj(self, s):
self.subj = s
self.add_tags(s)
def init_prio(self, s):
# accept priorities from 0-9, everything else corresponds to 9
prio_re = re.compile(r'^\s*(\d)')
prio_m = prio_re.search(s)
if (prio_m):
self.prio = int(prio_m.group(0))
else:
warning( "not a valid prio: %s", s)
debug("prio is %s"% self.prio)
def init_realprio(self):
# set real priority for sorting
base = self.prio
day = 999999 # highest possible day
if self.date:
day = self.date.toordinal() - base_ordinal() # days since 1970-1-1
if (self.date <= datetime.date.today()):
info("date reached, change prio")
reprio = 1 + self.prio / 10.0
debug("reprio: %s", reprio)
base = min(self.prio, reprio)
debug("day: %s"% day)
debug("base: %s"% base)
self.realprio = int((1000000 * (base + 1) ) + day);
if not self.open:
self.realprio = self.realprio * 10
debug("realprio: %s"% self.realprio)
def add_tags(self, s):
"""read s as string with tags sep. by space, add tags to tags dict"""
for tag in s.split():
self.tags[tag] = 1
def tag_str(self):
return " ".join(self.tags.keys())
def read_header(self):
if not os.path.isfile(self.filename):
self.open = False
warning("no-such-file: %s", self.filename)
return
f = open(self.filename)
empty_line = re.compile(r'^\s+$')
header_line = re.compile(r'^(\w+):\s*(.*)\s*$')
for line in f:
debug("matching: %s"% line)
h = holder()
if empty_line.search(line):
debug("empty line, break")
break
elif h.hold(header_line.search(line)):
# hvad hvis content er tomt??
(name, content) = h.var.groups()
debug("name: >%s< content: >%s<"%( name, content ) )
if name == "date": self.init_date(content)
elif name == "prio": self.init_prio(content)
elif name == "stat": self.init_stat(content)
elif name == "subj": self.init_subj(content)
elif name == "tags": self.add_tags(content)
else:
warning("no match")
class sorted_dict( object ):
"""sorted collection with insert and remove
simple: sort only when needed
"""
def __init__( self ):
self.dict = dict() # key -> prio
def insert( self, obj, prio ):
self.dict[obj] = prio
def sorted(self):
info("return sorted")
return sorted(self.dict.keys(), key=self.dict.get)
class task_cache( object ):
"""task_cache to hold already read tasks and a toc for searching
this is a simple cache:
* it will return in-memory task objects if they exists
* it will create a new in-memory object if the file has been
changed or is not in the cache
* it contains a searchable index
"""
# tag højde for at der skal genprioriteres når der er gået en dag
def __init__(self):
self.cache = dict()
self.index = trie.Trie() # search index
self.sorted_tasks = sorted_dict() # sorted list of all tasks (filenames)
self.toc = [] # list of short task number -> filename
self.tasks_to_show = 19
self.last_prio_day = today_as_ordinal()
self.filter = ''
def reprio(self):
"""we need to generate new priorities on a new day"""
today = today_as_ordinal()
info("reprio: last update: %s today is: %s" % (self.last_prio_day, today))
if self.last_prio_day < today:
info("reprio: new day, reprio all tasks")
self.last_prio_day = today
# self.sorted_tasks.list = list()
for task in self.cache.itervalues():
task.init_realprio() # update realprio
# self.sorted_tasks.insert(task.filename) # re-insert task
self.sorted_tasks.insert( task.filename, task.realprio ) # re-insert task
def insert(self, filename):
info("insert:" + filename)
t = task(filename)
self.cache[filename] = t
# put task into trie by tags
for tag in t.tags.keys():
debug( "putting %s:%s in index"%( tag, filename ) )
self.index.insert( list( tag ), filename )
# put into sorted_tasks
if t.open:
# self.sorted_tasks.insert( t.filename )
self.sorted_tasks.insert( t.filename, t.realprio )
return t
def get(self, filename):
""" fetch task from cache, reread if necessary
TODO: what if the file has been removed?
"""
debug("get: " + filename)
if filename in self.cache:
t = self.cache[filename]
if t.mtime() == t.rtime:
debug("using cache for: %s"% filename)
return t
return self.insert(filename)
def find(self, tag):
"""wrapper to find on index"""
files = self.index.retrieve( list( tag ) )
debug( "files found for %s: %s"%( tag, files ) )
return self.index.retrieve( list( tag ) )
def update(self):
"""update cache and sorted list
it will actually check all files in the taskdir
"""
info("update cache")
task_dir = config.get( 'basic', "task_dir" )
for line in os.listdir( task_dir ):
tilde_re = re.compile(r'~$')
m = tilde_re.search(line)
if not m:
filename = os.path.join( task_dir, line )
t = self.get(filename)
self.reprio()
def dshow(self):
"""default show"""
self.update()
if self.filter:
matching_filenames = self.find( self.filter ) # filenames
matching_tasks = [self.get(filename) for filename in matching_filenames]
matching_tasks.sort()
self.show( matching_tasks )
else:
self.show( [ self.get(filename) for filename in self.sorted_tasks.sorted() ] )
# self.show( self.sorted_tasks.sorted() )
def show(self, tasks ):
"""prints the n first tasks in tasks"""
self.toc = dict()
for n,t in enumerate(tasks):
if t.open:
nstr = "(%d)" % n
print "%4s %s" % (nstr, t.string() )
self.toc[n] = t.filename
if n >= self.tasks_to_show:
break
else:
info( "Not showing closed task %d %s" % (n, t.string() ) )
#\todo: tag højde for at cachede ting ikke bliver ryddet op
## operations
# n : open_task() # new task
# / : set filter for listing
# c : close task
# "name": open task
# q: quit
# default: list tasks
def random_new_filename( length = 6 ):
alphabet = string.digits+string.ascii_lowercase
filename_list = [ random.choice( alphabet ) for n in range( length ) ]
filename = "".join(filename_list)
return filename
seconds_on_day = 60*60*24
seconds_on_29_days = seconds_on_day * 29
def create_new_task( path=None, task_args=None ):
filename = str()
while 1:
filename = path + "/" + random_new_filename()
if not os.path.exists( filename ):
break
taskfile = open( filename, 'w')
lt = time.localtime( time.time() + seconds_on_29_days )
year = lt[0]
month = lt[1]
day = lt[2]
task_list = [ 'subj', 'prio', 'stat', 'tags', 'body' ]
#ok, this is a piss-poor shot at parsing unknown and probably
#non-conforming textual input
split_me_args = re.compile( r'(\w+) (\d+) (\w+) \"((?:\w+\s?)+)\" \"((?:\w+\s?)+)\"' )
debug( "task_args=%s"%( task_args ) )
re_args = None
if isinstance( task_args, str ):
re_args = split_me_args.search( task_args )
if( re_args ):
args = dict( zip( task_list, list( re_args.groups() ) ) )
elif isinstance( task_args, list ) and len( task_args ) == 5:
args = dict( zip( task_list, task_args ) )
else:
args = dict()
taskfile.write("subj: %s\n"%( args.get( "subj", "" ) ) )
taskfile.write("prio: %s\n"%( args.get( "prio", "" ) ) )
taskfile.write('date: %d-%d-%d\n' % (day, month, year))
taskfile.write("stat: %s\n"%( args.get( "stat", "" ) ) )
taskfile.write("tags: %s\n"%( args.get( "tags", "" ) ) )
taskfile.write("%s\n"%( args.get( "body", "" ) ) )
return filename
def edit_task( filename ):
cmd = config.get( 'basic', 'editor' ) + " " + filename
os.system(cmd)
class Taskmanager( object ):
""" The ui class.
Methods that should be invokable by the user from the command line
or the program ui must be prefixed with 'handle_'
"""
def __init__( self, action="help" ):
self.tasks = task_cache()
self.tasks.update()
self._action = action
# TODO: der er noget redundans imellem _commands og _aliases
self._commands = self._get_command_list()
self._aliases = { re.compile( r'^n' ): [ 'new' ],
re.compile( r'^/(.*)' ): [ 'search' ],
re.compile( r'^\s*(\d+)' ): [ 'open' ],
re.compile( r'^(c)' ): [ 'close' ],
re.compile( r'^\s*u' ): [ 'update' ],
re.compile( r'^\s*\?' ): [ 'help' ],
re.compile( r'^\s*q' ): [ 'quit' ] }
def handle_new( self, token ):
"""(or n) Adds a new task to the system
use new with arguments to quickly create a new task:
(new | n) {subject} {priority} {status} {tags} {body}
"""
debug( "got input token=%s"%( token ) )
filename = create_new_task( config.get( 'basic', 'task_dir' ), token )
edit_task( filename )
def handle_open( self, token ):
"""(using open or 'taskname' ) Opens an existing task in the
system"""
self.tasks.dshow()
if isinstance( token, list ) and len( token ) > 0:
token = token[0]
else:
token = int( raw_input( 'Please enter no of task to open: ' ) )
debug( "Trying to open task %s"%( token ) )
debug( "From list=%s"%( self.tasks.toc ) )
if len( self.tasks.toc ) > 0:
edit_task( self.tasks.toc[ int( token ) ])
else:
error( 'no task for number {}'.format( token ) )
return
def handle_close( self, token ):
"""(using 'close' or 'close tasknumber' ) Closes an existing
task in the system"""
self.tasks.dshow()
if isinstance( token, list ) and len( token ) > 0:
token = token[0]
else:
token = int( raw_input( 'Please enter no of task to close: ' ) )
debug( "Trying to close task %s"%( token ) )
debug( "From list=%s"%( self.tasks.toc ) )
if len( self.tasks.toc ) > 0:
filename = self.tasks.toc[ int( token ) ]
else:
error( 'no task for number {}'.format( token ) )
return
debug( "closing task in %s"%( filename ) )
f = open( filename )
header_line = re.compile(r'^(stat):\s*(.*)\s*$')
_output = list()
for line in f:
debug("matching: %s"% line)
h = holder()
if h.hold( header_line.search( line ) ):
(name, content) = h.var.groups()
debug("name: >%s< content: >%s<"%( name, content ) )
if name == "stat":
line = "stat: closed"
else:
warning("no match")
_output.append( line )
f.close()
f = open( filename, 'w' )
f.writelines( _output )
f.close()
debug( 'closed task in file %s'%( filename ) )
self.tasks.dshow()
def handle_update(self, str):
"""Updates the cache of tasks in the system"""
self.tasks.update()
def handle_search( self, token ):
"""Searches tasks in the system"""
#task_cache.filter only heeds the first item, so no use in
#splitting the list here, sans avoiding TypeErrors from the re
#module
self.tasks.filter = ' '.join( token )
self.tasks.dshow()
def handle_present( self, token=None ):
"""Shows tasks in the system"""
self.tasks.dshow()
return True
def handle_quit( self, token ):
"""(q) Quits the program"""
sys.exit()
def handle_help( self, token ):
"""Prints help text"""
self._print_help()
def execute(self, token):
""" Not having case/switch makes Jack a dull boy
"""
show = False # did we issue a show?
if self.handle_open( token ):
pass
elif self.handle_new( token ):
pass
elif self.handle_search( token ):
show = True
elif self.handle_update( token ):
pass
else: # default case
pass
if not show:
self.handle_present()
do_cli( show )
def run_once( self, args ):
"""If the arguments were given directly to the program, it is
run in non-interactive mode and exits thereafter"""
filename = create_new_task( config.get( 'basic', 'task_dir' ), args )
exit( 'created task with with subject \'%s\''%( args[0] ) )
def do_cli( self, cmd, args ):
""" The logic of the user interface.
Given a command `cmd` and arguments `args` this method will
try to do what was requested from the user. Among other
things, this involves checking whether the issued command is
defined as a method on the TaskManager class and 'registered'
as an interactive method.
"""
try:
cmd = int( cmd )
except ValueError:
debug( "%s is not a task number"%( cmd ) )
if( isinstance( cmd, int ) ):
method = getattr( Taskmanager(), 'handle_open' )
args = list( str( cmd ) )
debug( "calling %s( args=%s )"%( method, args ) )
method( args )
return
elif( cmd.lower() in self._commands[0] and cmd.lower() == 'help' ):
self._print_help( )
elif( cmd in self._commands[0] ):
method = getattr( Taskmanager(), 'handle_%s'%( cmd ), None )
if method is None:
raise ValueError( 'Command "%s" not found.' % cmd)
debug( "calling %s( args=%s )"%( method, args ) )
method( args )
return
for match in self._aliases.keys():
here = match.search( cmd )
if( here ):
new_args = list()
info( "groups="+str( here.groups() )+", args="+str( args ) )
if( len( args ) > 0 ):
debug( "args: %s "%( args ) )
for i in args:
new_args.append( i )
cmd = self._aliases.get( match )[0]
info( "METHOD: %s( %s )"%( cmd, new_args ) )
method = getattr( Taskmanager(), 'handle_%s'%( cmd ), None )
if method is None:
raise ValueError( 'Command "%s" not found.' % cmd )
if len( new_args ) == 0:
new_args = ''
method( new_args )
return
self.handle_present()
return
def _get_command_list( self ):
command_list = list()
command_help = dict()
for key in dir( Taskmanager ):
if key.startswith( 'handle_' ):
pname = key.split('_', 1)[-1]
command_list.append( pname )
command_help[ pname ] = getattr( Taskmanager, key ).__doc__
return command_list, command_help
def _print_help( self ):
print "\nAvailable commands:\n(The command alias is shown in paratheses)\n"
for command in self._commands[0]:
print """%s:%s%s"""%( command, " "*( 10-len( command ) ), self._commands[1].get( command ) )
if __name__ == "__main__":
sanity_check()
from optparse import OptionParser
usage_text = '''%prog [options] subject priority {open|closed} "tags" "description"
If all arguments are given, task.py simply creates a task and exits.
If more or less than the specified arguments are given, task.py exists
with an error code.
Currently, no checks are made on the order of the arguments, so if
priority are given as the argument in position one, and the number of
arguments is correct, a task will be created with "priority" as
subject.
'''
parser = OptionParser( usage=usage_text )
parser.add_option( "-t", dest="test",
action="store", help="Runs testsuite on task.py" )
(options, args) = parser.parse_args()
status = [ 'open', 'closed' ]
tm = Taskmanager()
if options.test:
import doctest
doctest.testmod()
if len( args ) == 5 and args[2] in status:
inp = args
tm.run_once( args )
elif len( args ) > 0 and args[2] not in status:
sys.exit( 'status must be one of: %s'%( ', '.join( status ) ) )
elif len( args ) > 0:
print "length of arguments = %s"%( len( args ) )
sys.exit( 'if any arguments are given, please specify all five' )
tm.handle_present()
try:
while True:
inp = raw_input( 'tcli% ' )
cmd = inp.split()[0]
args = inp.split()[1:]
debug( "cmd=%s"%( cmd ) )
debug( "args=%s"%( args ) )
tm.do_cli( cmd, args )
except KeyboardInterrupt:
sys.exit()