Source code for moana.estimators

import matplotlib as mpl
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator, MaxNLocator, AutoMinorLocator
import moana
import numpy as np
import pandas as pd
import scipy
from scipy.interpolate import interp1d
from scipy.optimize import minimize


[docs] class SampledPosterior: """Class to derive statistical properties from a sample. Args: sample: sample for 1+ parameters. labels: labels of columns to include in statistics. weights: this str indicates which column is used to weight the samples. Attributes: sample: :obj:`pandas.DataFrame` of the parameters. cdf: dict and for each label, cdf[label] is a :obj:`numpy.array` with sorted values in cdf[label][0] and the cumulative function in cdf[label][1]. """ def __init__( self, sample: pd.DataFrame, labels: list, limit=None, weights: str = None ): self.sample = sample self.request = labels self.weights = weights if limit == None: self.limit = [0.1585, 0.5, 0.8415] # Default limits else: self.limit = limit self.cdf = dict() self.ci = dict() for l in self.request: try: self.cdf.update({l: self.build_cdf(l, weights=weights)}) self.ci.update({l: self.find_limits(l, self.limit)}) except: print(f"Cannot determine credible intervals for {l}") print("Values cannot be NaN, they must be real.")
[docs] def build_cdf(self, label: str, weights: str = None) -> np.array: """Build the cumulative distribution function. Args: label: this str indicates which column is used. weights: this str indicates which column is used to weight the samples. Return: Array of the sorted values (column 0) and the cumulative function (column 1). """ if not weights == None: table = self.sample.sort_values(label) w = table[weights].values return np.array([table[label].values, np.cumsum(w) / np.sum(w)]) else: return np.array( [ self.sample.sort_values(label)[label].values, (np.arange(len(self.sample)) + 1) / len(self.sample), ] )
[docs] def find_limits( self, label: str, limit: list, ) -> np.array: """Determine the parameter values based on a given probability. Args: label: this str indicates which column is used. limit: list of probability values. weights: this str indicates which column is used to weight the samples. Return: Array of the corresponding values of the parameter. """ ci = np.array([]) cdf = self.cdf[label] for l in limit: fintp = interp1d(cdf[0], cdf[1] - l, kind="linear") ci = np.append( ci, scipy.optimize.brentq(fintp, np.min(cdf[0]), np.max(cdf[0])) ) return ci
def get2dcontours(self, cdf_levels=None, weights=None, **bins_opts): labels = self.request samples = self.sample N = len(labels) bins = dict() [bins.update({a + b: [50, 50]}) for a in labels for b in labels] [bins.update({b + a: [50, 50]}) for a in labels for b in labels] bins.update(bins_opts) xhist = [[None] * N for a in [None] * N] yhist = [[None] * N for a in [None] * N] hist = [[None] * N for a in [None] * N] hist_levels = [[None] * N for a in [None] * N] for i in range(N): for j in range(N): if j < i: x = samples[labels[j]].values y = samples[labels[i]].values a, b, c, d = moana.corner.compute_2dcontours( x, y, bins=[ bins[labels[j] + labels[i]][0], bins[labels[j] + labels[i]][1], ], cdf_levels=cdf_levels, weights=weights, ) xhist[i][j] = a yhist[i][j] = b hist[i][j] = c hist_levels[i][j] = d return xhist, yhist, hist, hist_levels def _get_plot_config_scatter_plots(self, rcfile=None, rotation=0, rcparams=dict()): """Default plot configuration for scatter plots.""" try: if not rcfile == None: plt.style.use(rcfile) else: path = "/".join(moana.__file__.split("/")[:-1]) plt.style.use(f"{path}/stylelib/corner_plots_times.mplstyle") except: pass if rotation > 10: mpl.rcParams["xtick.major.pad"] = 2 # Useful when ticks are rotated try: mpl.rcParams.update(rcparams) except: pass plt.close() plt.clf()
[docs] def scatter_plot_scaling( self, width: float = 2 * 84 / 25.4, optimize: bool = True ) -> list: """Compute size of scatter plots and margins Args: width: Size of the plot. optimize: If True, the function will reduce each subplot so that the total plot size is the value of width. If False, then the margins will enlarge a little bit the plot, and the total size will be slightly larger than the given width parameter. Return: List of float with with: - the total size of the plot; - the bottom and left margins; - the top and right margins; - the interplot spacing. """ N = len(self.request) if optimize: width = self._get_scatter_plots_size(target=width) wh = 0.05 # w/hspace size plot_size = width + width * (N - 1) * wh / N width_new = 0.8 * width / N + plot_size + 0.8 * width / N # inches bl = 0.8 * width / (N * width_new) tr = (0.8 * width / N + plot_size) / width_new return (width_new, bl, tr, wh)
def _plotsize(self, width: float, target: float) -> float: """Define a metric for actual plot size, and the target. Args: width: initial condition in inches. target: target width in inches. Return: Quadratic distance between target and actual plot size. """ width_new, _, _, _ = self.scatter_plot_scaling(width, optimize=False) return np.power(width_new - target, 2) def _get_scatter_plots_size(self, target: float) -> float: """Find the optimal plot size so that plot+margins are matching the target. Args: target: target width in inches. Return: Optimal plot size in inches. """ bnds = [[1e-2, target]] res = minimize(self._plotsize, [target * 0.9], args=(target), bounds=bnds) try: final = moana.dbc.custom_floor(res.x[0], precision=3) except IndexError: final = target print( f"Each sub-plot must be {final:.3f}in x {final:.3f}in to have a figure of {target:.3f}in. Okay! I'm using it!" ) return final def corner_plot( self, axes_options=dict(), contourf_options=dict(), figure=None, axes=None, bins_options=None, credible_intervals=True, fill_ci=True, show_samples=False, diagonal="cumul", display_plot_coords=False, labels=dict(), bins=dict(), filename=None, saving_options=dict(), align_xlabels=True, align_ylabels=True, cdf_levels=None, weights=True, rotation=0, display_1sigma=False, rcfile=None, rcparams=dict(), bins_density=dict(), ): N = len(self.request) # Definition of the figure and axes if isinstance(figure, mpl.figure.Figure) | isinstance(axes, np.ndarray): fig = figure ax = axes else: self._get_plot_config_scatter_plots( rcfile=rcfile, rotation=rotation, rcparams=rcparams ) width, lb, tr, wh_margin = self.scatter_plot_scaling() fig, ax = plt.subplots(N, N, figsize=[width, width]) fig.subplots_adjust( left=lb, bottom=lb, right=tr, top=tr, wspace=wh_margin, hspace=wh_margin ) # Compute credible intervals if weights & isinstance(self.weights, str): xhist, yhist, hist, hist_levels = self.get2dcontours( cdf_levels=cdf_levels, weights=self.sample[self.weights].values, **bins ) else: xhist, yhist, hist, hist_levels = self.get2dcontours( cdf_levels=cdf_levels, weights=None, **bins ) # How label are displayed label_names = dict() [label_names.update({a: a}) for a in self.request] label_names.update(labels) labels = self.request # Range, major and minor ticks locators ax_options = dict() [ax_options.update({a: 4 * [None]}) for a in self.request] ax_options.update(axes_options) spectral_map = plt.get_cmap("Spectral") alpha = [0.5, 0.9, 0.9, 0.9, 0.9] s_list = [1, 5, 5, 5, 5] dchi2_list = np.array([16, 9, 4, 1, 0]) colors = [ spectral_map(255.0 / 255.0), spectral_map(170.0 / 255.0), spectral_map(85.0 / 255.0), spectral_map(0.0), ] samples = self.sample.copy(deep=True) samples["reject"] = 0 if cdf_levels == None: contourf_opts = { "colors": [ (0.72, 0.72, 0.9003921568627451), (0.36, 0.36, 0.7011764705882353), (0.0, 0.0, 0.5019607843137255), ], "antialiased": False, } # with blue shades contourf_opts.update(contourf_options) else: if len(cdf_levels) <= 3: contourf_opts = { "colors": [ (0.72, 0.72, 0.9003921568627451), (0.36, 0.36, 0.7011764705882353), (0.0, 0.0, 0.5019607843137255), ], "antialiased": False, } # with blue shades contourf_opts.update(contourf_options) else: contourf_opts = dict(contourf_options) # Create subplots for i in range(N): for j in range(N): if j > i: ax[i][j].set_xlabel("") ax[i][j].set_ylabel("") ax[i][j].axes.get_xaxis().set_visible(False) ax[i][j].axes.get_yaxis().set_visible(False) ax[i][j].set_frame_on(False) else: ax[i][j].xaxis.set_minor_locator(AutoMinorLocator(2)) ax[i][j].yaxis.set_minor_locator(AutoMinorLocator(2)) ax[i][j].set_facecolor("none") if i == N - 1: ax[i][j].set_xlabel(rf"${label_names[labels[j]]}$", labelpad=0) ax[i][j].tick_params(axis="x", which="major", pad=2) for tick in ax[i][j].get_xticklabels(): tick.set_rotation(rotation) if rotation > 10: tick.set_ha("right") # Pour la rotation de 30 deg else: ax[i][j].set_xlabel("") ax[i][j].set_xticklabels([]) ax[i][j].tick_params(labelbottom=False) if j != N - 2: ax[i][j].sharex(ax[i + 1][j]) if j == 0: if i == 0: ax[i][j].set_ylabel(rf"${label_names[labels[i]]}$") else: ax[i][j].set_ylabel( rf"${label_names[labels[i]]}$", labelpad=0 ) else: ax[i][j].set_ylabel("") if not i == j: ax[i][j].set_yticklabels([]) if (j <= i) and display_plot_coords: ax[i][j].annotate( f"({i}, {j})", (0.05, 0.05), xycoords="axes fraction", c="k", size=11, weight=500, ha="left", va="bottom", bbox=dict(boxstyle="square, pad=0", fc="None", ec="None"), ) if not i == j: if credible_intervals: levels = np.concatenate( [hist_levels[i][j], [hist[i][j].max() * (1 + 1e-6)]] ) if fill_ci: ax[i][j].contourf( xhist[i][j], yhist[i][j], hist[i][j].T, levels, **contourf_opts, ) else: ax[i][j].contour( xhist[i][j], yhist[i][j], hist[i][j].T, levels, **contourf_opts, ) if show_samples: for id_dchi2 in range(len(dchi2_list) - 1): cond = samples.reject == 1 samples.loc[cond, "reject"] = 0 samples.loc[ ( (samples.dchi2 < dchi2_list[id_dchi2 + 1]) | (samples.dchi2 >= dchi2_list[id_dchi2]) ), "reject", ] = 1 cond = samples.reject == 0 ax[i][j].scatter( samples[cond][labels[j]].values, samples[cond][labels[i]].values, s=s_list[id_dchi2], facecolors=colors[id_dchi2], marker="o", alpha=alpha[id_dchi2], linewidths=0, zorder=-100 + id_dchi2, ) # Limits if not ax_options[labels[j]][0] == None: ax[i][j].set_xlim( ax_options[labels[j]][0][0], ax_options[labels[j]][0][1] ) if not ax_options[labels[i]][1] == None: ax[i][j].set_ylim( ax_options[labels[i]][1][0], ax_options[labels[i]][1][1] ) # Ticks position and occurence if not ax_options[labels[j]][2] == None: ax[i][j].xaxis.set_major_locator( MultipleLocator(ax_options[labels[j]][2][0]) ) ax[i][j].xaxis.set_minor_locator( AutoMinorLocator(ax_options[labels[j]][2][1]) ) if not ax_options[labels[i]][3] == None: ax[i][j].yaxis.set_major_locator( MultipleLocator(ax_options[labels[i]][3][0]) ) ax[i][j].yaxis.set_minor_locator( AutoMinorLocator(ax_options[labels[i]][3][1]) ) else: if diagonal == "chi2": x = np.linspace( samples[labels[j]].min(), samples[labels[j]].max(), 100 ) xx = list() yy = list() for k in range(100 - 1): mask = (samples[labels[j]] > x[k]) & ( samples[labels[j]] <= x[k + 1] ) if mask.sum() > 0: y = samples.loc[mask, "dchi2"] xx.append(np.mean([x[k], x[k + 1]])) yy.append(np.min(y)) ax[i][j].plot(xx, yy, ls="-", lw=1, c="k") ax[i][i].yaxis.set_label_position("right") ax[i][i].spines["left"].set_visible(False) ax[i][i].spines["top"].set_visible(False) ax[i][i].tick_params( which="both", bottom=True, top=False, left=False, right=True, labelbottom=True, labeltop=False, labelleft=False, labelright=True, pad=2, ) ax[i][i].set_ylabel(r"$\Delta\chi^2$") ax[i][i].set_ylim(-0.4, 9) ax[i][i].yaxis.set_major_locator(MultipleLocator(2)) ax[i][i].yaxis.set_minor_locator(AutoMinorLocator(4)) # Choose same ticks for a column if i < N - 1: ax[i][i].sharex(ax[i + 1][i]) elif diagonal == "cumul": cdf = self.cdf[labels[j]] ax[i][i].plot(cdf[0], cdf[1], ls="-", lw=1, c="k") if display_1sigma: x = self.ci[labels[j]][1] y = 0.1 xerr = np.array( [ [ x - self.ci[labels[j]][0], self.ci[labels[j]][2] - x, ] ] ).T ax[i][i].errorbar( x, y, xerr=xerr, marker="o", ms=1, lw=0.5, capsize=0, color="k", ) ax[i][i].yaxis.set_label_position("right") ax[i][i].spines["left"].set_visible(False) ax[i][i].spines["top"].set_visible(False) ax[i][i].tick_params( which="both", bottom=True, top=False, left=False, right=True, labelbottom=True, labeltop=False, labelleft=False, labelright=True, pad=2, ) ax[i][i].set_ylabel(r"CDF") ax[i][i].set_ylim(0, 1.05) ax[i][i].yaxis.set_major_locator(MultipleLocator(0.2)) ax[i][i].yaxis.set_minor_locator(AutoMinorLocator(4)) # Choose same ticks for a column if i < N - 1: ax[i][i].sharex(ax[i + 1][i]) # Limits if not ax_options[labels[j]][0] == None: ax[i][j].set_xlim( ax_options[labels[j]][0][0], ax_options[labels[j]][0][1], ) # Ticks position and occurence if not ax_options[labels[j]][2] == None: ax[i][j].xaxis.set_major_locator( MultipleLocator(ax_options[labels[j]][2][0]) ) ax[i][j].xaxis.set_minor_locator( AutoMinorLocator(ax_options[labels[j]][2][1]) ) elif diagonal == "density": bins_density_hist = dict() [bins_density_hist.update({label: 100}) for label in labels] bins_density_hist.update(bins_density) if weights & isinstance(self.weights, str): weight_bins, edge_bins, patches = ax[i][i].hist( self.sample[labels[i]], bins_density_hist[labels[i]], density=True, histtype="step", color="k", weights=self.sample[self.weights].values, ) else: weight_bins, edge_bins, patches = ax[i][i].hist( self.sample[labels[i]], bins_density_hist[labels[i]], density=True, histtype="step", color="k", ) if display_1sigma: x = self.ci[labels[j]][1] y = 0.1 * np.amax(weight_bins) xerr = np.array( [ [ x - self.ci[labels[j]][0], self.ci[labels[j]][2] - x, ] ] ).T ax[i][i].errorbar( x, y, xerr=xerr, marker="o", ms=2, lw=0.5, capsize=0, color="k", ) ax[i][i].yaxis.set_label_position("right") ax[i][i].spines["left"].set_visible(False) ax[i][i].spines["top"].set_visible(False) ax[i][i].tick_params( which="both", bottom=True, top=False, left=False, right=True, labelbottom=True, labeltop=False, labelleft=False, labelright=True, pad=2, ) ax[i][i].set_ylabel(r"Density") # Choose same ticks for a column if i < N - 1: ax[i][i].sharex(ax[i + 1][i]) # Limits if not ax_options[labels[j]][0] == None: ax[i][j].set_xlim( ax_options[labels[j]][0][0], ax_options[labels[j]][0][1], ) # Ticks position and occurence if not ax_options[labels[j]][2] == None: ax[i][j].xaxis.set_major_locator( MultipleLocator(ax_options[labels[j]][2][0]) ) ax[i][j].xaxis.set_minor_locator( AutoMinorLocator(ax_options[labels[j]][2][1]) ) ax[N-2][N-2].tick_params(labelbottom=False) if align_xlabels: fig.align_xlabels(ax[N - 1, :]) if align_ylabels: fig.align_ylabels(ax[:, 0]) if not filename == None: opts = dict( { "transparent": False, "bbox_inches": "tight", "dpi": 300, "pad_inches": 0.01, } ) opts.update(saving_options) fig.savefig(filename, **opts) else: return fig, ax