-
Notifications
You must be signed in to change notification settings - Fork 1
/
extract_posedge.py
111 lines (101 loc) · 3.65 KB
/
extract_posedge.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import csv,sys
import queue
from statistics import mean, median, variance, stdev
from scipy.stats import skew, kurtosis
from functools import reduce
from math import sqrt,floor,ceil
import numpy as np
def rms(xs):
return sqrt(reduce(lambda a, x: a + x * x, xs, 0) / len(xs))
def en(xs):
return reduce(lambda a, x: a + x * x, xs, 0) / len(xs)
idx = 0
posedge_term = 0
threshold = 2.97
can_dom_bit_idx = 0 # 0 to 2000
can_res_bit_idx = 0 # 0 to 2000
can_signal = []
SOF = False
POSEDGE = False
posedge_q = queue.Queue()
posedge_list = []
prev_can_signal_len = -1
sampling_rate = int(sys.argv[2])
#print(sampling_rate, 'MS/s')
#print(floor(float(1/sampling_rate)*1000), 'ns resolution')
skip_duration = floor(float(1/sampling_rate)*1000/2)
#print(skip_duration, 'skip duration')
queue_length = ceil(sampling_rate/2.0)+1
#print(queue_length, 'queue length')
buffering_term = floor((sampling_rate/2.0+1)/2)
#print(buffering_term, 'buffering term')
with open(sys.argv[1]) as f:
while True:
idx += 1
row = f.readline()
if idx == 1 or idx == 2 or idx == 3:
continue
#print(row, end='')
try :
v_value = float(row.split(',')[1])
except IndexError:
break
# estimate can bit signal
if v_value >= threshold :
#print(v_value)
SOF = True
can_dom_bit_idx += 2
elif SOF == True and v_value < threshold :
can_res_bit_idx += 2
if can_dom_bit_idx == 1000:
can_signal.append('0')
elif can_res_bit_idx == 1000:
can_signal.append('1')
elif can_res_bit_idx >= 2000:
can_res_bit_idx = 0
elif can_dom_bit_idx >= 2000:
can_dom_bit_idx = 0
if idx % skip_duration != 0:
continue
posedge_q.put(v_value)
if posedge_q.qsize() > queue_length:
posedge_q.get()
# extract posedge edge
try :
if posedge_q.queue[-1] - posedge_q.queue[0] >= 0.5 and prev_can_signal_len != len(can_signal) :
POSEDGE = True
#print(len(can_signal), posedge_q.queue[0], posedge_q.queue[-1])
prev_can_signal_len = len(can_signal)
if POSEDGE == True:
posedge_term += 1
if posedge_term >= buffering_term and POSEDGE == True:
for q_item in posedge_q.queue:
posedge_list.append(q_item)
#print("Posedge Edge: ", q_item, len(can_signal))
POSEDGE = False
posedge_term = 0
posedge_q.empty()
#print("=================================")
except IndexError :
continue
# feature extraction
fft_posedge_list = abs(np.fft.fft(posedge_list))
print('{:.4f}'.format(mean(posedge_list)),\
'{:.4f}'.format(stdev(posedge_list)),\
'{:.4f}'.format(variance(posedge_list)),\
'{:.4f}'.format(skew(posedge_list)),\
'{:.4f}'.format(kurtosis(posedge_list)),\
'{:.4f}'.format(max(posedge_list)),\
'{:.4f}'.format(min(posedge_list)),\
'{:.4f}'.format(rms(posedge_list)),\
'{:.4f}'.format(en(posedge_list)),\
'{:.4f}'.format(mean(fft_posedge_list)),\
'{:.4f}'.format(stdev(fft_posedge_list)),\
'{:.4f}'.format(variance(fft_posedge_list)),\
'{:.4f}'.format(skew(fft_posedge_list)),\
'{:.4f}'.format(kurtosis(fft_posedge_list)),\
'{:.4f}'.format(max(fft_posedge_list)),\
'{:.4f}'.format(min(fft_posedge_list)),\
'{:.4f}'.format(rms(fft_posedge_list)),\
'{:.4f}'.format(en(fft_posedge_list))
)