Source code for darshan.backend.cffi_backend

# -*- coding: utf-8 -*-
"""The cfii_backend package will read a darshan log
using the functions defined in libdarshan-util.so
and is interfaced via the python CFFI module.
"""

import functools

import cffi
import ctypes

import numpy as np
import pandas as pd

from collections import namedtuple

import logging
logger = logging.getLogger(__name__)


from darshan.backend.api_def_c import load_darshan_header
from darshan.discover_darshan import find_utils
from darshan.discover_darshan import check_version

addins = ""

#
# Optional APXC module
#
try:
  from darshan.backend.apxc import *
  addins += get_apxc_defs()
except:
  pass

#
# Optional APMPI module
#
try:
  from darshan.backend.apmpi import *
  addins += get_apmpi_defs()
except:
  pass

API_def_c = load_darshan_header(addins)

ffi = cffi.FFI()
ffi.cdef(API_def_c)

libdutil = None
libdutil = find_utils(ffi, libdutil)

check_version(ffi, libdutil)


_mod_names = [
    "NULL",
    "POSIX",
    "MPI-IO",
    "H5F",
    "H5D",
    "PNETCDF_FILE",
    "PNETCDF_VAR",
    "BG/Q",
    "LUSTRE",
    "STDIO",
    "DXT_POSIX",
    "DXT_MPIIO",
    "MDHIM",
    "APXC",
    "APMPI",
    "HEATMAP",
]
[docs]def mod_name_to_idx(mod_name): return _mod_names.index(mod_name)
_structdefs = { "BG/Q": "struct darshan_bgq_record **", "DXT_MPIIO": "struct dxt_file_record **", "DXT_POSIX": "struct dxt_file_record **", "HEATMAP": "struct darshan_heatmap_record **", "H5F": "struct darshan_hdf5_file **", "H5D": "struct darshan_hdf5_dataset **", "LUSTRE": "struct darshan_lustre_record **", "MPI-IO": "struct darshan_mpiio_file **", "PNETCDF_FILE": "struct darshan_pnetcdf_file **", "PNETCDF_VAR": "struct darshan_pnetcdf_var **", "POSIX": "struct darshan_posix_file **", "STDIO": "struct darshan_stdio_file **", "APXC-HEADER": "struct darshan_apxc_header_record **", "APXC-PERF": "struct darshan_apxc_perf_record **", "APMPI-HEADER": "struct darshan_apmpi_header_record **", "APMPI-PERF": "struct darshan_apmpi_perf_record **", }
[docs]def get_lib_version(): """ Return the version information hardcoded into the shared library. Args: None Return: version (str): library version number """ ver = ffi.new("char **") ver = libdutil.darshan_log_get_lib_version() version = ffi.string(ver).decode("utf-8") return version
[docs]def log_open(filename): """ Opens a darshan logfile. Args: filename (str): Path to a darshan log file Return: log handle """ b_fname = filename.encode() handle = libdutil.darshan_log_open(b_fname) log = {"handle": handle, 'modules': None, 'name_records': None} return log
[docs]def log_close(log): """ Closes the logfile and releases allocated memory. """ libdutil.darshan_log_close(log['handle']) #modules = {} return
[docs]def log_get_job(log): """ Returns a dictionary with information about the current job. """ job = {} jobrec = ffi.new("struct darshan_job *") libdutil.darshan_log_get_job(log['handle'], jobrec) job['uid'] = jobrec[0].uid job['start_time_sec'] = jobrec[0].start_time_sec job['start_time_nsec'] = jobrec[0].start_time_nsec job['end_time_sec'] = jobrec[0].end_time_sec job['end_time_nsec'] = jobrec[0].end_time_nsec job['nprocs'] = jobrec[0].nprocs job['jobid'] = jobrec[0].jobid runtime = ffi.new("double *") libdutil.darshan_log_get_job_runtime(log['handle'], jobrec[0], runtime) job['run_time'] = runtime[0] # dirty hack to get log format version -- we know it's currently stored at the # very beginning of the log handle structure, so we just cast the struct # pointer as a string... job['log_ver'] = ffi.string(ffi.cast("char *", log['handle'])).decode("utf-8") mstr = ffi.string(jobrec[0].metadata).decode("utf-8") md = {} for kv in mstr.split('\n')[:-1]: k,v = kv.split('=', maxsplit=1) md[k] = v job['metadata'] = md return job
[docs]def log_get_exe(log): """ Get details about the executable (path and arguments) Args: log: handle returned by darshan.open Return: string: executable path and arguments """ exestr = ffi.new("char[]", 4096) libdutil.darshan_log_get_exe(log['handle'], exestr) return ffi.string(exestr).decode("utf-8")
[docs]def log_get_mounts(log): """ Returns a list of available mounts recorded for the log. Args: log: handle returned by darshan.open """ mntlst = [] mnts = ffi.new("struct darshan_mnt_info **") cnt = ffi.new("int *") libdutil.darshan_log_get_mounts(log['handle'], mnts, cnt) for i in range(0, cnt[0]): mntlst.append((ffi.string(mnts[0][i].mnt_path).decode("utf-8"), ffi.string(mnts[0][i].mnt_type).decode("utf-8"))) libdutil.darshan_free(mnts[0]) return mntlst
[docs]def log_get_modules(log): """ Return a dictionary containing available modules including information about the contents available for each module in the current log. Args: log: handle returned by darshan.open Return: dict: Modules with additional info for current log. """ # use cached module index if already present if log['modules'] != None: return log['modules'] modules = {} mods = ffi.new("struct darshan_mod_info **") cnt = ffi.new("int *") libdutil.darshan_log_get_modules(log['handle'], mods, cnt) for i in range(0, cnt[0]): modules[ffi.string(mods[0][i].name).decode("utf-8")] = \ {'len': mods[0][i].len, 'ver': mods[0][i].ver, 'idx': mods[0][i].idx, 'partial_flag': bool(mods[0][i].partial_flag)} # add to cache log['modules'] = modules libdutil.darshan_free(mods[0]) return modules
[docs]def log_get_name_records(log): """ Return a dictionary resovling hash to string (typically a filepath). Args: log: handle returned by darshan.open hash: hash-value (a number) Return: dict: the name records """ # used cached name_records if already present if log['name_records'] != None: return log['name_records'] name_records = {} nrecs = ffi.new("struct darshan_name_record **") cnt = ffi.new("int *") libdutil.darshan_log_get_name_records(log['handle'], nrecs, cnt) for i in range(0, cnt[0]): name_records[nrecs[0][i].id] = ffi.string(nrecs[0][i].name).decode("utf-8") libdutil.darshan_free(nrecs[0][i].name) libdutil.darshan_free(nrecs[0]) # add to cache log['name_records'] = name_records return name_records
[docs]def log_lookup_name_records(log, ids=[]): """ Resolve a single hash to it's name record string (typically a filepath). Args: log: handle returned by darshan.open hash: hash-value (a number) Return: dict: the name records """ name_records = {} #cids = ffi.new("darshan_record_id *") * len(ids) whitelist = (ctypes.c_ulonglong * len(ids))(*ids) whitelist_cnt = len(ids) whitelistp = ffi.from_buffer(whitelist) nrecs = ffi.new("struct darshan_name_record **") cnt = ffi.new("int *") libdutil.darshan_log_get_filtered_name_records(log['handle'], nrecs, cnt, ffi.cast("darshan_record_id *", whitelistp), whitelist_cnt) for i in range(0, cnt[0]): name_records[nrecs[0][i].id] = ffi.string(nrecs[0][i].name).decode("utf-8") libdutil.darshan_free(nrecs[0][i].name) libdutil.darshan_free(nrecs[0]) # add to cache log['name_records'] = name_records return name_records
[docs]def log_get_record(log, mod, dtype='numpy'): """ Standard entry point fetch records via mod string. Args: log: Handle returned by darshan.open mod_name (str): Name of the Darshan module Return: log record of type dtype """ if mod in ['LUSTRE']: rec = _log_get_lustre_record(log, dtype=dtype) elif mod in ['HEATMAP']: rec = _log_get_heatmap_record(log) elif mod in ['DXT_POSIX', 'DXT_MPIIO']: rec = log_get_dxt_record(log, mod, dtype=dtype) else: rec = log_get_generic_record(log, mod, dtype=dtype) return rec
[docs]def log_get_generic_record(log, mod_name, dtype='numpy'): """ Returns a dictionary holding a generic darshan log record. Args: log: Handle returned by darshan.open mod_name (str): Name of the Darshan module Return: dict: generic log record Example: The typical darshan log record provides two arrays, on for integer counters and one for floating point counters: >>> darshan.log_get_generic_record(log, "POSIX", "struct darshan_posix_file **") {'counters': array([...], dtype=int64), 'fcounters': array([...])} """ modules = log_get_modules(log) if mod_name not in modules: return None mod_type = _structdefs[mod_name] buf = ffi.new("void **") r = libdutil.darshan_log_get_record(log['handle'], modules[mod_name]['idx'], buf) if r < 1: return None rbuf = ffi.cast(mod_type, buf) rec = _make_generic_record(rbuf, mod_name, dtype) libdutil.darshan_free(buf[0]) return rec
[docs]def _make_generic_record(rbuf, mod_name, dtype='numpy'): """ Returns a record dictionary for an input record buffer for a given module. """ rec = {} rec['id'] = rbuf[0].base_rec.id rec['rank'] = rbuf[0].base_rec.rank if mod_name == 'H5D' or mod_name == 'PNETCDF_VAR': rec['file_rec_id'] = rbuf[0].file_rec_id clst = np.copy(np.frombuffer(ffi.buffer(rbuf[0].counters), dtype=np.int64)) flst = np.copy(np.frombuffer(ffi.buffer(rbuf[0].fcounters), dtype=np.float64)) c_cols = counter_names(mod_name) fc_cols = fcounter_names(mod_name) if dtype == "numpy": rec['counters'] = clst rec['fcounters'] = flst elif dtype == "dict": rec['counters'] = dict(zip(c_cols, clst)) rec['fcounters'] = dict(zip(fc_cols, flst)) elif dtype == "pandas": # prepend id/rank columns new_cols = ["id", "rank"] new_c_cols = new_cols + c_cols new_f_cols = new_cols + fc_cols rec_id = np.uint64(rec["id"]) # prepend the id/rank values id_rank_list = [rec["id"], rec["rank"]] new_clst = np.asarray([id_rank_list + clst.tolist()]).reshape(1, -1) new_flst = np.asarray([id_rank_list + flst.tolist()], dtype=np.float64).reshape(1, -1) # create the dataframes df_c = pd.DataFrame(data=new_clst, columns=new_c_cols) df_fc = pd.DataFrame(data=new_flst, columns=new_f_cols) # correct the data type for the file hash/id df_c['id'] = rec_id df_fc['id'] = rec_id # assign the dataframes to the record rec['counters'] = df_c rec['fcounters'] = df_fc return rec
[docs]@functools.lru_cache(maxsize=32) def counter_names(mod_name, fcnts=False, special=''): """ Returns a list of available counter names for the module. By default only integer counter names are listed, unless fcnts is set to true in which case only the floating point counter names are listed. Args: mod_name (str): Name of the module to return counter names. fcnts (bool): Switch to request floating point counters instead of integer. (Default: False) Return: list: Counter names as strings. """ if mod_name == 'MPI-IO': mod_name = 'MPIIO' names = [] i = 0 if fcnts: F = "f_" else: F = "" end = "{0}_{1}{2}NUM_INDICES".format(mod_name.upper(), F.upper(), special.upper()) var_name = "{0}_{1}{2}counter_names".format(mod_name.lower(), F.lower(), special.lower()) while True: try: var = getattr(libdutil, var_name) except: var = None if not var: return None name = ffi.string(var[i]).decode("utf-8") if name == end: break names.append(name) i += 1 return names
[docs]@functools.lru_cache(maxsize=32) def fcounter_names(mod_name): """ Returns a list of available floating point counter names for the module. Args: mod_name (str): Name of the module to return counter names. Return: list: Available floiting point counter names as strings. """ return counter_names(mod_name, fcnts=True)
[docs]def _log_get_lustre_record(log, dtype='numpy'): """ Returns a darshan log record for Lustre. Args: log: handle returned by darshan.open """ modules = log_get_modules(log) if 'LUSTRE' not in modules: return None rec = {} buf = ffi.new("void **") r = libdutil.darshan_log_get_record(log['handle'], modules['LUSTRE']['idx'], buf) if r < 1: return None rbuf = ffi.cast("struct darshan_lustre_record **", buf) rec['id'] = rbuf[0].base_rec.id rec['rank'] = rbuf[0].base_rec.rank # components rec['components'] = [] ost_ids = ffi.cast("int64_t *", rbuf[0].ost_ids) ost_idx = 0 for i in range(0, rbuf[0].num_comps): component = {} component['counters'] = np.copy(np.frombuffer(ffi.buffer(rbuf[0].comps[i].counters), dtype=np.int64)) component['pool_name'] = ffi.string(rbuf[0].comps[i].pool_name).decode("utf-8") cdict = dict(zip(counter_names('LUSTRE_COMP'), component['counters'])) # ost info stripe_count = cdict['LUSTRE_COMP_STRIPE_COUNT'] ostlst = ffi.unpack(ost_ids + ost_idx, int(stripe_count)) ost_idx += int(stripe_count) component['ost_ids'] = np.array(ostlst, dtype=np.int64) # dtype conversion if dtype == "dict": component.update({ 'counters': cdict, 'ost_ids': ostlst }) elif dtype == "pandas": df_c = pd.DataFrame(cdict, index=[0]) # prepend id and rank df_c = df_c[df_c.columns[::-1]] # flip colum order df_c['id'] = rec['id'] df_c['rank'] = rec['rank'] df_c = df_c[df_c.columns[::-1]] # flip back # add pool_name string to df df_c['LUSTRE_POOL_NAME'] = component['pool_name'] # add ost list to df df_c['LUSTRE_OST_IDS'] = [component['ost_ids']] # overwrite component with comprehensive dataframe component = df_c rec['components'].append(component) if dtype == "pandas": combined_c = None for component in rec['components']: if combined_c is None: combined_c = component else: combined_c = pd.concat([combined_c, component]) rec['components'] = combined_c libdutil.darshan_free(buf[0]) return rec
[docs]def log_get_dxt_record(log, mod_name, reads=True, writes=True, dtype='dict'): """ Returns a dictionary holding a dxt darshan log record. Args: log: Handle returned by darshan.open mod_name (str): Name of the Darshan module mod_type (str): String containing the C type Return: dict: generic log record Example: The typical darshan log record provides two arrays, on for integer counters and one for floating point counters: >>> darshan.log_get_dxt_record(log, "DXT_POSIX", "struct dxt_file_record **") {'rank': 0, 'read_count': 11, 'read_segments': array([...]), ...} """ modules = log_get_modules(log) if mod_name not in modules: return None mod_type = _structdefs[mod_name] #name_records = log_get_name_records(log) rec = {} buf = ffi.new("void **") r = libdutil.darshan_log_get_record(log['handle'], modules[mod_name]['idx'], buf) if r < 1: return None filerec = ffi.cast(mod_type, buf) rec['id'] = filerec[0].base_rec.id rec['rank'] = filerec[0].base_rec.rank rec['hostname'] = ffi.string(filerec[0].hostname).decode("utf-8") #rec['filename'] = name_records[rec['id']] wcnt = filerec[0].write_count rcnt = filerec[0].read_count rec['write_count'] = wcnt rec['read_count'] = rcnt rec['write_segments'] = [] rec['read_segments'] = [] size_of = ffi.sizeof("struct dxt_file_record") segments = ffi.cast("struct segment_info *", buf[0] + size_of ) for i in range(wcnt): seg = { "offset": segments[i].offset, "length": segments[i].length, "start_time": segments[i].start_time, "end_time": segments[i].end_time } rec['write_segments'].append(seg) for i in range(rcnt): i = i + wcnt seg = { "offset": segments[i].offset, "length": segments[i].length, "start_time": segments[i].start_time, "end_time": segments[i].end_time } rec['read_segments'].append(seg) if dtype == "pandas": rec['read_segments'] = pd.DataFrame(rec['read_segments']) rec['write_segments'] = pd.DataFrame(rec['write_segments']) libdutil.darshan_free(buf[0]) return rec
[docs]def _log_get_heatmap_record(log): """ Returns a dictionary holding a heatmap darshan log record. Args: log: Handle returned by darshan.open Return: dict: heatmap log record """ mod_name = "HEATMAP" modules = log_get_modules(log) if mod_name not in modules: return None mod_type = _structdefs[mod_name] rec = {} buf = ffi.new("void **") r = libdutil.darshan_log_get_record(log['handle'], modules[mod_name]['idx'], buf) if r < 1: return None filerec = ffi.cast(mod_type, buf) rec['id'] = filerec[0].base_rec.id rec['rank'] = filerec[0].base_rec.rank bin_width_seconds = filerec[0].bin_width_seconds nbins = filerec[0].nbins rec['bin_width_seconds'] = bin_width_seconds rec['nbins'] = nbins # write/read bins sizeof_64 = ffi.sizeof("int64_t") write_bins = np.copy(np.frombuffer(ffi.buffer(filerec[0].write_bins, sizeof_64*nbins), dtype = np.int64)) rec['write_bins'] = write_bins read_bins = np.copy(np.frombuffer(ffi.buffer(filerec[0].read_bins, sizeof_64*nbins), dtype = np.int64)) rec['read_bins'] = read_bins libdutil.darshan_free(buf[0]) return rec
[docs]def _df_to_rec(rec_dict, mod_name, rec_index_of_interest=None): """ Pack the DataFrames-format PyDarshan data back into a C buffer of records that can be consumed by darshan-util C code. Parameters ---------- rec_dict: dict Dictionary containing the counter and fcounter dataframes. mod_name: str Name of the darshan module. rec_index_of_interest: int or None If ``None``, use all records in the dataframe. Otherwise, repack only the the record at the provided integer index. Returns ------- buf: Raw char array containing a buffer of record(s) or a single record. """ counters_df = rec_dict["counters"] fcounters_df = rec_dict["fcounters"] counters_n_cols = counters_df.shape[1] fcounters_n_cols = fcounters_df.shape[1] id_col = counters_df.columns.get_loc("id") rank_col = counters_df.columns.get_loc("rank") if rec_index_of_interest is None: num_recs = counters_df.shape[0] # newer pandas versions can support ... # but we use a slice for now rec_index_of_interest = slice(0, counters_df.shape[0]) else: num_recs = 1 # id and rank columns are duplicated # in counters and fcounters rec_arr = np.recarray(shape=(num_recs), dtype=[("id", "<u8", (1,)), ("rank", "<i8", (1,)), ("counters", "<i8", (counters_n_cols - 2,)), ("fcounters", "<f8", (fcounters_n_cols - 2,))]) rec_arr.fcounters = fcounters_df.iloc[rec_index_of_interest, 2:].to_numpy() rec_arr.counters = counters_df.iloc[rec_index_of_interest, 2:].to_numpy() if num_recs > 1: rec_arr.id = counters_df.iloc[rec_index_of_interest, id_col].to_numpy().reshape((num_recs, 1)) rec_arr.rank = counters_df.iloc[rec_index_of_interest, rank_col].to_numpy().reshape((num_recs, 1)) else: rec_arr.id = counters_df.iloc[rec_index_of_interest, id_col] rec_arr.rank = counters_df.iloc[rec_index_of_interest, rank_col] buf = rec_arr.tobytes() return buf
[docs]def accumulate_records(rec_dict, mod_name, nprocs): """ Passes a set of records (in pandas format) to the Darshan accumulator interface, and returns the corresponding derived metrics struct and summary record. Parameters: rec_dict: Dictionary containing the counter and fcounter dataframes. mod_name: Name of the Darshan module. nprocs: Number of processes participating in accumulation. Returns: namedtuple containing derived_metrics (cdata object) and summary_record (dict). """ mod_idx = mod_name_to_idx(mod_name) darshan_accumulator = ffi.new("darshan_accumulator *") r = libdutil.darshan_accumulator_create(mod_idx, nprocs, darshan_accumulator) if r != 0: raise RuntimeError("A nonzero exit code was received from " "darshan_accumulator_create() at the C level. " f"This could mean that the {mod_name} module does not " "support derived metric calculation, or that " "another kind of error occurred. It may be possible " "to retrieve additional information from the stderr " "stream.") num_recs = rec_dict["fcounters"].shape[0] record_array = _df_to_rec(rec_dict, mod_name) r_i = libdutil.darshan_accumulator_inject(darshan_accumulator[0], record_array, num_recs) if r_i != 0: raise RuntimeError("A nonzero exit code was received from " "darshan_accumulator_inject() at the C level. " "It may be possible " "to retrieve additional information from the stderr " "stream.") derived_metrics = ffi.new("struct darshan_derived_metrics *") summary_rbuf = ffi.new(_structdefs[mod_name].replace("**", "*")) r = libdutil.darshan_accumulator_emit(darshan_accumulator[0], derived_metrics, summary_rbuf) libdutil.darshan_accumulator_destroy(darshan_accumulator[0]) if r != 0: raise RuntimeError("A nonzero exit code was received from " "darshan_accumulator_emit() at the C level. " "It may be possible " "to retrieve additional information from the stderr " "stream.") summary_rec = _make_generic_record(summary_rbuf, mod_name, dtype='pandas') # create namedtuple type to hold return values AccumulatedRecords = namedtuple("AccumulatedRecords", ['derived_metrics', 'summary_record']) return AccumulatedRecords(derived_metrics, summary_rec)