mirror of https://github.com/coqui-ai/TTS.git
Remove Tensorflow requeriment (#1225)
* Remove TF modules * Remove TF unit tests * Remove TF vocoder modules * Remove TF convert scripts * Remove TF requirement * Remove the Docs TF instructions * Remove TF inference supportpull/1227/head
parent
44c7d1a826
commit
0860d73cf8
1
Makefile
1
Makefile
|
@ -41,7 +41,6 @@ system-deps: ## install linux system deps
|
|||
|
||||
dev-deps: ## install development deps
|
||||
pip install -r requirements.dev.txt
|
||||
pip install -r requirements.tf.txt
|
||||
|
||||
doc-deps: ## install docs dependencies
|
||||
pip install -r docs/requirements.txt
|
||||
|
|
11
README.md
11
README.md
|
@ -61,7 +61,6 @@ Underlined "TTS*" and "Judy*" are 🐸TTS models
|
|||
- Detailed training logs on the terminal and Tensorboard.
|
||||
- Support for Multi-speaker TTS.
|
||||
- Efficient, flexible, lightweight but feature complete `Trainer API`.
|
||||
- Ability to convert PyTorch models to Tensorflow 2.0 and TFLite for inference.
|
||||
- Released and read-to-use models.
|
||||
- Tools to curate Text2Speech datasets under```dataset_analysis```.
|
||||
- Utilities to use and test your models.
|
||||
|
@ -113,17 +112,11 @@ If you are only interested in [synthesizing speech](https://tts.readthedocs.io/e
|
|||
pip install TTS
|
||||
```
|
||||
|
||||
By default, this only installs the requirements for PyTorch. To install the tensorflow dependencies as well, use the `tf` extra.
|
||||
|
||||
```bash
|
||||
pip install TTS[tf]
|
||||
```
|
||||
|
||||
If you plan to code or train models, clone 🐸TTS and install it locally.
|
||||
|
||||
```bash
|
||||
git clone https://github.com/coqui-ai/TTS
|
||||
pip install -e .[all,dev,notebooks,tf] # Select the relevant extras
|
||||
pip install -e .[all,dev,notebooks] # Select the relevant extras
|
||||
```
|
||||
|
||||
If you are on Ubuntu (Debian), you can also run following commands for installation.
|
||||
|
@ -204,12 +197,10 @@ If you are on Windows, 👑@GuyPaddock wrote installation instructions [here](ht
|
|||
|- train*.py (train your target model.)
|
||||
|- distribute.py (train your TTS model using Multiple GPUs.)
|
||||
|- compute_statistics.py (compute dataset statistics for normalization.)
|
||||
|- convert*.py (convert target torch model to TF.)
|
||||
|- ...
|
||||
|- tts/ (text to speech models)
|
||||
|- layers/ (model layer definitions)
|
||||
|- models/ (model definitions)
|
||||
|- tf/ (Tensorflow 2 utilities and model implementations)
|
||||
|- utils/ (model specific utilities.)
|
||||
|- speaker_encoder/ (Speaker Encoder models.)
|
||||
|- (same)
|
||||
|
|
|
@ -1,25 +0,0 @@
|
|||
# Convert Tensorflow Tacotron2 model to TF-Lite binary
|
||||
|
||||
import argparse
|
||||
|
||||
from TTS.utils.io import load_config
|
||||
from TTS.vocoder.tf.utils.generic_utils import setup_generator
|
||||
from TTS.vocoder.tf.utils.io import load_checkpoint
|
||||
from TTS.vocoder.tf.utils.tflite import convert_melgan_to_tflite
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--tf_model", type=str, help="Path to target torch model to be converted to TF.")
|
||||
parser.add_argument("--config_path", type=str, help="Path to config file of torch model.")
|
||||
parser.add_argument("--output_path", type=str, help="path to tflite output binary.")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set constants
|
||||
CONFIG = load_config(args.config_path)
|
||||
|
||||
# load the model
|
||||
model = setup_generator(CONFIG)
|
||||
model.build_inference()
|
||||
model = load_checkpoint(model, args.tf_model)
|
||||
|
||||
# create tflite model
|
||||
tflite_model = convert_melgan_to_tflite(model, output_path=args.output_path)
|
|
@ -1,105 +0,0 @@
|
|||
import argparse
|
||||
import os
|
||||
from difflib import SequenceMatcher
|
||||
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
import torch
|
||||
|
||||
from TTS.utils.io import load_config, load_fsspec
|
||||
from TTS.vocoder.tf.utils.convert_torch_to_tf_utils import (
|
||||
compare_torch_tf,
|
||||
convert_tf_name,
|
||||
transfer_weights_torch_to_tf,
|
||||
)
|
||||
from TTS.vocoder.tf.utils.generic_utils import setup_generator as setup_tf_generator
|
||||
from TTS.vocoder.tf.utils.io import save_checkpoint
|
||||
from TTS.vocoder.utils.generic_utils import setup_generator
|
||||
|
||||
# prevent GPU use
|
||||
os.environ["CUDA_VISIBLE_DEVICES"] = ""
|
||||
|
||||
# define args
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--torch_model_path", type=str, help="Path to target torch model to be converted to TF.")
|
||||
parser.add_argument("--config_path", type=str, help="Path to config file of torch model.")
|
||||
parser.add_argument("--output_path", type=str, help="path to output file including file name to save TF model.")
|
||||
args = parser.parse_args()
|
||||
|
||||
# load model config
|
||||
config_path = args.config_path
|
||||
c = load_config(config_path)
|
||||
num_speakers = 0
|
||||
|
||||
# init torch model
|
||||
model = setup_generator(c)
|
||||
checkpoint = load_fsspec(args.torch_model_path, map_location=torch.device("cpu"))
|
||||
state_dict = checkpoint["model"]
|
||||
model.load_state_dict(state_dict)
|
||||
model.remove_weight_norm()
|
||||
state_dict = model.state_dict()
|
||||
|
||||
# init tf model
|
||||
model_tf = setup_tf_generator(c)
|
||||
|
||||
common_sufix = "/.ATTRIBUTES/VARIABLE_VALUE"
|
||||
# get tf_model graph by passing an input
|
||||
# B x D x T
|
||||
dummy_input = tf.random.uniform((7, 80, 64), dtype=tf.float32)
|
||||
mel_pred = model_tf(dummy_input, training=False)
|
||||
|
||||
# get tf variables
|
||||
tf_vars = model_tf.weights
|
||||
|
||||
# match variable names with fuzzy logic
|
||||
torch_var_names = list(state_dict.keys())
|
||||
tf_var_names = [we.name for we in model_tf.weights]
|
||||
var_map = []
|
||||
for tf_name in tf_var_names:
|
||||
# skip re-mapped layer names
|
||||
if tf_name in [name[0] for name in var_map]:
|
||||
continue
|
||||
tf_name_edited = convert_tf_name(tf_name)
|
||||
ratios = [SequenceMatcher(None, torch_name, tf_name_edited).ratio() for torch_name in torch_var_names]
|
||||
max_idx = np.argmax(ratios)
|
||||
matching_name = torch_var_names[max_idx]
|
||||
del torch_var_names[max_idx]
|
||||
var_map.append((tf_name, matching_name))
|
||||
|
||||
# pass weights
|
||||
tf_vars = transfer_weights_torch_to_tf(tf_vars, dict(var_map), state_dict)
|
||||
|
||||
# Compare TF and TORCH models
|
||||
# check embedding outputs
|
||||
model.eval()
|
||||
dummy_input_torch = torch.ones((1, 80, 10))
|
||||
dummy_input_tf = tf.convert_to_tensor(dummy_input_torch.numpy())
|
||||
dummy_input_tf = tf.transpose(dummy_input_tf, perm=[0, 2, 1])
|
||||
dummy_input_tf = tf.expand_dims(dummy_input_tf, 2)
|
||||
|
||||
out_torch = model.layers[0](dummy_input_torch)
|
||||
out_tf = model_tf.model_layers[0](dummy_input_tf)
|
||||
out_tf_ = tf.transpose(out_tf, perm=[0, 3, 2, 1])[:, :, 0, :]
|
||||
|
||||
assert compare_torch_tf(out_torch, out_tf_) < 1e-5
|
||||
|
||||
for i in range(1, len(model.layers)):
|
||||
print(f"{i} -> {model.layers[i]} vs {model_tf.model_layers[i]}")
|
||||
out_torch = model.layers[i](out_torch)
|
||||
out_tf = model_tf.model_layers[i](out_tf)
|
||||
out_tf_ = tf.transpose(out_tf, perm=[0, 3, 2, 1])[:, :, 0, :]
|
||||
diff = compare_torch_tf(out_torch, out_tf_)
|
||||
assert diff < 1e-5, diff
|
||||
|
||||
torch.manual_seed(0)
|
||||
dummy_input_torch = torch.rand((1, 80, 100))
|
||||
dummy_input_tf = tf.convert_to_tensor(dummy_input_torch.numpy())
|
||||
model.inference_padding = 0
|
||||
model_tf.inference_padding = 0
|
||||
output_torch = model.inference(dummy_input_torch)
|
||||
output_tf = model_tf(dummy_input_tf, training=False)
|
||||
assert compare_torch_tf(output_torch, output_tf) < 1e-5, compare_torch_tf(output_torch, output_tf)
|
||||
|
||||
# save tf model
|
||||
save_checkpoint(model_tf, checkpoint["step"], checkpoint["epoch"], args.output_path)
|
||||
print(" > Model conversion is successfully completed :).")
|
|
@ -1,30 +0,0 @@
|
|||
# Convert Tensorflow Tacotron2 model to TF-Lite binary
|
||||
|
||||
import argparse
|
||||
|
||||
from TTS.tts.tf.utils.generic_utils import setup_model
|
||||
from TTS.tts.tf.utils.io import load_checkpoint
|
||||
from TTS.tts.tf.utils.tflite import convert_tacotron2_to_tflite
|
||||
from TTS.tts.utils.text.symbols import phonemes, symbols
|
||||
from TTS.utils.io import load_config
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--tf_model", type=str, help="Path to target torch model to be converted to TF.")
|
||||
parser.add_argument("--config_path", type=str, help="Path to config file of torch model.")
|
||||
parser.add_argument("--output_path", type=str, help="path to tflite output binary.")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Set constants
|
||||
CONFIG = load_config(args.config_path)
|
||||
|
||||
# load the model
|
||||
c = CONFIG
|
||||
num_speakers = 0
|
||||
num_chars = len(phonemes) if c.use_phonemes else len(symbols)
|
||||
model = setup_model(num_chars, num_speakers, c, enable_tflite=True)
|
||||
model.build_inference()
|
||||
model = load_checkpoint(model, args.tf_model)
|
||||
model.decoder.set_max_decoder_steps(1000)
|
||||
|
||||
# create tflite model
|
||||
tflite_model = convert_tacotron2_to_tflite(model, output_path=args.output_path)
|
|
@ -1,187 +0,0 @@
|
|||
import argparse
|
||||
import os
|
||||
import sys
|
||||
from difflib import SequenceMatcher
|
||||
from pprint import pprint
|
||||
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
import torch
|
||||
|
||||
from TTS.tts.models import setup_model
|
||||
from TTS.tts.tf.models.tacotron2 import Tacotron2
|
||||
from TTS.tts.tf.utils.convert_torch_to_tf_utils import compare_torch_tf, convert_tf_name, transfer_weights_torch_to_tf
|
||||
from TTS.tts.tf.utils.generic_utils import save_checkpoint
|
||||
from TTS.tts.utils.text.symbols import phonemes, symbols
|
||||
from TTS.utils.io import load_config, load_fsspec
|
||||
|
||||
sys.path.append("/home/erogol/Projects")
|
||||
os.environ["CUDA_VISIBLE_DEVICES"] = ""
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--torch_model_path", type=str, help="Path to target torch model to be converted to TF.")
|
||||
parser.add_argument("--config_path", type=str, help="Path to config file of torch model.")
|
||||
parser.add_argument("--output_path", type=str, help="path to output file including file name to save TF model.")
|
||||
args = parser.parse_args()
|
||||
|
||||
# load model config
|
||||
config_path = args.config_path
|
||||
c = load_config(config_path)
|
||||
num_speakers = 0
|
||||
|
||||
# init torch model
|
||||
model = setup_model(c)
|
||||
checkpoint = load_fsspec(args.torch_model_path, map_location=torch.device("cpu"))
|
||||
state_dict = checkpoint["model"]
|
||||
model.load_state_dict(state_dict)
|
||||
|
||||
# init tf model
|
||||
num_chars = len(phonemes) if c.use_phonemes else len(symbols)
|
||||
model_tf = Tacotron2(
|
||||
num_chars=num_chars,
|
||||
num_speakers=num_speakers,
|
||||
r=model.decoder.r,
|
||||
out_channels=c.audio["num_mels"],
|
||||
decoder_output_dim=c.audio["num_mels"],
|
||||
attn_type=c.attention_type,
|
||||
attn_win=c.windowing,
|
||||
attn_norm=c.attention_norm,
|
||||
prenet_type=c.prenet_type,
|
||||
prenet_dropout=c.prenet_dropout,
|
||||
forward_attn=c.use_forward_attn,
|
||||
trans_agent=c.transition_agent,
|
||||
forward_attn_mask=c.forward_attn_mask,
|
||||
location_attn=c.location_attn,
|
||||
attn_K=c.attention_heads,
|
||||
separate_stopnet=c.separate_stopnet,
|
||||
bidirectional_decoder=c.bidirectional_decoder,
|
||||
)
|
||||
|
||||
# set initial layer mapping - these are not captured by the below heuristic approach
|
||||
# TODO: set layer names so that we can remove these manual matching
|
||||
common_sufix = "/.ATTRIBUTES/VARIABLE_VALUE"
|
||||
var_map = [
|
||||
("embedding/embeddings:0", "embedding.weight"),
|
||||
("encoder/lstm/forward_lstm/lstm_cell_1/kernel:0", "encoder.lstm.weight_ih_l0"),
|
||||
("encoder/lstm/forward_lstm/lstm_cell_1/recurrent_kernel:0", "encoder.lstm.weight_hh_l0"),
|
||||
("encoder/lstm/backward_lstm/lstm_cell_2/kernel:0", "encoder.lstm.weight_ih_l0_reverse"),
|
||||
("encoder/lstm/backward_lstm/lstm_cell_2/recurrent_kernel:0", "encoder.lstm.weight_hh_l0_reverse"),
|
||||
("encoder/lstm/forward_lstm/lstm_cell_1/bias:0", ("encoder.lstm.bias_ih_l0", "encoder.lstm.bias_hh_l0")),
|
||||
(
|
||||
"encoder/lstm/backward_lstm/lstm_cell_2/bias:0",
|
||||
("encoder.lstm.bias_ih_l0_reverse", "encoder.lstm.bias_hh_l0_reverse"),
|
||||
),
|
||||
("attention/v/kernel:0", "decoder.attention.v.linear_layer.weight"),
|
||||
("decoder/linear_projection/kernel:0", "decoder.linear_projection.linear_layer.weight"),
|
||||
("decoder/stopnet/kernel:0", "decoder.stopnet.1.linear_layer.weight"),
|
||||
]
|
||||
|
||||
# %%
|
||||
# get tf_model graph
|
||||
model_tf.build_inference()
|
||||
|
||||
# get tf variables
|
||||
tf_vars = model_tf.weights
|
||||
|
||||
# match variable names with fuzzy logic
|
||||
torch_var_names = list(state_dict.keys())
|
||||
tf_var_names = [we.name for we in model_tf.weights]
|
||||
for tf_name in tf_var_names:
|
||||
# skip re-mapped layer names
|
||||
if tf_name in [name[0] for name in var_map]:
|
||||
continue
|
||||
tf_name_edited = convert_tf_name(tf_name)
|
||||
ratios = [SequenceMatcher(None, torch_name, tf_name_edited).ratio() for torch_name in torch_var_names]
|
||||
max_idx = np.argmax(ratios)
|
||||
matching_name = torch_var_names[max_idx]
|
||||
del torch_var_names[max_idx]
|
||||
var_map.append((tf_name, matching_name))
|
||||
|
||||
pprint(var_map)
|
||||
pprint(torch_var_names)
|
||||
|
||||
# pass weights
|
||||
tf_vars = transfer_weights_torch_to_tf(tf_vars, dict(var_map), state_dict)
|
||||
|
||||
# Compare TF and TORCH models
|
||||
# %%
|
||||
# check embedding outputs
|
||||
model.eval()
|
||||
input_ids = torch.randint(0, 24, (1, 128)).long()
|
||||
|
||||
o_t = model.embedding(input_ids)
|
||||
o_tf = model_tf.embedding(input_ids.detach().numpy())
|
||||
assert abs(o_t.detach().numpy() - o_tf.numpy()).sum() < 1e-5, abs(o_t.detach().numpy() - o_tf.numpy()).sum()
|
||||
|
||||
# compare encoder outputs
|
||||
oo_en = model.encoder.inference(o_t.transpose(1, 2))
|
||||
ooo_en = model_tf.encoder(o_t.detach().numpy(), training=False)
|
||||
assert compare_torch_tf(oo_en, ooo_en) < 1e-5
|
||||
|
||||
# pylint: disable=redefined-builtin
|
||||
# compare decoder.attention_rnn
|
||||
inp = torch.rand([1, 768])
|
||||
inp_tf = inp.numpy()
|
||||
model.decoder._init_states(oo_en, mask=None) # pylint: disable=protected-access
|
||||
output, cell_state = model.decoder.attention_rnn(inp)
|
||||
states = model_tf.decoder.build_decoder_initial_states(1, 512, 128)
|
||||
output_tf, memory_state = model_tf.decoder.attention_rnn(inp_tf, states[2], training=False)
|
||||
assert compare_torch_tf(output, output_tf).mean() < 1e-5
|
||||
|
||||
query = output
|
||||
inputs = torch.rand([1, 128, 512])
|
||||
query_tf = query.detach().numpy()
|
||||
inputs_tf = inputs.numpy()
|
||||
|
||||
# compare decoder.attention
|
||||
model.decoder.attention.init_states(inputs)
|
||||
processes_inputs = model.decoder.attention.preprocess_inputs(inputs)
|
||||
loc_attn, proc_query = model.decoder.attention.get_location_attention(query, processes_inputs)
|
||||
context = model.decoder.attention(query, inputs, processes_inputs, None)
|
||||
|
||||
attention_states = model_tf.decoder.build_decoder_initial_states(1, 512, 128)[-1]
|
||||
model_tf.decoder.attention.process_values(tf.convert_to_tensor(inputs_tf))
|
||||
loc_attn_tf, proc_query_tf = model_tf.decoder.attention.get_loc_attn(query_tf, attention_states)
|
||||
context_tf, attention, attention_states = model_tf.decoder.attention(query_tf, attention_states, training=False)
|
||||
|
||||
assert compare_torch_tf(loc_attn, loc_attn_tf).mean() < 1e-5
|
||||
assert compare_torch_tf(proc_query, proc_query_tf).mean() < 1e-5
|
||||
assert compare_torch_tf(context, context_tf) < 1e-5
|
||||
|
||||
# compare decoder.decoder_rnn
|
||||
input = torch.rand([1, 1536])
|
||||
input_tf = input.numpy()
|
||||
model.decoder._init_states(oo_en, mask=None) # pylint: disable=protected-access
|
||||
output, cell_state = model.decoder.decoder_rnn(input, [model.decoder.decoder_hidden, model.decoder.decoder_cell])
|
||||
states = model_tf.decoder.build_decoder_initial_states(1, 512, 128)
|
||||
output_tf, memory_state = model_tf.decoder.decoder_rnn(input_tf, states[3], training=False)
|
||||
assert abs(input - input_tf).mean() < 1e-5
|
||||
assert compare_torch_tf(output, output_tf).mean() < 1e-5
|
||||
|
||||
# compare decoder.linear_projection
|
||||
input = torch.rand([1, 1536])
|
||||
input_tf = input.numpy()
|
||||
output = model.decoder.linear_projection(input)
|
||||
output_tf = model_tf.decoder.linear_projection(input_tf, training=False)
|
||||
assert compare_torch_tf(output, output_tf) < 1e-5
|
||||
|
||||
# compare decoder outputs
|
||||
model.decoder.max_decoder_steps = 100
|
||||
model_tf.decoder.set_max_decoder_steps(100)
|
||||
output, align, stop = model.decoder.inference(oo_en)
|
||||
states = model_tf.decoder.build_decoder_initial_states(1, 512, 128)
|
||||
output_tf, align_tf, stop_tf = model_tf.decoder(ooo_en, states, training=False)
|
||||
assert compare_torch_tf(output.transpose(1, 2), output_tf) < 1e-4
|
||||
|
||||
# compare the whole model output
|
||||
outputs_torch = model.inference(input_ids)
|
||||
outputs_tf = model_tf(tf.convert_to_tensor(input_ids.numpy()))
|
||||
print(abs(outputs_torch[0].numpy()[:, 0] - outputs_tf[0].numpy()[:, 0]).mean())
|
||||
assert compare_torch_tf(outputs_torch[2][:, 50, :], outputs_tf[2][:, 50, :]) < 1e-5
|
||||
assert compare_torch_tf(outputs_torch[0], outputs_tf[0]) < 1e-4
|
||||
|
||||
# %%
|
||||
# save tf model
|
||||
save_checkpoint(model_tf, None, checkpoint["step"], checkpoint["epoch"], checkpoint["r"], args.output_path)
|
||||
print(" > Model conversion is successfully completed :).")
|
|
@ -6,7 +6,6 @@ from .attentions import init_attn
|
|||
from .common_layers import Linear, Prenet
|
||||
|
||||
|
||||
# NOTE: linter has a problem with the current TF release
|
||||
# pylint: disable=no-value-for-parameter
|
||||
# pylint: disable=unexpected-keyword-arg
|
||||
class ConvBNBlock(nn.Module):
|
||||
|
|
|
@ -1,20 +0,0 @@
|
|||
## Utilities to Convert Models to Tensorflow2
|
||||
Here there are experimental utilities to convert trained Torch models to Tensorflow (2.2>=).
|
||||
|
||||
Converting Torch models to TF enables all the TF toolkit to be used for better deployment and device specific optimizations.
|
||||
|
||||
Note that we do not plan to share training scripts for Tensorflow in near future. But any contribution in that direction would be more than welcome.
|
||||
|
||||
To see how you can use TF model at inference, check the notebook.
|
||||
|
||||
This is an experimental release. If you encounter an error, please put an issue or in the best send a PR but you are mostly on your own.
|
||||
|
||||
|
||||
### Converting a Model
|
||||
- Run ```convert_tacotron2_torch_to_tf.py --torch_model_path /path/to/torch/model.pth.tar --config_path /path/to/model/config.json --output_path /path/to/output/tf/model``` with the right arguments.
|
||||
|
||||
### Known issues ans limitations
|
||||
- We use a custom model load/save mechanism which enables us to store model related information with models weights. (Similar to Torch). However, it is prone to random errors.
|
||||
- Current TF model implementation is slightly slower than Torch model. Hopefully, it'll get better with improving TF support for eager mode and ```tf.function```.
|
||||
- TF implementation of Tacotron2 only supports regular Tacotron2 as in the paper.
|
||||
- You can only convert models trained after TF model implementation since model layers has been updated in Torch model.
|
|
@ -1,301 +0,0 @@
|
|||
import tensorflow as tf
|
||||
from tensorflow import keras
|
||||
from tensorflow.python.ops import math_ops
|
||||
|
||||
# from tensorflow_addons.seq2seq import BahdanauAttention
|
||||
|
||||
# NOTE: linter has a problem with the current TF release
|
||||
# pylint: disable=no-value-for-parameter
|
||||
# pylint: disable=unexpected-keyword-arg
|
||||
|
||||
|
||||
class Linear(keras.layers.Layer):
|
||||
def __init__(self, units, use_bias, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.linear_layer = keras.layers.Dense(units, use_bias=use_bias, name="linear_layer")
|
||||
self.activation = keras.layers.ReLU()
|
||||
|
||||
def call(self, x):
|
||||
"""
|
||||
shapes:
|
||||
x: B x T x C
|
||||
"""
|
||||
return self.activation(self.linear_layer(x))
|
||||
|
||||
|
||||
class LinearBN(keras.layers.Layer):
|
||||
def __init__(self, units, use_bias, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.linear_layer = keras.layers.Dense(units, use_bias=use_bias, name="linear_layer")
|
||||
self.batch_normalization = keras.layers.BatchNormalization(
|
||||
axis=-1, momentum=0.90, epsilon=1e-5, name="batch_normalization"
|
||||
)
|
||||
self.activation = keras.layers.ReLU()
|
||||
|
||||
def call(self, x, training=None):
|
||||
"""
|
||||
shapes:
|
||||
x: B x T x C
|
||||
"""
|
||||
out = self.linear_layer(x)
|
||||
out = self.batch_normalization(out, training=training)
|
||||
return self.activation(out)
|
||||
|
||||
|
||||
class Prenet(keras.layers.Layer):
|
||||
def __init__(self, prenet_type, prenet_dropout, units, bias, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.prenet_type = prenet_type
|
||||
self.prenet_dropout = prenet_dropout
|
||||
self.linear_layers = []
|
||||
if prenet_type == "bn":
|
||||
self.linear_layers += [
|
||||
LinearBN(unit, use_bias=bias, name=f"linear_layer_{idx}") for idx, unit in enumerate(units)
|
||||
]
|
||||
elif prenet_type == "original":
|
||||
self.linear_layers += [
|
||||
Linear(unit, use_bias=bias, name=f"linear_layer_{idx}") for idx, unit in enumerate(units)
|
||||
]
|
||||
else:
|
||||
raise RuntimeError(" [!] Unknown prenet type.")
|
||||
if prenet_dropout:
|
||||
self.dropout = keras.layers.Dropout(rate=0.5)
|
||||
|
||||
def call(self, x, training=None):
|
||||
"""
|
||||
shapes:
|
||||
x: B x T x C
|
||||
"""
|
||||
for linear in self.linear_layers:
|
||||
if self.prenet_dropout:
|
||||
x = self.dropout(linear(x), training=training)
|
||||
else:
|
||||
x = linear(x)
|
||||
return x
|
||||
|
||||
|
||||
def _sigmoid_norm(score):
|
||||
attn_weights = tf.nn.sigmoid(score)
|
||||
attn_weights = attn_weights / tf.reduce_sum(attn_weights, axis=1, keepdims=True)
|
||||
return attn_weights
|
||||
|
||||
|
||||
class Attention(keras.layers.Layer):
|
||||
"""TODO: implement forward_attention
|
||||
TODO: location sensitive attention
|
||||
TODO: implement attention windowing"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
attn_dim,
|
||||
use_loc_attn,
|
||||
loc_attn_n_filters,
|
||||
loc_attn_kernel_size,
|
||||
use_windowing,
|
||||
norm,
|
||||
use_forward_attn,
|
||||
use_trans_agent,
|
||||
use_forward_attn_mask,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self.use_loc_attn = use_loc_attn
|
||||
self.loc_attn_n_filters = loc_attn_n_filters
|
||||
self.loc_attn_kernel_size = loc_attn_kernel_size
|
||||
self.use_windowing = use_windowing
|
||||
self.norm = norm
|
||||
self.use_forward_attn = use_forward_attn
|
||||
self.use_trans_agent = use_trans_agent
|
||||
self.use_forward_attn_mask = use_forward_attn_mask
|
||||
self.query_layer = tf.keras.layers.Dense(attn_dim, use_bias=False, name="query_layer/linear_layer")
|
||||
self.inputs_layer = tf.keras.layers.Dense(
|
||||
attn_dim, use_bias=False, name=f"{self.name}/inputs_layer/linear_layer"
|
||||
)
|
||||
self.v = tf.keras.layers.Dense(1, use_bias=True, name="v/linear_layer")
|
||||
if use_loc_attn:
|
||||
self.location_conv1d = keras.layers.Conv1D(
|
||||
filters=loc_attn_n_filters,
|
||||
kernel_size=loc_attn_kernel_size,
|
||||
padding="same",
|
||||
use_bias=False,
|
||||
name="location_layer/location_conv1d",
|
||||
)
|
||||
self.location_dense = keras.layers.Dense(attn_dim, use_bias=False, name="location_layer/location_dense")
|
||||
if norm == "softmax":
|
||||
self.norm_func = tf.nn.softmax
|
||||
elif norm == "sigmoid":
|
||||
self.norm_func = _sigmoid_norm
|
||||
else:
|
||||
raise ValueError("Unknown value for attention norm type")
|
||||
|
||||
def init_states(self, batch_size, value_length):
|
||||
states = []
|
||||
if self.use_loc_attn:
|
||||
attention_cum = tf.zeros([batch_size, value_length])
|
||||
attention_old = tf.zeros([batch_size, value_length])
|
||||
states = [attention_cum, attention_old]
|
||||
if self.use_forward_attn:
|
||||
alpha = tf.concat([tf.ones([batch_size, 1]), tf.zeros([batch_size, value_length])[:, :-1] + 1e-7], 1)
|
||||
states.append(alpha)
|
||||
return tuple(states)
|
||||
|
||||
def process_values(self, values):
|
||||
"""cache values for decoder iterations"""
|
||||
# pylint: disable=attribute-defined-outside-init
|
||||
self.processed_values = self.inputs_layer(values)
|
||||
self.values = values
|
||||
|
||||
def get_loc_attn(self, query, states):
|
||||
"""compute location attention, query layer and
|
||||
unnorm. attention weights"""
|
||||
attention_cum, attention_old = states[:2]
|
||||
attn_cat = tf.stack([attention_old, attention_cum], axis=2)
|
||||
|
||||
processed_query = self.query_layer(tf.expand_dims(query, 1))
|
||||
processed_attn = self.location_dense(self.location_conv1d(attn_cat))
|
||||
score = self.v(tf.nn.tanh(self.processed_values + processed_query + processed_attn))
|
||||
score = tf.squeeze(score, axis=2)
|
||||
return score, processed_query
|
||||
|
||||
def get_attn(self, query):
|
||||
"""compute query layer and unnormalized attention weights"""
|
||||
processed_query = self.query_layer(tf.expand_dims(query, 1))
|
||||
score = self.v(tf.nn.tanh(self.processed_values + processed_query))
|
||||
score = tf.squeeze(score, axis=2)
|
||||
return score, processed_query
|
||||
|
||||
def apply_score_masking(self, score, mask): # pylint: disable=no-self-use
|
||||
"""ignore sequence paddings"""
|
||||
padding_mask = tf.expand_dims(math_ops.logical_not(mask), 2)
|
||||
# Bias so padding positions do not contribute to attention distribution.
|
||||
score -= 1.0e9 * math_ops.cast(padding_mask, dtype=tf.float32)
|
||||
return score
|
||||
|
||||
def apply_forward_attention(self, alignment, alpha): # pylint: disable=no-self-use
|
||||
# forward attention
|
||||
fwd_shifted_alpha = tf.pad(alpha[:, :-1], ((0, 0), (1, 0)), constant_values=0.0)
|
||||
# compute transition potentials
|
||||
new_alpha = ((1 - 0.5) * alpha + 0.5 * fwd_shifted_alpha + 1e-8) * alignment
|
||||
# renormalize attention weights
|
||||
new_alpha = new_alpha / tf.reduce_sum(new_alpha, axis=1, keepdims=True)
|
||||
return new_alpha
|
||||
|
||||
def update_states(self, old_states, scores_norm, attn_weights, new_alpha=None):
|
||||
states = []
|
||||
if self.use_loc_attn:
|
||||
states = [old_states[0] + scores_norm, attn_weights]
|
||||
if self.use_forward_attn:
|
||||
states.append(new_alpha)
|
||||
return tuple(states)
|
||||
|
||||
def call(self, query, states):
|
||||
"""
|
||||
shapes:
|
||||
query: B x D
|
||||
"""
|
||||
if self.use_loc_attn:
|
||||
score, _ = self.get_loc_attn(query, states)
|
||||
else:
|
||||
score, _ = self.get_attn(query)
|
||||
|
||||
# TODO: masking
|
||||
# if mask is not None:
|
||||
# self.apply_score_masking(score, mask)
|
||||
# attn_weights shape == (batch_size, max_length, 1)
|
||||
|
||||
# normalize attention scores
|
||||
scores_norm = self.norm_func(score)
|
||||
attn_weights = scores_norm
|
||||
|
||||
# apply forward attention
|
||||
new_alpha = None
|
||||
if self.use_forward_attn:
|
||||
new_alpha = self.apply_forward_attention(attn_weights, states[-1])
|
||||
attn_weights = new_alpha
|
||||
|
||||
# update states tuple
|
||||
# states = (cum_attn_weights, attn_weights, new_alpha)
|
||||
states = self.update_states(states, scores_norm, attn_weights, new_alpha)
|
||||
|
||||
# context_vector shape after sum == (batch_size, hidden_size)
|
||||
context_vector = tf.matmul(
|
||||
tf.expand_dims(attn_weights, axis=2), self.values, transpose_a=True, transpose_b=False
|
||||
)
|
||||
context_vector = tf.squeeze(context_vector, axis=1)
|
||||
return context_vector, attn_weights, states
|
||||
|
||||
|
||||
# def _location_sensitive_score(processed_query, keys, processed_loc, attention_v, attention_b):
|
||||
# dtype = processed_query.dtype
|
||||
# num_units = keys.shape[-1].value or array_ops.shape(keys)[-1]
|
||||
# return tf.reduce_sum(attention_v * tf.tanh(keys + processed_query + processed_loc + attention_b), [2])
|
||||
|
||||
|
||||
# class LocationSensitiveAttention(BahdanauAttention):
|
||||
# def __init__(self,
|
||||
# units,
|
||||
# memory=None,
|
||||
# memory_sequence_length=None,
|
||||
# normalize=False,
|
||||
# probability_fn="softmax",
|
||||
# kernel_initializer="glorot_uniform",
|
||||
# dtype=None,
|
||||
# name="LocationSensitiveAttention",
|
||||
# location_attention_filters=32,
|
||||
# location_attention_kernel_size=31):
|
||||
|
||||
# super( self).__init__(units=units,
|
||||
# memory=memory,
|
||||
# memory_sequence_length=memory_sequence_length,
|
||||
# normalize=normalize,
|
||||
# probability_fn='softmax', ## parent module default
|
||||
# kernel_initializer=kernel_initializer,
|
||||
# dtype=dtype,
|
||||
# name=name)
|
||||
# if probability_fn == 'sigmoid':
|
||||
# self.probability_fn = lambda score, _: self._sigmoid_normalization(score)
|
||||
# self.location_conv = keras.layers.Conv1D(filters=location_attention_filters, kernel_size=location_attention_kernel_size, padding='same', use_bias=False)
|
||||
# self.location_dense = keras.layers.Dense(units, use_bias=False)
|
||||
# # self.v = keras.layers.Dense(1, use_bias=True)
|
||||
|
||||
# def _location_sensitive_score(self, processed_query, keys, processed_loc):
|
||||
# processed_query = tf.expand_dims(processed_query, 1)
|
||||
# return tf.reduce_sum(self.attention_v * tf.tanh(keys + processed_query + processed_loc), [2])
|
||||
|
||||
# def _location_sensitive(self, alignment_cum, alignment_old):
|
||||
# alignment_cat = tf.stack([alignment_cum, alignment_old], axis=2)
|
||||
# return self.location_dense(self.location_conv(alignment_cat))
|
||||
|
||||
# def _sigmoid_normalization(self, score):
|
||||
# return tf.nn.sigmoid(score) / tf.reduce_sum(tf.nn.sigmoid(score), axis=-1, keepdims=True)
|
||||
|
||||
# # def _apply_masking(self, score, mask):
|
||||
# # padding_mask = tf.expand_dims(math_ops.logical_not(mask), 2)
|
||||
# # # Bias so padding positions do not contribute to attention distribution.
|
||||
# # score -= 1.e9 * math_ops.cast(padding_mask, dtype=tf.float32)
|
||||
# # return score
|
||||
|
||||
# def _calculate_attention(self, query, state):
|
||||
# alignment_cum, alignment_old = state[:2]
|
||||
# processed_query = self.query_layer(
|
||||
# query) if self.query_layer else query
|
||||
# processed_loc = self._location_sensitive(alignment_cum, alignment_old)
|
||||
# score = self._location_sensitive_score(
|
||||
# processed_query,
|
||||
# self.keys,
|
||||
# processed_loc)
|
||||
# alignment = self.probability_fn(score, state)
|
||||
# alignment_cum = alignment_cum + alignment
|
||||
# state[0] = alignment_cum
|
||||
# state[1] = alignment
|
||||
# return alignment, state
|
||||
|
||||
# def compute_context(self, alignments):
|
||||
# expanded_alignments = tf.expand_dims(alignments, 1)
|
||||
# context = tf.matmul(expanded_alignments, self.values)
|
||||
# context = tf.squeeze(context, [1])
|
||||
# return context
|
||||
|
||||
# # def call(self, query, state):
|
||||
# # alignment, next_state = self._calculate_attention(query, state)
|
||||
# # return alignment, next_state
|
|
@ -1,322 +0,0 @@
|
|||
import tensorflow as tf
|
||||
from tensorflow import keras
|
||||
|
||||
from TTS.tts.tf.layers.tacotron.common_layers import Attention, Prenet
|
||||
from TTS.tts.tf.utils.tf_utils import shape_list
|
||||
|
||||
|
||||
# NOTE: linter has a problem with the current TF release
|
||||
# pylint: disable=no-value-for-parameter
|
||||
# pylint: disable=unexpected-keyword-arg
|
||||
class ConvBNBlock(keras.layers.Layer):
|
||||
def __init__(self, filters, kernel_size, activation, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.convolution1d = keras.layers.Conv1D(filters, kernel_size, padding="same", name="convolution1d")
|
||||
self.batch_normalization = keras.layers.BatchNormalization(
|
||||
axis=2, momentum=0.90, epsilon=1e-5, name="batch_normalization"
|
||||
)
|
||||
self.dropout = keras.layers.Dropout(rate=0.5, name="dropout")
|
||||
self.activation = keras.layers.Activation(activation, name="activation")
|
||||
|
||||
def call(self, x, training=None):
|
||||
o = self.convolution1d(x)
|
||||
o = self.batch_normalization(o, training=training)
|
||||
o = self.activation(o)
|
||||
o = self.dropout(o, training=training)
|
||||
return o
|
||||
|
||||
|
||||
class Postnet(keras.layers.Layer):
|
||||
def __init__(self, output_filters, num_convs, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.convolutions = []
|
||||
self.convolutions.append(ConvBNBlock(512, 5, "tanh", name="convolutions_0"))
|
||||
for idx in range(1, num_convs - 1):
|
||||
self.convolutions.append(ConvBNBlock(512, 5, "tanh", name=f"convolutions_{idx}"))
|
||||
self.convolutions.append(ConvBNBlock(output_filters, 5, "linear", name=f"convolutions_{idx+1}"))
|
||||
|
||||
def call(self, x, training=None):
|
||||
o = x
|
||||
for layer in self.convolutions:
|
||||
o = layer(o, training=training)
|
||||
return o
|
||||
|
||||
|
||||
class Encoder(keras.layers.Layer):
|
||||
def __init__(self, output_input_dim, **kwargs):
|
||||
super().__init__(**kwargs)
|
||||
self.convolutions = []
|
||||
for idx in range(3):
|
||||
self.convolutions.append(ConvBNBlock(output_input_dim, 5, "relu", name=f"convolutions_{idx}"))
|
||||
self.lstm = keras.layers.Bidirectional(
|
||||
keras.layers.LSTM(output_input_dim // 2, return_sequences=True, use_bias=True), name="lstm"
|
||||
)
|
||||
|
||||
def call(self, x, training=None):
|
||||
o = x
|
||||
for layer in self.convolutions:
|
||||
o = layer(o, training=training)
|
||||
o = self.lstm(o)
|
||||
return o
|
||||
|
||||
|
||||
class Decoder(keras.layers.Layer):
|
||||
# pylint: disable=unused-argument
|
||||
def __init__(
|
||||
self,
|
||||
frame_dim,
|
||||
r,
|
||||
attn_type,
|
||||
use_attn_win,
|
||||
attn_norm,
|
||||
prenet_type,
|
||||
prenet_dropout,
|
||||
use_forward_attn,
|
||||
use_trans_agent,
|
||||
use_forward_attn_mask,
|
||||
use_location_attn,
|
||||
attn_K,
|
||||
separate_stopnet,
|
||||
speaker_emb_dim,
|
||||
enable_tflite,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
self.frame_dim = frame_dim
|
||||
self.r_init = tf.constant(r, dtype=tf.int32)
|
||||
self.r = tf.constant(r, dtype=tf.int32)
|
||||
self.output_dim = r * self.frame_dim
|
||||
self.separate_stopnet = separate_stopnet
|
||||
self.enable_tflite = enable_tflite
|
||||
|
||||
# layer constants
|
||||
self.max_decoder_steps = tf.constant(1000, dtype=tf.int32)
|
||||
self.stop_thresh = tf.constant(0.5, dtype=tf.float32)
|
||||
|
||||
# model dimensions
|
||||
self.query_dim = 1024
|
||||
self.decoder_rnn_dim = 1024
|
||||
self.prenet_dim = 256
|
||||
self.attn_dim = 128
|
||||
self.p_attention_dropout = 0.1
|
||||
self.p_decoder_dropout = 0.1
|
||||
|
||||
self.prenet = Prenet(prenet_type, prenet_dropout, [self.prenet_dim, self.prenet_dim], bias=False, name="prenet")
|
||||
self.attention_rnn = keras.layers.LSTMCell(
|
||||
self.query_dim,
|
||||
use_bias=True,
|
||||
name="attention_rnn",
|
||||
)
|
||||
self.attention_rnn_dropout = keras.layers.Dropout(0.5)
|
||||
|
||||
# TODO: implement other attn options
|
||||
self.attention = Attention(
|
||||
attn_dim=self.attn_dim,
|
||||
use_loc_attn=True,
|
||||
loc_attn_n_filters=32,
|
||||
loc_attn_kernel_size=31,
|
||||
use_windowing=False,
|
||||
norm=attn_norm,
|
||||
use_forward_attn=use_forward_attn,
|
||||
use_trans_agent=use_trans_agent,
|
||||
use_forward_attn_mask=use_forward_attn_mask,
|
||||
name="attention",
|
||||
)
|
||||
self.decoder_rnn = keras.layers.LSTMCell(self.decoder_rnn_dim, use_bias=True, name="decoder_rnn")
|
||||
self.decoder_rnn_dropout = keras.layers.Dropout(0.5)
|
||||
self.linear_projection = keras.layers.Dense(self.frame_dim * r, name="linear_projection/linear_layer")
|
||||
self.stopnet = keras.layers.Dense(1, name="stopnet/linear_layer")
|
||||
|
||||
def set_max_decoder_steps(self, new_max_steps):
|
||||
self.max_decoder_steps = tf.constant(new_max_steps, dtype=tf.int32)
|
||||
|
||||
def set_r(self, new_r):
|
||||
self.r = tf.constant(new_r, dtype=tf.int32)
|
||||
self.output_dim = self.frame_dim * new_r
|
||||
|
||||
def build_decoder_initial_states(self, batch_size, memory_dim, memory_length):
|
||||
zero_frame = tf.zeros([batch_size, self.frame_dim])
|
||||
zero_context = tf.zeros([batch_size, memory_dim])
|
||||
attention_rnn_state = self.attention_rnn.get_initial_state(batch_size=batch_size, dtype=tf.float32)
|
||||
decoder_rnn_state = self.decoder_rnn.get_initial_state(batch_size=batch_size, dtype=tf.float32)
|
||||
attention_states = self.attention.init_states(batch_size, memory_length)
|
||||
return zero_frame, zero_context, attention_rnn_state, decoder_rnn_state, attention_states
|
||||
|
||||
def step(self, prenet_next, states, memory_seq_length=None, training=None):
|
||||
_, context_next, attention_rnn_state, decoder_rnn_state, attention_states = states
|
||||
attention_rnn_input = tf.concat([prenet_next, context_next], -1)
|
||||
attention_rnn_output, attention_rnn_state = self.attention_rnn(
|
||||
attention_rnn_input, attention_rnn_state, training=training
|
||||
)
|
||||
attention_rnn_output = self.attention_rnn_dropout(attention_rnn_output, training=training)
|
||||
context, attention, attention_states = self.attention(attention_rnn_output, attention_states, training=training)
|
||||
decoder_rnn_input = tf.concat([attention_rnn_output, context], -1)
|
||||
decoder_rnn_output, decoder_rnn_state = self.decoder_rnn(
|
||||
decoder_rnn_input, decoder_rnn_state, training=training
|
||||
)
|
||||
decoder_rnn_output = self.decoder_rnn_dropout(decoder_rnn_output, training=training)
|
||||
linear_projection_input = tf.concat([decoder_rnn_output, context], -1)
|
||||
output_frame = self.linear_projection(linear_projection_input, training=training)
|
||||
stopnet_input = tf.concat([decoder_rnn_output, output_frame], -1)
|
||||
stopnet_output = self.stopnet(stopnet_input, training=training)
|
||||
output_frame = output_frame[:, : self.r * self.frame_dim]
|
||||
states = (
|
||||
output_frame[:, self.frame_dim * (self.r - 1) :],
|
||||
context,
|
||||
attention_rnn_state,
|
||||
decoder_rnn_state,
|
||||
attention_states,
|
||||
)
|
||||
return output_frame, stopnet_output, states, attention
|
||||
|
||||
def decode(self, memory, states, frames, memory_seq_length=None):
|
||||
B, _, _ = shape_list(memory)
|
||||
num_iter = shape_list(frames)[1] // self.r
|
||||
# init states
|
||||
frame_zero = tf.expand_dims(states[0], 1)
|
||||
frames = tf.concat([frame_zero, frames], axis=1)
|
||||
outputs = tf.TensorArray(dtype=tf.float32, size=num_iter)
|
||||
attentions = tf.TensorArray(dtype=tf.float32, size=num_iter)
|
||||
stop_tokens = tf.TensorArray(dtype=tf.float32, size=num_iter)
|
||||
# pre-computes
|
||||
self.attention.process_values(memory)
|
||||
prenet_output = self.prenet(frames, training=True)
|
||||
step_count = tf.constant(0, dtype=tf.int32)
|
||||
|
||||
def _body(step, memory, prenet_output, states, outputs, stop_tokens, attentions):
|
||||
prenet_next = prenet_output[:, step]
|
||||
output, stop_token, states, attention = self.step(prenet_next, states, memory_seq_length)
|
||||
outputs = outputs.write(step, output)
|
||||
attentions = attentions.write(step, attention)
|
||||
stop_tokens = stop_tokens.write(step, stop_token)
|
||||
return step + 1, memory, prenet_output, states, outputs, stop_tokens, attentions
|
||||
|
||||
_, memory, _, states, outputs, stop_tokens, attentions = tf.while_loop(
|
||||
lambda *arg: True,
|
||||
_body,
|
||||
loop_vars=(step_count, memory, prenet_output, states, outputs, stop_tokens, attentions),
|
||||
parallel_iterations=32,
|
||||
swap_memory=True,
|
||||
maximum_iterations=num_iter,
|
||||
)
|
||||
|
||||
outputs = outputs.stack()
|
||||
attentions = attentions.stack()
|
||||
stop_tokens = stop_tokens.stack()
|
||||
outputs = tf.transpose(outputs, [1, 0, 2])
|
||||
attentions = tf.transpose(attentions, [1, 0, 2])
|
||||
stop_tokens = tf.transpose(stop_tokens, [1, 0, 2])
|
||||
stop_tokens = tf.squeeze(stop_tokens, axis=2)
|
||||
outputs = tf.reshape(outputs, [B, -1, self.frame_dim])
|
||||
return outputs, stop_tokens, attentions
|
||||
|
||||
def decode_inference(self, memory, states):
|
||||
B, _, _ = shape_list(memory)
|
||||
# init states
|
||||
outputs = tf.TensorArray(dtype=tf.float32, size=0, clear_after_read=False, dynamic_size=True)
|
||||
attentions = tf.TensorArray(dtype=tf.float32, size=0, clear_after_read=False, dynamic_size=True)
|
||||
stop_tokens = tf.TensorArray(dtype=tf.float32, size=0, clear_after_read=False, dynamic_size=True)
|
||||
|
||||
# pre-computes
|
||||
self.attention.process_values(memory)
|
||||
|
||||
# iter vars
|
||||
stop_flag = tf.constant(False, dtype=tf.bool)
|
||||
step_count = tf.constant(0, dtype=tf.int32)
|
||||
|
||||
def _body(step, memory, states, outputs, stop_tokens, attentions, stop_flag):
|
||||
frame_next = states[0]
|
||||
prenet_next = self.prenet(frame_next, training=False)
|
||||
output, stop_token, states, attention = self.step(prenet_next, states, None, training=False)
|
||||
stop_token = tf.math.sigmoid(stop_token)
|
||||
outputs = outputs.write(step, output)
|
||||
attentions = attentions.write(step, attention)
|
||||
stop_tokens = stop_tokens.write(step, stop_token)
|
||||
stop_flag = tf.greater(stop_token, self.stop_thresh)
|
||||
stop_flag = tf.reduce_all(stop_flag)
|
||||
return step + 1, memory, states, outputs, stop_tokens, attentions, stop_flag
|
||||
|
||||
cond = lambda step, m, s, o, st, a, stop_flag: tf.equal(stop_flag, tf.constant(False, dtype=tf.bool))
|
||||
_, memory, states, outputs, stop_tokens, attentions, stop_flag = tf.while_loop(
|
||||
cond,
|
||||
_body,
|
||||
loop_vars=(step_count, memory, states, outputs, stop_tokens, attentions, stop_flag),
|
||||
parallel_iterations=32,
|
||||
swap_memory=True,
|
||||
maximum_iterations=self.max_decoder_steps,
|
||||
)
|
||||
|
||||
outputs = outputs.stack()
|
||||
attentions = attentions.stack()
|
||||
stop_tokens = stop_tokens.stack()
|
||||
|
||||
outputs = tf.transpose(outputs, [1, 0, 2])
|
||||
attentions = tf.transpose(attentions, [1, 0, 2])
|
||||
stop_tokens = tf.transpose(stop_tokens, [1, 0, 2])
|
||||
stop_tokens = tf.squeeze(stop_tokens, axis=2)
|
||||
outputs = tf.reshape(outputs, [B, -1, self.frame_dim])
|
||||
return outputs, stop_tokens, attentions
|
||||
|
||||
def decode_inference_tflite(self, memory, states):
|
||||
"""Inference with TF-Lite compatibility. It assumes
|
||||
batch_size is 1"""
|
||||
# init states
|
||||
# dynamic_shape is not supported in TFLite
|
||||
outputs = tf.TensorArray(
|
||||
dtype=tf.float32,
|
||||
size=self.max_decoder_steps,
|
||||
element_shape=tf.TensorShape([self.output_dim]),
|
||||
clear_after_read=False,
|
||||
dynamic_size=False,
|
||||
)
|
||||
# stop_flags = tf.TensorArray(dtype=tf.bool,
|
||||
# size=self.max_decoder_steps,
|
||||
# element_shape=tf.TensorShape(
|
||||
# []),
|
||||
# clear_after_read=False,
|
||||
# dynamic_size=False)
|
||||
attentions = ()
|
||||
stop_tokens = ()
|
||||
|
||||
# pre-computes
|
||||
self.attention.process_values(memory)
|
||||
|
||||
# iter vars
|
||||
stop_flag = tf.constant(False, dtype=tf.bool)
|
||||
step_count = tf.constant(0, dtype=tf.int32)
|
||||
|
||||
def _body(step, memory, states, outputs, stop_flag):
|
||||
frame_next = states[0]
|
||||
prenet_next = self.prenet(frame_next, training=False)
|
||||
output, stop_token, states, _ = self.step(prenet_next, states, None, training=False)
|
||||
stop_token = tf.math.sigmoid(stop_token)
|
||||
stop_flag = tf.greater(stop_token, self.stop_thresh)
|
||||
stop_flag = tf.reduce_all(stop_flag)
|
||||
# stop_flags = stop_flags.write(step, tf.logical_not(stop_flag))
|
||||
|
||||
outputs = outputs.write(step, tf.reshape(output, [-1]))
|
||||
return step + 1, memory, states, outputs, stop_flag
|
||||
|
||||
cond = lambda step, m, s, o, stop_flag: tf.equal(stop_flag, tf.constant(False, dtype=tf.bool))
|
||||
step_count, memory, states, outputs, stop_flag = tf.while_loop(
|
||||
cond,
|
||||
_body,
|
||||
loop_vars=(step_count, memory, states, outputs, stop_flag),
|
||||
parallel_iterations=32,
|
||||
swap_memory=True,
|
||||
maximum_iterations=self.max_decoder_steps,
|
||||
)
|
||||
|
||||
outputs = outputs.stack()
|
||||
outputs = tf.gather(outputs, tf.range(step_count)) # pylint: disable=no-value-for-parameter
|
||||
outputs = tf.expand_dims(outputs, axis=[0])
|
||||
outputs = tf.transpose(outputs, [1, 0, 2])
|
||||
outputs = tf.reshape(outputs, [1, -1, self.frame_dim])
|
||||
return outputs, stop_tokens, attentions
|
||||
|
||||
def call(self, memory, states, frames=None, memory_seq_length=None, training=False):
|
||||
if training:
|
||||
return self.decode(memory, states, frames, memory_seq_length)
|
||||
if self.enable_tflite:
|
||||
return self.decode_inference_tflite(memory, states)
|
||||
return self.decode_inference(memory, states)
|
|
@ -1,116 +0,0 @@
|
|||
import tensorflow as tf
|
||||
from tensorflow import keras
|
||||
|
||||
from TTS.tts.tf.layers.tacotron.tacotron2 import Decoder, Encoder, Postnet
|
||||
from TTS.tts.tf.utils.tf_utils import shape_list
|
||||
|
||||
|
||||
# pylint: disable=too-many-ancestors, abstract-method
|
||||
class Tacotron2(keras.models.Model):
|
||||
def __init__(
|
||||
self,
|
||||
num_chars,
|
||||
num_speakers,
|
||||
r,
|
||||
out_channels=80,
|
||||
decoder_output_dim=80,
|
||||
attn_type="original",
|
||||
attn_win=False,
|
||||
attn_norm="softmax",
|
||||
attn_K=4,
|
||||
prenet_type="original",
|
||||
prenet_dropout=True,
|
||||
forward_attn=False,
|
||||
trans_agent=False,
|
||||
forward_attn_mask=False,
|
||||
location_attn=True,
|
||||
separate_stopnet=True,
|
||||
bidirectional_decoder=False,
|
||||
enable_tflite=False,
|
||||
):
|
||||
super().__init__()
|
||||
self.r = r
|
||||
self.decoder_output_dim = decoder_output_dim
|
||||
self.out_channels = out_channels
|
||||
self.bidirectional_decoder = bidirectional_decoder
|
||||
self.num_speakers = num_speakers
|
||||
self.speaker_embed_dim = 256
|
||||
self.enable_tflite = enable_tflite
|
||||
|
||||
self.embedding = keras.layers.Embedding(num_chars, 512, name="embedding")
|
||||
self.encoder = Encoder(512, name="encoder")
|
||||
# TODO: most of the decoder args have no use at the momment
|
||||
self.decoder = Decoder(
|
||||
decoder_output_dim,
|
||||
r,
|
||||
attn_type=attn_type,
|
||||
use_attn_win=attn_win,
|
||||
attn_norm=attn_norm,
|
||||
prenet_type=prenet_type,
|
||||
prenet_dropout=prenet_dropout,
|
||||
use_forward_attn=forward_attn,
|
||||
use_trans_agent=trans_agent,
|
||||
use_forward_attn_mask=forward_attn_mask,
|
||||
use_location_attn=location_attn,
|
||||
attn_K=attn_K,
|
||||
separate_stopnet=separate_stopnet,
|
||||
speaker_emb_dim=self.speaker_embed_dim,
|
||||
name="decoder",
|
||||
enable_tflite=enable_tflite,
|
||||
)
|
||||
self.postnet = Postnet(out_channels, 5, name="postnet")
|
||||
|
||||
@tf.function(experimental_relax_shapes=True)
|
||||
def call(self, characters, text_lengths=None, frames=None, training=None):
|
||||
if training:
|
||||
return self.training(characters, text_lengths, frames)
|
||||
if not training:
|
||||
return self.inference(characters)
|
||||
raise RuntimeError(" [!] Set model training mode True or False")
|
||||
|
||||
def training(self, characters, text_lengths, frames):
|
||||
B, T = shape_list(characters)
|
||||
embedding_vectors = self.embedding(characters, training=True)
|
||||
encoder_output = self.encoder(embedding_vectors, training=True)
|
||||
decoder_states = self.decoder.build_decoder_initial_states(B, 512, T)
|
||||
decoder_frames, stop_tokens, attentions = self.decoder(
|
||||
encoder_output, decoder_states, frames, text_lengths, training=True
|
||||
)
|
||||
postnet_frames = self.postnet(decoder_frames, training=True)
|
||||
output_frames = decoder_frames + postnet_frames
|
||||
return decoder_frames, output_frames, attentions, stop_tokens
|
||||
|
||||
def inference(self, characters):
|
||||
B, T = shape_list(characters)
|
||||
embedding_vectors = self.embedding(characters, training=False)
|
||||
encoder_output = self.encoder(embedding_vectors, training=False)
|
||||
decoder_states = self.decoder.build_decoder_initial_states(B, 512, T)
|
||||
decoder_frames, stop_tokens, attentions = self.decoder(encoder_output, decoder_states, training=False)
|
||||
postnet_frames = self.postnet(decoder_frames, training=False)
|
||||
output_frames = decoder_frames + postnet_frames
|
||||
print(output_frames.shape)
|
||||
return decoder_frames, output_frames, attentions, stop_tokens
|
||||
|
||||
@tf.function(
|
||||
experimental_relax_shapes=True,
|
||||
input_signature=[
|
||||
tf.TensorSpec([1, None], dtype=tf.int32),
|
||||
],
|
||||
)
|
||||
def inference_tflite(self, characters):
|
||||
B, T = shape_list(characters)
|
||||
embedding_vectors = self.embedding(characters, training=False)
|
||||
encoder_output = self.encoder(embedding_vectors, training=False)
|
||||
decoder_states = self.decoder.build_decoder_initial_states(B, 512, T)
|
||||
decoder_frames, stop_tokens, attentions = self.decoder(encoder_output, decoder_states, training=False)
|
||||
postnet_frames = self.postnet(decoder_frames, training=False)
|
||||
output_frames = decoder_frames + postnet_frames
|
||||
print(output_frames.shape)
|
||||
return decoder_frames, output_frames, attentions, stop_tokens
|
||||
|
||||
def build_inference(
|
||||
self,
|
||||
):
|
||||
# TODO: issue https://github.com/PyCQA/pylint/issues/3613
|
||||
input_ids = tf.random.uniform(shape=[1, 4], maxval=10, dtype=tf.int32) # pylint: disable=unexpected-keyword-arg
|
||||
self(input_ids)
|
|
@ -1,87 +0,0 @@
|
|||
import numpy as np
|
||||
import tensorflow as tf
|
||||
|
||||
# NOTE: linter has a problem with the current TF release
|
||||
# pylint: disable=no-value-for-parameter
|
||||
# pylint: disable=unexpected-keyword-arg
|
||||
|
||||
|
||||
def tf_create_dummy_inputs():
|
||||
"""Create dummy inputs for TF Tacotron2 model"""
|
||||
batch_size = 4
|
||||
max_input_length = 32
|
||||
max_mel_length = 128
|
||||
pad = 1
|
||||
n_chars = 24
|
||||
input_ids = tf.random.uniform([batch_size, max_input_length + pad], maxval=n_chars, dtype=tf.int32)
|
||||
input_lengths = np.random.randint(0, high=max_input_length + 1 + pad, size=[batch_size])
|
||||
input_lengths[-1] = max_input_length
|
||||
input_lengths = tf.convert_to_tensor(input_lengths, dtype=tf.int32)
|
||||
mel_outputs = tf.random.uniform(shape=[batch_size, max_mel_length + pad, 80])
|
||||
mel_lengths = np.random.randint(0, high=max_mel_length + 1 + pad, size=[batch_size])
|
||||
mel_lengths[-1] = max_mel_length
|
||||
mel_lengths = tf.convert_to_tensor(mel_lengths, dtype=tf.int32)
|
||||
return input_ids, input_lengths, mel_outputs, mel_lengths
|
||||
|
||||
|
||||
def compare_torch_tf(torch_tensor, tf_tensor):
|
||||
"""Compute the average absolute difference b/w torch and tf tensors"""
|
||||
return abs(torch_tensor.detach().numpy() - tf_tensor.numpy()).mean()
|
||||
|
||||
|
||||
def convert_tf_name(tf_name):
|
||||
"""Convert certain patterns in TF layer names to Torch patterns"""
|
||||
tf_name_tmp = tf_name
|
||||
tf_name_tmp = tf_name_tmp.replace(":0", "")
|
||||
tf_name_tmp = tf_name_tmp.replace("/forward_lstm/lstm_cell_1/recurrent_kernel", "/weight_hh_l0")
|
||||
tf_name_tmp = tf_name_tmp.replace("/forward_lstm/lstm_cell_2/kernel", "/weight_ih_l1")
|
||||
tf_name_tmp = tf_name_tmp.replace("/recurrent_kernel", "/weight_hh")
|
||||
tf_name_tmp = tf_name_tmp.replace("/kernel", "/weight")
|
||||
tf_name_tmp = tf_name_tmp.replace("/gamma", "/weight")
|
||||
tf_name_tmp = tf_name_tmp.replace("/beta", "/bias")
|
||||
tf_name_tmp = tf_name_tmp.replace("/", ".")
|
||||
return tf_name_tmp
|
||||
|
||||
|
||||
def transfer_weights_torch_to_tf(tf_vars, var_map_dict, state_dict):
|
||||
"""Transfer weigths from torch state_dict to TF variables"""
|
||||
print(" > Passing weights from Torch to TF ...")
|
||||
for tf_var in tf_vars:
|
||||
torch_var_name = var_map_dict[tf_var.name]
|
||||
print(f" | > {tf_var.name} <-- {torch_var_name}")
|
||||
# if tuple, it is a bias variable
|
||||
if not isinstance(torch_var_name, tuple):
|
||||
torch_layer_name = ".".join(torch_var_name.split(".")[-2:])
|
||||
torch_weight = state_dict[torch_var_name]
|
||||
if "convolution1d/kernel" in tf_var.name or "conv1d/kernel" in tf_var.name:
|
||||
# out_dim, in_dim, filter -> filter, in_dim, out_dim
|
||||
numpy_weight = torch_weight.permute([2, 1, 0]).detach().cpu().numpy()
|
||||
elif "lstm_cell" in tf_var.name and "kernel" in tf_var.name:
|
||||
numpy_weight = torch_weight.transpose(0, 1).detach().cpu().numpy()
|
||||
# if variable is for bidirectional lstm and it is a bias vector there
|
||||
# needs to be pre-defined two matching torch bias vectors
|
||||
elif "_lstm/lstm_cell_" in tf_var.name and "bias" in tf_var.name:
|
||||
bias_vectors = [value for key, value in state_dict.items() if key in torch_var_name]
|
||||
assert len(bias_vectors) == 2
|
||||
numpy_weight = bias_vectors[0] + bias_vectors[1]
|
||||
elif "rnn" in tf_var.name and "kernel" in tf_var.name:
|
||||
numpy_weight = torch_weight.transpose(0, 1).detach().cpu().numpy()
|
||||
elif "rnn" in tf_var.name and "bias" in tf_var.name:
|
||||
bias_vectors = [value for key, value in state_dict.items() if torch_var_name[:-2] in key]
|
||||
assert len(bias_vectors) == 2
|
||||
numpy_weight = bias_vectors[0] + bias_vectors[1]
|
||||
elif "linear_layer" in torch_layer_name and "weight" in torch_var_name:
|
||||
numpy_weight = torch_weight.transpose(0, 1).detach().cpu().numpy()
|
||||
else:
|
||||
numpy_weight = torch_weight.detach().cpu().numpy()
|
||||
assert np.all(
|
||||
tf_var.shape == numpy_weight.shape
|
||||
), f" [!] weight shapes does not match: {tf_var.name} vs {torch_var_name} --> {tf_var.shape} vs {numpy_weight.shape}"
|
||||
tf.keras.backend.set_value(tf_var, numpy_weight)
|
||||
return tf_vars
|
||||
|
||||
|
||||
def load_tf_vars(model_tf, tf_vars):
|
||||
for tf_var in tf_vars:
|
||||
model_tf.get_layer(tf_var.name).set_weights(tf_var)
|
||||
return model_tf
|
|
@ -1,105 +0,0 @@
|
|||
import datetime
|
||||
import importlib
|
||||
import pickle
|
||||
|
||||
import fsspec
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
|
||||
|
||||
def save_checkpoint(model, optimizer, current_step, epoch, r, output_path, **kwargs):
|
||||
state = {
|
||||
"model": model.weights,
|
||||
"optimizer": optimizer,
|
||||
"step": current_step,
|
||||
"epoch": epoch,
|
||||
"date": datetime.date.today().strftime("%B %d, %Y"),
|
||||
"r": r,
|
||||
}
|
||||
state.update(kwargs)
|
||||
with fsspec.open(output_path, "wb") as f:
|
||||
pickle.dump(state, f)
|
||||
|
||||
|
||||
def load_checkpoint(model, checkpoint_path):
|
||||
with fsspec.open(checkpoint_path, "rb") as f:
|
||||
checkpoint = pickle.load(f)
|
||||
chkp_var_dict = {var.name: var.numpy() for var in checkpoint["model"]}
|
||||
tf_vars = model.weights
|
||||
for tf_var in tf_vars:
|
||||
layer_name = tf_var.name
|
||||
try:
|
||||
chkp_var_value = chkp_var_dict[layer_name]
|
||||
except KeyError:
|
||||
class_name = list(chkp_var_dict.keys())[0].split("/")[0]
|
||||
layer_name = f"{class_name}/{layer_name}"
|
||||
chkp_var_value = chkp_var_dict[layer_name]
|
||||
|
||||
tf.keras.backend.set_value(tf_var, chkp_var_value)
|
||||
if "r" in checkpoint.keys():
|
||||
model.decoder.set_r(checkpoint["r"])
|
||||
return model
|
||||
|
||||
|
||||
def sequence_mask(sequence_length, max_len=None):
|
||||
if max_len is None:
|
||||
max_len = sequence_length.max()
|
||||
batch_size = sequence_length.size(0)
|
||||
seq_range = np.empty([0, max_len], dtype=np.int8)
|
||||
seq_range_expand = seq_range.unsqueeze(0).expand(batch_size, max_len)
|
||||
seq_range_expand = seq_range_expand.type_as(sequence_length)
|
||||
seq_length_expand = sequence_length.unsqueeze(1).expand_as(seq_range_expand)
|
||||
# B x T_max
|
||||
return seq_range_expand < seq_length_expand
|
||||
|
||||
|
||||
# @tf.custom_gradient
|
||||
def check_gradient(x, grad_clip):
|
||||
x_normed = tf.clip_by_norm(x, grad_clip)
|
||||
grad_norm = tf.norm(grad_clip)
|
||||
return x_normed, grad_norm
|
||||
|
||||
|
||||
def count_parameters(model, c):
|
||||
try:
|
||||
return model.count_params()
|
||||
except RuntimeError:
|
||||
input_dummy = tf.convert_to_tensor(np.random.rand(8, 128).astype("int32"))
|
||||
input_lengths = np.random.randint(100, 129, (8,))
|
||||
input_lengths[-1] = 128
|
||||
input_lengths = tf.convert_to_tensor(input_lengths.astype("int32"))
|
||||
mel_spec = np.random.rand(8, 2 * c.r, c.audio["num_mels"]).astype("float32")
|
||||
mel_spec = tf.convert_to_tensor(mel_spec)
|
||||
speaker_ids = np.random.randint(0, 5, (8,)) if c.use_speaker_embedding else None
|
||||
_ = model(input_dummy, input_lengths, mel_spec, speaker_ids=speaker_ids)
|
||||
return model.count_params()
|
||||
|
||||
|
||||
def setup_model(num_chars, num_speakers, c, enable_tflite=False):
|
||||
print(" > Using model: {}".format(c.model))
|
||||
MyModel = importlib.import_module("TTS.tts.tf.models." + c.model.lower())
|
||||
MyModel = getattr(MyModel, c.model)
|
||||
if c.model.lower() in "tacotron":
|
||||
raise NotImplementedError(" [!] Tacotron model is not ready.")
|
||||
# tacotron2
|
||||
model = MyModel(
|
||||
num_chars=num_chars,
|
||||
num_speakers=num_speakers,
|
||||
r=c.r,
|
||||
out_channels=c.audio["num_mels"],
|
||||
decoder_output_dim=c.audio["num_mels"],
|
||||
attn_type=c.attention_type,
|
||||
attn_win=c.windowing,
|
||||
attn_norm=c.attention_norm,
|
||||
prenet_type=c.prenet_type,
|
||||
prenet_dropout=c.prenet_dropout,
|
||||
forward_attn=c.use_forward_attn,
|
||||
trans_agent=c.transition_agent,
|
||||
forward_attn_mask=c.forward_attn_mask,
|
||||
location_attn=c.location_attn,
|
||||
attn_K=c.attention_heads,
|
||||
separate_stopnet=c.separate_stopnet,
|
||||
bidirectional_decoder=c.bidirectional_decoder,
|
||||
enable_tflite=enable_tflite,
|
||||
)
|
||||
return model
|
|
@ -1,45 +0,0 @@
|
|||
import datetime
|
||||
import pickle
|
||||
|
||||
import fsspec
|
||||
import tensorflow as tf
|
||||
|
||||
|
||||
def save_checkpoint(model, optimizer, current_step, epoch, r, output_path, **kwargs):
|
||||
state = {
|
||||
"model": model.weights,
|
||||
"optimizer": optimizer,
|
||||
"step": current_step,
|
||||
"epoch": epoch,
|
||||
"date": datetime.date.today().strftime("%B %d, %Y"),
|
||||
"r": r,
|
||||
}
|
||||
state.update(kwargs)
|
||||
with fsspec.open(output_path, "wb") as f:
|
||||
pickle.dump(state, f)
|
||||
|
||||
|
||||
def load_checkpoint(model, checkpoint_path):
|
||||
with fsspec.open(checkpoint_path, "rb") as f:
|
||||
checkpoint = pickle.load(f)
|
||||
chkp_var_dict = {var.name: var.numpy() for var in checkpoint["model"]}
|
||||
tf_vars = model.weights
|
||||
for tf_var in tf_vars:
|
||||
layer_name = tf_var.name
|
||||
try:
|
||||
chkp_var_value = chkp_var_dict[layer_name]
|
||||
except KeyError:
|
||||
class_name = list(chkp_var_dict.keys())[0].split("/")[0]
|
||||
layer_name = f"{class_name}/{layer_name}"
|
||||
chkp_var_value = chkp_var_dict[layer_name]
|
||||
|
||||
tf.keras.backend.set_value(tf_var, chkp_var_value)
|
||||
if "r" in checkpoint.keys():
|
||||
model.decoder.set_r(checkpoint["r"])
|
||||
return model
|
||||
|
||||
|
||||
def load_tflite_model(tflite_path):
|
||||
tflite_model = tf.lite.Interpreter(model_path=tflite_path)
|
||||
tflite_model.allocate_tensors()
|
||||
return tflite_model
|
|
@ -1,8 +0,0 @@
|
|||
import tensorflow as tf
|
||||
|
||||
|
||||
def shape_list(x):
|
||||
"""Deal with dynamic shape in tensorflow cleanly."""
|
||||
static = x.shape.as_list()
|
||||
dynamic = tf.shape(x)
|
||||
return [dynamic[i] if s is None else s for i, s in enumerate(static)]
|
|
@ -1,27 +0,0 @@
|
|||
import fsspec
|
||||
import tensorflow as tf
|
||||
|
||||
|
||||
def convert_tacotron2_to_tflite(model, output_path=None, experimental_converter=True):
|
||||
"""Convert Tensorflow Tacotron2 model to TFLite. Save a binary file if output_path is
|
||||
provided, else return TFLite model."""
|
||||
|
||||
concrete_function = model.inference_tflite.get_concrete_function()
|
||||
converter = tf.lite.TFLiteConverter.from_concrete_functions([concrete_function])
|
||||
converter.experimental_new_converter = experimental_converter
|
||||
converter.optimizations = [tf.lite.Optimize.DEFAULT]
|
||||
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS]
|
||||
tflite_model = converter.convert()
|
||||
print(f"Tflite Model size is {len(tflite_model) / (1024.0 * 1024.0)} MBs.")
|
||||
if output_path is not None:
|
||||
# same model binary if outputpath is provided
|
||||
with fsspec.open(output_path, "wb") as f:
|
||||
f.write(tflite_model)
|
||||
return None
|
||||
return tflite_model
|
||||
|
||||
|
||||
def load_tflite_model(tflite_path):
|
||||
tflite_model = tf.lite.Interpreter(model_path=tflite_path)
|
||||
tflite_model.allocate_tensors()
|
||||
return tflite_model
|
|
@ -1,19 +1,11 @@
|
|||
import os
|
||||
from typing import Dict
|
||||
|
||||
import numpy as np
|
||||
import pkg_resources
|
||||
import torch
|
||||
from torch import nn
|
||||
|
||||
from .text import phoneme_to_sequence, text_to_sequence
|
||||
|
||||
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
|
||||
|
||||
installed = {pkg.key for pkg in pkg_resources.working_set} # pylint: disable=not-an-iterable
|
||||
if "tensorflow" in installed or "tensorflow-gpu" in installed:
|
||||
import tensorflow as tf
|
||||
|
||||
|
||||
def text_to_seq(text, CONFIG, custom_symbols=None, language=None):
|
||||
text_cleaner = [CONFIG.text_cleaner]
|
||||
|
@ -51,13 +43,6 @@ def numpy_to_torch(np_array, dtype, cuda=False):
|
|||
return tensor
|
||||
|
||||
|
||||
def numpy_to_tf(np_array, dtype):
|
||||
if np_array is None:
|
||||
return None
|
||||
tensor = tf.convert_to_tensor(np_array, dtype=dtype)
|
||||
return tensor
|
||||
|
||||
|
||||
def compute_style_mel(style_wav, ap, cuda=False):
|
||||
style_mel = torch.FloatTensor(ap.melspectrogram(ap.load_wav(style_wav, sr=ap.sample_rate))).unsqueeze(0)
|
||||
if cuda:
|
||||
|
@ -103,53 +88,6 @@ def run_model_torch(
|
|||
return outputs
|
||||
|
||||
|
||||
def run_model_tf(model, inputs, CONFIG, speaker_id=None, style_mel=None):
|
||||
if CONFIG.gst and style_mel is not None:
|
||||
raise NotImplementedError(" [!] GST inference not implemented for TF")
|
||||
if speaker_id is not None:
|
||||
raise NotImplementedError(" [!] Multi-Speaker not implemented for TF")
|
||||
# TODO: handle multispeaker case
|
||||
decoder_output, postnet_output, alignments, stop_tokens = model(inputs, training=False)
|
||||
return decoder_output, postnet_output, alignments, stop_tokens
|
||||
|
||||
|
||||
def run_model_tflite(model, inputs, CONFIG, speaker_id=None, style_mel=None):
|
||||
if CONFIG.gst and style_mel is not None:
|
||||
raise NotImplementedError(" [!] GST inference not implemented for TfLite")
|
||||
if speaker_id is not None:
|
||||
raise NotImplementedError(" [!] Multi-Speaker not implemented for TfLite")
|
||||
# get input and output details
|
||||
input_details = model.get_input_details()
|
||||
output_details = model.get_output_details()
|
||||
# reshape input tensor for the new input shape
|
||||
model.resize_tensor_input(input_details[0]["index"], inputs.shape)
|
||||
model.allocate_tensors()
|
||||
detail = input_details[0]
|
||||
# input_shape = detail['shape']
|
||||
model.set_tensor(detail["index"], inputs)
|
||||
# run the model
|
||||
model.invoke()
|
||||
# collect outputs
|
||||
decoder_output = model.get_tensor(output_details[0]["index"])
|
||||
postnet_output = model.get_tensor(output_details[1]["index"])
|
||||
# tflite model only returns feature frames
|
||||
return decoder_output, postnet_output, None, None
|
||||
|
||||
|
||||
def parse_outputs_tf(postnet_output, decoder_output, alignments, stop_tokens):
|
||||
postnet_output = postnet_output[0].numpy()
|
||||
decoder_output = decoder_output[0].numpy()
|
||||
alignment = alignments[0].numpy()
|
||||
stop_tokens = stop_tokens[0].numpy()
|
||||
return postnet_output, decoder_output, alignment, stop_tokens
|
||||
|
||||
|
||||
def parse_outputs_tflite(postnet_output, decoder_output):
|
||||
postnet_output = postnet_output[0]
|
||||
decoder_output = decoder_output[0]
|
||||
return postnet_output, decoder_output
|
||||
|
||||
|
||||
def trim_silence(wav, ap):
|
||||
return wav[: ap.find_endpoint(wav)]
|
||||
|
||||
|
@ -213,7 +151,6 @@ def synthesis(
|
|||
d_vector=None,
|
||||
language_id=None,
|
||||
language_name=None,
|
||||
backend="torch",
|
||||
):
|
||||
"""Synthesize voice for the given text using Griffin-Lim vocoder or just compute output features to be passed to
|
||||
the vocoder model.
|
||||
|
@ -254,9 +191,6 @@ def synthesis(
|
|||
|
||||
language_name (str):
|
||||
Language name corresponding to the language code used by the phonemizer. Defaults to None.
|
||||
|
||||
backend (str):
|
||||
tf or torch. Defaults to "torch".
|
||||
"""
|
||||
# GST processing
|
||||
style_mel = None
|
||||
|
@ -270,44 +204,27 @@ def synthesis(
|
|||
custom_symbols = model.make_symbols(CONFIG)
|
||||
# preprocess the given text
|
||||
text_inputs = text_to_seq(text, CONFIG, custom_symbols=custom_symbols, language=language_name)
|
||||
# pass tensors to backend
|
||||
if backend == "torch":
|
||||
if speaker_id is not None:
|
||||
speaker_id = id_to_torch(speaker_id, cuda=use_cuda)
|
||||
|
||||
if d_vector is not None:
|
||||
d_vector = embedding_to_torch(d_vector, cuda=use_cuda)
|
||||
if speaker_id is not None:
|
||||
speaker_id = id_to_torch(speaker_id, cuda=use_cuda)
|
||||
|
||||
if language_id is not None:
|
||||
language_id = id_to_torch(language_id, cuda=use_cuda)
|
||||
if d_vector is not None:
|
||||
d_vector = embedding_to_torch(d_vector, cuda=use_cuda)
|
||||
|
||||
if language_id is not None:
|
||||
language_id = id_to_torch(language_id, cuda=use_cuda)
|
||||
|
||||
if not isinstance(style_mel, dict):
|
||||
style_mel = numpy_to_torch(style_mel, torch.float, cuda=use_cuda)
|
||||
text_inputs = numpy_to_torch(text_inputs, torch.long, cuda=use_cuda)
|
||||
text_inputs = text_inputs.unsqueeze(0)
|
||||
|
||||
if not isinstance(style_mel, dict):
|
||||
style_mel = numpy_to_torch(style_mel, torch.float, cuda=use_cuda)
|
||||
text_inputs = numpy_to_torch(text_inputs, torch.long, cuda=use_cuda)
|
||||
text_inputs = text_inputs.unsqueeze(0)
|
||||
elif backend in ["tf", "tflite"]:
|
||||
# TODO: handle speaker id for tf model
|
||||
style_mel = numpy_to_tf(style_mel, tf.float32)
|
||||
text_inputs = numpy_to_tf(text_inputs, tf.int32)
|
||||
text_inputs = tf.expand_dims(text_inputs, 0)
|
||||
# synthesize voice
|
||||
if backend == "torch":
|
||||
outputs = run_model_torch(model, text_inputs, speaker_id, style_mel, d_vector=d_vector, language_id=language_id)
|
||||
model_outputs = outputs["model_outputs"]
|
||||
model_outputs = model_outputs[0].data.cpu().numpy()
|
||||
alignments = outputs["alignments"]
|
||||
elif backend == "tf":
|
||||
decoder_output, postnet_output, alignments, stop_tokens = run_model_tf(
|
||||
model, text_inputs, CONFIG, speaker_id, style_mel
|
||||
)
|
||||
model_outputs, decoder_output, alignments, stop_tokens = parse_outputs_tf(
|
||||
postnet_output, decoder_output, alignments, stop_tokens
|
||||
)
|
||||
elif backend == "tflite":
|
||||
decoder_output, postnet_output, alignments, stop_tokens = run_model_tflite(
|
||||
model, text_inputs, CONFIG, speaker_id, style_mel
|
||||
)
|
||||
model_outputs, decoder_output = parse_outputs_tflite(postnet_output, decoder_output)
|
||||
outputs = run_model_torch(model, text_inputs, speaker_id, style_mel, d_vector=d_vector, language_id=language_id)
|
||||
model_outputs = outputs["model_outputs"]
|
||||
model_outputs = model_outputs[0].data.cpu().numpy()
|
||||
alignments = outputs["alignments"]
|
||||
|
||||
# convert outputs to numpy
|
||||
# plot results
|
||||
wav = None
|
||||
|
|
|
@ -1,54 +0,0 @@
|
|||
import tensorflow as tf
|
||||
|
||||
|
||||
class ReflectionPad1d(tf.keras.layers.Layer):
|
||||
def __init__(self, padding):
|
||||
super().__init__()
|
||||
self.padding = padding
|
||||
|
||||
def call(self, x):
|
||||
return tf.pad(x, [[0, 0], [self.padding, self.padding], [0, 0], [0, 0]], "REFLECT")
|
||||
|
||||
|
||||
class ResidualStack(tf.keras.layers.Layer):
|
||||
def __init__(self, channels, num_res_blocks, kernel_size, name):
|
||||
super().__init__(name=name)
|
||||
|
||||
assert (kernel_size - 1) % 2 == 0, " [!] kernel_size has to be odd."
|
||||
base_padding = (kernel_size - 1) // 2
|
||||
|
||||
self.blocks = []
|
||||
num_layers = 2
|
||||
for idx in range(num_res_blocks):
|
||||
layer_kernel_size = kernel_size
|
||||
layer_dilation = layer_kernel_size ** idx
|
||||
layer_padding = base_padding * layer_dilation
|
||||
block = [
|
||||
tf.keras.layers.LeakyReLU(0.2),
|
||||
ReflectionPad1d(layer_padding),
|
||||
tf.keras.layers.Conv2D(
|
||||
filters=channels,
|
||||
kernel_size=(kernel_size, 1),
|
||||
dilation_rate=(layer_dilation, 1),
|
||||
use_bias=True,
|
||||
padding="valid",
|
||||
name=f"blocks.{idx}.{num_layers}",
|
||||
),
|
||||
tf.keras.layers.LeakyReLU(0.2),
|
||||
tf.keras.layers.Conv2D(
|
||||
filters=channels, kernel_size=(1, 1), use_bias=True, name=f"blocks.{idx}.{num_layers + 2}"
|
||||
),
|
||||
]
|
||||
self.blocks.append(block)
|
||||
self.shortcuts = [
|
||||
tf.keras.layers.Conv2D(channels, kernel_size=1, use_bias=True, name=f"shortcuts.{i}")
|
||||
for i in range(num_res_blocks)
|
||||
]
|
||||
|
||||
def call(self, x):
|
||||
for block, shortcut in zip(self.blocks, self.shortcuts):
|
||||
res = shortcut(x)
|
||||
for layer in block:
|
||||
x = layer(x)
|
||||
x += res
|
||||
return x
|
|
@ -1,60 +0,0 @@
|
|||
import numpy as np
|
||||
import tensorflow as tf
|
||||
from scipy import signal as sig
|
||||
|
||||
|
||||
class PQMF(tf.keras.layers.Layer):
|
||||
def __init__(self, N=4, taps=62, cutoff=0.15, beta=9.0):
|
||||
super().__init__()
|
||||
# define filter coefficient
|
||||
self.N = N
|
||||
self.taps = taps
|
||||
self.cutoff = cutoff
|
||||
self.beta = beta
|
||||
|
||||
QMF = sig.firwin(taps + 1, cutoff, window=("kaiser", beta))
|
||||
H = np.zeros((N, len(QMF)))
|
||||
G = np.zeros((N, len(QMF)))
|
||||
for k in range(N):
|
||||
constant_factor = (2 * k + 1) * (np.pi / (2 * N)) * (np.arange(taps + 1) - ((taps - 1) / 2))
|
||||
phase = (-1) ** k * np.pi / 4
|
||||
H[k] = 2 * QMF * np.cos(constant_factor + phase)
|
||||
|
||||
G[k] = 2 * QMF * np.cos(constant_factor - phase)
|
||||
|
||||
# [N, 1, taps + 1] == [filter_width, in_channels, out_channels]
|
||||
self.H = np.transpose(H[:, None, :], (2, 1, 0)).astype("float32")
|
||||
self.G = np.transpose(G[None, :, :], (2, 1, 0)).astype("float32")
|
||||
|
||||
# filter for downsampling & upsampling
|
||||
updown_filter = np.zeros((N, N, N), dtype=np.float32)
|
||||
for k in range(N):
|
||||
updown_filter[0, k, k] = 1.0
|
||||
self.updown_filter = updown_filter.astype(np.float32)
|
||||
|
||||
def analysis(self, x):
|
||||
"""
|
||||
x : :math:`[B, 1, T]`
|
||||
"""
|
||||
x = tf.transpose(x, perm=[0, 2, 1])
|
||||
x = tf.pad(x, [[0, 0], [self.taps // 2, self.taps // 2], [0, 0]], constant_values=0.0)
|
||||
x = tf.nn.conv1d(x, self.H, stride=1, padding="VALID")
|
||||
x = tf.nn.conv1d(x, self.updown_filter, stride=self.N, padding="VALID")
|
||||
x = tf.transpose(x, perm=[0, 2, 1])
|
||||
return x
|
||||
|
||||
def synthesis(self, x):
|
||||
"""
|
||||
x : B x D x T
|
||||
"""
|
||||
x = tf.transpose(x, perm=[0, 2, 1])
|
||||
x = tf.nn.conv1d_transpose(
|
||||
x,
|
||||
self.updown_filter * self.N,
|
||||
strides=self.N,
|
||||
output_shape=(tf.shape(x)[0], tf.shape(x)[1] * self.N, self.N),
|
||||
)
|
||||
x = tf.pad(x, [[0, 0], [self.taps // 2, self.taps // 2], [0, 0]], constant_values=0.0)
|
||||
x = tf.nn.conv1d(x, self.G, stride=1, padding="VALID")
|
||||
x = tf.transpose(x, perm=[0, 2, 1])
|
||||
return x
|
|
@ -1,133 +0,0 @@
|
|||
import logging
|
||||
import os
|
||||
|
||||
import tensorflow as tf
|
||||
|
||||
from TTS.vocoder.tf.layers.melgan import ReflectionPad1d, ResidualStack
|
||||
|
||||
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" # FATAL
|
||||
logging.getLogger("tensorflow").setLevel(logging.FATAL)
|
||||
|
||||
from TTS.vocoder.tf.layers.melgan import ReflectionPad1d, ResidualStack
|
||||
|
||||
|
||||
# pylint: disable=too-many-ancestors
|
||||
# pylint: disable=abstract-method
|
||||
class MelganGenerator(tf.keras.models.Model):
|
||||
"""Melgan Generator TF implementation dedicated for inference with no
|
||||
weight norm"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
in_channels=80,
|
||||
out_channels=1,
|
||||
proj_kernel=7,
|
||||
base_channels=512,
|
||||
upsample_factors=(8, 8, 2, 2),
|
||||
res_kernel=3,
|
||||
num_res_blocks=3,
|
||||
):
|
||||
super().__init__()
|
||||
|
||||
self.in_channels = in_channels
|
||||
|
||||
# assert model parameters
|
||||
assert (proj_kernel - 1) % 2 == 0, " [!] proj_kernel should be an odd number."
|
||||
|
||||
# setup additional model parameters
|
||||
base_padding = (proj_kernel - 1) // 2
|
||||
act_slope = 0.2
|
||||
self.inference_padding = 2
|
||||
|
||||
# initial layer
|
||||
self.initial_layer = [
|
||||
ReflectionPad1d(base_padding),
|
||||
tf.keras.layers.Conv2D(
|
||||
filters=base_channels, kernel_size=(proj_kernel, 1), strides=1, padding="valid", use_bias=True, name="1"
|
||||
),
|
||||
]
|
||||
num_layers = 3 # count number of layers for layer naming
|
||||
|
||||
# upsampling layers and residual stacks
|
||||
self.upsample_layers = []
|
||||
for idx, upsample_factor in enumerate(upsample_factors):
|
||||
layer_out_channels = base_channels // (2 ** (idx + 1))
|
||||
layer_filter_size = upsample_factor * 2
|
||||
layer_stride = upsample_factor
|
||||
# layer_output_padding = upsample_factor % 2
|
||||
self.upsample_layers += [
|
||||
tf.keras.layers.LeakyReLU(act_slope),
|
||||
tf.keras.layers.Conv2DTranspose(
|
||||
filters=layer_out_channels,
|
||||
kernel_size=(layer_filter_size, 1),
|
||||
strides=(layer_stride, 1),
|
||||
padding="same",
|
||||
# output_padding=layer_output_padding,
|
||||
use_bias=True,
|
||||
name=f"{num_layers}",
|
||||
),
|
||||
ResidualStack(
|
||||
channels=layer_out_channels,
|
||||
num_res_blocks=num_res_blocks,
|
||||
kernel_size=res_kernel,
|
||||
name=f"layers.{num_layers + 1}",
|
||||
),
|
||||
]
|
||||
num_layers += num_res_blocks - 1
|
||||
|
||||
self.upsample_layers += [tf.keras.layers.LeakyReLU(act_slope)]
|
||||
|
||||
# final layer
|
||||
self.final_layers = [
|
||||
ReflectionPad1d(base_padding),
|
||||
tf.keras.layers.Conv2D(
|
||||
filters=out_channels, kernel_size=(proj_kernel, 1), use_bias=True, name=f"layers.{num_layers + 1}"
|
||||
),
|
||||
tf.keras.layers.Activation("tanh"),
|
||||
]
|
||||
|
||||
# self.model_layers = tf.keras.models.Sequential(self.initial_layer + self.upsample_layers + self.final_layers, name="layers")
|
||||
self.model_layers = self.initial_layer + self.upsample_layers + self.final_layers
|
||||
|
||||
@tf.function(experimental_relax_shapes=True)
|
||||
def call(self, c, training=False):
|
||||
"""
|
||||
c : :math:`[B, C, T]`
|
||||
"""
|
||||
if training:
|
||||
raise NotImplementedError()
|
||||
return self.inference(c)
|
||||
|
||||
def inference(self, c):
|
||||
c = tf.transpose(c, perm=[0, 2, 1])
|
||||
c = tf.expand_dims(c, 2)
|
||||
# FIXME: TF had no replicate padding as in Torch
|
||||
# c = tf.pad(c, [[0, 0], [self.inference_padding, self.inference_padding], [0, 0], [0, 0]], "REFLECT")
|
||||
o = c
|
||||
for layer in self.model_layers:
|
||||
o = layer(o)
|
||||
# o = self.model_layers(c)
|
||||
o = tf.transpose(o, perm=[0, 3, 2, 1])
|
||||
return o[:, :, 0, :]
|
||||
|
||||
def build_inference(self):
|
||||
x = tf.random.uniform((1, self.in_channels, 4), dtype=tf.float32)
|
||||
self(x, training=False)
|
||||
|
||||
@tf.function(
|
||||
experimental_relax_shapes=True,
|
||||
input_signature=[
|
||||
tf.TensorSpec([1, None, None], dtype=tf.float32),
|
||||
],
|
||||
)
|
||||
def inference_tflite(self, c):
|
||||
c = tf.transpose(c, perm=[0, 2, 1])
|
||||
c = tf.expand_dims(c, 2)
|
||||
# FIXME: TF had no replicate padding as in Torch
|
||||
# c = tf.pad(c, [[0, 0], [self.inference_padding, self.inference_padding], [0, 0], [0, 0]], "REFLECT")
|
||||
o = c
|
||||
for layer in self.model_layers:
|
||||
o = layer(o)
|
||||
# o = self.model_layers(c)
|
||||
o = tf.transpose(o, perm=[0, 3, 2, 1])
|
||||
return o[:, :, 0, :]
|
|
@ -1,65 +0,0 @@
|
|||
import tensorflow as tf
|
||||
|
||||
from TTS.vocoder.tf.layers.pqmf import PQMF
|
||||
from TTS.vocoder.tf.models.melgan_generator import MelganGenerator
|
||||
|
||||
|
||||
# pylint: disable=too-many-ancestors
|
||||
# pylint: disable=abstract-method
|
||||
class MultibandMelganGenerator(MelganGenerator):
|
||||
def __init__(
|
||||
self,
|
||||
in_channels=80,
|
||||
out_channels=4,
|
||||
proj_kernel=7,
|
||||
base_channels=384,
|
||||
upsample_factors=(2, 8, 2, 2),
|
||||
res_kernel=3,
|
||||
num_res_blocks=3,
|
||||
):
|
||||
super().__init__(
|
||||
in_channels=in_channels,
|
||||
out_channels=out_channels,
|
||||
proj_kernel=proj_kernel,
|
||||
base_channels=base_channels,
|
||||
upsample_factors=upsample_factors,
|
||||
res_kernel=res_kernel,
|
||||
num_res_blocks=num_res_blocks,
|
||||
)
|
||||
self.pqmf_layer = PQMF(N=4, taps=62, cutoff=0.15, beta=9.0)
|
||||
|
||||
def pqmf_analysis(self, x):
|
||||
return self.pqmf_layer.analysis(x)
|
||||
|
||||
def pqmf_synthesis(self, x):
|
||||
return self.pqmf_layer.synthesis(x)
|
||||
|
||||
def inference(self, c):
|
||||
c = tf.transpose(c, perm=[0, 2, 1])
|
||||
c = tf.expand_dims(c, 2)
|
||||
# FIXME: TF had no replicate padding as in Torch
|
||||
# c = tf.pad(c, [[0, 0], [self.inference_padding, self.inference_padding], [0, 0], [0, 0]], "REFLECT")
|
||||
o = c
|
||||
for layer in self.model_layers:
|
||||
o = layer(o)
|
||||
o = tf.transpose(o, perm=[0, 3, 2, 1])
|
||||
o = self.pqmf_layer.synthesis(o[:, :, 0, :])
|
||||
return o
|
||||
|
||||
@tf.function(
|
||||
experimental_relax_shapes=True,
|
||||
input_signature=[
|
||||
tf.TensorSpec([1, 80, None], dtype=tf.float32),
|
||||
],
|
||||
)
|
||||
def inference_tflite(self, c):
|
||||
c = tf.transpose(c, perm=[0, 2, 1])
|
||||
c = tf.expand_dims(c, 2)
|
||||
# FIXME: TF had no replicate padding as in Torch
|
||||
# c = tf.pad(c, [[0, 0], [self.inference_padding, self.inference_padding], [0, 0], [0, 0]], "REFLECT")
|
||||
o = c
|
||||
for layer in self.model_layers:
|
||||
o = layer(o)
|
||||
o = tf.transpose(o, perm=[0, 3, 2, 1])
|
||||
o = self.pqmf_layer.synthesis(o[:, :, 0, :])
|
||||
return o
|
|
@ -1,47 +0,0 @@
|
|||
import numpy as np
|
||||
import tensorflow as tf
|
||||
|
||||
|
||||
def compare_torch_tf(torch_tensor, tf_tensor):
|
||||
"""Compute the average absolute difference b/w torch and tf tensors"""
|
||||
return abs(torch_tensor.detach().numpy() - tf_tensor.numpy()).mean()
|
||||
|
||||
|
||||
def convert_tf_name(tf_name):
|
||||
"""Convert certain patterns in TF layer names to Torch patterns"""
|
||||
tf_name_tmp = tf_name
|
||||
tf_name_tmp = tf_name_tmp.replace(":0", "")
|
||||
tf_name_tmp = tf_name_tmp.replace("/forward_lstm/lstm_cell_1/recurrent_kernel", "/weight_hh_l0")
|
||||
tf_name_tmp = tf_name_tmp.replace("/forward_lstm/lstm_cell_2/kernel", "/weight_ih_l1")
|
||||
tf_name_tmp = tf_name_tmp.replace("/recurrent_kernel", "/weight_hh")
|
||||
tf_name_tmp = tf_name_tmp.replace("/kernel", "/weight")
|
||||
tf_name_tmp = tf_name_tmp.replace("/gamma", "/weight")
|
||||
tf_name_tmp = tf_name_tmp.replace("/beta", "/bias")
|
||||
tf_name_tmp = tf_name_tmp.replace("/", ".")
|
||||
return tf_name_tmp
|
||||
|
||||
|
||||
def transfer_weights_torch_to_tf(tf_vars, var_map_dict, state_dict):
|
||||
"""Transfer weigths from torch state_dict to TF variables"""
|
||||
print(" > Passing weights from Torch to TF ...")
|
||||
for tf_var in tf_vars:
|
||||
torch_var_name = var_map_dict[tf_var.name]
|
||||
print(f" | > {tf_var.name} <-- {torch_var_name}")
|
||||
# if tuple, it is a bias variable
|
||||
if "kernel" in tf_var.name:
|
||||
torch_weight = state_dict[torch_var_name]
|
||||
numpy_weight = torch_weight.permute([2, 1, 0]).numpy()[:, None, :, :]
|
||||
if "bias" in tf_var.name:
|
||||
torch_weight = state_dict[torch_var_name]
|
||||
numpy_weight = torch_weight
|
||||
assert np.all(
|
||||
tf_var.shape == numpy_weight.shape
|
||||
), f" [!] weight shapes does not match: {tf_var.name} vs {torch_var_name} --> {tf_var.shape} vs {numpy_weight.shape}"
|
||||
tf.keras.backend.set_value(tf_var, numpy_weight)
|
||||
return tf_vars
|
||||
|
||||
|
||||
def load_tf_vars(model_tf, tf_vars):
|
||||
for tf_var in tf_vars:
|
||||
model_tf.get_layer(tf_var.name).set_weights(tf_var)
|
||||
return model_tf
|
|
@ -1,36 +0,0 @@
|
|||
import importlib
|
||||
import re
|
||||
|
||||
|
||||
def to_camel(text):
|
||||
text = text.capitalize()
|
||||
return re.sub(r"(?!^)_([a-zA-Z])", lambda m: m.group(1).upper(), text)
|
||||
|
||||
|
||||
def setup_generator(c):
|
||||
print(" > Generator Model: {}".format(c.generator_model))
|
||||
MyModel = importlib.import_module("TTS.vocoder.tf.models." + c.generator_model.lower())
|
||||
MyModel = getattr(MyModel, to_camel(c.generator_model))
|
||||
if c.generator_model in "melgan_generator":
|
||||
model = MyModel(
|
||||
in_channels=c.audio["num_mels"],
|
||||
out_channels=1,
|
||||
proj_kernel=7,
|
||||
base_channels=512,
|
||||
upsample_factors=c.generator_model_params["upsample_factors"],
|
||||
res_kernel=3,
|
||||
num_res_blocks=c.generator_model_params["num_res_blocks"],
|
||||
)
|
||||
if c.generator_model in "melgan_fb_generator":
|
||||
pass
|
||||
if c.generator_model in "multiband_melgan_generator":
|
||||
model = MyModel(
|
||||
in_channels=c.audio["num_mels"],
|
||||
out_channels=4,
|
||||
proj_kernel=7,
|
||||
base_channels=384,
|
||||
upsample_factors=c.generator_model_params["upsample_factors"],
|
||||
res_kernel=3,
|
||||
num_res_blocks=c.generator_model_params["num_res_blocks"],
|
||||
)
|
||||
return model
|
|
@ -1,31 +0,0 @@
|
|||
import datetime
|
||||
import pickle
|
||||
|
||||
import fsspec
|
||||
import tensorflow as tf
|
||||
|
||||
|
||||
def save_checkpoint(model, current_step, epoch, output_path, **kwargs):
|
||||
"""Save TF Vocoder model"""
|
||||
state = {
|
||||
"model": model.weights,
|
||||
"step": current_step,
|
||||
"epoch": epoch,
|
||||
"date": datetime.date.today().strftime("%B %d, %Y"),
|
||||
}
|
||||
state.update(kwargs)
|
||||
with fsspec.open(output_path, "wb") as f:
|
||||
pickle.dump(state, f)
|
||||
|
||||
|
||||
def load_checkpoint(model, checkpoint_path):
|
||||
"""Load TF Vocoder model"""
|
||||
with fsspec.open(checkpoint_path, "rb") as f:
|
||||
checkpoint = pickle.load(f)
|
||||
chkp_var_dict = {var.name: var.numpy() for var in checkpoint["model"]}
|
||||
tf_vars = model.weights
|
||||
for tf_var in tf_vars:
|
||||
layer_name = tf_var.name
|
||||
chkp_var_value = chkp_var_dict[layer_name]
|
||||
tf.keras.backend.set_value(tf_var, chkp_var_value)
|
||||
return model
|
|
@ -1,27 +0,0 @@
|
|||
import fsspec
|
||||
import tensorflow as tf
|
||||
|
||||
|
||||
def convert_melgan_to_tflite(model, output_path=None, experimental_converter=True):
|
||||
"""Convert Tensorflow MelGAN model to TFLite. Save a binary file if output_path is
|
||||
provided, else return TFLite model."""
|
||||
|
||||
concrete_function = model.inference_tflite.get_concrete_function()
|
||||
converter = tf.lite.TFLiteConverter.from_concrete_functions([concrete_function])
|
||||
converter.experimental_new_converter = experimental_converter
|
||||
converter.optimizations = []
|
||||
converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS, tf.lite.OpsSet.SELECT_TF_OPS]
|
||||
tflite_model = converter.convert()
|
||||
print(f"Tflite Model size is {len(tflite_model) / (1024.0 * 1024.0)} MBs.")
|
||||
if output_path is not None:
|
||||
# same model binary if outputpath is provided
|
||||
with fsspec.open(output_path, "wb") as f:
|
||||
f.write(tflite_model)
|
||||
return None
|
||||
return tflite_model
|
||||
|
||||
|
||||
def load_tflite_model(tflite_path):
|
||||
tflite_model = tf.lite.Interpreter(model_path=tflite_path)
|
||||
tflite_model.allocate_tensors()
|
||||
return tflite_model
|
|
@ -1,21 +0,0 @@
|
|||
# Converting Torch to TF 2
|
||||
|
||||
Currently, 🐸TTS supports the vanilla Tacotron2 and MelGAN models in TF 2.It does not support advanced attention methods and other small tricks used by the Torch models. You can convert any Torch model trained after v0.0.2.
|
||||
|
||||
You can also export TF 2 models to TFLite for even faster inference.
|
||||
|
||||
## How to convert from Torch to TF 2.0
|
||||
Make sure you installed Tensorflow v2.2. It is not installed by default by :frog: TTS.
|
||||
|
||||
All the TF related code stays under ```tf``` folder.
|
||||
|
||||
To convert a **compatible** Torch model, run the following command with the right arguments:
|
||||
|
||||
```bash
|
||||
python TTS/bin/convert_tacotron2_torch_to_tf.py\
|
||||
--torch_model_path /path/to/torch/model.pth.tar \
|
||||
--config_path /path/to/model/config.json\
|
||||
--output_path /path/to/output/tf/model
|
||||
```
|
||||
|
||||
This will create a TF model file. Notice that our model format is not compatible with the official TF checkpoints. We created our custom format to match Torch checkpoints we use. Therefore, use the ```load_checkpoint``` and ```save_checkpoint``` functions provided under ```TTS.tf.generic_utils```.
|
|
@ -27,7 +27,6 @@
|
|||
formatting_your_dataset
|
||||
what_makes_a_good_dataset
|
||||
tts_datasets
|
||||
converting_torch_to_tf
|
||||
|
||||
.. toctree::
|
||||
:maxdepth: 2
|
||||
|
|
|
@ -12,12 +12,6 @@ You can install from PyPI as follows:
|
|||
pip install TTS # from PyPI
|
||||
```
|
||||
|
||||
By default, this only installs the requirements for PyTorch. To install the tensorflow dependencies as well, use the `tf` extra.
|
||||
|
||||
```bash
|
||||
pip install TTS[tf]
|
||||
```
|
||||
|
||||
Or install from Github:
|
||||
|
||||
```bash
|
||||
|
|
|
@ -1,425 +0,0 @@
|
|||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"colab_type": "text",
|
||||
"id": "6LWsNd3_M3MP"
|
||||
},
|
||||
"source": [
|
||||
"# Converting Pytorch models to Tensorflow and TFLite by CoquiTTS"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"colab_type": "text",
|
||||
"id": "FAqrSIWgLyP0"
|
||||
},
|
||||
"source": [
|
||||
"This is a tutorial demonstrating Coqui TTS capabilities to convert \n",
|
||||
"trained PyTorch models to Tensorflow and Tflite.\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"colab_type": "text",
|
||||
"id": "MBJjGYnoEo4v"
|
||||
},
|
||||
"source": [
|
||||
"# Installation"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"colab_type": "text",
|
||||
"id": "Ku-dA4DKoeXk"
|
||||
},
|
||||
"source": [
|
||||
"### Download TF Models and configs"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"base_uri": "https://localhost:8080/",
|
||||
"height": 162
|
||||
},
|
||||
"colab_type": "code",
|
||||
"id": "jGIgnWhGsxU1",
|
||||
"outputId": "b461952f-8507-4dd2-af06-4e6b8692765d",
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"!gdown --id 1dntzjWFg7ufWaTaFy80nRz-Tu02xWZos -O data/tts_model.pth.tar\n",
|
||||
"!gdown --id 18CQ6G6tBEOfvCHlPqP8EBI4xWbrr9dBc -O data/config.json"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"base_uri": "https://localhost:8080/",
|
||||
"height": 235
|
||||
},
|
||||
"colab_type": "code",
|
||||
"id": "4dnpE0-kvTsu",
|
||||
"outputId": "f67c3138-bda0-4b3e-ffcc-647f9feec23e",
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"!gdown --id 1Ty5DZdOc0F7OTGj9oJThYbL5iVu_2G0K -O data/vocoder_model.pth.tar\n",
|
||||
"!gdown --id 1Rd0R_nRCrbjEdpOwq6XwZAktvugiBvmu -O data/config_vocoder.json\n",
|
||||
"!gdown --id 11oY3Tv0kQtxK_JPgxrfesa99maVXHNxU -O data/scale_stats.npy"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"colab_type": "text",
|
||||
"id": "3IGvvCRMEwqn"
|
||||
},
|
||||
"source": [
|
||||
"# Model Conversion PyTorch -> TF -> TFLite"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"colab_type": "text",
|
||||
"id": "tLhz8SAf8Pgp"
|
||||
},
|
||||
"source": [
|
||||
"## Converting PyTorch to Tensorflow\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"base_uri": "https://localhost:8080/",
|
||||
"height": 1000
|
||||
},
|
||||
"colab_type": "code",
|
||||
"id": "Xsrvr_WQ8Ib5",
|
||||
"outputId": "dae96616-e5f7-41b6-cdb9-5026cfcd3214",
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# convert TTS model to Tensorflow\n",
|
||||
"!python ../TTS/bin/convert_tacotron2_torch_to_tf.py --config_path data/config.json --torch_model_path data/tts_model.pth.tar --output_path data/tts_model_tf.pkl"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"base_uri": "https://localhost:8080/",
|
||||
"height": 1000
|
||||
},
|
||||
"colab_type": "code",
|
||||
"id": "VJ4NA5If9ljv",
|
||||
"outputId": "1520dca8-1db8-4e07-bc0c-b1d5941c775e",
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# convert Vocoder model to Tensorflow\n",
|
||||
"!python ../TTS/bin/convert_melgan_torch_to_tf.py --config_path data/config_vocoder.json --torch_model_path data/vocoder_model.pth.tar --output_path data/vocoder_model_tf.pkl"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"colab_type": "text",
|
||||
"id": "7d5vTkBZ-BYQ"
|
||||
},
|
||||
"source": [
|
||||
"## Converting Tensorflow to TFLite"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"base_uri": "https://localhost:8080/",
|
||||
"height": 927
|
||||
},
|
||||
"colab_type": "code",
|
||||
"id": "33hTfpuU99cg",
|
||||
"outputId": "8a0e5be1-23a2-4128-ee37-8232adcb8ff0",
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# convert TTS model to TFLite\n",
|
||||
"!python ../TTS/bin/convert_tacotron2_tflite.py --config_path data/config.json --tf_model data/tts_model_tf.pkl --output_path data/tts_model.tflite"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"base_uri": "https://localhost:8080/",
|
||||
"height": 364
|
||||
},
|
||||
"colab_type": "code",
|
||||
"id": "e00Hm75Y-wZ2",
|
||||
"outputId": "42381b05-3c9d-44f0-dac7-d81efd95eadf",
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# convert Vocoder model to TFLite\n",
|
||||
"!python ../TTS/bin/convert_melgan_tflite.py --config_path data/config_vocoder.json --tf_model data/vocoder_model_tf.pkl --output_path data/vocoder_model.tflite"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"colab_type": "text",
|
||||
"id": "Zlgi8fPdpRF0"
|
||||
},
|
||||
"source": [
|
||||
"# Run Inference with TFLite "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {},
|
||||
"colab_type": "code",
|
||||
"id": "f-Yc42nQZG5A"
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def run_vocoder(mel_spec):\n",
|
||||
" vocoder_inputs = mel_spec[None, :, :]\n",
|
||||
" # get input and output details\n",
|
||||
" input_details = vocoder_model.get_input_details()\n",
|
||||
" # reshape input tensor for the new input shape\n",
|
||||
" vocoder_model.resize_tensor_input(input_details[0]['index'], vocoder_inputs.shape)\n",
|
||||
" vocoder_model.allocate_tensors()\n",
|
||||
" detail = input_details[0]\n",
|
||||
" vocoder_model.set_tensor(detail['index'], vocoder_inputs)\n",
|
||||
" # run the model\n",
|
||||
" vocoder_model.invoke()\n",
|
||||
" # collect outputs\n",
|
||||
" output_details = vocoder_model.get_output_details()\n",
|
||||
" waveform = vocoder_model.get_tensor(output_details[0]['index'])\n",
|
||||
" return waveform \n",
|
||||
"\n",
|
||||
"\n",
|
||||
"def tts(model, text, CONFIG, p):\n",
|
||||
" t_1 = time.time()\n",
|
||||
" waveform, alignment, mel_spec, mel_postnet_spec, stop_tokens, inputs = synthesis(model, text, CONFIG, use_cuda, ap, speaker_id, style_wav=None,\n",
|
||||
" truncated=False, enable_eos_bos_chars=CONFIG.enable_eos_bos_chars,\n",
|
||||
" backend='tflite')\n",
|
||||
" waveform = run_vocoder(mel_postnet_spec.T)\n",
|
||||
" waveform = waveform[0, 0]\n",
|
||||
" rtf = (time.time() - t_1) / (len(waveform) / ap.sample_rate)\n",
|
||||
" tps = (time.time() - t_1) / len(waveform)\n",
|
||||
" print(waveform.shape)\n",
|
||||
" print(\" > Run-time: {}\".format(time.time() - t_1))\n",
|
||||
" print(\" > Real-time factor: {}\".format(rtf))\n",
|
||||
" print(\" > Time per step: {}\".format(tps))\n",
|
||||
" IPython.display.display(IPython.display.Audio(waveform, rate=CONFIG.audio['sample_rate'])) \n",
|
||||
" return alignment, mel_postnet_spec, stop_tokens, waveform"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"colab_type": "text",
|
||||
"id": "ZksegYQepkFg"
|
||||
},
|
||||
"source": [
|
||||
"### Load TF Models"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {},
|
||||
"colab_type": "code",
|
||||
"id": "oVa0kOamprgj"
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import os\n",
|
||||
"import torch\n",
|
||||
"import time\n",
|
||||
"import IPython\n",
|
||||
"\n",
|
||||
"from TTS.tts.tf.utils.tflite import load_tflite_model\n",
|
||||
"from TTS.tts.tf.utils.io import load_checkpoint\n",
|
||||
"from TTS.utils.io import load_config\n",
|
||||
"from TTS.tts.utils.text.symbols import symbols, phonemes\n",
|
||||
"from TTS.utils.audio import AudioProcessor\n",
|
||||
"from TTS.tts.utils.synthesis import synthesis"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {},
|
||||
"colab_type": "code",
|
||||
"id": "EY-sHVO8IFSH"
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# runtime settings\n",
|
||||
"use_cuda = False"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {},
|
||||
"colab_type": "code",
|
||||
"id": "_1aIUp2FpxOQ"
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# model paths\n",
|
||||
"TTS_MODEL = \"data/tts_model.tflite\"\n",
|
||||
"TTS_CONFIG = \"data/config.json\"\n",
|
||||
"VOCODER_MODEL = \"data/vocoder_model.tflite\"\n",
|
||||
"VOCODER_CONFIG = \"data/config_vocoder.json\""
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {},
|
||||
"colab_type": "code",
|
||||
"id": "CpgmdBVQplbv"
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# load configs\n",
|
||||
"TTS_CONFIG = load_config(TTS_CONFIG)\n",
|
||||
"VOCODER_CONFIG = load_config(VOCODER_CONFIG)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"base_uri": "https://localhost:8080/",
|
||||
"height": 471
|
||||
},
|
||||
"colab_type": "code",
|
||||
"id": "zmrQxiozIUVE",
|
||||
"outputId": "21cda136-de87-4d55-fd46-7d5306103d90",
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# load the audio processor\n",
|
||||
"TTS_CONFIG.audio['stats_path'] = 'data/scale_stats.npy'\n",
|
||||
"ap = AudioProcessor(**TTS_CONFIG.audio) "
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {},
|
||||
"colab_type": "code",
|
||||
"id": "8fLoI4ipqMeS"
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# LOAD TTS MODEL\n",
|
||||
"# multi speaker \n",
|
||||
"speaker_id = None\n",
|
||||
"speakers = []\n",
|
||||
"\n",
|
||||
"# load the models\n",
|
||||
"model = load_tflite_model(TTS_MODEL)\n",
|
||||
"vocoder_model = load_tflite_model(VOCODER_MODEL)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"colab_type": "text",
|
||||
"id": "Ws_YkPKsLgo-"
|
||||
},
|
||||
"source": [
|
||||
"## Run Sample Sentence"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"base_uri": "https://localhost:8080/",
|
||||
"height": 134
|
||||
},
|
||||
"colab_type": "code",
|
||||
"id": "FuWxZ9Ey5Puj",
|
||||
"outputId": "535c2df1-c27c-458b-e14b-41a977635aa1",
|
||||
"tags": []
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"sentence = \"Bill got in the habit of asking himself “Is that thought true?” and if he wasn’t absolutely certain it was, he just let it go.\"\n",
|
||||
"align, spec, stop_tokens, wav = tts(model, sentence, TTS_CONFIG, ap)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"colab": {
|
||||
"collapsed_sections": [],
|
||||
"name": "Tutorial_Converting_PyTorch_to_TF_to_TFlite.ipynb",
|
||||
"provenance": []
|
||||
},
|
||||
"kernelspec": {
|
||||
"display_name": "Python 3",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.8.5"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 4
|
||||
}
|
|
@ -1 +0,0 @@
|
|||
tensorflow==2.5.0
|
5
setup.py
5
setup.py
|
@ -65,9 +65,7 @@ with open(os.path.join(cwd, "requirements.notebooks.txt"), "r") as f:
|
|||
requirements_notebooks = f.readlines()
|
||||
with open(os.path.join(cwd, "requirements.dev.txt"), "r") as f:
|
||||
requirements_dev = f.readlines()
|
||||
with open(os.path.join(cwd, "requirements.tf.txt"), "r") as f:
|
||||
requirements_tf = f.readlines()
|
||||
requirements_all = requirements_dev + requirements_notebooks + requirements_tf
|
||||
requirements_all = requirements_dev + requirements_notebooks
|
||||
|
||||
with open("README.md", "r", encoding="utf-8") as readme_file:
|
||||
README = readme_file.read()
|
||||
|
@ -116,7 +114,6 @@ setup(
|
|||
"all": requirements_all,
|
||||
"dev": requirements_dev,
|
||||
"notebooks": requirements_notebooks,
|
||||
"tf": requirements_tf,
|
||||
},
|
||||
python_requires=">=3.6.0, <3.10",
|
||||
entry_points={"console_scripts": ["tts=TTS.bin.synthesize:main", "tts-server = TTS.server.server:main"]},
|
||||
|
|
|
@ -1,156 +0,0 @@
|
|||
import os
|
||||
import unittest
|
||||
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
import torch
|
||||
|
||||
from TTS.tts.configs.tacotron2_config import Tacotron2Config
|
||||
from TTS.tts.tf.models.tacotron2 import Tacotron2
|
||||
from TTS.tts.tf.utils.tflite import convert_tacotron2_to_tflite, load_tflite_model
|
||||
|
||||
tf.get_logger().setLevel("INFO")
|
||||
|
||||
|
||||
# pylint: disable=unused-variable
|
||||
|
||||
torch.manual_seed(1)
|
||||
use_cuda = torch.cuda.is_available()
|
||||
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
|
||||
|
||||
c = Tacotron2Config()
|
||||
|
||||
|
||||
class TacotronTFTrainTest(unittest.TestCase):
|
||||
@staticmethod
|
||||
def generate_dummy_inputs():
|
||||
chars_seq = torch.randint(0, 24, (8, 128)).long().to(device)
|
||||
chars_seq_lengths = torch.randint(100, 128, (8,)).long().to(device)
|
||||
chars_seq_lengths = torch.sort(chars_seq_lengths, descending=True)[0]
|
||||
mel_spec = torch.rand(8, 30, c.audio["num_mels"]).to(device)
|
||||
mel_postnet_spec = torch.rand(8, 30, c.audio["num_mels"]).to(device)
|
||||
mel_lengths = torch.randint(20, 30, (8,)).long().to(device)
|
||||
stop_targets = torch.zeros(8, 30, 1).float().to(device)
|
||||
speaker_ids = torch.randint(0, 5, (8,)).long().to(device)
|
||||
|
||||
chars_seq = tf.convert_to_tensor(chars_seq.cpu().numpy())
|
||||
chars_seq_lengths = tf.convert_to_tensor(chars_seq_lengths.cpu().numpy())
|
||||
mel_spec = tf.convert_to_tensor(mel_spec.cpu().numpy())
|
||||
return chars_seq, chars_seq_lengths, mel_spec, mel_postnet_spec, mel_lengths, stop_targets, speaker_ids
|
||||
|
||||
@unittest.skipIf(use_cuda, " [!] Skip Test: TfLite conversion does not work on GPU.")
|
||||
def test_train_step(self):
|
||||
"""test forward pass"""
|
||||
(
|
||||
chars_seq,
|
||||
chars_seq_lengths,
|
||||
mel_spec,
|
||||
mel_postnet_spec,
|
||||
mel_lengths,
|
||||
stop_targets,
|
||||
speaker_ids,
|
||||
) = self.generate_dummy_inputs()
|
||||
|
||||
for idx in mel_lengths:
|
||||
stop_targets[:, int(idx.item()) :, 0] = 1.0
|
||||
|
||||
stop_targets = stop_targets.view(chars_seq.shape[0], stop_targets.size(1) // c.r, -1)
|
||||
stop_targets = (stop_targets.sum(2) > 0.0).unsqueeze(2).float().squeeze()
|
||||
|
||||
model = Tacotron2(num_chars=24, r=c.r, num_speakers=5)
|
||||
# training pass
|
||||
output = model(chars_seq, chars_seq_lengths, mel_spec, training=True)
|
||||
|
||||
# check model output shapes
|
||||
assert np.all(output[0].shape == mel_spec.shape)
|
||||
assert np.all(output[1].shape == mel_spec.shape)
|
||||
assert output[2].shape[2] == chars_seq.shape[1]
|
||||
assert output[2].shape[1] == (mel_spec.shape[1] // model.decoder.r)
|
||||
assert output[3].shape[1] == (mel_spec.shape[1] // model.decoder.r)
|
||||
|
||||
# inference pass
|
||||
output = model(chars_seq, training=False)
|
||||
|
||||
@unittest.skipIf(use_cuda, " [!] Skip Test: TfLite conversion does not work on GPU.")
|
||||
def test_forward_attention(
|
||||
self,
|
||||
):
|
||||
(
|
||||
chars_seq,
|
||||
chars_seq_lengths,
|
||||
mel_spec,
|
||||
mel_postnet_spec,
|
||||
mel_lengths,
|
||||
stop_targets,
|
||||
speaker_ids,
|
||||
) = self.generate_dummy_inputs()
|
||||
|
||||
for idx in mel_lengths:
|
||||
stop_targets[:, int(idx.item()) :, 0] = 1.0
|
||||
|
||||
stop_targets = stop_targets.view(chars_seq.shape[0], stop_targets.size(1) // c.r, -1)
|
||||
stop_targets = (stop_targets.sum(2) > 0.0).unsqueeze(2).float().squeeze()
|
||||
|
||||
model = Tacotron2(num_chars=24, r=c.r, num_speakers=5, forward_attn=True)
|
||||
# training pass
|
||||
output = model(chars_seq, chars_seq_lengths, mel_spec, training=True)
|
||||
|
||||
# check model output shapes
|
||||
assert np.all(output[0].shape == mel_spec.shape)
|
||||
assert np.all(output[1].shape == mel_spec.shape)
|
||||
assert output[2].shape[2] == chars_seq.shape[1]
|
||||
assert output[2].shape[1] == (mel_spec.shape[1] // model.decoder.r)
|
||||
assert output[3].shape[1] == (mel_spec.shape[1] // model.decoder.r)
|
||||
|
||||
# inference pass
|
||||
output = model(chars_seq, training=False)
|
||||
|
||||
@unittest.skipIf(use_cuda, " [!] Skip Test: TfLite conversion does not work on GPU.")
|
||||
def test_tflite_conversion(
|
||||
self,
|
||||
): # pylint:disable=no-self-use
|
||||
model = Tacotron2(
|
||||
num_chars=24,
|
||||
num_speakers=0,
|
||||
r=3,
|
||||
out_channels=80,
|
||||
decoder_output_dim=80,
|
||||
attn_type="original",
|
||||
attn_win=False,
|
||||
attn_norm="sigmoid",
|
||||
prenet_type="original",
|
||||
prenet_dropout=True,
|
||||
forward_attn=False,
|
||||
trans_agent=False,
|
||||
forward_attn_mask=False,
|
||||
location_attn=True,
|
||||
attn_K=0,
|
||||
separate_stopnet=True,
|
||||
bidirectional_decoder=False,
|
||||
enable_tflite=True,
|
||||
)
|
||||
model.build_inference()
|
||||
convert_tacotron2_to_tflite(model, output_path="test_tacotron2.tflite", experimental_converter=True)
|
||||
# init tflite model
|
||||
tflite_model = load_tflite_model("test_tacotron2.tflite")
|
||||
# fake input
|
||||
inputs = tf.random.uniform([1, 4], maxval=10, dtype=tf.int32) # pylint:disable=unexpected-keyword-arg
|
||||
# run inference
|
||||
# get input and output details
|
||||
input_details = tflite_model.get_input_details()
|
||||
output_details = tflite_model.get_output_details()
|
||||
# reshape input tensor for the new input shape
|
||||
tflite_model.resize_tensor_input(
|
||||
input_details[0]["index"], inputs.shape
|
||||
) # pylint:disable=unexpected-keyword-arg
|
||||
tflite_model.allocate_tensors()
|
||||
detail = input_details[0]
|
||||
input_shape = detail["shape"]
|
||||
tflite_model.set_tensor(detail["index"], inputs)
|
||||
# run the tflite_model
|
||||
tflite_model.invoke()
|
||||
# collect outputs
|
||||
decoder_output = tflite_model.get_tensor(output_details[0]["index"])
|
||||
postnet_output = tflite_model.get_tensor(output_details[1]["index"])
|
||||
# remove tflite binary
|
||||
os.remove("test_tacotron2.tflite")
|
|
@ -1,19 +0,0 @@
|
|||
import unittest
|
||||
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
import torch
|
||||
|
||||
from TTS.vocoder.tf.models.melgan_generator import MelganGenerator
|
||||
|
||||
use_cuda = torch.cuda.is_available()
|
||||
|
||||
|
||||
@unittest.skipIf(use_cuda, " [!] Skip Test: Loosy TF support.")
|
||||
def test_melgan_generator():
|
||||
hop_length = 256
|
||||
model = MelganGenerator()
|
||||
# pylint: disable=no-value-for-parameter
|
||||
dummy_input = tf.random.uniform((4, 80, 64))
|
||||
output = model(dummy_input, training=False)
|
||||
assert np.all(output.shape == (4, 1, 64 * hop_length)), output.shape
|
|
@ -1,31 +0,0 @@
|
|||
import os
|
||||
import unittest
|
||||
|
||||
import soundfile as sf
|
||||
import tensorflow as tf
|
||||
import torch
|
||||
from librosa.core import load
|
||||
|
||||
from tests import get_tests_input_path, get_tests_output_path, get_tests_path
|
||||
from TTS.vocoder.tf.layers.pqmf import PQMF
|
||||
|
||||
TESTS_PATH = get_tests_path()
|
||||
WAV_FILE = os.path.join(get_tests_input_path(), "example_1.wav")
|
||||
use_cuda = torch.cuda.is_available()
|
||||
|
||||
|
||||
@unittest.skipIf(use_cuda, " [!] Skip Test: Loosy TF support.")
|
||||
def test_pqmf():
|
||||
w, sr = load(WAV_FILE)
|
||||
|
||||
layer = PQMF(N=4, taps=62, cutoff=0.15, beta=9.0)
|
||||
w, sr = load(WAV_FILE)
|
||||
w2 = tf.convert_to_tensor(w[None, None, :])
|
||||
b2 = layer.analysis(w2)
|
||||
w2_ = layer.synthesis(b2)
|
||||
w2_ = w2.numpy()
|
||||
|
||||
print(w2_.max())
|
||||
print(w2_.min())
|
||||
print(w2_.mean())
|
||||
sf.write(os.path.join(get_tests_output_path(), "tf_pqmf_output.wav"), w2_.flatten(), sr)
|
Loading…
Reference in New Issue