.. _traitementdusonrst: === Son === .. only:: html **Links:** :download:`notebook `, :downloadlink:`html `, :download:`PDF `, :download:`python `, :downloadlink:`slides `, :githublink:`GitHub|_doc/notebooks/hackathon_2022/traitement_du_son.ipynb|*` Quelques éléments de code pour le hackathon 2022. .. code:: ipython3 from jyquickhelper import add_notebook_menu Télécharger depuis youtube -------------------------- `pytube `__ .. code:: ipython3 from pytube import YouTube # YouTube('https://youtu.be/9bZkp7q19f0').streams.first().download() yt = YouTube('https://www.youtube.com/watch?v=X-4UPGVxKgc') .. code:: ipython3 #yt.streams.first().download() down = yt.streams.first().download() Si cela ne fonctionne pas, voir `pytube.exceptions.RegexMatchError: get_throttling_function_name: could not find match for multiple `__. .. code:: ipython3 down .. code:: ipython3 import moviepy.editor as me dat = me.AudioFileClip(down) .. code:: ipython3 wav = dat.to_soundarray() .. code:: ipython3 wav.shape .. code:: ipython3 dat.write_audiofile("sound.wav", 44100, 2, 2000, "pcm_s32le") pyannote.autio -------------- Il faut utiliser la version de github. ``pip install git+https://github.com/pyannote/pyannote-audio.git@develop#egg=pyannote-audio``. .. code:: ipython3 from pyannote.audio import Pipeline pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization") # apply pretrained pipeline diarization = pipeline("sound.wav") # print the result for turn, _, speaker in diarization.itertracks(yield_label=True): print(f"start={turn.start:.1f}s stop={turn.end:.1f}s speaker_{speaker}") ONNX ---- Voir `Speaker Diarization using GRU in PyTorch `__. .. code:: ipython3 import onnxruntime as ort sess = ort.InferenceSession("speaker_diarization.onnx") .. code:: ipython3 for i in sess.get_inputs(): print(i) .. code:: ipython3 import torch import librosa import numpy as np from itertools import groupby from scipy.ndimage import gaussian_filter1d def zcr_vad(y, shift=0.025, win_len=2048, hop_len=1024, threshold=0.005): if isinstance(y, torch.Tensor): y = y.cpu().numpy() if y.ndim == 2: y = y[0] zcr = librosa.feature.zero_crossing_rate(y + shift, win_len, hop_len)[0] activity = gaussian_filter1d(zcr, 1) > threshold activity = np.repeat(activity, len(y) // len(activity) + 1) activity = activity[:len(y)] return activity def get_timestamp(activity): mask = [k for k, _ in groupby(activity)] change = np.argwhere(activity[:-1] != activity[1:]).flatten() span = np.concatenate([[0], change, [len(activity)]]) span = list(zip(span[:-1], span[1:])) span = np.array(span)[mask] return span .. code:: ipython3 import torchaudio from torchaudio.transforms import MFCC, Resample sr = 16000 print("A") waveform, ori_sr = torchaudio.load("sound.wav") print("A") waveform = waveform.mean(0, keepdims=True) print("A") _resample = Resample(ori_sr, sr) print("A") audio = _resample(waveform) print("A") .. code:: ipython3 activity = zcr_vad(y) spans = get_timestamp(activity) .. code:: ipython3 embed = [self._encode_segment(y, span) for span in spans] embed = torch.cat(embed).cpu().numpy() speakers = OptimizedAgglomerativeClustering().fit_predict(embed) .. code:: ipython3 audio.shape .. code:: ipython3 import torch import torchaudio from torchaudio.transforms import MFCC, Resample from torch.utils.data import Dataset, DataLoader class BaseLoad: def __init__(self, sr=16000, n_mfcc=40): self.sr = sr self.n_mfcc = n_mfcc self._mfcc = MFCC(sr, n_mfcc=40, log_mels=True) def _load(self, path, mfcc=True): try: waveform, ori_sr = torchaudio.load(path) waveform = waveform.mean(0, keepdims=True) except RuntimeError: raise Exception(f"Error loading {path}") _resample = Resample(ori_sr, self.sr) audio = _resample(waveform) if mfcc: audio = self._mfcc(audio) return audio class BasePredictor(BaseLoad): def __init__(self, config_path, max_frame, hop): config = torch.load(config_path) self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") super().__init__(config.get('sr', 16000), config.get('n_mfcc', 40)) self.ndim = config.get('ndim', 3) self.max_frame = max_frame self.hop = hop @staticmethod def _plot_diarization(y, spans, speakers): c = y[0].cpu().numpy().copy() for (start, end), speaker in zip(spans, speakers): c[start:end] = speaker plt.figure(figsize=(15, 2)) plt.plot(y[0], "k-") for idx, speaker in enumerate(set(speakers)): plt.fill_between(range(len(c)), -1, 1, where=(c==speaker), alpha=0.5, label=f"speaker_{speaker}") plt.legend(loc="upper center", ncol=idx+1, bbox_to_anchor=(0.5, -0.25)) class PyTorchPredictor(BasePredictor): def __init__(self, config_path, model_path, max_frame=45, hop=3): super().__init__(config_path, max_frame, hop) weight = torch.load(model_path, map_location="cpu") self.model = Encoder(self.ndim).to(self.device) self.model.load_state_dict(weight) self.model.eval() def predict(self, path, plot=False): y = self._load(path, mfcc=False) activity = zcr_vad(y) spans = get_timestamp(activity) embed = [self._encode_segment(y, span) for span in spans] embed = torch.cat(embed).cpu().numpy() speakers = OptimizedAgglomerativeClustering().fit_predict(embed) if plot: self._plot_diarization(y, spans, speakers) timestamp = np.array(spans) / self.sr return timestamp, speakers def _encode_segment(self, y, span): start, end = span mfcc = self._mfcc(y[:, start:end]).to(self.device) mfcc = mfcc.unfold(2, self.max_frame, self.hop).permute(2, 0, 1, 3) with torch.no_grad(): embed = self.model(mfcc).mean(0, keepdims=True) return embed p = PyTorchPredictor("weights_best.pth", "configs.pth")