forked from fabriziop/bitis
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbitis.py
1778 lines (1425 loc) · 61.7 KB
/
bitis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python
# .+
# .context : Binary Timed Signal Processing Processing Library
# .title : Bitis main module
# .kind : python source
# .author : Fabrizio Pollastri
# .site : Torino - Italy
# .creation : 26-Sep-2013
# .copyright : (c) 2013-2014 Fabrizio Pollastri
# .license : GNU General Public License (see below)
#
# This file is part of "Bitis, Binary Timed Signal Processig Library".
#
# Bitis is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Bitis is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this software. If not, see <http://www.gnu.org/licenses/>.
#
# .-
### import required modules
import copy # object copy support
import math # mathematical support
import random # random generation
import sys # sys constants
# define global variables
__version__ = '0.12.3'
__author__ = 'Fabrizio Pollastri <[email protected]>'
#### classes
# unicode box drawing characters
LIGHT_HORIZONTAL = u'\u2500'
LIGHT_DOWN_AND_RIGHT = u'\u250c'
DOWN_HEAVY_AND_RIGHT_LIGHT = u'\u250e'
LIGHT_DOWN_AND_LEFT = u'\u2510'
DOWN_HEAVY_AND_LEFT_LIGHT = u'\u2512'
LIGHT_UP_AND_RIGHT = u'\u2514'
UP_HEAVY_AND_RIGHT_LIGHT = u'\u2516'
LIGHT_UP_AND_LEFT = u'\u2518'
UP_HEAVY_AND_LEFT_LIGHT = u'\u251a'
DOWN_HEAVY_HORIZONTAL_LIGHT = u'\u2530'
UP_HEAVY_AND_HORIZONTAL_LIGHT = u'\u2538'
HEAVY_UP = u'\u2579'
HEAVY_DOWN = u'\u257b'
class Signal:
"""
Implements the concept of "Binary Timed Signal": a memory
representation of a binary signal as sequence of the times of
signal edges (signal changes).
*start* sets the signal start time.
*edges* can be used to initialize the signal edges sequence, it must
be a list of times (integers or floats). May be empty.
The signal level before the first change is specified by *slevel*.
Also a time scale factor can be specified by *tscale*, at present
not used.
"""
def __init__(self,start=None,edges=None,end=None,slevel=0,tscale=1.):
# signal start time
self.start = start
# sequence of signal change times
if edges is None:
self.edges = []
else:
self.edges = edges
# signal end time
self.end = end
# signal level before the first change
self.slevel = slevel
# time scale of edges time (1=1s)
self.tscale = tscale
self.validate()
def validate(self):
""" Validate signal attributes. Complete type and value checking of
signal object attributes. If a check fails, an exception is raised. """
# type checking
if not type(self.start) in (float,int,type(None)):
raise TypeError('signal start time must be float or int.'
+ '\n found start type: %s' % type(self.start))
if not type(self.edges) in (list,):
raise TypeError('signal edges times must be a list.'
+ '\n found edges type: %s' % type(self.edges))
if len(self.edges) > 0:
edge_type = type(self.edges[0])
if not edge_type in (float,int):
raise TypeError('signal edge time must be float or int.'
+ '\n found edge type: %s' % type(self.edges[0]))
if len(self.edges) > 1:
for i in range(1,len(self.edges)):
if not type(self.edges[i]) == edge_type:
raise TypeError(
'signal edges times must be all float or int'
+ '\n expected edge type: %s' % edge_type
+ '\n found edges[%d] type: %s' %
(i,type(self.edges[i])))
if not type(self.end) in (float,int,type(None)):
raise TypeError('signal end time must be float or int.'
+ '\n found end type: %s' % type(self.end))
if not type(self.slevel) in (bool,int):
raise TypeError('signal start level must be boolean or int.'
+ '\n found start level type: %s' % type(self.slevel))
if not type(self.tscale) in (float,int):
raise TypeError('signal time scale must be float or int.'
+ '\n found time scale type: %s' % type(self.tscale))
# value checking
if self.start and not self.start < self.end:
raise ValueError('signal start time must be < then end time.'
+ '\n start time: %s' % repr(self.start)
+ '\n end time: %s' % repr(self.end))
if len(self.edges) > 0:
if not self.start <= self.edges[0]:
raise ValueError(
'signal start time must be <= than first edge time.'
+ '\n start time: %s' % repr(self.start)
+ '\n first edge time: %s' % repr(self.edges[0]))
if not self.edges[-1] <= self.end:
raise ValueError(
'signal last edge time must be <= than end time.'
+ '\n last edge time: %s' % repr(self.edges[-1])
+ '\n end time: %s' % repr(self.end))
if len(self.edges) > 1:
for i in range(1,len(self.edges)):
if not self.edges[i-1] < self.edges[i]:
raise ValueError(
'signal edges times must be ascending.'
+'\n found edges[%d]: %s'%(i-1,repr(self.edges[i-1]))
+'\n found edges[%d]: %s' %(i,repr(self.edges[i])))
def __str__(self):
descr = '%s\n' % object.__str__(self)
if not self:
descr += ' The Void Signal'
else:
descr += ' start: %s\n' % self.start
descr += ' edges: %s\n' % self.edges
descr += ' end: %s\n' % self.end
descr += ' start level: %d\n' % self.slevel
descr += ' time scale: %d' % self.tscale
return descr
def level(self,time,tpos=0):
""" Return the logic level and the number of edges of a signal object at
a given time (*time*) and a given edge position where to start searching
(*tpos*). The edge position is the number of signal edges before *time*.
When *time* is equal to an edge time that edge is considered before
*time*.
*time* must be in the signal time domain, otherwise None is returned.
*tpos* must point to an edge before *time*. """
try:
while time > self.edges[tpos]:
tpos += 1
except:
if time < self.start or self.end < time:
return None, len(self)
return len(self) & 1 ^ self.slevel, len(self)
if time < self.start:
return None, 0
return tpos & 1 ^ self.slevel, tpos
def end_level(self):
""" Return the logic level at the end of a signal object. """
return len(self) & 1 ^ self.slevel
def clone(self):
""" Return a deep copy with the same attributes/values of signal
object. """
return copy.deepcopy(self)
def clone_into(self,other):
""" Return a full copy of signal object into *other*. Each other
attribute is assigned a deep copy of the value of the same attribute
in signal object. Return *other*. """
# copy all self attributes into other
other.start = self.start
other.edges = self.edges[:]
other.end = self.end
other.slevel = self.slevel
other.tscale = self.tscale
return other
def elapse(self):
""" Ruturn the signal elapse time: end time - start time.
If *self* is void, return zero."""
if not self:
return 0.
return self.end - self.start
def shift(self,offset,inplace=False):
""" Add *offset* to signal start and end times and to each signal
change time. If *inplace* is false, return the result as a new signal
object. Otherwise, return the result as *self*. """
# set where to return result
if inplace:
sig = self
else:
sig = self.clone()
# void signal are shift invariant
if not sig:
return sig
# if nonzero offset, add it.
if offset:
sig.start += offset
for i in range(len(self)):
sig.edges[i] += offset
sig.end += offset
return sig
def reverse(self,inplace=False):
""" Reverse the signal change times sequence: last change becomes
the first and viceversa. Time intervals between edges are preserved.
If *inplace* is false, return the result as a new signal object.
Otherwise, return the result as *self*. """
# set where to return result
if inplace:
sig = self
else:
sig = self.clone()
# return if nothing to reverse
if not sig or len(sig) == 0:
return sig
# set start level: if times number is odd, must be inverted.
if len(sig) & 1:
sig.slevel = not sig.slevel
# reverse change times, not first and last times (start and end).
for i in range(len(sig)):
sig.edges[i] = sig.start + sig.end - sig.edges[i]
# edges sequence needs to have ascending times
sig.edges.sort()
return sig
def append(self,other):
""" Return the *self* signal modified by appending *other* signal
to it. If there is a time gap between the signals, fill it.
The start time of *other* must be greater or equal to end time of
*self*. The end level of *self* must be equal to the start level of
*other*. Otherwise, no append is done. """
return self.join(other,inplace=True)
def split(self,split,inplace=False):
""" Split *self* into two signals at time *split*.
Return pattern **(** *older, newer* **)**
**older**: signal, the part of *self* from start time to *split* time.
**newer**: signal, the part of *self* from *split* time to signal end.
If *split* is equal to a signal change time, the change is put into
the *newer* part.
If *split* is at or before signal start, *older* is the void signal and
*newer* is *self*.
If *split* is at or after signal end, *older* is *self* and *newer* is
the void signal.
If *inplace* is false, *newer* is returned as a new signal object.
If *inplace* is true, *self* is changed to *newer* and returned as
*newer*.
If *self* is void, both *older* and *newer* are the void signal. """
# void is split invariant
if not self:
if inplace:
return self, self
else:
return Signal(), Signal()
# split before signal start
if split <= self.start:
if inplace:
return Signal(), self
else:
return Signal(), self.clone()
# split after signal end
if self.end <= split:
if inplace:
older = Signal(self.start,self.edges,self.end,self.slevel,
self.tscale)
self.start = None
self.edges = []
self.end = None
return older, self
else:
return self.clone(), Signal()
# search split point
level, split_pos = self.level(split)
# older signal part: pre split time.
older = Signal(self.start,self.edges[0:split_pos],split,
slevel=self.slevel,tscale=self.tscale)
# newer signal part: post split time.
if inplace:
self.start = split
self.edges = self.edges[split_pos:]
self.slevel = level
newer = self
else:
newer = Signal(split,self.edges[split_pos:],self.end,
slevel=level,tscale=self.tscale)
return older, newer
def older(self,split,inplace=False):
""" Split *self* into two signals at time *split* and return the part
before split time.
If *split* is equal to a signal change time, the change is not put into
the return signal.
If *split* is at or before signal start, return the void signal.
If *split* is at or after signal end, return *self*.
If *inplace* is false, a new signal object is returned.
If *inplace* is true, *self* is changed to the older part and returned.
If *self* is void, the void signal is returned. """
# void is split invariant
if not self:
if inplace:
return self
else:
return Signal()
# split before signal start
if split <= self.start:
if inplace:
Signal().clone_into(self)
return self
else:
return Signal()
# split after signal end
if self.end <= split:
if inplace:
return self
else:
return self.clone()
# search split point
level, split_pos = self.level(split)
# older signal part: pre split time.
if inplace:
self.edges = self.edges[0:split_pos]
self.end = split
older = self
else:
older = Signal(self.start,self.edges[0:split_pos],split,
slevel=self.slevel,tscale=self.tscale)
return older
def newer(self,split,inplace=False):
""" Split *self* into two signals at time *split* and return the part
after split time.
If *split* is equal to a signal change time, the change is put into
the return signal.
If *split* is at or before signal start, return *self*.
If *split* is at or after signal end, return the void signal.
If *inplace* is false, a new signal object is returned.
If *inplace* is true, *self* is changed to the newer part and returned.
If *self* is void, the void signal is returned. """
# void is split invariant
if not self:
if inplace:
return self
else:
return Signal()
# split before signal start
if split <= self.start:
if inplace:
return self
else:
return self.clone()
# split after signal end
if self.end <= split:
if inplace:
Signal().clone_into(self)
return self
else:
return Signal()
# search split point
level, split_pos = self.level(split)
# the newer signal part: post split time.
if inplace:
self.start = split
self.edges = self.edges[split_pos:]
self.slevel = level
newer = self
else:
newer = Signal(split,self.edges[split_pos:],self.end,
slevel=level,tscale=self.tscale)
return newer
def join(self,other,inplace=False):
""" Join two signals (*self* and *other*) in one signal.
End time of *self* must be less or equal to start time of *other*.
If there is a time gap between the joining signals, fill it.
Return a signal object with the join result. If *inplace* is false,
a new signal object is returned. If *inplace* is true, the join
result is put into *self* and *self* is returned. Signals with
different levels at *self* end and at *other* start cannot be joined
(join return a void signal). """
# if other is void, no append.
if not other:
if inplace:
return self
else:
return self.clone()
# if self is void, return other or copy of it.
if not self:
if inplace:
return other.clone_into(self)
else:
return other.clone()
# check for non overlap
assert self.end <= other.start, \
'self and other overlaps in time.\n' \
+ 'self end = ' + repr(self.end) \
+ ' , other start = ' + repr(other.start)
# check for same end-start level
assert len(self) & 1 ^ self.slevel == other.slevel, \
'self end level differ from other start level.\n' \
+ 'self end level = ' + str(len(self) & 1 ^ self.slevel) \
+ ' , other start level = ' + str(other.slevel)
# join
if inplace:
self.edges += other.edges
self.end = other.end
return self
else:
return Signal(self.start,self.edges + other.edges,other.end,
slevel=self.slevel)
def chop(self,period,origin=None,max_chops=1000):
""" Divide the signal into several time contiguous signals with the
same elapse time equal to *period*. The dividing times sequence
starts at *origin* and has an element every *period* time, except for
the last element. It has end time = period * max_chops, if max_chops
is reached. Otherwise, it has the end time of the chopped signal.
Return a list with the chopped signals.
If *origin* is before the signal start, it is moved forward by an
integer times of period, until it falls into the signal domain.
If *origin* is none, it is set to self start time by default.
If *origin* is after the signal end, no chop occours,
an empty list is returned.
If *self* is void, return an empty chop list. """
# if self is void, return an empty chop list.
if not self:
return []
# if not defined, set origin default.
org = origin
if origin is None:
org = self.start
# if origin after signal end, return signal copy.
if self.end <= org:
return [self.clone()]
# preserve original signal from internal manipulations
signal = self.clone()
# if origin before signal domain, set it to the first split in
# signal domain
if org <= self.start:
split = self.start + period - (self.start - org) % float(period)
# if origin inside signal domain, discard signal part before origin.
else:
discard = signal.split(origin,inplace=True)
split = org + period
# init loop vars
chops = []
# for each chop time, chop signal until max_chops is reached or the
# signal end is reached.
for c in range(1,max_chops):
older, newer = signal.split(split,inplace=True)
chops.append(older)
if self.end <= split:
break
split += period
return chops
def jitter(self,stddev=0):
""" Add a gaussian jitter to the change times of *self* signal object
with the given standard deviation *stddev* and zero mean.
Signal start and end times are unchanged. """
# void is jitter invariant
if not self:
return
# if not edges, return
if not self.edges:
return
# first jittered change must fall between start and second change or end
new_time = self.edges[0] + random.gauss(0.0,stddev)
if len(self) > 1:
top_limit = self.edges[1]
else:
top_limit = self.end
if self.start < new_time and new_time < top_limit:
self.edges[0] = new_time
# if only one change, jittering is terminated.
if len(self) == 1:
return
# add jitter to signal all change times except first and last
for i in range(1,len(self)-1):
new_time = self.edges[i] + random.gauss(0.0,stddev)
# if current edge has room to be moved forward or back, add jitter.
if self.edges[i - 1] < new_time and new_time < self.edges[i + 1]:
self.edges[i] = new_time
# last jittered change must fall between the last but one change or end
new_time = self.edges[-1] + random.gauss(0.0,stddev)
if self.edges[-2] < new_time and new_time < self.end:
self.edges[-1] = new_time
def __add__(self,other):
""" Concatenate (join) other to self. """
return self.join(other,inplace=False)
def __len__(self):
""" Return the length of the change times sequence. """
return len(self.edges)
def __nonzero__(self):
""" Return true if the signal is not void, return false otherwise. """
return not self.start is None
def __eq__(self,other):
""" Equality test between two signals. Return *True* if the
two signals are equal. Otherwise, return *False*. Can be used
as the equality operator as in the following example (signal
a,b are instances of the Signal class)::
if signal_a == signal_b:
print 'signal a and b are equal' """
if self and other:
return self.__dict__ == other.__dict__
else:
return not self and not other
def __ne__(self,other):
""" Inequality test between two signals. Return *True* if the
two signals are not equal. Otherwise, return *False*. Can be used
as the inequality operator as in the following example (signal a,b
are instances of the Signal class)::
if signal_a != signal_b:
print 'signal a and b are different'"""
if self and other:
return self.__dict__ != other.__dict__
else:
return self or other
def _intersect(self,other):
""" Compute the time intersection of two signals, if exists.
Return the start and end time of intersection, for each signal,
return the indexes of the first and last signal changes inside
the intersection. """
# check for empty signal
if not self or not other:
return None
# start and end times of signal A and B intersection
start = max(self.start,other.start)
end = min(self.end,other.end)
# if no intersection, return none
if start >= end:
return None
# find index of first edge after start and index of last edge before
# end. If no edges in start-end range, set indexes to 0.
for ia_start in range(len(self)):
if self.edges[ia_start] < start:
continue
for ia_end in range(-1,-len(self)-1,-1):
if self.edges[ia_end] <= end:
ia_end += len(self) + 1
break
else:
ia_start = ia_end = 0
break
else:
ia_start = ia_end = len(self)
for ib_start in range(len(other)):
if other.edges[ib_start] < start:
continue
for ib_end in range(-1,-len(other)-1,-1):
if other.edges[ib_end] <= end:
ib_end += len(other) + 1
break
else:
ib_start = ib_end = 0
break
else:
ib_start = ib_end = len(other)
# compute level before first change after start
slevel_a = self.slevel ^ (ia_start & 1)
slevel_b = other.slevel ^ (ib_start & 1)
return start,end,ia_start,ia_end,slevel_a,ib_start,ib_end,slevel_b
def _bioper(self,other,operator):
""" Compute the logic operator of two given signal object: *self* and
*signal*. Return a signal object with the operator applyed to the two
input signals. """
# if one or both operand is none, return none as result.
if not self or not other:
return Signal()
# compute A and B time intersection paramenters.
# If no intersection, return none.
intersection = self._intersect(other)
if not intersection:
return Signal()
start,end,ia_start,ia_end,slevel_a,ib_start,ib_end,slevel_b = \
intersection
# optimize result computation when both self and other are constant
# if self and other are constant, return a constant signal.
if len(self) < 1 and len(other) < 1:
return Signal(start,[],end,slevel=operator(self.slevel,other.slevel))
# optimize result computation when self or other is constant
if len(self) < 1 and not (operator(1,0) ^ self.slevel):
return Signal(start,[],end,slevel=self.slevel)
if len(other) < 1 and not (operator(1,0) ^ other.slevel):
return Signal(start,[],end,slevel=other.slevel)
# create output signal object
out_sig = Signal(start,[],end)
# initial status vars of a two input logic: inputs a and b, output.
in_a = slevel_a
in_b = slevel_b
out_sig.slevel = operator(in_a,in_b)
out = out_sig.slevel
# get all edges, one at a time, from the two lists sorted by
# ascending time, do it until the end of one of the two lists is
# reached.
ia = ia_start
ib = ib_start
while ia < ia_end and ib < ib_end:
# get the next edge in time. If the next edge makes a change to the
# and output, append it to the output anded pulses and update and
# logic output (a_and_b).
# Always update logic inputs (a,b) and list pointers (i,j)
if self.edges[ia] < other.edges[ib]:
in_a = not in_a
if out != operator(in_a,in_b):
out_sig.edges.append(self.edges[ia])
out = not out
ia = ia + 1
elif self.edges[ia] > other.edges[ib]:
in_b = not in_b
if out != operator(in_a,in_b):
out_sig.edges.append(other.edges[ib])
out = not out
ib = ib + 1
else:
in_a = not in_a
in_b = not in_b
if out != operator(in_a,in_b):
out_sig.edges.append(self.edges[ia])
out = not out
ia = ia + 1
ib = ib + 1
# if one of A or B is exausted, the requested logic operation
# is applied to the remaining part of the other and to the
# terminating level of the first. The output is appended to result.
if ia == ia_end and ib < ib_end:
in_a = self.slevel ^ (ia & 1)
if operator(in_a,0) != operator(in_a,1):
out_sig.edges.extend(other.edges[ib:ib_end])
elif ia < ia_end and ib == ib_end:
in_b == other.slevel ^ (ib & 1)
if operator(in_b,0) != operator(in_b,1):
out_sig.edges.extend(self.edges[ia:ia_end])
return out_sig
def __and__(self,other):
""" Compute the logic *and* of two given signal objects: *self* and
*other*. Return a signal object with the and of the two input
signals. Can be used as the bitwise and operator as in the following
example (signal a,b,c are instances of the Signal class)::
signal_c = signal_a & signal_b
"""
return self._bioper(other,lambda a,b: a and b)
def __or__(self,other):
""" Compute the logic *or* of two given signal objects: *self* and
*other*. Return a signal object with the or of the two input
signals. Can be used as the bitwise or operator as in the following
example (signal a,b,c are instances of the Signal class)::
signal_c = signal_a | signal-b
"""
return self._bioper(other,lambda a,b: a or b)
def __xor__(self,other):
""" Compute the logic *xor* of two given signal objects: *self* and
*signal*. Return a signal object with the xor of the two input
signals. Can be used as the bitwise xor operator as in the
following example (signal a,b,c are instances of the Signal class)::
signal_c = signal_a ^ signal_b
"""
# if one or both operand is none, return none as result.
if not self or not other:
return Signal()
# compute A and B time intersection paramenters.
# If no intersection, return none.
intersection = self._intersect(other)
if not intersection:
return Signal()
start,end,ia_start,ia_end,slevel_a,ib_start,ib_end,slevel_b = \
intersection
# create signal object for xor storage
xor_sig = Signal(start,[],end)
# set start level
xor_sig.slevel = slevel_a ^ slevel_b
# xor is the union of pulse edges sorted by time
xor_sig.edges = self.edges[ia_start:ia_end]+other.edges[ib_start:ib_end]
xor_sig.edges.sort()
# simultaneous edges cancel each other, so, if any, remove them.
i = 0
while i < len(xor_sig) - 1:
if not xor_sig.edges[i] - xor_sig.edges[i + 1]:
del xor_sig.edges[i]
del xor_sig.edges[i]
else:
i += 1
return xor_sig
def __invert__(self,inplace=False):
""" Compute the logic *not* of the given signal object: *self*.
If *inplace* is false, return the result as a new signal object.
Otherwise, return the result as *self*.
Can be used as the bitwise not operator as in the following example
(signal a,b are instances of the Signal class)::
signal_b = ~ signal_a
"""
# set where to return result
if inplace:
sig = self
else:
sig = self.clone()
# void is invert invariant
if not self:
return sig
# apply inversion
sig.slevel = not sig.slevel
return sig
def integral(self,level=1,normalize=False):
""" Return the integral of a signal object: the elapsed time of all
periods in which the signal is at the level specified by *level*.
Output can be absolute (*normalize=False*) or can be normalized
(*normalize=True*): absolute integral averaged over the whole signal
domain. The summation is operated on the signal domain only.
If *self* is void, return none. """
# if self is void, return none.
if not self:
return None
# do summation between first and last signal edges
edges_int = 0
for i in range(0,len(self),2):
try:
edges_int += self.edges[i + 1] - self.edges[i]
except:
edges_int += self.end - self.edges[-1]
# return summation of level=0 or =1 as requested by level argument
if level ^ self.slevel:
integral = edges_int
else:
integral = self.end - self.start - edges_int
# return normalized if requested by normalized argument
if normalize:
integral = float(integral) / (self.end - self.start)
return integral
def correlation(self,other,mask=None,step_size=1.,
skip=0,width=None,normalize=False):
""" Return the correlation function of two given signal objects:
*self* and *other*.
**mask**: signal or None, same elapse of *other*. Compute
correlation only where *mask* == 1. If None, compute correlation
on the whole intersection of *self* and *other*.
**step_size**: positive float, the time pitch of the correlation
function.
**skip**: positive float, the time elapse at the start of the
correlation function not to be computed.
**width**: is the time elapse where to compute the correlation
function. If None, compute the correlation function for each time
shift of *self* that has an intersection with *other*, having an
elapse time >= *step_size*.
**normalize**: boolean, controls the values of the correlation
function. If True, values are normalized in the range -1 +1.
If False, values are absolute: the integral of xor between shifted
*self* and *other* signals.
Return pattern **(** *corr, shift* **)**
**corr**: list of floats. The values of the correlation function.
**shift**: list of floats. The time shift applied to *self* to slide
it over *other* signal for each value of *corr*.
"""
# if at least one signal is void, return empty lists.
if not self or not other:
return [],[]
# correlation width
def_width = max(0,self.elapse() + other.elapse() - skip - step_size)
if width:
width = max(0,min(width,def_width))
else:
width = def_width
# if not at least one step, do empty return.
if width < step_size:
return [],[]
# simplify variables access
sig_a = self.clone()
# time shift that, applied to A, align A end with B start.
align_shift = other.start - self.end
# apply shift to slide A to the leftmost position
sig_a.shift(align_shift + skip + step_size,inplace=True)
# compute correlation step by step
corr = []
shift = []
while sig_a.start <= self.start + align_shift + skip + width:
# correlation among signal A and B
if mask:
xor = (sig_a ^ other) | ~mask
else:
xor = sig_a ^ other
if normalize:
corr += [xor.integral(0,normalize) * 2 - 1]
else:
corr += [xor.integral(0,normalize=False)]
shift += [sig_a.start - self.start]
# shift right A by step unit
sig_a.shift(step_size,inplace=True)
return corr, shift
def phase(self,other,mask,resolutions,period=None):
""" Find the phase between *self* and *other*. Phase is the time shift
that applied to *self* gives the maximum correlation:
*self* (t + phase) * *other* (t) is maximum (* means correlation).
For faster computation, the phase can be computed by progressive smaller
resolutions.
**mask**: signal or None, same elapse of *other*. Compute
correlation only where *mask* == 1. If None, compute correlation
on the whole intersection of *self* and *other*.
**resolutions**: tuple or list of positive float, at least one
element, sequence of resolutions from coarser to finest, the time
step used in the computation of correlation.
**period**: None or positive float. If None, phase is computed as
absolute time shift. If float, phase is the time shift with respect to