# Graphs
import matplotlib.pyplot as plt
# Preprocessing
from sklearn.preprocessing import LabelEncoder, StandardScaler, MinMaxScaler, MaxAbsScaler, QuantileTransformer, OneHotEncoder
from sklearn.model_selection import train_test_split
# Metrics
from sklearn.metrics import roc_curve, roc_auc_score, auc, ConfusionMatrixDisplay, classification_report
from sklearn import tree
# SHAP values
import shap
#-------------------------------------------------------------------------------
def read_csv(path, drop_id=False):
""""Read csv files
:param path str: path to the csv file.
:return: dataframe from the csv file.
:rtype: pd.DataFrame
"""
df = pd.read_csv(path,
# dtype={'M': str}
)
if drop_id:
df.drop(columns=['ID_estudo'], inplace=True)
print(df.shape)
return df
#-------------------------------------------------------------------------------
def save_csv(df, path):
"""Save csv files
:param df pd.DataFrame: dataframe to be saved.
:param path str: path to save the csv file.
:return: no value
:rtype: none
"""
df.to_csv(path, encoding='utf-8', index=False)
print('CSV file saved successfully!')
#-------------------------------------------------------------------------------
def variables_preprocessing(df):
"""Do some preprocessing on the DataFrame like strings splits, fill NaN values,
replace values and drop some columns.
:param df pd.DataFrame: DataFrame to be preprocessed.
:return: DataFrame after be preprocessed and get some columns removed
:rtype: pd.DataFrame
"""
df_aux = df.copy()
no_info = '**Sem informação**'
# Excluding ECGRUP with X and Y values
df_aux = df_aux[~df_aux.ECGRUP.isin(['X','Y'])]
# Get 'comportamento' = 3
df_aux = df_aux[df_aux.comportamento == 3]
# Select only morphologies 81403
df_aux = df_aux[df_aux.MORFO == 81403]
# DRS
DRS_expand = df_aux.DRS.str.split(' ', expand=True)
df_aux['DRS'] = DRS_expand[1]
df_aux.DRS.fillna(0, inplace=True)
# META
# df_aux.META01.fillna(no_info, inplace=True)
# df_aux.META02.fillna(no_info, inplace=True)
# df_aux.META03.fillna(no_info, inplace=True)
# df_aux.META04.fillna(no_info, inplace=True)
# REC
# df_aux.REC01.fillna(no_info, inplace=True)
# df_aux.REC02.fillna(no_info, inplace=True)
# df_aux.REC03.fillna(no_info, inplace=True)
df_sp = df_aux[df_aux.UFRESID == 'SP']
col = df_sp.columns
drop_cols = ['UFRESID', 'UFNASC', 'REC04', 'CIDADE', 'DESCTOPO', 'DESCMORFO',
'META01', 'META02', 'META03', 'META04', 'REC01', 'REC02', 'REC03',
'comportamento', 'MORFO', 'TMOAPOS', 'TOPO', 'TOPOGRUP', 'T',
'N', 'M', 'NAOTRAT', 'TRATAMENTO', 'TRATFAPOS', 'NENHUMAPOS',
'CIRURAPOS', 'RADIOAPOS', 'QUIMIOAPOS', 'HORMOAPOS', 'IMUNOAPOS',
'OUTROAPOS', 'RECLOCAL', 'RECREGIO', 'RECDIST', 'HABILIT']
col = col.drop(drop_cols)
return df_sp[col]
#-------------------------------------------------------------------------------
def get_dates_diff(df, dates_list):
"""Get the difference, in days, between columns with dates
:param df pd.DataFrame: DataFrame to get the dates difference.
:param dates_list list: list with the name of date columns.
:return: DataFrame with dates difference in nine new columns
:rtype: pd.DataFrame
"""
df_aux = df.copy()
df_aux.dropna(subset=['DTTRAT', 'DTULTINFO'], inplace=True)
for c in dates_list:
if c in ['DTTRAT', 'DTULTINFO', 'DTRECIDIVA']: # Has a different date format
fmt = '%Y-%m-%d %H:%M:%S'
else:
fmt = '%Y-%m-%d'
df_aux[c] = pd.to_datetime(df_aux[c], format=fmt)
df_aux['CONSDIAG'] = (df_aux.DTDIAG - df_aux.DTCONSULT).dt.days
df_aux['DIAGTRAT'] = (df_aux.DTTRAT - df_aux.DTDIAG).dt.days
df_aux['TRATCONS'] = (df_aux.DTTRAT - df_aux.DTCONSULT).dt.days
# df_aux['RECCONS'] = (df_aux.DTRECIDIVA - df_aux.DTCONSULT).dt.days
# df_aux['RECDIAG'] = (df_aux.DTRECIDIVA - df_aux.DTDIAG).dt.days
# df_aux['RECTRAT'] = (df_aux.DTRECIDIVA - df_aux.DTTRAT).dt.days
df_aux['ULTICONS'] = (df_aux.DTULTINFO - df_aux.DTCONSULT).dt.days
df_aux['ULTIDIAG'] = (df_aux.DTULTINFO - df_aux.DTDIAG).dt.days
df_aux['ULTITRAT'] = (df_aux.DTULTINFO - df_aux.DTTRAT).dt.days
df_aux.drop(columns=['DTCONSULT', 'DTDIAG', 'DTTRAT', 'DTRECIDIVA', 'DTULTINFO'],
inplace=True)
return df_aux
#-------------------------------------------------------------------------------
def get_labels(df):
"""Create death labels acording to the last information year.
:param df pd.DataFrame: dataframe to be processed.
:return: DataFrame with the new labels
:rtype: pd.DataFrame
"""
df_aux = df.copy()
df_aux['obito_geral'] = 0
df_aux['obito_cancer'] = 0
df_aux['vivo_ano1'] = 0
df_aux['vivo_ano3'] = 0
df_aux['vivo_ano5'] = 0
df_aux.loc[df_aux.ULTINFO > 2, 'obito_geral'] = 1
df_aux.loc[df_aux.ULTINFO == 3, 'obito_cancer'] = 1
df_aux.loc[df_aux.ULTIDIAG > 365, 'vivo_ano1'] = 1
df_aux.loc[df_aux.ULTIDIAG > 3*365, 'vivo_ano3'] = 1
df_aux.loc[df_aux.ULTIDIAG > 5*365, 'vivo_ano5'] = 1
return df_aux
#-------------------------------------------------------------------------------
def get_label_rec(df):
"""Create the labels analyzing whether there was recurrence.
:param df pd.DataFrame: dataframe to be processed.
:return: DataFrame with the new labels
:rtype: pd.DataFrame
"""
df_aux = df.copy()
df_aux['ob_com_rec'] = 0
df_aux['ob_sem_rec'] = 0
df_aux['vivo_com_rec'] = 0
df_aux['vivo_sem_rec'] = 0
df_aux.loc[(df_aux.obito_geral == 1) & (df_aux.RECNENHUM == 1), 'ob_sem_rec'] = 1
df_aux.loc[(df_aux.obito_geral == 1) & (df_aux.RECNENHUM == 0), 'ob_com_rec'] = 1
df_aux.loc[(df_aux.obito_geral == 0) & (df_aux.RECNENHUM == 1), 'vivo_sem_rec'] = 1
df_aux.loc[(df_aux.obito_geral == 0) & (df_aux.RECNENHUM == 0), 'vivo_com_rec'] = 1
return df_aux
#-------------------------------------------------------------------------------
def get_train_test(df, drop_cols, label, test_size=0.25, random_state=0):
"""Get features and label, and then returns train and test dataframes.
:param df pd.DataFrame: dataframe that will be splitted.
:param drop_cols list: columns to be removed from the DataFrame.
:param label str: name of the label column.
:param test_size float: size of test (default=0.25).
:param random_state int: value for train_test_split random_state (default=10).
:return: train and test DataFrames, X_train, X_test, y_train, y_test
:rtype: pd.DataFrame
"""
df_aux = df.copy()
cols = df_aux.columns.drop(drop_cols)
lb = df_aux[label].copy()
cols = cols.drop(label)
feat = df_aux[cols]
X_train, X_test, y_train, y_test = train_test_split(feat, lb,
test_size=test_size,
random_state=random_state,
stratify=lb)
return X_train, X_test, y_train, y_test
#-------------------------------------------------------------------------------
def train_preprocessing(df, encoder_type='LabelEncoder', normalizer='StandardScaler',
pca=False, pca_components=None, random_state=0):
"""Preprocessing the train dataset.
:param df pd.DataFrame: DataFrame to be preprocessed.
:param encoder_type string: Encoder type to use for categorical features (default='LabelEncoder').
options:
* 'LabelEncoder'
* 'OneHotEncoder'
:param normalizer str: which normalizer to be fitted to the data (default='StandardScaler').
options:
* 'StandardScaler'
* 'MinMaxScaler'
* 'MaxAbsScaler'
* 'QuantileTransformer'
:param pca bool: if want to use PCA components set True (default=False).
:param pca_components int: number of PCA components (default=None).
:param random_state int: value for pca random_state (default=10).
:return df: preprocessed train DataFrame
:rtype: pd.DataFrame
:return enc: trained LabelEncoder
:rtype: dict
:return norm: trained normalizer
:rtype: object
:return pca if param pca=True: trained PCA
:rtype: object
:return feat_cols: list with features names
:rtype: list
"""
df_aux = df.copy()
list_categorical = df_aux.select_dtypes(include='object').columns
enc = dict()
if encoder_type == 'LabelEncoder':
for col in list_categorical:
enc[col] = LabelEncoder()
df_aux[col] = enc[col].fit_transform(df_aux[col])
elif encoder_type == 'OneHotEncoder':
for col in list_categorical:
enc[col] = OneHotEncoder(handle_unknown='ignore')
ohe_results = enc[col].fit_transform(df_aux[[col]])
df1 = pd.DataFrame(ohe_results.toarray(), columns=[f'{col}_{name}' for name in enc[col].categories_[0]], index=df_aux[col].index)
df_aux = df_aux.merge(df1, how='left', left_index=True, right_index=True)
df_aux.drop(columns=list_categorical, inplace=True)
feat_cols = df_aux.columns
if normalizer == 'StandardScaler':
norm = StandardScaler()
elif normalizer == 'MinMaxScaler':
norm = MinMaxScaler((0, 1))
elif normalizer == 'MaxAbsScaler':
norm = MaxAbsScaler()
elif normalizer == 'QuantileTransformer':
norm = QuantileTransformer(output_distribution='normal')
df_aux = norm.fit_transform(df_aux)
if pca:
pca = PCA(pca_components, random_state=random_state)
df_aux = pca.fit_transform(df_aux)
return df_aux, enc, norm, pca, feat_cols
else:
return df_aux, enc, norm, feat_cols
#-------------------------------------------------------------------------------
def test_preprocessing(df, enc, norm, encoder_type='LabelEncoder', pca=None):
"""Preprocessing the test dataset.
:param df pd.DataFrame: DataFrame to be preprocessed.
:param enc: trained encoder with the categorical features.
:param norm: trained normalizer.
:param encoder_type string: Encoder type to use for categorical features (default='LabelEncoder').
options:
* 'LabelEncoder'
* 'OneHotEncoder'
:param pca: trained PCA (default=None).
:return: preprocessed test DataFrame
:rtype: pd.DataFrame
"""
df_aux = df.copy()
df_aux.fillna(0, inplace=True)
list_categorical = df_aux.select_dtypes(include='object').columns
if encoder_type == 'LabelEncoder':
for col in list_categorical:
df_aux.loc[~df_aux[col].isin(enc[col].classes_), col] = -1
df_aux.loc[df_aux[col].isin(enc[col].classes_), col] = enc[col].transform(df_aux[col][df_aux[col].isin(enc[col].classes_)])
elif encoder_type == 'OneHotEncoder':
for col in list_categorical:
ohe_results = enc[col].transform(df_aux[[col]])
df1 = pd.DataFrame(ohe_results.toarray(), columns=[f'{col}_{name}' for name in enc[col].categories_[0]], index=df_aux[col].index)
df_aux = df_aux.merge(df1, how='left', left_index=True, right_index=True)
df_aux.drop(columns=list_categorical, inplace=True)
df_aux = norm.transform(df_aux)
if pca != None:
df_aux = pca.transform(df_aux)
return df_aux
#-------------------------------------------------------------------------------
def preprocessing(df, cols_drop, label, test_size=0.25, encoder_type='LabelEncoder',
norm_name='StandardScaler', return_enc_norm=False, pca=False,
pca_components=None, balance_data=True, group_years=False,
first_year=None, last_year=None, morpho3=False, random_state=0):
"""Preprocessing the train and test datasets.
:param df pd.DataFrame: DataFrame to be preprocessed.
:param cols_drop list: list of columns to be dropped from dataset.
:param label string: name of the column that will be the label.
:param test_size float: size of test set (default=0.25).
:param encoder_type string: Encoder type to use for categorical features (default='LabelEncoder').
options:
* 'LabelEncoder'
* 'OneHotEncoder'
:param norm_name str: which normalizer to be fitted to the data (default='StandardScaler').
- options:
* 'StandardScaler';
* 'MinMaxScaler';
* 'MaxAbsScaler';
* 'PowerTransformer';
* 'QuantileTransformer'.
:param return_enc_norm bool: if want to return the encoder and the normalizer set True (default=False).
:param pca bool: if want to use PCA components set True (default=False).
:param pca_components int: number of PCA components (default=None).
:param balance_data bool: balance the data using oversampling (default=True).
:param group_years bool: create a subset with years grouped (default=False).
:param first_year int: first year of the grouped years. Ignored if group_years = False.
:param last_year int: last year of the grouped years. Ignored if group_years = False.
:param morpho3 bool: use only morphologies that the last number is equal to 3 (default=False).
:param random_state int: value for pca random_state (default=10).
:return X_train_: preprocessed train DataFrame
:rtype: pd.DataFrame
:return X_test_: preprocessed test DataFrame
:rtype: pd.DataFrame
:return y_train_: preprocessed train label
:rtype: pd.DataFrame
:return y_test: preprocessed test label
:rtype: pd.DataFrame
:return feat_cols: list with the features columns names
:rtype: list
"""
df_aux = df.copy()
# Morphology 3
if morpho3:
df_aux['comportamento'] = [int(repr(i)[-1]) for i in df_aux.MORFO]
df_aux = df_aux[df_aux.comportamento == 3].copy()
df_aux.drop(columns='comportamento', inplace=True)
# Grouped years
if group_years and first_year != None and last_year != None:
df_aux = df_aux[(df_aux.ANODIAG >= first_year) & (df_aux.ANODIAG <= last_year)].copy()
# Train Test split
X_train, X_test, y_train, y_test = get_train_test(df_aux, cols_drop, label,
test_size,
random_state=random_state)
# Preprocessing
if pca and pca_components != None:
X_train_enc, enc, norm, pca, feat_cols = train_preprocessing(X_train, encoder_type=encoder_type,
normalizer=norm_name, pca=pca,
pca_components=pca_components,
random_state=random_state)
X_test_ = test_preprocessing(X_test, enc, norm,
encoder_type, pca)
else:
X_train_enc, enc, norm, feat_cols = train_preprocessing(X_train, encoder_type=encoder_type,
normalizer=norm_name)
X_test_ = test_preprocessing(X_test, enc, norm, encoder_type)
# Balancing
if balance_data:
X_train_, y_train_ = SMOTE(random_state=random_state).fit_resample(X_train_enc, y_train)
else:
X_train_, y_train_ = X_train_enc, y_train
print(f'X_train = {X_train_.shape}, X_test = {X_test_.shape}')
print(f'y_train = {y_train_.shape}, y_test = {y_test.shape}')
if return_enc_norm:
return X_train_, X_test_, y_train_, y_test, feat_cols, enc, norm
else:
return X_train_, X_test_, y_train_, y_test, feat_cols
#-------------------------------------------------------------------------------
def show_tree(model, feat_cols, max_depth=3, estimator=0):
"""Show the Random Forest tree
:param model: machine learning model.
:param feat_cols list: list of the features used in the model training.
:param max_depth int: max_depth to show in the tree (default = 3).
:param estimator int: number of the estimator do show the tree (default = 0).
:return: no value
:rtype: none
"""
plt.figure(figsize = (22, 10))
tree.plot_tree(model.estimators_[estimator],
feature_names=feat_cols,
filled=True,
max_depth=max_depth);
#-------------------------------------------------------------------------------
def plot_feat_importances(model, feat_cols, n=10):
"""Shows the features importances for the model.
:param model: machine learning model.
:param feat_cols list: list of the features used in the model training.
:param n int: number of features to be shown (default=10).
:return: no value
:rtype: none
"""
feat_import = pd.Series(model.feature_importances_, index=feat_cols)
feat_import.nlargest(n).plot(kind='barh', figsize=(10, 8))
plt.show()
#-------------------------------------------------------------------------------
def plot_roc_curve(model, X_train, X_test, y_train, y_test):
"""Plot the ROC curve for train and test sets.
:param model: Trained machine learning model.
:param X_train: Features of training set.
:param X_test: Features of test set.
:param y_train: Label of training set.
:param y_test: Label of test set.
:return: no value
:rtype: None
"""
probas_train = model.predict_proba(X_train)[:, 1]
probas_test = model.predict_proba(X_test)[:, 1]
fp_train, tp_train, _ = roc_curve(y_train, probas_train)
fp_test, tp_test, _ = roc_curve(y_test, probas_test)
plt.figure(figsize=(10, 7))
plt.plot(fp_train, tp_train, 'b', label=f'Train (AUC = {auc(fp_train, tp_train):.3f})')
plt.plot(fp_test, tp_test, 'r', label=f'Test (AUC = {auc(fp_test, tp_test):.3f})')
plt.plot(np.linspace(0, 1, 100),
np.linspace(0, 1, 100),
label='Baseline',
linestyle='--',
color='k')
plt.xlabel('False Positives')
plt.ylabel('True Positives')
plt.grid(True)
plt.legend()
plt.show()
#-------------------------------------------------------------------------------
def plot_confusion_matrix(model, x, y, format='.3f'):
'''Plot the confusion matrix.
:param model: Trained machine learning model.
:param x: Features set.
:param y: Label set.
:param format string: Format for the numbers in the confusion matrix (default ".3f")
:return: no value
:rtype: None
'''
with plt.rc_context({'font.size': 12, 'font.weight': 'bold'}):
ConfusionMatrixDisplay.from_estimator(model, x, y, values_format=format,
cmap='Blues', normalize='true')
plt.show()
print(f'\n{classification_report(y, model.predict(x), digits=3)}')
#-------------------------------------------------------------------------------
def plot_shap_values(model, x, features, max_display=10):
"""Plot the shap values.
:param model: Trained machine learning model.
:param x: Features set.
:param features: Features names.
:param max_display int: Max features to show shap values (default=10)
:return: no value
:rtype: None
"""
shap_values = shap.TreeExplainer(model).shap_values(x)
try:
shap.summary_plot(shap_values[1], x,
feature_names=features,
max_display=max_display)
except AssertionError:
shap.summary_plot(shap_values, x,
feature_names=features,
max_display=max_display)
#-------------------------------------------------------------------------------
def roc_together(X, y, naive_bayes=None, random_forest=None, xgboost=None,
lightgbm=None):
"""
"""
plt.figure(figsize=(10, 7))
if naive_bayes != None:
probas_nb = naive_bayes.predict_proba(X)[:, 1]
fp_nb, tp_nb, _ = roc_curve(y, probas_nb)
plt.plot(fp_nb, tp_nb, 'k', linestyle='dashed',
label=f'Naive Bayes (AUC = {auc(fp_nb, tp_nb):.3f})')
if random_forest != None:
probas_rf = random_forest.predict_proba(X)[:, 1]
fp_rf, tp_rf, _ = roc_curve(y, probas_rf)
plt.plot(fp_rf, tp_rf, 'k', linestyle='dashdot',
label=f'Random Forest (AUC = {auc(fp_rf, tp_rf):.3f})')
if xgboost != None:
probas_xgb = xgboost.predict_proba(X)[:, 1]
fp_xgb, tp_xgb, _ = roc_curve(y, probas_xgb)
plt.plot(fp_xgb, tp_xgb, 'k',
label=f'XGBoost (AUC = {auc(fp_xgb, tp_xgb):.3f})')
if lightgbm != None:
probas_lgbm = lightgbm.predict_proba(X)[:, 1]
fp_lgbm, tp_lgbm, _ = roc_curve(y, probas_lgbm)
plt.plot(fp_lgbm, tp_lgbm, 'k', linestyle='dashed',
label=f'LightGBM (AUC = {auc(fp_lgbm, tp_lgbm):.3f})')
plt.plot(np.linspace(0, 1, 100),
np.linspace(0, 1, 100),
label='Baseline',
linestyle='dotted',
color='gray')
plt.xlabel('False Positives')
plt.ylabel('True Positives')
plt.grid(True)
plt.legend()
plt.show()