Importações

[1]:
# !pip install shap
# !pip install optuna
[2]:
seed = 10 # semente para o random_state

# Básicas
import pandas as pd
import numpy as np

# Modelos
from sklearn.naive_bayes import GaussianNB
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier

from sklearn.manifold import TSNE
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.model_selection import RandomizedSearchCV

import optuna
from optuna.samplers import RandomSampler
from optuna.visualization import plot_optimization_history
/home/docs/checkouts/readthedocs.org/user_builds/colorectal-site/envs/latest/lib/python3.8/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html
  from .autonotebook import tqdm as notebook_tqdm

Funções

[3]:
# Graphs
import matplotlib.pyplot as plt

# Preprocessing
from sklearn.preprocessing import LabelEncoder, StandardScaler, MinMaxScaler, MaxAbsScaler, QuantileTransformer, OneHotEncoder
from sklearn.model_selection import train_test_split

# Metrics
from sklearn.metrics import roc_curve, roc_auc_score, auc, ConfusionMatrixDisplay, classification_report
from sklearn import tree

# SHAP values
import shap

#-------------------------------------------------------------------------------

def read_csv(path, drop_id=False):
    """"Read csv files

    :param path str: path to the csv file.

    :return: dataframe from the csv file.
    :rtype: pd.DataFrame
    """

    df = pd.read_csv(path,
                    #    dtype={'M': str}
                    )
    if drop_id:
        df.drop(columns=['ID_estudo'], inplace=True)

    print(df.shape)

    return df
#-------------------------------------------------------------------------------

def save_csv(df, path):
    """Save csv files

    :param df pd.DataFrame: dataframe to be saved.
    :param path str: path to save the csv file.

    :return: no value
    :rtype: none
    """

    df.to_csv(path, encoding='utf-8', index=False)
    print('CSV file saved successfully!')
#-------------------------------------------------------------------------------

def variables_preprocessing(df):
    """Do some preprocessing on the DataFrame like strings splits, fill NaN values,
       replace values and drop some columns.

    :param df pd.DataFrame: DataFrame to be preprocessed.

    :return: DataFrame after be preprocessed and get some columns removed
    :rtype: pd.DataFrame
    """

    df_aux = df.copy()
    no_info = '**Sem informação**'

    # Excluding ECGRUP with X and Y values
    df_aux = df_aux[~df_aux.ECGRUP.isin(['X','Y'])]

    # Get 'comportamento' = 3
    df_aux = df_aux[df_aux.comportamento == 3]

    # Select only morphologies 81403
    df_aux = df_aux[df_aux.MORFO == 81403]

    # DRS
    DRS_expand = df_aux.DRS.str.split(' ', expand=True)
    df_aux['DRS'] = DRS_expand[1]
    df_aux.DRS.fillna(0, inplace=True)

    # META
    # df_aux.META01.fillna(no_info, inplace=True)
    # df_aux.META02.fillna(no_info, inplace=True)
    # df_aux.META03.fillna(no_info, inplace=True)
    # df_aux.META04.fillna(no_info, inplace=True)

    # REC
    # df_aux.REC01.fillna(no_info, inplace=True)
    # df_aux.REC02.fillna(no_info, inplace=True)
    # df_aux.REC03.fillna(no_info, inplace=True)

    df_sp = df_aux[df_aux.UFRESID == 'SP']

    col = df_sp.columns
    drop_cols = ['UFRESID', 'UFNASC', 'REC04', 'CIDADE', 'DESCTOPO', 'DESCMORFO',
                 'META01', 'META02', 'META03', 'META04', 'REC01', 'REC02', 'REC03',
                 'comportamento', 'MORFO', 'TMOAPOS', 'TOPO', 'TOPOGRUP', 'T',
                 'N', 'M', 'NAOTRAT', 'TRATAMENTO', 'TRATFAPOS', 'NENHUMAPOS',
                 'CIRURAPOS', 'RADIOAPOS', 'QUIMIOAPOS', 'HORMOAPOS', 'IMUNOAPOS',
                 'OUTROAPOS', 'RECLOCAL', 'RECREGIO', 'RECDIST', 'HABILIT']

    col = col.drop(drop_cols)

    return df_sp[col]
#-------------------------------------------------------------------------------

def get_dates_diff(df, dates_list):
    """Get the difference, in days, between columns with dates

    :param df pd.DataFrame: DataFrame to get the dates difference.
    :param dates_list list: list with the name of date columns.

    :return: DataFrame with dates difference in nine new columns
    :rtype: pd.DataFrame
    """

    df_aux = df.copy()

    df_aux.dropna(subset=['DTTRAT', 'DTULTINFO'], inplace=True)

    for c in dates_list:
        if c in ['DTTRAT', 'DTULTINFO', 'DTRECIDIVA']: # Has a different date format
            fmt = '%Y-%m-%d %H:%M:%S'
        else:
            fmt = '%Y-%m-%d'
        df_aux[c] = pd.to_datetime(df_aux[c], format=fmt)

    df_aux['CONSDIAG'] = (df_aux.DTDIAG - df_aux.DTCONSULT).dt.days
    df_aux['DIAGTRAT'] = (df_aux.DTTRAT - df_aux.DTDIAG).dt.days
    df_aux['TRATCONS'] = (df_aux.DTTRAT - df_aux.DTCONSULT).dt.days

    # df_aux['RECCONS'] = (df_aux.DTRECIDIVA - df_aux.DTCONSULT).dt.days
    # df_aux['RECDIAG'] = (df_aux.DTRECIDIVA - df_aux.DTDIAG).dt.days
    # df_aux['RECTRAT'] = (df_aux.DTRECIDIVA - df_aux.DTTRAT).dt.days

    df_aux['ULTICONS'] = (df_aux.DTULTINFO - df_aux.DTCONSULT).dt.days
    df_aux['ULTIDIAG'] = (df_aux.DTULTINFO - df_aux.DTDIAG).dt.days
    df_aux['ULTITRAT'] = (df_aux.DTULTINFO - df_aux.DTTRAT).dt.days

    df_aux.drop(columns=['DTCONSULT', 'DTDIAG', 'DTTRAT', 'DTRECIDIVA', 'DTULTINFO'],
                inplace=True)

    return df_aux
#-------------------------------------------------------------------------------

def get_labels(df):
    """Create death labels acording to the last information year.

    :param df pd.DataFrame: dataframe to be processed.

    :return: DataFrame with the new labels
    :rtype: pd.DataFrame
    """

    df_aux = df.copy()

    df_aux['obito_geral'] = 0
    df_aux['obito_cancer'] = 0

    df_aux['vivo_ano1'] = 0
    df_aux['vivo_ano3'] = 0
    df_aux['vivo_ano5'] = 0

    df_aux.loc[df_aux.ULTINFO > 2, 'obito_geral'] = 1

    df_aux.loc[df_aux.ULTINFO == 3, 'obito_cancer'] = 1

    df_aux.loc[df_aux.ULTIDIAG > 365, 'vivo_ano1'] = 1
    df_aux.loc[df_aux.ULTIDIAG > 3*365, 'vivo_ano3'] = 1
    df_aux.loc[df_aux.ULTIDIAG > 5*365, 'vivo_ano5'] = 1

    return df_aux
#-------------------------------------------------------------------------------

def get_label_rec(df):
    """Create the labels analyzing whether there was recurrence.

    :param df pd.DataFrame: dataframe to be processed.

    :return: DataFrame with the new labels
    :rtype: pd.DataFrame
    """

    df_aux = df.copy()

    df_aux['ob_com_rec'] = 0
    df_aux['ob_sem_rec'] = 0
    df_aux['vivo_com_rec'] = 0
    df_aux['vivo_sem_rec'] = 0

    df_aux.loc[(df_aux.obito_geral == 1) & (df_aux.RECNENHUM == 1), 'ob_sem_rec'] = 1
    df_aux.loc[(df_aux.obito_geral == 1) & (df_aux.RECNENHUM == 0), 'ob_com_rec'] = 1
    df_aux.loc[(df_aux.obito_geral == 0) & (df_aux.RECNENHUM == 1), 'vivo_sem_rec'] = 1
    df_aux.loc[(df_aux.obito_geral == 0) & (df_aux.RECNENHUM == 0), 'vivo_com_rec'] = 1

    return df_aux
#-------------------------------------------------------------------------------

def get_train_test(df, drop_cols, label, test_size=0.25, random_state=0):
    """Get features and label, and then returns train and test dataframes.

    :param df pd.DataFrame: dataframe that will be splitted.
    :param drop_cols list: columns to be removed from the DataFrame.
    :param label str: name of the label column.
    :param test_size float: size of test (default=0.25).
    :param random_state int: value for train_test_split random_state (default=10).

    :return: train and test DataFrames, X_train, X_test, y_train, y_test
    :rtype: pd.DataFrame
    """

    df_aux = df.copy()

    cols = df_aux.columns.drop(drop_cols)
    lb = df_aux[label].copy()
    cols = cols.drop(label)
    feat = df_aux[cols]

    X_train, X_test, y_train, y_test = train_test_split(feat, lb,
                                                        test_size=test_size,
                                                        random_state=random_state,
                                                        stratify=lb)

    return X_train, X_test, y_train, y_test
#-------------------------------------------------------------------------------

def train_preprocessing(df, encoder_type='LabelEncoder', normalizer='StandardScaler',
                        pca=False, pca_components=None, random_state=0):
    """Preprocessing the train dataset.

    :param df pd.DataFrame: DataFrame to be preprocessed.
    :param encoder_type string: Encoder type to use for categorical features (default='LabelEncoder').
        options:
        * 'LabelEncoder'
        * 'OneHotEncoder'
    :param normalizer str: which normalizer to be fitted to the data (default='StandardScaler').
        options:
        * 'StandardScaler'
        * 'MinMaxScaler'
        * 'MaxAbsScaler'
        * 'QuantileTransformer'
    :param pca bool: if want to use PCA components set True (default=False).
    :param pca_components int: number of PCA components (default=None).
    :param random_state int: value for pca random_state (default=10).

    :return df: preprocessed train DataFrame
    :rtype: pd.DataFrame
    :return enc: trained LabelEncoder
    :rtype: dict
    :return norm: trained normalizer
    :rtype: object
    :return pca if param pca=True: trained PCA
    :rtype: object
    :return feat_cols: list with features names
    :rtype: list
    """

    df_aux = df.copy()

    list_categorical = df_aux.select_dtypes(include='object').columns

    enc = dict()
    if encoder_type == 'LabelEncoder':
        for col in list_categorical:
            enc[col] = LabelEncoder()
            df_aux[col] = enc[col].fit_transform(df_aux[col])

    elif encoder_type == 'OneHotEncoder':
        for col in list_categorical:
            enc[col] = OneHotEncoder(handle_unknown='ignore')
            ohe_results = enc[col].fit_transform(df_aux[[col]])
            df1 = pd.DataFrame(ohe_results.toarray(), columns=[f'{col}_{name}' for name in enc[col].categories_[0]], index=df_aux[col].index)
            df_aux = df_aux.merge(df1, how='left', left_index=True, right_index=True)

        df_aux.drop(columns=list_categorical, inplace=True)

    feat_cols = df_aux.columns

    if normalizer == 'StandardScaler':
        norm = StandardScaler()
    elif normalizer == 'MinMaxScaler':
        norm = MinMaxScaler((0, 1))
    elif normalizer == 'MaxAbsScaler':
        norm = MaxAbsScaler()
    elif normalizer == 'QuantileTransformer':
        norm = QuantileTransformer(output_distribution='normal')

    df_aux = norm.fit_transform(df_aux)

    if pca:
        pca = PCA(pca_components, random_state=random_state)
        df_aux = pca.fit_transform(df_aux)

        return df_aux, enc, norm, pca, feat_cols

    else:
        return df_aux, enc, norm, feat_cols
#-------------------------------------------------------------------------------

def test_preprocessing(df, enc, norm, encoder_type='LabelEncoder', pca=None):
    """Preprocessing the test dataset.

    :param df pd.DataFrame: DataFrame to be preprocessed.
    :param enc: trained encoder with the categorical features.
    :param norm: trained normalizer.
    :param encoder_type string: Encoder type to use for categorical features (default='LabelEncoder').
        options:
        * 'LabelEncoder'
        * 'OneHotEncoder'
    :param pca: trained PCA (default=None).

    :return: preprocessed test DataFrame
    :rtype: pd.DataFrame
    """

    df_aux = df.copy()

    df_aux.fillna(0, inplace=True)

    list_categorical = df_aux.select_dtypes(include='object').columns

    if encoder_type == 'LabelEncoder':
        for col in list_categorical:
            df_aux.loc[~df_aux[col].isin(enc[col].classes_), col] = -1
            df_aux.loc[df_aux[col].isin(enc[col].classes_), col] = enc[col].transform(df_aux[col][df_aux[col].isin(enc[col].classes_)])

    elif encoder_type == 'OneHotEncoder':
        for col in list_categorical:
            ohe_results = enc[col].transform(df_aux[[col]])
            df1 = pd.DataFrame(ohe_results.toarray(), columns=[f'{col}_{name}' for name in enc[col].categories_[0]], index=df_aux[col].index)
            df_aux = df_aux.merge(df1, how='left', left_index=True, right_index=True)

        df_aux.drop(columns=list_categorical, inplace=True)

    df_aux = norm.transform(df_aux)

    if pca != None:
        df_aux = pca.transform(df_aux)

    return df_aux
#-------------------------------------------------------------------------------

def preprocessing(df, cols_drop, label, test_size=0.25, encoder_type='LabelEncoder',
                  norm_name='StandardScaler', return_enc_norm=False, pca=False,
                  pca_components=None, balance_data=True, group_years=False,
                  first_year=None, last_year=None, morpho3=False, random_state=0):

    """Preprocessing the train and test datasets.

    :param df pd.DataFrame: DataFrame to be preprocessed.
    :param cols_drop list: list of columns to be dropped from dataset.
    :param label string: name of the column that will be the label.
    :param test_size float: size of test set (default=0.25).
    :param encoder_type string: Encoder type to use for categorical features (default='LabelEncoder').
        options:
        * 'LabelEncoder'
        * 'OneHotEncoder'
    :param norm_name str: which normalizer to be fitted to the data (default='StandardScaler').
        - options:
        * 'StandardScaler';
        * 'MinMaxScaler';
        * 'MaxAbsScaler';
        * 'PowerTransformer';
        * 'QuantileTransformer'.
    :param return_enc_norm bool: if want to return the encoder and the normalizer set True (default=False).
    :param pca bool: if want to use PCA components set True (default=False).
    :param pca_components int: number of PCA components (default=None).
    :param balance_data bool: balance the data using oversampling (default=True).
    :param group_years bool: create a subset with years grouped (default=False).
    :param first_year int: first year of the grouped years. Ignored if group_years = False.
    :param last_year int: last year of the grouped years. Ignored if group_years = False.
    :param morpho3 bool: use only morphologies that the last number is equal to 3 (default=False).
    :param random_state int: value for pca random_state (default=10).

    :return X_train_: preprocessed train DataFrame
    :rtype: pd.DataFrame
    :return X_test_: preprocessed test DataFrame
    :rtype: pd.DataFrame
    :return y_train_: preprocessed train label
    :rtype: pd.DataFrame
    :return y_test: preprocessed test label
    :rtype: pd.DataFrame
    :return feat_cols: list with the features columns names
    :rtype: list
    """

    df_aux = df.copy()

    # Morphology 3
    if morpho3:
        df_aux['comportamento'] = [int(repr(i)[-1]) for i in df_aux.MORFO]
        df_aux = df_aux[df_aux.comportamento == 3].copy()
        df_aux.drop(columns='comportamento', inplace=True)

    # Grouped years
    if group_years and first_year != None and last_year != None:
        df_aux = df_aux[(df_aux.ANODIAG >= first_year) & (df_aux.ANODIAG <= last_year)].copy()

    # Train Test split
    X_train, X_test, y_train, y_test = get_train_test(df_aux, cols_drop, label,
                                                      test_size,
                                                      random_state=random_state)

    # Preprocessing
    if pca and pca_components != None:
        X_train_enc, enc, norm, pca, feat_cols = train_preprocessing(X_train, encoder_type=encoder_type,
                                                                     normalizer=norm_name, pca=pca,
                                                                     pca_components=pca_components,
                                                                     random_state=random_state)
        X_test_ = test_preprocessing(X_test, enc, norm,
                                     encoder_type, pca)

    else:
        X_train_enc, enc, norm, feat_cols = train_preprocessing(X_train, encoder_type=encoder_type,
                                                                normalizer=norm_name)
        X_test_ = test_preprocessing(X_test, enc, norm, encoder_type)

    # Balancing
    if balance_data:
        X_train_, y_train_ = SMOTE(random_state=random_state).fit_resample(X_train_enc, y_train)

    else:
        X_train_, y_train_ = X_train_enc, y_train

    print(f'X_train = {X_train_.shape}, X_test = {X_test_.shape}')
    print(f'y_train = {y_train_.shape}, y_test = {y_test.shape}')

    if return_enc_norm:
        return X_train_, X_test_, y_train_, y_test, feat_cols, enc, norm
    else:
        return X_train_, X_test_, y_train_, y_test, feat_cols
#-------------------------------------------------------------------------------

def show_tree(model, feat_cols, max_depth=3, estimator=0):
    """Show the Random Forest tree

    :param model: machine learning model.
    :param feat_cols list: list of the features used in the model training.
    :param max_depth int: max_depth to show in the tree (default = 3).
    :param estimator int: number of the estimator do show the tree (default = 0).

    :return: no value
    :rtype: none
    """

    plt.figure(figsize = (22, 10))
    tree.plot_tree(model.estimators_[estimator],
                   feature_names=feat_cols,
                   filled=True,
                   max_depth=max_depth);
#-------------------------------------------------------------------------------

def plot_feat_importances(model, feat_cols, n=10):
    """Shows the features importances for the model.

    :param model: machine learning model.
    :param feat_cols list: list of the features used in the model training.
    :param n int: number of features to be shown (default=10).

    :return: no value
    :rtype: none
    """

    feat_import = pd.Series(model.feature_importances_, index=feat_cols)
    feat_import.nlargest(n).plot(kind='barh', figsize=(10, 8))
    plt.show()
#-------------------------------------------------------------------------------

def plot_roc_curve(model, X_train, X_test, y_train, y_test):
    """Plot the ROC curve for train and test sets.

    :param model: Trained machine learning model.
    :param X_train: Features of training set.
    :param X_test: Features of test set.
    :param y_train: Label of training set.
    :param y_test: Label of test set.

    :return: no value
    :rtype: None
    """
    probas_train = model.predict_proba(X_train)[:, 1]
    probas_test = model.predict_proba(X_test)[:, 1]

    fp_train, tp_train, _ = roc_curve(y_train, probas_train)
    fp_test, tp_test, _ = roc_curve(y_test, probas_test)

    plt.figure(figsize=(10, 7))
    plt.plot(fp_train, tp_train, 'b', label=f'Train (AUC = {auc(fp_train, tp_train):.3f})')
    plt.plot(fp_test, tp_test, 'r', label=f'Test (AUC = {auc(fp_test, tp_test):.3f})')
    plt.plot(np.linspace(0, 1, 100),
             np.linspace(0, 1, 100),
             label='Baseline',
             linestyle='--',
             color='k')
    plt.xlabel('False Positives')
    plt.ylabel('True Positives')
    plt.grid(True)
    plt.legend()
    plt.show()
#-------------------------------------------------------------------------------

def plot_confusion_matrix(model, x, y, format='.3f'):
    '''Plot the confusion matrix.

    :param model: Trained machine learning model.
    :param x: Features set.
    :param y: Label set.
    :param format string: Format for the numbers in the confusion matrix (default ".3f")

    :return: no value
    :rtype: None
    '''
    with plt.rc_context({'font.size': 12, 'font.weight': 'bold'}):
        ConfusionMatrixDisplay.from_estimator(model, x, y, values_format=format,
                                              cmap='Blues', normalize='true')
        plt.show()

    print(f'\n{classification_report(y, model.predict(x), digits=3)}')
#-------------------------------------------------------------------------------

def plot_shap_values(model, x, features, max_display=10):
    """Plot the shap values.

    :param model: Trained machine learning model.
    :param x: Features set.
    :param features: Features names.
    :param max_display int: Max features to show shap values (default=10)

    :return: no value
    :rtype: None
    """

    shap_values = shap.TreeExplainer(model).shap_values(x)

    try:
        shap.summary_plot(shap_values[1], x,
                          feature_names=features,
                          max_display=max_display)
    except AssertionError:
        shap.summary_plot(shap_values, x,
                          feature_names=features,
                          max_display=max_display)

#-------------------------------------------------------------------------------

def roc_together(X, y, naive_bayes=None, random_forest=None, xgboost=None,
                 lightgbm=None):
    """
    """
    plt.figure(figsize=(10, 7))

    if naive_bayes != None:
        probas_nb = naive_bayes.predict_proba(X)[:, 1]
        fp_nb, tp_nb, _ = roc_curve(y, probas_nb)
        plt.plot(fp_nb, tp_nb, 'k', linestyle='dashed',
                 label=f'Naive Bayes (AUC = {auc(fp_nb, tp_nb):.3f})')

    if random_forest != None:
        probas_rf = random_forest.predict_proba(X)[:, 1]
        fp_rf, tp_rf, _ = roc_curve(y, probas_rf)
        plt.plot(fp_rf, tp_rf, 'k', linestyle='dashdot',
                 label=f'Random Forest (AUC = {auc(fp_rf, tp_rf):.3f})')

    if xgboost != None:
        probas_xgb = xgboost.predict_proba(X)[:, 1]
        fp_xgb, tp_xgb, _ = roc_curve(y, probas_xgb)
        plt.plot(fp_xgb, tp_xgb, 'k',
                 label=f'XGBoost (AUC = {auc(fp_xgb, tp_xgb):.3f})')

    if lightgbm != None:
        probas_lgbm = lightgbm.predict_proba(X)[:, 1]
        fp_lgbm, tp_lgbm, _ = roc_curve(y, probas_lgbm)
        plt.plot(fp_lgbm, tp_lgbm, 'k', linestyle='dashed',
                 label=f'LightGBM (AUC = {auc(fp_lgbm, tp_lgbm):.3f})')

    plt.plot(np.linspace(0, 1, 100),
             np.linspace(0, 1, 100),
             label='Baseline',
             linestyle='dotted',
             color='gray')
    plt.xlabel('False Positives')
    plt.ylabel('True Positives')
    plt.grid(True)
    plt.legend()
    plt.show()