import re
import logging
import unicodedata
import numpy as np
import pandas as pd
from collections import Counter
[docs]
class PreprocessSeq2Seq:
"""
Loading Data and Generating Source, target Data for SEQ2SEQ Model Training.
:param config_dict: Config Params Dictionary
:type config_dict: dict
"""
def __init__(self, config_dict):
self.logger = logging.getLogger(__name__)
self.config_dict = config_dict
self.num_src_vocab = config_dict["dataset"]["num_src_vocab"]
self.num_tgt_vocab = config_dict["dataset"]["num_tgt_vocab"]
self.seq_len = config_dict["dataset"]["seq_len"]
self.df, self.test_df = self.extract_data()
self.get_vocab(self.df)
[docs]
def get_data(self, df):
"""
Generating Source and Target Array of Tokens
:param df: DataFrame with Source and Target sentences
:type df: pandas.DataFrame
:return: Source and Target Arrays
:rtype: tuple (numpy.ndarray [num_samples, seq_len], numpy.ndarray [num_samples, seq_len])
"""
src = list(df["Source"].map(lambda x: self.preprocess_src(x)))
tgt = list(df["Target"].map(lambda x: self.preprocess_tgt(x)))
tokenSrc = np.zeros((len(src), self.seq_len))
tokenTgt = np.zeros((len(tgt), self.seq_len))
for i, (s, t) in enumerate(zip(src, tgt)):
s = ["<SOS>"] + s[: self.seq_len - 2] + ["<EOS>"]
t = ["<SOS>"] + t[: self.seq_len - 2] + ["<EOS>"]
if len(s) < self.seq_len:
s = s + ["<PAD>"] * (self.seq_len - len(s))
if len(t) < self.seq_len:
t = s + ["<PAD>"] * (self.seq_len - len(t))
for j, (s_w, t_w) in enumerate(zip(s, t)):
bool_src = s_w in self.vocabSrc
tokenSrc[i][j] = (
self.word2idSrc[s_w] if bool_src else self.word2idSrc["<UNK>"]
)
bool_tgt = t_w in self.vocabTgt
tokenTgt[i][j] = (
self.word2idTgt[t_w] if bool_tgt else self.word2idTgt["<UNK>"]
)
return tokenSrc, tokenTgt
[docs]
def get_vocab(self, df):
"""
Generates Vocabulary
:param df: DataFrame with Source and Target sentences
:type df: pandas.DataFrame
"""
self.logger.info(
"Building Vocabulary for Source and Target Languages using Training data"
)
src = list(df["Source"].map(lambda x: self.preprocess_src(x)))
tgt = list(df["Target"].map(lambda x: self.preprocess_tgt(x)))
all_src_words = [word for sent in src for word in sent]
topk_src_vocab_freq = Counter(all_src_words).most_common(self.num_src_vocab - 4)
self.vocabSrc = np.array(
["<PAD>", "<UNK>", "<SOS>", "<EOS>"]
+ [word[0] for word in topk_src_vocab_freq]
)
self.word2idSrc = {w: i for i, w in enumerate(self.vocabSrc)}
self.id2wordSrc = {v: k for k, v in self.word2idSrc.items()}
all_tgt_words = [word for sent in tgt for word in sent]
topk_tgt_vocab_freq = Counter(all_tgt_words).most_common(self.num_tgt_vocab - 4)
self.vocabTgt = np.array(
["<PAD>", "<UNK>", "<SOS>", "<EOS>"]
+ [word[0] for word in topk_tgt_vocab_freq]
)
self.word2idTgt = {w: i for i, w in enumerate(self.vocabTgt)}
self.id2wordTgt = {v: k for k, v in self.word2idTgt.items()}
[docs]
def batched_ids2tokens(self, tokens, type="src"):
"""
Converting sentence of ids to tokens
:param tokens: Tokens Array, 2D array (num_samples, seq_len)
:type tokens: numpy.ndarray
:param type: {'src', 'tgt'} Type of tokens, defaults to "src"
:type type: str, optional
:return: List of decoded sentences
:rtype: list
"""
if type == "src":
func = lambda x: self.id2wordSrc[x]
else:
func = lambda x: self.id2wordTgt[x]
vect_func = np.vectorize(func)
tokens = vect_func(tokens)
sentences = []
for words in tokens:
txt = ""
for word in words:
if word not in ["<SOS>", "<EOS>", "<PAD>"]:
txt += f"{word} "
sentences.append(txt[:-1])
return sentences
[docs]
def preprocess_src(self, text):
"""
Preprocessing Source Sentence
:param text: Sentence
:type text: str
:return: Preprocessed sentence
:rtype: str
"""
text = text.lower().strip()
text = re.sub(r"([?.!,¿_])", r" \1 ", text)
text = re.sub(r'[" "]+', " ", text)
text = re.sub(r"[^a-zA-Z?.!,¿_]+", " ", text)
text = text.strip()
return text.split()
[docs]
def preprocess_tgt(self, text):
"""
Preprocessing Target Sentence
:param text: Sentence
:type text: str
:return: Preprocessed sentence
:rtype: str
"""
text = "".join(
c
for c in unicodedata.normalize("NFD", text)
if unicodedata.category(c) != "BN"
)
text = re.sub(r"([?.!,¿_])", r" \1 ", text)
text = re.sub(r'[" "]+', " ", text)
text = text.strip()
return text.split()