Son#

Links: notebook, html, PDF, python, slides, GitHub

Quelques éléments de code pour le hackathon 2022.

from jyquickhelper import add_notebook_menu

Télécharger depuis youtube#

pytube

from pytube import YouTube
# YouTube('https://youtu.be/9bZkp7q19f0').streams.first().download()
yt = YouTube('https://www.youtube.com/watch?v=X-4UPGVxKgc')
#yt.streams.first().download()
down = yt.streams.first().download()

Si cela ne fonctionne pas, voir pytube.exceptions.RegexMatchError: get_throttling_function_name: could not find match for multiple.

down
import moviepy.editor as me
dat = me.AudioFileClip(down)
wav = dat.to_soundarray()
wav.shape
dat.write_audiofile("sound.wav", 44100, 2, 2000, "pcm_s32le")

pyannote.autio#

Il faut utiliser la version de github. pip install git+https://github.com/pyannote/pyannote-audio.git@develop#egg=pyannote-audio.

from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization")

# apply pretrained pipeline
diarization = pipeline("sound.wav")

# print the result
for turn, _, speaker in diarization.itertracks(yield_label=True):
    print(f"start={turn.start:.1f}s stop={turn.end:.1f}s speaker_{speaker}")

ONNX#

Voir Speaker Diarization using GRU in PyTorch.

import onnxruntime as ort
sess = ort.InferenceSession("speaker_diarization.onnx")
for i in sess.get_inputs():
    print(i)
import torch
import librosa
import numpy as np
from itertools import groupby
from scipy.ndimage import gaussian_filter1d


def zcr_vad(y, shift=0.025, win_len=2048, hop_len=1024, threshold=0.005):
    if isinstance(y, torch.Tensor):
        y = y.cpu().numpy()
    if y.ndim == 2:
        y = y[0]
    zcr = librosa.feature.zero_crossing_rate(y + shift, win_len, hop_len)[0]
    activity = gaussian_filter1d(zcr, 1) > threshold
    activity = np.repeat(activity, len(y) // len(activity) + 1)
    activity = activity[:len(y)]
    return activity


def get_timestamp(activity):
    mask = [k for k, _ in groupby(activity)]
    change = np.argwhere(activity[:-1] != activity[1:]).flatten()
    span = np.concatenate([[0], change, [len(activity)]])
    span = list(zip(span[:-1], span[1:]))
    span = np.array(span)[mask]
    return span
import torchaudio
from torchaudio.transforms import MFCC, Resample

sr = 16000
print("A")
waveform, ori_sr = torchaudio.load("sound.wav")
print("A")
waveform = waveform.mean(0, keepdims=True)
print("A")
_resample = Resample(ori_sr, sr)
print("A")
audio = _resample(waveform)
print("A")
activity = zcr_vad(y)
spans = get_timestamp(activity)
embed = [self._encode_segment(y, span) for span in spans]
embed = torch.cat(embed).cpu().numpy()
speakers = OptimizedAgglomerativeClustering().fit_predict(embed)
audio.shape
import torch
import torchaudio
from torchaudio.transforms import MFCC, Resample
from torch.utils.data import Dataset, DataLoader


class BaseLoad:
    def __init__(self, sr=16000, n_mfcc=40):
        self.sr = sr
        self.n_mfcc = n_mfcc
        self._mfcc = MFCC(sr, n_mfcc=40, log_mels=True)

    def _load(self, path, mfcc=True):
        try:
            waveform, ori_sr = torchaudio.load(path)
            waveform = waveform.mean(0, keepdims=True)
        except RuntimeError:
            raise Exception(f"Error loading {path}")
        _resample = Resample(ori_sr, self.sr)
        audio = _resample(waveform)

        if mfcc:
            audio = self._mfcc(audio)
        return audio


class BasePredictor(BaseLoad):
    def __init__(self, config_path, max_frame, hop):
        config = torch.load(config_path)
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        super().__init__(config.get('sr', 16000), config.get('n_mfcc', 40))
        self.ndim = config.get('ndim', 3)
        self.max_frame = max_frame
        self.hop = hop

    @staticmethod
    def _plot_diarization(y, spans, speakers):
        c = y[0].cpu().numpy().copy()
        for (start, end), speaker in zip(spans, speakers):
            c[start:end] = speaker

        plt.figure(figsize=(15, 2))
        plt.plot(y[0], "k-")
        for idx, speaker in enumerate(set(speakers)):
            plt.fill_between(range(len(c)), -1, 1, where=(c==speaker), alpha=0.5, label=f"speaker_{speaker}")
        plt.legend(loc="upper center", ncol=idx+1, bbox_to_anchor=(0.5, -0.25))


class PyTorchPredictor(BasePredictor):
    def __init__(self, config_path, model_path, max_frame=45, hop=3):
        super().__init__(config_path, max_frame, hop)

        weight = torch.load(model_path, map_location="cpu")
        self.model = Encoder(self.ndim).to(self.device)
        self.model.load_state_dict(weight)
        self.model.eval()

    def predict(self, path, plot=False):
        y = self._load(path, mfcc=False)
        activity = zcr_vad(y)
        spans = get_timestamp(activity)

        embed = [self._encode_segment(y, span) for span in spans]
        embed = torch.cat(embed).cpu().numpy()
        speakers = OptimizedAgglomerativeClustering().fit_predict(embed)

        if plot:
            self._plot_diarization(y, spans, speakers)

        timestamp = np.array(spans) / self.sr
        return timestamp, speakers

    def _encode_segment(self, y, span):
        start, end = span
        mfcc = self._mfcc(y[:, start:end]).to(self.device)
        mfcc = mfcc.unfold(2, self.max_frame, self.hop).permute(2, 0, 1, 3)
        with torch.no_grad():
            embed = self.model(mfcc).mean(0, keepdims=True)
        return embed


p = PyTorchPredictor("weights_best.pth", "configs.pth")