Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add 1f analyzer #5

Merged
merged 2 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,7 @@
*.csv
*.xls
*.png
*.mp3
__pycache__
flagged
*.DS_Store
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
FROM python:3.10.15-slim-bullseye

RUN apt-get update && apt-get install -y git curl
RUN apt-get update && apt-get install -y git curl ffmpeg
RUN git config --global --add safe.directory /app
RUN python3 -m pip install --upgrade pip
RUN python3 -m pip install poetry \
Expand Down
111 changes: 19 additions & 92 deletions lab_tool_webui.py
Original file line number Diff line number Diff line change
@@ -1,98 +1,11 @@
import sys
import numpy as np
import matplotlib.pyplot as plt
import math
import pandas as pd
import gradio as gr
import tempfile


# CSVファイルから信号データを読み込む
def load_signal(file_path, column_name):
try:
df = pd.read_csv(file_path)
signal = df[column_name].values
return signal
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
return []
except KeyError as e:
print(f"Column '{column_name}' not found in the file. ({e})", file=sys.stderr)
return []


# モルレーウェーブレット関数
def morlet(x, f, width):
sf = f / width
st = 1 / (2 * math.pi * sf)
A = 1 / (st * math.sqrt(2 * math.pi))
h = -np.power(x, 2) / (2 * st**2)
co1 = 1j * 2 * math.pi * f * x
return A * np.exp(co1) * np.exp(h)


# 連続ウェーブレット変換
def continuous_wavelet_transform(Fs, data, fmax, width=48, wavelet_R=0.5):
Ts = 1 / Fs
wavelet_length = np.arange(-wavelet_R, wavelet_R, Ts)
data_length = len(data)
cwt_result = np.zeros([fmax, data_length])

for i in range(fmax):
conv_result = np.convolve(data, morlet(wavelet_length, i + 1, width), mode='same')
cwt_result[i, :] = (2 * np.abs(conv_result) / Fs) ** 2

return cwt_result


# 連続ウェーブレット変換結果をカラーマップとしてプロット
def plot_cwt(cwt_result, time_data, fmax):
plt.imshow(cwt_result, cmap='jet', aspect='auto',
extent=[time_data[0], time_data[-1], 0, fmax],
vmax=abs(cwt_result).max(), vmin=-abs(cwt_result).max())
plt.xlabel("Time [sec]")
plt.ylabel("Frequency [Hz]")
plt.colorbar(label="Power")
plt.clim(-5, 5)


# グラフ描画とCWTの処理を行う関数
def wavelet_ui(uploaded_file, Fs, fmax, column_name, start_time, end_time):
filepath = uploaded_file.name
signal = load_signal(filepath, column_name)

if len(signal) == 0:
return None, None

# 時間データを計算
t_data = np.arange(0, len(signal) / Fs, 1 / Fs)

# スライダーの範囲に基づいてデータをフィルタリング
start_idx = int(start_time * Fs)
end_idx = int(end_time * Fs)
signal = signal[start_idx:end_idx]
t_data = t_data[start_idx:end_idx]

signal_filename = tempfile.NamedTemporaryFile(delete=False, suffix='.png').name
plt.figure(dpi=200)
plt.title("Signal")
plt.plot(t_data, signal)
plt.xlim(start_time, end_time)
plt.xlabel("Time [sec]")
plt.ylabel("Voltage [uV]")
plt.savefig(signal_filename)

cwt_signal_filename = tempfile.NamedTemporaryFile(delete=False, suffix='.png').name
cwt_signal = continuous_wavelet_transform(Fs=Fs, data=signal, fmax=fmax)
plt.figure(dpi=200)
plot_cwt(cwt_signal, t_data, fmax)
plt.savefig(cwt_signal_filename)

return cwt_signal_filename, signal_filename
from lab_tools import wavelet
from lab_tools import labutils
from lab_tools import analyze1f


def update_slider_range(filepath):
timestamp = load_signal(filepath, "Timestamp")
timestamp = labutils.load_signal(filepath, "Timestamp")
max_value = float(timestamp[len(timestamp) - 1])
min_value = float(timestamp[0])

Expand Down Expand Up @@ -121,6 +34,20 @@ def update_slider_range(filepath):
wavelet_image = gr.Image(type="filepath", label="Wavelet")
signal_image = gr.Image(type="filepath", label="Signal")

submit_button.click(wavelet_ui, inputs=[file_input, fs_slider, fmax_slider, column_dropdown, start_time, end_time], outputs=[wavelet_image, signal_image])
submit_button.click(wavelet.wavelet_ui, inputs=[file_input, fs_slider, fmax_slider, column_dropdown, start_time, end_time], outputs=[wavelet_image, signal_image])

with gr.Tab("1f Noise Search"):
with gr.Row():
with gr.Column():
file_input = gr.Text(label="YouTubeのリンクを貼り付けてください。")
submit_button = gr.Button("計算開始")

with gr.Column():
caption = gr.Text(label="動画タイトル")
result = gr.Image(type="filepath", label="Wavelet")

submit_button.click(analyze1f.analyze_1f_noise, inputs=[file_input], outputs=[caption, result])


if __name__ == "__main__":
main_ui.queue().launch(server_name="0.0.0.0")
69 changes: 69 additions & 0 deletions lab_tools/analyze1f.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import numpy as np
import matplotlib.pyplot as plt
from pydub import AudioSegment
from scipy.fftpack import fft
import yt_dlp
import os
import tempfile


def download_youtube(youtube_url: str) -> str:
ydl_opts = {
'postprocessors': [
{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '128',
}
],
'outtmpl': '%(title)s.%(ext)s'
}

with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info_dict = ydl.extract_info(youtube_url, download=True)
file_path = ydl.prepare_filename(info_dict)
filename, _ = os.path.splitext(file_path)
filename += ".mp3"
print(f"Downloaded file path: {filename}")

return filename


def analyze_1f_noise(youtube_url: str):
filename = download_youtube(youtube_url)
audio = AudioSegment.from_mp3(filename)
os.remove(filename)
data = np.array(audio.get_array_of_samples())
sample_rate = audio.frame_rate

if audio.channels > 1:
data = data.reshape((-1, audio.channels)).mean(axis=1)

N = len(data)
T = 1.0 / sample_rate
yf = fft(data)
xf = np.fft.fftfreq(N, T)[:N//2]

power_spectrum = 2.0/N * np.abs(yf[:N//2])

xf_log = xf[1:]
power_spectrum_log = power_spectrum[1:]

graphfile_path = tempfile.NamedTemporaryFile(delete=False, suffix='.png').name

# グラフの描画
plt.figure(figsize=(10, 6))
plt.plot(xf_log, power_spectrum_log)
plt.xscale('log')
plt.yscale('log')

plt.title('Power Spectrum (Log Scale)')
plt.xlabel('Frequency (Hz)')
plt.ylabel('Power')
plt.grid(True, which="both", ls="--")
plt.xlim([1, sample_rate // 2])
plt.savefig(graphfile_path)

filename, _ = os.path.splitext(filename)

return filename, graphfile_path
62 changes: 62 additions & 0 deletions lab_tools/highpass.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import numpy as np
from scipy import signal
from scipy import fftpack
from typing import Tuple


def highpass_filter(signal_data: np.ndarray, samplerate: float, fp: float, fs: float, gpass: float, gstop: float) -> np.ndarray:
fn = samplerate / 2
wp = fp / fn
ws = fs / fn

N, Wn = signal.buttord(wp, ws, gpass, gstop)
b, a = signal.butter(N, Wn, "high")

filtered_signal = signal.filtfilt(b, a, signal_data)

return filtered_signal


def overlap_frames(signal_data: np.ndarray, samplerate: float, frame_size: int, overlap: float) -> Tuple[np.ndarray, int]:
total_duration = len(signal_data) / samplerate
frame_duration = frame_size / samplerate
step_size = frame_size * (1 - overlap / 100)

num_frames = int((total_duration - (frame_duration * overlap / 100)) / (frame_duration * (1 - overlap / 100)))

frames = []

for i in range(num_frames):
start_idx = int(step_size * i)
frames.append(signal_data[start_idx:start_idx + frame_size])

return np.array(frames), num_frames


def hanning(signal_data: np.ndarray, frame_size: int, num_frames: int) -> Tuple[np.ndarray, float]:
han = signal.get_window('hann', frame_size)
acf = 1 / (sum(han) / frame_size)

for i in range(num_frames):
signal_data[i] *= han

return signal_data, acf


def fft_ave(signal_data: np.ndarray, samplerate: float, frame_size: int, num_frames: int, acf: float):
fft_array = []
for i in range(num_frames):
fft_result = fftpack.fft(signal_data[i]) / frame_size
fft_array.append(acf * np.abs(fft_result))

fft_axis = np.linspace(0, samplerate / 2, frame_size // 2)
fft_array = np.array(fft_array)[:, :frame_size // 2]
fft_mean = np.mean(fft_array, axis=0)

return fft_array, fft_mean, fft_axis


def linear_to_db(x: float, y: float) -> float:
if y == 0:
raise ValueError("y cannot be zero in logarithmic conversion")
return 20 * np.log10(x / y)
24 changes: 24 additions & 0 deletions lab_tools/labutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import sys
import pandas as pd


# CSVファイルから信号データを読み込む
def load_signal(file_path, column_name):
try:
with open(file_path, 'r') as file:
# データ部分が始まる行を見つける
for i, line in enumerate(file):
if 'Timestamp' in line:
header_line = i
break

# 見つけたヘッダー行からデータを読み込む
df = pd.read_csv(file_path, skiprows=header_line)
signal = df[column_name].values
return signal
except FileNotFoundError as e:
print(f"Error: {e}", file=sys.stderr)
return []
except KeyError as e:
print(f"Column '{column_name}' not found in the file. ({e})", file=sys.stderr)
return []
76 changes: 76 additions & 0 deletions lab_tools/wavelet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import numpy as np
import matplotlib.pyplot as plt
import math
import tempfile

from lab_tools import labutils


# モルレーウェーブレット関数
def morlet(x, f, width):
sf = f / width
st = 1 / (2 * math.pi * sf)
A = 1 / (st * math.sqrt(2 * math.pi))
h = -np.power(x, 2) / (2 * st**2)
co1 = 1j * 2 * math.pi * f * x
return A * np.exp(co1) * np.exp(h)


# 連続ウェーブレット変換
def continuous_wavelet_transform(Fs, data, fmax, width=48, wavelet_R=0.5):
Ts = 1 / Fs
wavelet_length = np.arange(-wavelet_R, wavelet_R, Ts)
data_length = len(data)
cwt_result = np.zeros([fmax, data_length])

for i in range(fmax):
conv_result = np.convolve(data, morlet(wavelet_length, i + 1, width), mode='same')
cwt_result[i, :] = (2 * np.abs(conv_result) / Fs) ** 2

return cwt_result


# 連続ウェーブレット変換結果をカラーマップとしてプロット
def plot_cwt(cwt_result, time_data, fmax):
plt.imshow(cwt_result, cmap='jet', aspect='auto',
extent=[time_data[0], time_data[-1], 0, fmax],
vmax=abs(cwt_result).max(), vmin=-abs(cwt_result).max())
plt.xlabel("Time [sec]")
plt.ylabel("Frequency [Hz]")
plt.colorbar(label="Power")
plt.clim(-5, 5)


# グラフ描画とCWTの処理を行う関数
def wavelet_ui(uploaded_file, Fs, fmax, column_name, start_time, end_time):
filepath = uploaded_file.name
signal = labutils.load_signal(filepath, column_name)

if len(signal) == 0:
return None, None

# 時間データを計算
t_data = np.arange(0, len(signal) / Fs, 1 / Fs)

# スライダーの範囲に基づいてデータをフィルタリング
start_idx = int(start_time * Fs)
end_idx = int(end_time * Fs)
signal = signal[start_idx:end_idx]
t_data = t_data[start_idx:end_idx]

signal_filename = tempfile.NamedTemporaryFile(delete=False, suffix='.png').name
plt.figure(dpi=200)
plt.title("Signal")
plt.plot(t_data, signal)
plt.xlim(start_time, end_time)
plt.xlabel("Time [sec]")
plt.ylabel("Voltage [uV]")
plt.savefig(signal_filename)

cwt_signal_filename = tempfile.NamedTemporaryFile(delete=False, suffix='.png').name
cwt_signal = continuous_wavelet_transform(Fs=Fs, data=signal, fmax=fmax)
plt.figure(dpi=200)
plot_cwt(cwt_signal, t_data, fmax)
plt.savefig(cwt_signal_filename)

return cwt_signal_filename, signal_filename
Loading
Loading