| |
| import os |
| import librosa |
| import numpy as np |
| from concurrent.futures import ThreadPoolExecutor |
| import sys |
|
|
| from huggingface_hub import hf_hub_download |
| sys.path.append(os.path.join(os.path.dirname(__file__), '../utils')) |
| from ddsp.vocoder import F0_Extractor, Volume_Extractor |
|
|
| import torch |
| from typing import Union |
| from torch.nn import functional as F |
| from slicer import Slicer |
| from transformers import AutoTokenizer, AutoModel |
| |
|
|
| def edge_padding(f0): |
| f0_padded = f0.copy() |
| |
| |
| for i in range(1, len(f0) - 1): |
| if f0[i] != 0: |
| |
| if f0[i-1] == 0: |
| f0_padded[i-1] = f0[i] |
| |
| if f0[i+1] == 0: |
| f0_padded[i+1] = f0[i] |
| |
| return f0_padded |
|
|
| def split(audio, sample_rate, hop_size, db_thresh = -40, min_len = 5000): |
| slnpicer = Slicer( |
| sr=sample_rate, |
| threshold=db_thresh, |
| min_length=min_len) |
| chunks = dict(slicer.slice(audio)) |
| result = [] |
| for k, v in chunks.items(): |
| tag = v["split_time"].split(",") |
| if tag[0] != tag[1]: |
| start_frame = int(int(tag[0]) // hop_size) |
| end_frame = int(int(tag[1]) // hop_size) |
| if end_frame > start_frame: |
| result.append(( |
| start_frame, |
| audio[int(start_frame * hop_size) : int(end_frame * hop_size)])) |
| return result |
|
|
| def wav_pad(wav, multiple=200): |
| seq_len = wav.shape[0] |
| padded_len = ((seq_len + (multiple-1)) // multiple) * multiple |
| padded_wav = repeat_expand(wav, padded_len) |
| return padded_wav |
|
|
| def repeat_expand( |
| content: Union[torch.Tensor, np.ndarray], target_len: int, mode: str = "nearest" |
| ): |
| """Repeat content to target length. |
| This is a wrapper of torch.nn.functional.interpolate. |
| |
| Args: |
| content (torch.Tensor): tensor |
| target_len (int): target length |
| mode (str, optional): interpolation mode. Defaults to "nearest". |
| |
| Returns: |
| torch.Tensor: tensor |
| """ |
|
|
| ndim = content.ndim |
|
|
| if content.ndim == 1: |
| content = content[None, None] |
| elif content.ndim == 2: |
| content = content[None] |
|
|
| assert content.ndim == 3 |
|
|
| is_np = isinstance(content, np.ndarray) |
| if is_np: |
| content = torch.from_numpy(content) |
|
|
| results = torch.nn.functional.interpolate(content, size=target_len, mode=mode) |
|
|
| if is_np: |
| results = results.numpy() |
|
|
| if ndim == 1: |
| return results[0, 0] |
| elif ndim == 2: |
| return results[0] |
|
|
| def repeat_expand_2d(content, target_len, mode = 'left'): |
| |
| return repeat_expand_2d_left(content, target_len) if mode == 'left' else repeat_expand_2d_other(content, target_len, mode) |
|
|
|
|
| def repeat_expand_2d_left(content, target_len): |
| |
|
|
| src_len = content.shape[-1] |
| target = torch.zeros([content.shape[0], target_len], dtype=torch.float).to(content.device) |
| temp = torch.arange(src_len+1) * target_len / src_len |
| current_pos = 0 |
| for i in range(target_len): |
| if i < temp[current_pos+1]: |
| target[:, i] = content[:, current_pos] |
| else: |
| current_pos += 1 |
| target[:, i] = content[:, current_pos] |
|
|
| return target |
|
|
|
|
| |
| def repeat_expand_2d_other(content, target_len, mode = 'nearest'): |
| |
| content = content[None,:,:] |
| target = F.interpolate(content,size=target_len,mode=mode)[0] |
| return target |
|
|
| def align_data(data, max_len): |
| data_len = data.shape[-1] |
| if data_len < max_len: |
| data = F.pad(data, (0, max_len - data_len)) |
| elif data_len > max_len: |
| data = data[:max_len] |
| return data |
|
|
| def adjust_length(feature, target_len): |
| |
| current_len = feature.shape[0] |
| |
| |
| |
| if current_len == target_len: |
| return feature |
| |
| |
| feature = feature.t() |
| feature = feature.unsqueeze(0) |
| feature = F.interpolate(feature, size=target_len, mode='linear', align_corners=False) |
| |
| feature = feature.squeeze(0) |
| feature = feature.t() |
| |
| return feature |
|
|
| def load_bert_model(model_name, device): |
| tokenizer = AutoTokenizer.from_pretrained(model_name) |
| model = AutoModel.from_pretrained(model_name).to(device) |
| return tokenizer, model |
|
|
| def get_style_embed(style_prompt, tokenizer, model): |
| inputs = tokenizer(style_prompt, return_tensors="pt").to(model.device) |
| outputs = model(**inputs) |
| return outputs[-1] |
|
|
| def load_facodec(device): |
| from Amphion.models.codec.ns3_codec import FACodecEncoderV2, FACodecDecoderV2 |
| fa_encoder = FACodecEncoderV2( |
| ngf=32, |
| up_ratios=[2, 4, 5, 5], |
| out_channels=256, |
| ) |
|
|
| fa_decoder = FACodecDecoderV2( |
| in_channels=256, |
| upsample_initial_channel=1024, |
| ngf=32, |
| up_ratios=[5, 5, 4, 2], |
| vq_num_q_c=2, |
| vq_num_q_p=1, |
| vq_num_q_r=3, |
| vq_dim=256, |
| codebook_dim=8, |
| codebook_size_prosody=10, |
| codebook_size_content=10, |
| codebook_size_residual=10, |
| use_gr_x_timbre=True, |
| use_gr_residual_f0=True, |
| use_gr_residual_phone=True, |
| ) |
| |
| |
|
|
| encoder_ckpt = "utils/pretrain/ns3_facodec_encoder_v2.bin" |
| decoder_ckpt = "utils/pretrain/ns3_facodec_decoder_v2.bin" |
|
|
| fa_encoder.load_state_dict(torch.load(encoder_ckpt)) |
| fa_decoder.load_state_dict(torch.load(decoder_ckpt)) |
| |
| fa_encoder = fa_encoder.to(device).eval() |
| fa_decoder = fa_decoder.to(device).eval() |
| |
| return fa_encoder, fa_decoder |
|
|
| def load_f0_extractor(args): |
| f0_extractor = F0_Extractor(args.f0_extractor if args.f0_extractor is not None else 'rmvpe', |
| args.sr if args.sr is not None else 44100, |
| args.block_size if args.block_size is not None else 512, |
| args.f0_min if args.f0_min is not None else 60, |
| args.f0_max if args.f0_max is not None else 1200) |
| return f0_extractor |
|
|
| def load_volume_extractor(args): |
| volume_extractor = Volume_Extractor(args.block_size if args.block_size is not None else 512) |
| return volume_extractor |
|
|
| def load_audio(input_path, sr): |
| audio, _ = librosa.load(input_path, sr=sr) |
| if len(audio.shape) > 1: |
| audio = librosa.to_mono(audio) |
| return audio |
|
|
| def resample_and_normalize(audio, max_gain=0.6): |
| audio = audio / np.abs(audio).max() * max_gain |
| audio = audio / max(0.01, np.max(np.abs(audio))) * 32767 * max_gain |
| return audio.astype(np.int16) |
|
|
| def get_processed_file(input_path, sr, encoder_sr, mel_extractor, volume_extractor, f0_extractor, |
| fa_encoder=None, fa_decoder=None, content_encoder=None, spk_encoder=None, |
| device='cuda', max_sec=None, f0_interpolate_mode='full'): |
|
|
| if max_sec is not None: |
| max_audio_44k_len = sr * max_sec |
| max_audio_len = encoder_sr * max_sec |
| |
| |
| if not os.path.exists(input_path): |
| print(f'\n[Error] {input_path} does not exist!') |
| return None |
| try: |
| name = input_path.split('/')[-1].split('.')[0] |
| audio_44k = load_audio(input_path, sr) |
| audio = load_audio(input_path, encoder_sr) |
|
|
| if max_sec is not None and max_audio_44k_len > 0: |
| audio_44k = audio_44k[:min(len(audio_44k), max_audio_44k_len)] |
| audio = audio[:min(len(audio), max_audio_len)] |
| |
| audio_44k_t = torch.from_numpy(audio_44k).float().to(device).unsqueeze(0) |
| except Exception as e: |
| print(f'\n[Error] Failed to load audio. Error: {e}') |
| return None |
|
|
| |
| |
| def task_f0(): |
| return f0_extractor.extract(audio_44k, uv_interp=False) |
|
|
| def task_volume(): |
| return volume_extractor.extract(audio_44k) |
|
|
| def task_mel(): |
| return mel_extractor.extract(audio_44k_t, sr).squeeze() |
|
|
| def task_encoder(): |
| |
| with torch.no_grad(): |
| if fa_encoder is not None and fa_decoder is not None: |
| audio_t = torch.from_numpy(wav_pad(audio)).unsqueeze(0).unsqueeze(0).to(device) |
| enc_out = fa_encoder(audio_t) |
| prosody = fa_encoder.get_prosody_feature(audio_t) |
| content_emb_t, _, _, _, spk_emb_t = fa_decoder(enc_out, prosody, eval_vq=False, vq=True) |
| return content_emb_t.squeeze(0), spk_emb_t |
| return None, None |
|
|
| |
| |
| with ThreadPoolExecutor(max_workers=4) as executor: |
| future_f0 = executor.submit(task_f0) |
| future_vol = executor.submit(task_volume) |
| future_mel = executor.submit(task_mel) |
| future_enc = executor.submit(task_encoder) |
|
|
| |
| f0 = future_f0.result() |
| volume = future_vol.result() |
| mel_t = future_mel.result() |
| content_emb_t, spk_emb_t = future_enc.result() |
|
|
| |
|
|
| |
| if f0 is None or volume is None or mel_t is None: |
| return None |
|
|
| seq_len = mel_t.shape[0] |
| volume_t = align_data(torch.from_numpy(volume).float(), seq_len) |
| |
| |
| if fa_encoder is not None: |
| content_emb_t = repeat_expand_2d(content_emb_t, seq_len).T |
| else: |
| content_emb_t = adjust_length(content_emb_t, seq_len) |
|
|
| |
| f0_origin = f0.copy() |
| if f0_interpolate_mode == 'full': |
| uv = (f0 == 0) |
| if len(f0[~uv]) > 0: |
| f0[uv] = np.interp(np.where(uv)[0], np.where(~uv)[0], f0[~uv]) |
| else: |
| return None |
| elif f0_interpolate_mode == 'part': |
| f0 = edge_padding(f0) |
| |
| f0_t = align_data(torch.from_numpy(f0).float(), seq_len) |
|
|
| return dict( |
| vq_post=content_emb_t, |
| spk=spk_emb_t, |
| f0=f0_t, |
| f0_origin=f0_origin, |
| vol=volume_t, |
| name=name, |
| mel=mel_t |
| ) |
|
|