Module dalex.aspect.utils

Expand source code Browse git
import numpy as np
import pandas as pd

from . import checks
from .. import _global_checks

def calculate_cat_assoc_matrix(data, categorical_variables, n):
    from scipy.stats import chi2_contingency
    cat_assoc_matrix = pd.DataFrame(
        index=categorical_variables, columns=categorical_variables, dtype=float
    )
    for i, variable_name_a in enumerate(categorical_variables):
        for j, variable_name_b in enumerate(categorical_variables):
            if i > j:
                continue   # matrix is symmetric
            elif i == j:
                cramers_V = 1
            else:
                # calculate Cramér’s V with bias correction
                cont_tab = pd.crosstab(data[variable_name_a], data[variable_name_b])
                r = cont_tab.shape[0]
                c = cont_tab.shape[1]
                chi2_test = chi2_contingency(cont_tab)
                phi = max(0, (chi2_test[0] / n) - (((r - 1) * (c - 1)) / (n - 1)))
                r = r - ((r - 1) ** 2 / (n - 1))
                c = c - ((c - 1) ** 2 / (n - 1))
                cramers_V = np.sqrt(phi / (min(c, r) - 1))
                cramers_V = checks.check_assoc_value(cramers_V)
            cat_assoc_matrix.loc[variable_name_a, variable_name_b] = cramers_V
            cat_assoc_matrix.loc[variable_name_b, variable_name_a] = cramers_V
    return cat_assoc_matrix


def calculate_cat_num_assoc_matrix(data, categorical_variables, numerical_variables, n):
    from scipy.stats import kruskal
    cat_num_assoc_matrix = pd.DataFrame(
        index=categorical_variables, columns=numerical_variables, dtype=float
    )
    for cat_variable_name in categorical_variables:
        for num_variable_name in numerical_variables:
            # calculate eta-squared
            samples = [
                gr_data[num_variable_name].values
                for gr_name, gr_data in data.groupby(cat_variable_name, as_index=False)
            ]
            kruskal_wallis_test = kruskal(*samples, nan_policy="omit")
            n_samples = len(samples)
            eta_squared = (kruskal_wallis_test.statistic - n_samples + 1) / (n - n_samples)
            eta_squared = checks.check_assoc_value(eta_squared)
            cat_num_assoc_matrix.loc[cat_variable_name, num_variable_name] = eta_squared
    return cat_num_assoc_matrix


def calculate_assoc_matrix(data, corr_method):
    corr_matrix = abs(data.corr(corr_method)) # get submatrix for only numerical variables
    numerical_variables = corr_matrix.columns 
    categorical_variables = list(set(data.columns) - set(numerical_variables))
    n = len(data)
    cat_assoc_matrix = calculate_cat_assoc_matrix(data, categorical_variables, n) # get submatrix for only categorical variables
    cat_num_assoc_matrix = calculate_cat_num_assoc_matrix(          # get submatrix for mixed variables
        data, categorical_variables, numerical_variables, n
    )
    tmp_matrix = pd.concat([corr_matrix, cat_num_assoc_matrix.T], axis=1)
    tmp_matrix_2 = pd.concat([cat_num_assoc_matrix, cat_assoc_matrix], axis=1)
    assoc_matrix = pd.concat([tmp_matrix, tmp_matrix_2])
    return assoc_matrix


def calculate_pps_matrix(data, agg_method):
    _global_checks.global_check_import('ppscore', 'depend_matrix with PPS calculation')
    import ppscore as pps
    pps_result = pps.matrix(data, sample=None)
    pps_matrix = pps_result[["x", "y", "ppscore"]].pivot(
        columns="x", index="y", values="ppscore"
    )
    pps_matrix.rename_axis(None, axis=1, inplace=True)
    pps_matrix.rename_axis(None, axis=0, inplace=True)
    # aggregate values to make symmetric matrix
    if agg_method == "max":
        pps_matrix = np.maximum(pps_matrix, pps_matrix.transpose())
    if agg_method == "min":
        pps_matrix = np.minimum(pps_matrix, pps_matrix.transpose())
    if agg_method == "mean":
        pps_matrix = (pps_matrix + pps_matrix.transpose()) / 2
    return pps_matrix


def calculate_depend_matrix(
    data, depend_method, corr_method, agg_method
):
    depend_matrix = pd.DataFrame()
    if depend_method == "assoc":
        depend_matrix = calculate_assoc_matrix(data, corr_method)
    if depend_method == "pps":
        depend_matrix = calculate_pps_matrix(data, agg_method)
    if callable(depend_method):
        try:
            depend_matrix = depend_method(data)
        except:
            raise ValueError(
                "You have passed wrong callable in depend_method argument. "
                "'depend_method' is the callable to use for calculating dependency matrix."
            )
    return depend_matrix


def calculate_linkage_matrix(depend_matrix, clust_method):
    from scipy.cluster.hierarchy import linkage
    from scipy.spatial.distance import squareform
    dissimilarity = 1 - abs(depend_matrix)
    ## https://www.kaggle.com/sgalella/correlation-heatmaps-with-hierarchical-clustering?scriptVersionId=24045077&cellId=10
    linkage_matrix = linkage(squareform(dissimilarity), clust_method)

    return linkage_matrix


def get_dendrogram_aspects_ordered(hierarchical_clustering_dendrogram, depend_matrix):
    # names of variables on axis
    tick_dict = dict(
        zip(
            hierarchical_clustering_dendrogram.layout.yaxis.tickvals,
            [[var] for var in hierarchical_clustering_dendrogram.layout.yaxis.ticktext]
        )
    )
    d = {k:v for v,k in enumerate(depend_matrix.columns)} # original order of columns
    
    # get names of variables for dendrogram traces
    _aspects_dendrogram_order = []
    for scatter in hierarchical_clustering_dendrogram.data:
        vars_list = tick_dict[scatter.y[1]] + tick_dict[scatter.y[2]]
        vars_list.sort(key=d.get)
        tick_dict[np.mean(scatter.y[1:3])] = vars_list
        _aspects_dendrogram_order.append(
            vars_list
        )
    
    # get min dependency between grouped variables
    _vars_min_depend, _min_depend = get_min_depend_from_matrix(depend_matrix, _aspects_dendrogram_order)

    return pd.DataFrame({
        "variable_names": _aspects_dendrogram_order,
        "min_depend": _min_depend,
        "vars_min_depend": _vars_min_depend
    })


def get_min_depend_from_matrix(depend_matrix, variables_list): 
    _vars_min_depend = []
    _min_depend = []
    for vars in variables_list:
        if isinstance(vars, (list, tuple, np.ndarray)):
            depend_submatrix = abs(depend_matrix).loc[list(vars), list(vars)]
            _min = np.array(depend_submatrix).min()
            _min_idx = np.where(depend_submatrix == _min)
            var_a_idx, var_b_idx = _min_idx[0][0], _min_idx[1][0]
            _vars_min_depend.append(np.array(depend_submatrix.columns[[var_a_idx, var_b_idx]]))
            _min_depend.append(_min)
        else:
            _vars_min_depend.append(None)
            _min_depend.append(None)
    return _vars_min_depend, _min_depend


def calculate_min_depend(
    variables_list,
    data,
    depend_method="assoc",
    corr_method="spearman",
    agg_method="max",
): 
    depend_matrix = calculate_depend_matrix(data, depend_method, corr_method, agg_method)
    return get_min_depend_from_matrix(depend_matrix, variables_list)

Functions

def calculate_assoc_matrix(data, corr_method)
Expand source code Browse git
def calculate_assoc_matrix(data, corr_method):
    corr_matrix = abs(data.corr(corr_method)) # get submatrix for only numerical variables
    numerical_variables = corr_matrix.columns 
    categorical_variables = list(set(data.columns) - set(numerical_variables))
    n = len(data)
    cat_assoc_matrix = calculate_cat_assoc_matrix(data, categorical_variables, n) # get submatrix for only categorical variables
    cat_num_assoc_matrix = calculate_cat_num_assoc_matrix(          # get submatrix for mixed variables
        data, categorical_variables, numerical_variables, n
    )
    tmp_matrix = pd.concat([corr_matrix, cat_num_assoc_matrix.T], axis=1)
    tmp_matrix_2 = pd.concat([cat_num_assoc_matrix, cat_assoc_matrix], axis=1)
    assoc_matrix = pd.concat([tmp_matrix, tmp_matrix_2])
    return assoc_matrix
def calculate_cat_assoc_matrix(data, categorical_variables, n)
Expand source code Browse git
def calculate_cat_assoc_matrix(data, categorical_variables, n):
    from scipy.stats import chi2_contingency
    cat_assoc_matrix = pd.DataFrame(
        index=categorical_variables, columns=categorical_variables, dtype=float
    )
    for i, variable_name_a in enumerate(categorical_variables):
        for j, variable_name_b in enumerate(categorical_variables):
            if i > j:
                continue   # matrix is symmetric
            elif i == j:
                cramers_V = 1
            else:
                # calculate Cramér’s V with bias correction
                cont_tab = pd.crosstab(data[variable_name_a], data[variable_name_b])
                r = cont_tab.shape[0]
                c = cont_tab.shape[1]
                chi2_test = chi2_contingency(cont_tab)
                phi = max(0, (chi2_test[0] / n) - (((r - 1) * (c - 1)) / (n - 1)))
                r = r - ((r - 1) ** 2 / (n - 1))
                c = c - ((c - 1) ** 2 / (n - 1))
                cramers_V = np.sqrt(phi / (min(c, r) - 1))
                cramers_V = checks.check_assoc_value(cramers_V)
            cat_assoc_matrix.loc[variable_name_a, variable_name_b] = cramers_V
            cat_assoc_matrix.loc[variable_name_b, variable_name_a] = cramers_V
    return cat_assoc_matrix
def calculate_cat_num_assoc_matrix(data, categorical_variables, numerical_variables, n)
Expand source code Browse git
def calculate_cat_num_assoc_matrix(data, categorical_variables, numerical_variables, n):
    from scipy.stats import kruskal
    cat_num_assoc_matrix = pd.DataFrame(
        index=categorical_variables, columns=numerical_variables, dtype=float
    )
    for cat_variable_name in categorical_variables:
        for num_variable_name in numerical_variables:
            # calculate eta-squared
            samples = [
                gr_data[num_variable_name].values
                for gr_name, gr_data in data.groupby(cat_variable_name, as_index=False)
            ]
            kruskal_wallis_test = kruskal(*samples, nan_policy="omit")
            n_samples = len(samples)
            eta_squared = (kruskal_wallis_test.statistic - n_samples + 1) / (n - n_samples)
            eta_squared = checks.check_assoc_value(eta_squared)
            cat_num_assoc_matrix.loc[cat_variable_name, num_variable_name] = eta_squared
    return cat_num_assoc_matrix
def calculate_depend_matrix(data, depend_method, corr_method, agg_method)
Expand source code Browse git
def calculate_depend_matrix(
    data, depend_method, corr_method, agg_method
):
    depend_matrix = pd.DataFrame()
    if depend_method == "assoc":
        depend_matrix = calculate_assoc_matrix(data, corr_method)
    if depend_method == "pps":
        depend_matrix = calculate_pps_matrix(data, agg_method)
    if callable(depend_method):
        try:
            depend_matrix = depend_method(data)
        except:
            raise ValueError(
                "You have passed wrong callable in depend_method argument. "
                "'depend_method' is the callable to use for calculating dependency matrix."
            )
    return depend_matrix
def calculate_linkage_matrix(depend_matrix, clust_method)
Expand source code Browse git
def calculate_linkage_matrix(depend_matrix, clust_method):
    from scipy.cluster.hierarchy import linkage
    from scipy.spatial.distance import squareform
    dissimilarity = 1 - abs(depend_matrix)
    ## https://www.kaggle.com/sgalella/correlation-heatmaps-with-hierarchical-clustering?scriptVersionId=24045077&cellId=10
    linkage_matrix = linkage(squareform(dissimilarity), clust_method)

    return linkage_matrix
def calculate_min_depend(variables_list, data, depend_method='assoc', corr_method='spearman', agg_method='max')
Expand source code Browse git
def calculate_min_depend(
    variables_list,
    data,
    depend_method="assoc",
    corr_method="spearman",
    agg_method="max",
): 
    depend_matrix = calculate_depend_matrix(data, depend_method, corr_method, agg_method)
    return get_min_depend_from_matrix(depend_matrix, variables_list)
def calculate_pps_matrix(data, agg_method)
Expand source code Browse git
def calculate_pps_matrix(data, agg_method):
    _global_checks.global_check_import('ppscore', 'depend_matrix with PPS calculation')
    import ppscore as pps
    pps_result = pps.matrix(data, sample=None)
    pps_matrix = pps_result[["x", "y", "ppscore"]].pivot(
        columns="x", index="y", values="ppscore"
    )
    pps_matrix.rename_axis(None, axis=1, inplace=True)
    pps_matrix.rename_axis(None, axis=0, inplace=True)
    # aggregate values to make symmetric matrix
    if agg_method == "max":
        pps_matrix = np.maximum(pps_matrix, pps_matrix.transpose())
    if agg_method == "min":
        pps_matrix = np.minimum(pps_matrix, pps_matrix.transpose())
    if agg_method == "mean":
        pps_matrix = (pps_matrix + pps_matrix.transpose()) / 2
    return pps_matrix
def get_dendrogram_aspects_ordered(hierarchical_clustering_dendrogram, depend_matrix)
Expand source code Browse git
def get_dendrogram_aspects_ordered(hierarchical_clustering_dendrogram, depend_matrix):
    # names of variables on axis
    tick_dict = dict(
        zip(
            hierarchical_clustering_dendrogram.layout.yaxis.tickvals,
            [[var] for var in hierarchical_clustering_dendrogram.layout.yaxis.ticktext]
        )
    )
    d = {k:v for v,k in enumerate(depend_matrix.columns)} # original order of columns
    
    # get names of variables for dendrogram traces
    _aspects_dendrogram_order = []
    for scatter in hierarchical_clustering_dendrogram.data:
        vars_list = tick_dict[scatter.y[1]] + tick_dict[scatter.y[2]]
        vars_list.sort(key=d.get)
        tick_dict[np.mean(scatter.y[1:3])] = vars_list
        _aspects_dendrogram_order.append(
            vars_list
        )
    
    # get min dependency between grouped variables
    _vars_min_depend, _min_depend = get_min_depend_from_matrix(depend_matrix, _aspects_dendrogram_order)

    return pd.DataFrame({
        "variable_names": _aspects_dendrogram_order,
        "min_depend": _min_depend,
        "vars_min_depend": _vars_min_depend
    })
def get_min_depend_from_matrix(depend_matrix, variables_list)
Expand source code Browse git
def get_min_depend_from_matrix(depend_matrix, variables_list): 
    _vars_min_depend = []
    _min_depend = []
    for vars in variables_list:
        if isinstance(vars, (list, tuple, np.ndarray)):
            depend_submatrix = abs(depend_matrix).loc[list(vars), list(vars)]
            _min = np.array(depend_submatrix).min()
            _min_idx = np.where(depend_submatrix == _min)
            var_a_idx, var_b_idx = _min_idx[0][0], _min_idx[1][0]
            _vars_min_depend.append(np.array(depend_submatrix.columns[[var_a_idx, var_b_idx]]))
            _min_depend.append(_min)
        else:
            _vars_min_depend.append(None)
            _min_depend.append(None)
    return _vars_min_depend, _min_depend