Source code for darshan.experimental.plots.heatmap_handling

"""
Module of data pre-processing functions for constructing the heatmap figure.
"""

from typing import Dict, Any, Tuple, Sequence, TYPE_CHECKING, Optional

import sys

if sys.version_info >= (3, 8):
    from typing import TypedDict
else:
    from typing_extensions import TypedDict

import pandas as pd
import numpy as np

if TYPE_CHECKING:
    import numpy.typing as npt


[docs]class SegDict(TypedDict): """ Custom type hint class for `dict_list` argument in `get_rd_wr_dfs()`. """ id: int rank: int hostname: str write_count: int read_count: int write_segments: pd.DataFrame read_segments: pd.DataFrame
[docs]def get_rd_wr_dfs( dict_list: Sequence[SegDict], ops: Sequence[str] = ["read", "write"] ) -> Dict[str, pd.DataFrame]: """ Uses the DXT records to construct individual dataframes for both read and write segments. Parameters ---------- dict_list: a sequence of DXT records, where each record is a Python dictionary with the following keys: 'id', 'rank', 'hostname', 'write_count', 'read_count', 'write_segments', and 'read_segments'. The read/write data is stored in ``read_segments`` and ``write_segments``, where each is a ```pd.DataFrame`` containing the following data (columns): 'offset', 'length', 'start_time', 'end_time'. ops: a sequence of keys designating which Darshan operations to collect data for. Default is ``["read", "write"]``. Returns ------- rd_wr_dfs: dictionary where each key is an operation from the input ``ops`` parameter (i.e. "read", "write") and each value is a ``pd.DataFrame`` object containing all of the read/write events. Notes ----- Used in ``get_single_df_dict()``. Examples -------- ``dict_list`` and ``rd_wr_dfs`` generated from ``tests/input/sample-dxt-simple.darshan``: dict_list = [ { 'id': 14388265063268455899, 'rank': 0, 'hostname': 'sn176.localdomain', 'write_count': 1, 'read_count': 0, 'write_segments': offset length start_time end_time 0 0 40 0.103379 0.103388, 'read_segments': Empty DataFrame Columns: [] Index: [] }, { 'id': 9457796068806373448, 'rank': 0, 'hostname': 'sn176.localdomain', 'write_count': 1, 'read_count': 0, 'write_segments': offset length start_time end_time 0 0 4000 0.104217 0.104231, 'read_segments': Empty DataFrame Columns: [] Index: [] }, ] rd_wr_dfs = { 'read': Empty DataFrame Columns: [] Index: [], 'write': length start_time end_time rank 0 40 0.103379 0.103388 0 1 4000 0.104217 0.104231 0 } """ # columns to drop when accumulating the dataframes. # Currently "offset" data is not utilized drop_columns = ["offset"] # create empty dictionary to store # the concatenated read/write dataframes rd_wr_dfs = {} # iterate over each operation for op_key in ops: # mypy can't tell that these keys are in fact # in the ``SegDict``, so just ignore the type seg_key = op_key + "_segments" # create empty list to store each dataframe df_list = [] # iterate over all records/dictionaries for _dict in dict_list: # ignore for the same reason as above seg_df = _dict[seg_key] # type: ignore if seg_df.size: # drop unused columns from the dataframe seg_df = seg_df.drop(columns=drop_columns) # create new column for the ranks seg_df["rank"] = _dict["rank"] # add the dataframe to the list df_list.append(seg_df) if df_list: # concatenate the list of pandas dataframes into # a single one with new row indices rd_wr_dfs[op_key] = pd.concat(df_list, ignore_index=True) else: # if the list is empty assign an empty dataframe rd_wr_dfs[op_key] = pd.DataFrame() return rd_wr_dfs
[docs]def get_single_df_dict( report: Any, mod: str = "DXT_POSIX", ops: Sequence[str] = ["read", "write"], ) -> Dict[str, pd.DataFrame]: """ Reorganizes segmented read/write data into a single ``pd.DataFrame`` and stores them in a dictionary with an entry for each DXT module. Parameters ---------- report: a ``darshan.DarshanReport``. mod: the DXT module to do analysis for (i.e. "DXT_POSIX" or "DXT_MPIIO"). Default is ``"DXT_POSIX"``. ops: a sequence of keys designating which Darshan operations to use for data aggregation. Default is ``["read", "write"]``. Returns ------- flat_data_dict: a nested dictionary where the input module keys (i.e. "DXT_POSIX") are the top level keys, which contain an entry for each input operation (i.e. "read"/"write") that map to dataframes containing all events for the specified operation. Examples -------- `flat_data_dict` generated from `tests/input/sample-dxt-simple.darshan`: { 'read': Empty DataFrame Columns: [] Index: [], 'write': length start_time end_time rank 0 40 0.103379 0.103388 0 1 4000 0.104217 0.104231 0 } """ # initialize an empty dictionary for storing # module and read/write data flat_data_dict = {} # type: Dict[str, Dict[str, pd.DataFrame]] # retrieve the list of records in pd.DataFrame() form dict_list = report.records[mod].to_df() # retrieve the list of read/write dataframes from the list of records rd_wr_dfs = get_rd_wr_dfs(dict_list=dict_list, ops=ops) # create empty dictionary flat_data_dict = {} for op_key in ops: # add the concatenated dataframe to the flat dictionary flat_data_dict[op_key] = rd_wr_dfs[op_key] return flat_data_dict
[docs]def get_aggregate_data( report: Any, mod: str = "DXT_POSIX", ops: Sequence[str] = ["read", "write"], ) -> pd.DataFrame: """ Aggregates the data based on which modules and operations are selected. Parameters ---------- report: a ``darshan.DarshanReport``. mod: the DXT module to do analysis for (i.e. "DXT_POSIX" or "DXT_MPIIO"). Default is ``"DXT_POSIX"``. ops: a sequence of keys designating which Darshan operations to use for data aggregation. Default is ``["read", "write"]``. Returns ------- agg_df: a ``pd.DataFrame`` containing the aggregated data determined by the input modules and operations. Raises ------ ValueError: raised if the selected modules/operations don't contain any data. Notes ----- Since read and write events are considered unique events, if both are selected their dataframes are simply concatenated. Examples -------- `agg_df` generated from `tests/input/sample-dxt-simple.darshan`: length start_time end_time rank 0 40 0.103379 0.103388 0 1 4000 0.104217 0.104231 0 """ # collect the concatenated dataframe data from the darshan report df_dict = get_single_df_dict(report=report, mod=mod, ops=ops) # iterate over each dataframe based on which operations are selected df_list = [] for op_key, op_df in df_dict.items(): # if the dataframe has data, append it to the list if op_df.size: df_list.append(op_df) if df_list: # if there are dataframes in the list, concatenate them into 1 dataframe agg_df = pd.concat(df_list, ignore_index=True) else: raise ValueError("No data available for selected module(s) and operation(s).") return agg_df
[docs]def get_heatmap_df(agg_df: pd.DataFrame, xbins: int, nprocs: int, max_time: Optional[float] = None) -> pd.DataFrame: """ Builds an array similar to a 2D-histogram, where the y data is the unique ranks and the x data is time. Each bin is populated with the data sum and/or proportionate data sum for all IO events read/written during the time spanned by the bin. Parameters ---------- agg_df: a ``pd.DataFrame`` containing the aggregated data determined by the input modules and operations. xbins: the number of x-axis bins to create. nprocs: the number of MPI ranks/processes used at runtime. max_time: the maximum time, since input DXT data is not necessarily bounded by wallclock duration Returns ------- hmap_df: dataframe with time intervals for columns and rank index (0, 1, etc.) for rows, where each element contains the data read/written by the corresponding rank in the given time interval. Examples -------- The first column/bin for the `hmap_df` generated from "examples/example-logs/ior_hdf5_example.darshan": (0.0, 0.09552296002705891] rank 0 8.951484e+05 1 3.746313e+05 2 6.350999e+05 3 1.048576e+06 """ # generate the bin edges by generating an array of length n_bins+1, then # taking pairs of data points as the min/max bin value if max_time is None: max_time = agg_df["end_time"].max() bin_edge_data = np.linspace(0.0, max_time, xbins + 1) # create dummy variables for start/end time data, where dataframe columns # are the x-axis bin ranges # pin dtype here because of pandas 2.0+ change--see: # gh-909 and # https://github.com/pandas-dev/pandas/pull/48022#issuecomment-1448755561 cats_start = pd.get_dummies( pd.cut(agg_df["start_time"], bin_edge_data, precision=16), dtype=np.uint8, ) cats_end = pd.get_dummies(pd.cut(agg_df["end_time"], bin_edge_data, precision=16)) # get series for the elapsed times for each dxt segment elapsed_times_dxt_segments = agg_df["end_time"] - agg_df["start_time"] # calculate the time interval spanned by each bin bin_size = bin_edge_data[1] - bin_edge_data[0] # get the ratio of the bin time interval over the dxt segment elapsed times true_fraction_of_dxt_segment_in_a_bin = bin_size / elapsed_times_dxt_segments # create a version of the above series where any ratio above 1 is set to 1 fraction_of_dxt_segment_in_a_bin = true_fraction_of_dxt_segment_in_a_bin.copy() fraction_of_dxt_segment_in_a_bin.mask( fraction_of_dxt_segment_in_a_bin > 1, 1, inplace=True ) # create a dataframe of binned dxt start events where occupied bins contain # the start time and unoccupied bins are populated with NaNs start_times_in_bins = cats_start.mul(agg_df["start_time"], axis=0).replace( 0, np.nan ) # create a dataframe of binned dxt start events where occupied bins contain # the proportional time spent in the bin start_fraction_bin_occupancy = (bin_edge_data[1:] - start_times_in_bins) / bin_size # create a dataframe of binned dxt end events where occupied bins contain # the end time and unoccupied bins are populated with NaNs end_times_in_bins = cats_end.mul(agg_df["end_time"], axis=0).replace(0, np.nan) # create a dataframe of binned dxt end events where occupied bins contain # the proportional time spent in the bin end_fraction_bin_occupancy = (end_times_in_bins - bin_edge_data[:-1]) / bin_size # combine the start/end fractional bin occupancy dataframes by multiplying # them together. Fill any missing values with (1) combo = start_fraction_bin_occupancy.mul( end_fraction_bin_occupancy, fill_value=1, axis=0 ) # add the start/end dummy variable dataframes # and replace any zeros with NaN's cats = cats_start.add(cats_end, fill_value=0) cats.replace(0, np.nan, inplace=True) # for each row (IO event) fill in any empty (NaN) bins # between filled bins because those are time spans b/w start # and stop events # interpolation is pointless when there is # a single non-null value in a row null_mask = cats.notna().sum(axis=1) > 1 null_mask = null_mask.loc[null_mask == True].index cats_vals = cats.iloc[null_mask].values nan_mask = ~np.isnan(cats_vals) fill_locs = np.where(nan_mask) num_rows = cats_vals.shape[0] start_index = 0 row = 0 # TODO: this is already much faster than # pandas per gh-798, but consider moving # to i.e., Cython or numba? while start_index < (2 * num_rows): start_bound = fill_locs[1][start_index] end_bound = fill_locs[1][start_index + 1] cats_vals[row, start_bound:end_bound] = 1 start_index += 2 row += 1 cats.iloc[null_mask] = cats_vals # each time bin containing an event has a 1 in it, otherwise NaN # store mask for restoring fully occupied bins mask_occ = cats == 2 # set the fraction of segment per occupied bin cats = cats.mul(fraction_of_dxt_segment_in_a_bin, axis=0) # start/end in adjacent bins require special treatment adjacent_mask = (cats.count(axis=1) == 2) & (cats.sum(axis=1) == 2) # multiply them by the true fractions of DXT segments, which # may actually be > 1 (i.e., the non-1-capped fractions) cats[adjacent_mask] = cats[adjacent_mask].mul( true_fraction_of_dxt_segment_in_a_bin[adjacent_mask], axis=0 ) # adjust the fractions of the starts/ends for partial occupancy # of bins cats = cats.mul(combo, fill_value=1, axis=0) # start and end are in bin, so restore it cats = cats.mask(mask_occ, 1) # each full or fractional bin event is now multiplied by # the bytes data cats = cats.mul(agg_df["length"], axis=0) cats.index = agg_df["rank"] hmap_df = cats.groupby("rank").sum() hmap_df = hmap_df.reindex(index=range(nprocs), fill_value=0.0) return hmap_df