import mermaid from ''; mermaid.initialize({ startOnLoad: true });
Quelques éléments de code pour le hackathon 2022.
from jyquickhelper import add_notebook_menu
from pytube import YouTube
# YouTube('https://youtu.be/9bZkp7q19f0').streams.first().download()
yt = YouTube('https://www.youtube.com/watch?v=X-4UPGVxKgc')
#yt.streams.first().download()
down = yt.streams.first().download()
Si cela ne fonctionne pas, voir pytube.exceptions.RegexMatchError: get_throttling_function_name: could not find match for multiple.
down
import moviepy.editor as me
dat = me.AudioFileClip(down)
wav = dat.to_soundarray()
wav.shape
dat.write_audiofile("sound.wav", 44100, 2, 2000, "pcm_s32le")
Il faut utiliser la version de github.
pip install git+https://github.com/pyannote/pyannote-audio.git@develop#egg=pyannote-audio
.
from pyannote.audio import Pipeline
pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization")
# apply pretrained pipeline
diarization = pipeline("sound.wav")
# print the result
for turn, _, speaker in diarization.itertracks(yield_label=True):
print(f"start={turn.start:.1f}s stop={turn.end:.1f}s speaker_{speaker}")
import onnxruntime as ort
sess = ort.InferenceSession("speaker_diarization.onnx")
for i in sess.get_inputs():
print(i)
import torch
import librosa
import numpy as np
from itertools import groupby
from scipy.ndimage import gaussian_filter1d
def zcr_vad(y, shift=0.025, win_len=2048, hop_len=1024, threshold=0.005):
if isinstance(y, torch.Tensor):
y = y.cpu().numpy()
if y.ndim == 2:
y = y[0]
zcr = librosa.feature.zero_crossing_rate(y + shift, win_len, hop_len)[0]
activity = gaussian_filter1d(zcr, 1) > threshold
activity = np.repeat(activity, len(y) // len(activity) + 1)
activity = activity[:len(y)]
return activity
def get_timestamp(activity):
mask = [k for k, _ in groupby(activity)]
change = np.argwhere(activity[:-1] != activity[1:]).flatten()
span = np.concatenate([[0], change, [len(activity)]])
span = list(zip(span[:-1], span[1:]))
span = np.array(span)[mask]
return span
import torchaudio
from torchaudio.transforms import MFCC, Resample
sr = 16000
print("A")
waveform, ori_sr = torchaudio.load("sound.wav")
print("A")
waveform = waveform.mean(0, keepdims=True)
print("A")
_resample = Resample(ori_sr, sr)
print("A")
audio = _resample(waveform)
print("A")
activity = zcr_vad(y)
spans = get_timestamp(activity)
embed = [self._encode_segment(y, span) for span in spans]
embed = torch.cat(embed).cpu().numpy()
speakers = OptimizedAgglomerativeClustering().fit_predict(embed)
audio.shape
import torch
import torchaudio
from torchaudio.transforms import MFCC, Resample
from torch.utils.data import Dataset, DataLoader
class BaseLoad:
def __init__(self, sr=16000, n_mfcc=40):
self.sr = sr
self.n_mfcc = n_mfcc
self._mfcc = MFCC(sr, n_mfcc=40, log_mels=True)
def _load(self, path, mfcc=True):
try:
waveform, ori_sr = torchaudio.load(path)
waveform = waveform.mean(0, keepdims=True)
except RuntimeError:
raise Exception(f"Error loading {path}")
_resample = Resample(ori_sr, self.sr)
audio = _resample(waveform)
if mfcc:
audio = self._mfcc(audio)
return audio
class BasePredictor(BaseLoad):
def __init__(self, config_path, max_frame, hop):
config = torch.load(config_path)
self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
super().__init__(config.get('sr', 16000), config.get('n_mfcc', 40))
self.ndim = config.get('ndim', 3)
self.max_frame = max_frame
self.hop = hop
@staticmethod
def _plot_diarization(y, spans, speakers):
c = y[0].cpu().numpy().copy()
for (start, end), speaker in zip(spans, speakers):
c[start:end] = speaker
plt.figure(figsize=(15, 2))
plt.plot(y[0], "k-")
for idx, speaker in enumerate(set(speakers)):
plt.fill_between(range(len(c)), -1, 1, where=(c==speaker), alpha=0.5, label=f"speaker_{speaker}")
plt.legend(loc="upper center", ncol=idx+1, bbox_to_anchor=(0.5, -0.25))
class PyTorchPredictor(BasePredictor):
def __init__(self, config_path, model_path, max_frame=45, hop=3):
super().__init__(config_path, max_frame, hop)
weight = torch.load(model_path, map_location="cpu")
self.model = Encoder(self.ndim).to(self.device)
self.model.load_state_dict(weight)
self.model.eval()
def predict(self, path, plot=False):
y = self._load(path, mfcc=False)
activity = zcr_vad(y)
spans = get_timestamp(activity)
embed = [self._encode_segment(y, span) for span in spans]
embed = torch.cat(embed).cpu().numpy()
speakers = OptimizedAgglomerativeClustering().fit_predict(embed)
if plot:
self._plot_diarization(y, spans, speakers)
timestamp = np.array(spans) / self.sr
return timestamp, speakers
def _encode_segment(self, y, span):
start, end = span
mfcc = self._mfcc(y[:, start:end]).to(self.device)
mfcc = mfcc.unfold(2, self.max_frame, self.hop).permute(2, 0, 1, 3)
with torch.no_grad():
embed = self.model(mfcc).mean(0, keepdims=True)
return embed
p = PyTorchPredictor("weights_best.pth", "configs.pth")