import pytest
import numpy as np
from numpy.testing import assert_array_equal, assert_allclose
import pandas as pd
import darshan
from darshan.experimental.plots import heatmap_handling
from darshan.log_utils import get_log_path
[docs]@pytest.fixture(scope="function")
def dict_list():
# mock data structure created to test `heatmap_handling.get_rd_wr_dfs()`
# generates a list of python dictionaries which each contain dataframes
# for read/write events
# create a small data set to store in a dataframe
n_data_points = 10
start_arr = np.linspace(0, 3, n_data_points)
end_arr = start_arr + 0.5
len_arr = np.arange(1, n_data_points + 1)
offset_arr = np.arange(n_data_points) + n_data_points
# use the data above to create a base dataframe to use
# for creating the dictionary list
base_df = pd.DataFrame(
data=np.column_stack((start_arr, end_arr, len_arr, offset_arr)),
columns=["length", "start_time", "end_time", "offset"],
)
# initialize an empty list for storing dictionaries (containing dataframes)
dict_list = []
# only iterate 3 times to keep data structure simple
for i in range(1, 4):
# create a dictionary with a rank index, a read segment (dataframe)
# and a write segment (dataframe)
_dict = {}
# assign the rank using the index
_dict["rank"] = i
# for the read segment, multiply the dataframe data by the index so
# each segment has distinguishable values
_dict["read_segments"] = i * base_df
# do the same for the write segments, but only for the middle iteration
if i == 2:
# again, assign a modified dataframe for uniqueness
_dict["write_segments"] = i * (base_df + 10)
else:
# assign an empty dataframe for first and last iterations
_dict["write_segments"] = pd.DataFrame()
dict_list.append(_dict)
return dict_list
[docs]@pytest.fixture(scope="function")
def dict_list_no_writes():
# Similar to `dict_list`, this fixture is targeted at creating a
# dictionary list for `heatmap_handling.get_rd_wr_dfs()` such that
# the returned `write_df` is an empty dataframe
# create a small data set to store in a dataframe
n_data_points = 10
start_arr = np.linspace(0, 3, n_data_points)
end_arr = start_arr + 0.5
len_arr = np.arange(1, n_data_points + 1)
offset_arr = np.arange(n_data_points) + n_data_points
# use the data above to create a base dataframe to use
# for creating the dictionary list
base_df = pd.DataFrame(
data=np.column_stack((start_arr, end_arr, len_arr, offset_arr)),
columns=["length", "start_time", "end_time", "offset"],
)
# initialize an empty list for storing dictionaries (containing dataframes)
dict_list = []
# only iterate 3 times to keep data structure simple
for i in range(1, 4):
# create a dictionary with a rank index, a read segment (dataframe)
# and a write segment (dataframe)
_dict = {}
# assign the rank using the index
_dict["rank"] = i
# for the read segment, multiply the dataframe data by the index so
# each segment has distinguishable values
_dict["read_segments"] = i * base_df
# for the write segments assign an empty dataframe
_dict["write_segments"] = pd.DataFrame()
dict_list.append(_dict)
return dict_list
[docs]def test_get_rd_wr_dfs(dict_list):
# regression test for `heatmap_handling.get_rd_wr_dfs()`
rd_wr_dfs = heatmap_handling.get_rd_wr_dfs(dict_list=dict_list)
# retrieve read/write dataframes from the dictionary
read_df = rd_wr_dfs["read"]
write_df = rd_wr_dfs["write"]
# check that we get the correct data shape after
# combining the read/write dataframes
assert read_df.shape == (30, 4)
assert write_df.shape == (10, 4)
# check that the correct column names are generated. We expect "offset"
# to be missing and "rank" to be added
df_keys = ["length", "start_time", "end_time", "rank"]
assert list(read_df.columns) == df_keys
assert list(write_df.columns) == df_keys
# verify the correct rank values are displayed. Since a read segment was
# generated for each iteration, there should be ranks 1-3, and since
# a write segment was only generated for the middle iteration we should
# only get 2
assert_array_equal(np.unique(read_df["rank"].values), [1, 2, 3])
assert_array_equal(np.unique(write_df["rank"].values), [2])
# since we ignore the original row indices in the individual dataframes
# make sure we get the correct indices (0-29) and (0-9) for read and write,
# respectively
assert_array_equal(read_df.index, np.arange(30))
assert_array_equal(write_df.index, np.arange(10))
[docs]def test_get_rd_wr_dfs_no_write(dict_list_no_writes):
# based on `test_get_rd_wr_dfs`, regression test for
# `heatmap_handling.get_rd_wr_dfs()` to cover case
# where there are no write events found
rd_wr_dfs = heatmap_handling.get_rd_wr_dfs(dict_list=dict_list_no_writes)
# retrieve read/write dataframes from the dictionary
read_df = rd_wr_dfs["read"]
write_df = rd_wr_dfs["write"]
# since there are no write dataframes we should get an empty write dataframe
assert write_df.empty
# check that we get the correct data shape after
# combining the read dataframes
assert read_df.shape == (30, 4)
# check that the correct column names are generated. We expect "offset"
# to be missing and "rank" to be added
df_keys = ["length", "start_time", "end_time", "rank"]
assert list(read_df.columns) == df_keys
# verify the correct rank values are displayed. Since a read segment was
# generated for each iteration, there should be ranks 1-3
assert_array_equal(np.unique(read_df["rank"].values), [1, 2, 3])
# since we ignore the original row indices in the individual dataframes
# make sure we get the correct indices (0-29) for read
assert_array_equal(read_df.index, np.arange(30))
[docs]@pytest.mark.parametrize(
# all 3 test cases are based on the outputs for
# `tests/input/sample-dxt-simple.darshan`, which only has write data
"ops, expected_df_dict",
[
(
# check the result using both operations
["read", "write"],
{
"read": pd.DataFrame(),
"write": pd.DataFrame(
columns=["length", "start_time", "end_time", "rank"],
data=np.array(
[
[40, 0.10337884305045009, 0.10338771319948137, 0],
[4000, 0.10421665315516293, 0.10423145908862352, 0],
]
),
),
},
),
(
# check the result for only the "read" operation, should be empty
["read"],
{"read": pd.DataFrame()},
),
(
# the results for only checking the "write" data should be the same
# as checking both operations
["write"],
{
"write": pd.DataFrame(
columns=["length", "start_time", "end_time", "rank"],
data=np.array(
[
[40, 0.10337884305045009, 0.10338771319948137, 0],
[4000, 0.10421665315516293, 0.10423145908862352, 0],
]
),
),
},
),
],
)
def test_get_single_df_dict(expected_df_dict, ops):
# regression test for `heatmap_handling.get_single_df_dict()`
with darshan.DarshanReport(get_log_path("sample-dxt-simple.darshan")) as report:
actual_df_dict = heatmap_handling.get_single_df_dict(
report=report, mod="DXT_POSIX", ops=ops
)
# make sure we get the same key(s) ("read", "write")
assert actual_df_dict.keys() == expected_df_dict.keys()
# also check that we only get the key(s) we requested
assert list(actual_df_dict.keys()) == ops
if "read" in ops:
# for the read case, check that we get an empty dataframe
assert actual_df_dict["read"].empty
if "write" in ops:
# check that we get the same column names
assert_array_equal(
actual_df_dict["write"].columns,
expected_df_dict["write"].columns,
)
# verify the returned values are the same
assert_allclose(
actual_df_dict["write"].values,
expected_df_dict["write"].values,
)
[docs]@pytest.mark.parametrize(
"log_file, mod, ops, expected_agg_data",
[
# all 3 test cases are based on the outputs for
# `tests/input/sample-dxt-simple.darshan`, which only has write data
(
"sample-dxt-simple.darshan",
"DXT_POSIX",
["read", "write"],
np.array(
[
[40, 0.10337884305045009, 0.10338771319948137, 0],
[4000, 0.10421665315516293, 0.10423145908862352, 0],
]
),
),
# for "read" case input None since there is no data to compare
("sample-dxt-simple.darshan", "DXT_POSIX", ["read"], None),
("sample-dxt-simple.darshan", "DXT_MPIIO", ["read"], None),
(
"sample-dxt-simple.darshan",
"DXT_POSIX",
["write"],
np.array(
[
[40, 0.10337884305045009, 0.10338771319948137, 0],
[4000, 0.10421665315516293, 0.10423145908862352, 0],
]
),
),
(
"ior_hdf5_example.darshan",
"DXT_MPIIO",
["write"],
np.array(
[
[262144, 0.029964923858642578, 0.033110857009887695, 0],
[262144, 0.03313708305358887, 0.03374886512756348, 0],
[262144, 0.03376293182373047, 0.03420686721801758, 0],
[262144, 0.03422093391418457, 0.1820380687713623, 0],
[40, 0.22188901901245117, 0.23144793510437012, 0],
[96, 0.2314610481262207, 0.23147892951965332, 0],
[96, 0.23216795921325684, 0.2321760654449463, 0],
[262144, 0.0299680233001709, 0.03130483627319336, 1],
[262144, 0.03133583068847656, 0.18091988563537598, 1],
[262144, 0.1809389591217041, 0.18172383308410645, 1],
[262144, 0.18174386024475098, 0.18261194229125977, 1],
[544, 0.2218928337097168, 0.23146295547485352, 1],
[120, 0.23146700859069824, 0.23148202896118164, 1],
[262144, 0.0299680233001709, 0.03239917755126953, 2],
[262144, 0.03243207931518555, 0.03294110298156738, 2],
[262144, 0.03295707702636719, 0.1809689998626709, 2],
[262144, 0.18098902702331543, 0.2218320369720459, 2],
[272, 0.22189807891845703, 0.23153114318847656, 2],
[262144, 0.029965877532958984, 0.031455039978027344, 3],
[262144, 0.03148388862609863, 0.03171586990356445, 3],
[262144, 0.03172898292541504, 0.03197503089904785, 3],
[262144, 0.03198695182800293, 0.032212018966674805, 3],
[328, 0.2218940258026123, 0.23151302337646484, 3],
]
)
)
],
)
def test_get_aggregate_data(log_file, expected_agg_data, mod, ops):
# regression test for `heatmap_handling.get_aggregate_data()`
log_file = get_log_path(log_file)
with darshan.DarshanReport(log_file) as report:
if ops == ["read"]:
expected_msg = (
"No data available for selected module\\(s\\) and operation\\(s\\)."
)
with pytest.raises(ValueError, match=expected_msg):
# expect an error because there are no read segments
# in sample-dxt-simple.darshan
actual_agg_data = heatmap_handling.get_aggregate_data(
report=report, mod=mod, ops=ops
)
else:
actual_agg_data = heatmap_handling.get_aggregate_data(
report=report, mod=mod, ops=ops
)
# for other cases, make sure the value arrays are identically valued
assert_allclose(actual_agg_data.values, expected_agg_data)
[docs]@pytest.mark.parametrize(
"filepath, xbins, ops, expected_hmap_data",
[
# iterate over 3 different darshan logs, various bin counts, and
# combinations of operations, checking the heatmap data array
# output for each case.
# For `sample-dxt-simple.darshan` the selected
# operations are not changed because there is no "read" data
(
"sample-dxt-simple.darshan",
1,
["read", "write"],
np.array([[4040, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0 , 0 ,0]]).reshape(16, 1),
),
(
"sample-dxt-simple.darshan",
4,
["read", "write"],
np.vstack((
np.array([[0, 0, 0, 4040]]),
np.zeros((15, 4)))),
),
(
"sample-dxt-simple.darshan",
10,
["read", "write"],
np.vstack((
np.array([[0, 0, 0, 0, 0, 0, 0, 0, 0, 4040]]),
np.zeros((15, 10)))),
),
# `dxt.darshan` is complex enough to warrant changing the
# selected operations
("dxt.darshan", 1, ["read"], np.array([[22517726]])),
(
"dxt.darshan",
4,
["read"],
np.array([[10214363, 0, 8070137, 4233226]]),
),
(
"dxt.darshan",
10,
["read"],
np.array([[10214363, 0, 0, 0, 0, 0, 8070137, 0, 0, 4233226]]),
),
("dxt.darshan", 1, ["write"], np.array([[13021781]])),
(
"dxt.darshan",
4,
["write"],
np.array([[4381, 0, 10915913, 2101487]]),
),
(
"dxt.darshan",
10,
["write"],
np.array([[4381, 0, 0, 0, 0, 0, 10915913, 0, 0, 2101487]]),
),
(
"dxt.darshan",
1,
["read", "write"],
np.array([[35539507]]),
),
(
"dxt.darshan",
4,
["read", "write"],
np.array([[10218744, 0, 18986050, 6334713]]),
),
(
"dxt.darshan",
10,
["read", "write"],
np.array([[10218744, 0, 0, 0, 0, 0, 18986050, 0, 0, 6334713]]),
),
# `ior_hdf5_example.darshan` is the only log with multiple ranks (4),
# so it also gets different operation combinations
(
"ior_hdf5_example.darshan",
1,
["read"],
np.array([[1051088], [1050472], [1050472], [1050472]]),
),
(
"ior_hdf5_example.darshan",
10,
["read"],
np.array(
[
[0, 0, 0, 0, 0, 0, 0, 0, 1051088, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 107988.68001937, 942483.31998063],
[0, 0, 0, 0, 0, 0, 0, 0, 1050472, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 1050472, 0],
]
),
),
(
"ior_hdf5_example.darshan",
1,
["write"],
np.array(
[
[1048808],
[1049240],
[1048848],
[1048904],
]
),
),
(
"ior_hdf5_example.darshan",
10,
["write"],
np.array(
[
[
0,
808091.3650729951,
41175.65189967951,
41175.6518996795,
41175.6518996795,
41175.65189967952,
41175.65189967947,
34606.37542860738,
0,
232,
],
[
0,
288603.1671221,
40689.00335231,
40689.00335231,
40689.00335231,
40689.00335231,
40689.00335231,
556527.81611634,
0,
664,
],
[
0,
548158.6819154,
41120.86590861,
41120.86590861,
41120.86590861,
41120.86590861,
41120.86590861,
63152.77796803,
149027.37037175,
82904.84020176,
],
[0, 1048576, 0, 0, 0, 0, 0, 0, 0, 328],
]
),
),
(
"ior_hdf5_example.darshan",
1,
["read", "write"],
np.array(
[
[2099896],
[2099712],
[2099320],
[2099376],
]
),
),
(
"ior_hdf5_example.darshan",
10,
["read", "write"],
np.array(
[
[
0,
827385.0734944909,
50822.506110427385,
50822.50611042739,
50822.50611042739,
50822.50611042737,
17900.90206379957,
30.830417529761142,
1051289.1695824703,
0,
],
[
0,
307668.84624124144,
50221.842911882275,
50221.84291188228,
50221.84291188228,
50221.84291188226,
540019.7821112294,
418.49887595643366,
108234.18114341467,
942483.3199806289,
],
[
0,
567426.7192208751,
50754.88456134755,
50754.88456134756,
50754.88456134756,
50754.884561347535,
141846.72456250372,
136490.77078712088,
1050536.24718411,
0,
],
[
0,
1048576,
0,
0,
0,
0,
0,
251.0322619047617,
1050548.9677380952,
0,
],
]
),
),
],
)
def test_get_heatmap_df(
filepath,
expected_hmap_data,
xbins,
ops,
):
# regression test for `heatmap_handling.get_heatmap_df()`
# generate the report and use it to obtain the aggregated data
filepath = get_log_path(filepath)
with darshan.DarshanReport(filepath) as report:
agg_df = heatmap_handling.get_aggregate_data(
report=report, mod="DXT_POSIX", ops=ops
)
nprocs = report.metadata["job"]["nprocs"]
# run the aggregated data through the heatmap data code
actual_hmap_data = heatmap_handling.get_heatmap_df(agg_df=agg_df, xbins=xbins, nprocs=nprocs)
if "sample-dxt-simple.darshan" in filepath:
# check the data is conserved
assert actual_hmap_data.values.sum() == 4040
# make sure the output array is the correct shape
assert actual_hmap_data.shape == (16, xbins)
# make sure the output data contains identical values
assert_allclose(actual_hmap_data.values, expected_hmap_data)
elif "dxt.darshan" in filepath:
# make sure the output array is the correct shape
assert actual_hmap_data.shape == (1, xbins)
# make sure the output data contains identical values
assert_allclose(actual_hmap_data.values, expected_hmap_data)
# for each combination of operations, make sure the sum is correct
if len(ops) == 2:
assert actual_hmap_data.values.sum() == 35539507
elif ops[0] == "read":
assert actual_hmap_data.values.sum() == 22517726
elif ops[0] == "write":
assert actual_hmap_data.values.sum() == 13021781
elif "ior_hdf5_example.darshan" in filepath:
# make sure the output array is the correct shape
assert actual_hmap_data.shape == (4, xbins)
# make sure the output data contains identical values
assert_allclose(actual_hmap_data.values, expected_hmap_data)
# for each combination of operations, make sure the sum is correct
if len(ops) == 2:
assert actual_hmap_data.values.sum() == 8398304
elif ops[0] == "read":
assert actual_hmap_data.values.sum() == 4202504
elif ops[0] == "write":
assert actual_hmap_data.values.sum() == 4195800