Skip to content

Compute

Trajectory and structure analyses. All available via from molpy.compute import ....

See the Compute overview for the Compute → Result pattern and the Dielectric Spectroscopy guide for the theory behind the dielectric classes.

Quick reference

Symbol Summary Returns
Compute Base class: configure once, call on data a Result subclass
DielectricSusceptibility \(\varepsilon^*(\omega)\) via Einstein–Helfand / Green–Kubo DielectricSusceptibilityResult
IonicConductivity Ionic conductivity \(\sigma\) (Einstein–Helfand MSD) ConductivityResult
ACFAnalyzer Autocorrelation of selected columns ACFResult
SpectralAnalyzer Windowed time→frequency transform SpectralResult
MCDCompute Mean displacement correlation (diffusion) MCDResult
PMSDCompute Polarization mean-squared displacement PMSDResult
Onsager Onsager transport coefficients \(L_{ij}\) (collective displacement cross-correlation) OnsagerResult
JACF Green–Kubo conductivity \(\sigma\) from the current ACF JACFResult
Persist Pair-survival / residence-time correlation PersistResult
RDF Radial distribution function \(g(r)\) structural result
MSD Single-particle mean-squared displacement time series
RadiusOfGyration, GyrationTensor, InertiaTensor, CenterOfMass Molecular shape descriptors per-frame values
Cluster, ClusterCenters, Pca, KMeans Clustering & decomposition labels / components
Workflow Directed graph of chained computes per-node results

Full API

Base

base

Base classes for compute operations.

This module provides the core abstractions for defining computation units: - Compute: Generic base class for all compute operations

The Compute class is compatible with molexp workflow tasks via structural typing (TaskProtocol).

Compute

Compute(**config_kwargs)

Bases: ABC

Abstract base class for compute operations.

A Compute is a callable object that takes an input of type InT and returns a result of type OutT. It is compatible with molexp workflow tasks via structural typing (implements execute() and dump() methods).

To implement a new compute operation: 1. Subclass Compute[InT, OutT] with appropriate types 2. Override the _compute() method with your core logic 3. Optionally override before() and after() for setup/cleanup 4. Pass **config_kwargs to super().init() to enable config serialization

The call method orchestrates the computation: 1. Calls before(input) for setup 2. Calls _compute(input) for the main computation 3. Calls after(input, result) for cleanup 4. Returns the result

Examples:

>>> class MyCompute(Compute[Frame, MyResult]):
...     def __init__(self, param1, param2, **config_kwargs):
...         super().__init__(param1=param1, param2=param2, **config_kwargs)
...         self.param1 = param1
...         self.param2 = param2
...
...     def _compute(self, frame: Frame) -> MyResult:
...         # Your computation logic here
...         return MyResult(value=42)
>>>
>>> compute = MyCompute(param1="value", param2=42)
>>> result = compute(frame)  # Calls __call__, which calls _compute()
>>> config = compute.dump()  # {"param1": "value", "param2": 42}
>>>
>>> # Use in molexp workflow
>>> result_dict = compute.execute(input=frame)  # {"result": MyResult(...)}

Initialize compute with configuration parameters.

Parameters:

Name Type Description Default
**config_kwargs Any

Configuration parameters stored for serialization

{}
after
after(input, result)

Hook called after _compute().

Override this method to perform cleanup or post-processing operations after the main computation. The default implementation does nothing.

Parameters:

Name Type Description Default
input InT

Input data for the computation.

required
result OutT

Result from the _compute() method.

required
before
before(input)

Hook called before _compute().

Override this method to perform setup operations before the main computation. The default implementation does nothing.

Parameters:

Name Type Description Default
input InT

Input data for the computation.

required
dump
dump()

Serialize configuration to dictionary.

Returns:

Type Description
dict[str, Any]

Configuration parameters as dict

execute
execute(ctx=None, **inputs)

Execute as a workflow task (molexp-compatible API).

This method adapts the Compute interface to work with molexp workflows. Subclasses can override to customize input/output mapping.

Parameters:

Name Type Description Default
ctx Any | None

Optional runtime context

None
**inputs Any

Named input values

{}

Returns:

Type Description
dict[str, Any]

Dictionary with output_key -> result mapping

Raises:

Type Description
ValueError

If required input is missing

Result types

result

Result classes for compute operations.

This module defines result types returned by compute operations.

ACFResult dataclass

ACFResult(meta=dict(), time=(lambda: np.array([]))(), acf=(lambda: np.array([]))(), n_lags=0)

Bases: TimeSeriesResult

Autocorrelation function result.

Attributes:

Name Type Description
time NDArray[float64]

Time lag values (in ps)

acf NDArray[float64]

Autocorrelation values at each time lag, shape (n_lags,)

n_lags int

Number of time lags

ConductivityResult dataclass

ConductivityResult(meta=dict(), time=(lambda: np.array([]))(), msd=(lambda: np.array([]))(), sigma=float('nan'), slope=float('nan'), fit_start=0, fit_end=0)

Bases: TimeSeriesResult

Einstein-Helfand ionic-conductivity result.

Attributes:

Name Type Description
time NDArray[float64]

MSD lag times tau (ps), shape (n_lags,).

msd NDArray[float64]

Collective MSD <|M_J(t+tau) - M_J(t)|^2> of the ionic translational dipole, (e*A)^2, shape (n_lags,).

sigma float

Static ionic conductivity sigma (S/m).

slope float

Fitted MSD slope over the diffusive window, (e*A)^2/ps.

fit_start int

First lag index used in the linear fit (inclusive).

fit_end int

Last lag index used in the linear fit (exclusive).

DebyeFit dataclass

DebyeFit(tau=float('nan'), delta_eps=float('nan'), eps_inf=1.0, eps_static=float('nan'), omega_peak=float('nan'))

Single-Debye relaxation parameters fitted from a dielectric spectrum.

Attributes:

Name Type Description
tau float

Relaxation time (ps).

delta_eps float

Relaxation strength epsilon(0) - epsilon_inf (dimensionless).

eps_inf float

High-frequency permittivity used in the fit.

eps_static float

Static permittivity epsilon(0) (dimensionless).

omega_peak float

Angular frequency of the dielectric-loss peak (rad/ps).

epsilon
epsilon(omega)

Evaluate the fitted Debye model at angular frequencies omega.

Returns:

Type Description
NDArray

(epsilon_real, epsilon_imag) under the positive-loss convention

NDArray

epsilon* = epsilon' - i epsilon''.

DielectricResult dataclass

DielectricResult(meta=dict(), frequency=(lambda: np.array([]))(), epsilon_real=(lambda: np.array([]))(), epsilon_imag=(lambda: np.array([]))(), epsilon_static=float('nan'), epsilon_inf=1.0, route='', component='', conductivity=None)

Bases: Result

Single-route dielectric susceptibility result.

Attributes:

Name Type Description
frequency NDArray[float64]

Angular frequency grid omega, shape (n_freq,), units rad/ps. Bin 0 is DC; bin 1 is Delta-omega = 2 * pi / (n_pad * dt).

epsilon_real NDArray[float64]

Real part epsilon'(omega), shape (n_freq,), dimensionless.

epsilon_imag NDArray[float64]

Loss spectrum epsilon''(omega), shape (n_freq,), dimensionless, positive sign convention.

epsilon_static float

Static dielectric constant epsilon(0), dimensionless. May be nan if the route does not provide a static estimate.

epsilon_inf float

High-frequency dielectric constant.

route str

Computation route ("einstein-helfand" or "green-kubo").

component str

System component ("full", "water", "ion").

conductivity NDArray[float64] | None

Optional conductivity spectrum sigma(omega), shape (n_freq,).

fit_debye
fit_debye()

Fit a single Debye relaxation to this spectrum (NumPy only).

Uses the exact single-Debye identity epsilon''(omega) / (epsilon'(omega) - epsilon_inf) = omega * tau: tau is the least-squares slope through the origin of that ratio versus omega over the low-frequency rising branch (up to the loss peak), with a loss-peak fallback tau = 1 / omega_peak. The relaxation strength is the static limit delta_eps = epsilon(0) - epsilon_inf.

No SciPy: the estimator is closed-form linear regression. For broadened or skewed (Cole-Cole / Havriliak-Negami) line shapes do a nonlinear fit in your analysis script using :meth:DebyeFit.epsilon as the model.

Returns:

Type Description
'DebyeFit'

DebyeFit with tau (ps), delta_eps, eps_inf, eps_static, omega_peak.

DielectricSusceptibilityResult dataclass

DielectricSusceptibilityResult(meta=dict(), results=dict(), metadata=dict())

Bases: Result

Aggregate dielectric susceptibility result.

Attributes:

Name Type Description
results dict[str, DielectricResult]

Mapping from route-component key to DielectricResult

metadata dict[str, Any]

Trajectory parameters and computation info

to_dict
to_dict()

Serialize with nested DielectricResult recursion.

JACFResult dataclass

JACFResult(meta=dict(), time=(lambda: np.array([]))(), jacf=(lambda: np.array([]))(), sigma_running=(lambda: np.array([]))(), sigma=float('nan'))

Bases: TimeSeriesResult

Results from a Green-Kubo current-autocorrelation conductivity.

Attributes:

Name Type Description
time NDArray[float64]

Time lag values (in ps), shape (n_time_lags,).

jacf NDArray[float64]

Current autocorrelation C(tau) = <J(0).J(tau)>, (e*A/ps)^2, shape (n_time_lags,).

sigma_running NDArray[float64]

Running Green-Kubo conductivity integral sigma(tau) = 1/(3 V kB T) integral_0^tau C(t) dt (S/m), shape (n_time_lags,).

sigma float

DC ionic conductivity (S/m) — sigma_running at the final lag.

MCDResult dataclass

MCDResult(meta=dict(), time=(lambda: np.array([]))(), correlations=dict())

Bases: TimeSeriesResult

Results from Mean Displacement Correlation calculation.

Attributes:

Name Type Description
time NDArray[float64]

Time lag values (in ps)

correlations dict[str, NDArray[float64]]

Dictionary mapping tag names to correlation function arrays (MSD values). Each array has shape (n_time_lags,)

OnsagerResult dataclass

OnsagerResult(meta=dict(), time=(lambda: np.array([]))(), correlations=dict())

Bases: TimeSeriesResult

Results from an Onsager collective-displacement cross-correlation.

Attributes:

Name Type Description
time NDArray[float64]

Time lag values (in ps), shape (n_time_lags,).

correlations dict[str, NDArray[float64]]

Mapping from tag "i,j" to the cross-correlation L_ij(tau) = <DP_i(tau).DP_j(tau)> of the collective (summed) species displacements, shape (n_time_lags,), units A^2. The diagonal "i,i" is the collective MSD of species i.

PMSDResult dataclass

PMSDResult(meta=dict(), time=(lambda: np.array([]))(), pmsd=(lambda: np.array([]))())

Bases: TimeSeriesResult

Results from Polarization Mean Square Displacement calculation.

Attributes:

Name Type Description
time NDArray[float64]

Time lag values (in ps)

pmsd NDArray[float64]

Polarization MSD values at each time lag, shape (n_time_lags,)

PersistResult dataclass

PersistResult(meta=dict(), time=(lambda: np.array([]))(), correlations=dict())

Bases: TimeSeriesResult

Results from a pair-survival (persistence) correlation.

Attributes:

Name Type Description
time NDArray[float64]

Time lag values (in ps), shape (n_time_lags,).

correlations dict[str, NDArray[float64]]

Mapping from tag "i,j:method:r0[,r1]" to the persistence correlation C(tau) (mean surviving partners per reference particle), shape (n_time_lags,). C(0) is the mean coordination number.

Result dataclass

Result(meta=dict())

Base class for computation results.

Subclasses should define specific fields for their result data.

to_dict
to_dict()

Convert result to dictionary representation.

SpectralResult dataclass

SpectralResult(meta=dict(), frequency=(lambda: np.array([]))(), spectrum=(lambda: np.array([]))())

Bases: Result

Frequency-domain spectrum result.

Attributes:

Name Type Description
frequency NDArray[float64]

Angular frequency grid omega, shape (n_freq,), units rad/ps.

spectrum NDArray[float64]

Spectral density at each frequency, shape (n_freq,).

TimeSeriesResult dataclass

TimeSeriesResult(meta=dict(), time=(lambda: np.array([]))())

Bases: Result

Base class for time-series analysis results.

Attributes:

Name Type Description
time NDArray[float64]

Time points for the analysis (in ps or frames)

Dielectric

dielectric

Dielectric susceptibility Compute classes.

Thin glue layers bridging molpy Trajectory to molrs computational kernels. The Python side does only data extraction (positions, charges) and vectorized NumPy assembly (dipole moment via einsum, minimum-image unwrap via Box.diff_dr); all spectral physics — ACF, windowing, FFT, prefactors — is performed in Rust by molrs.dielectric and molrs.signal.

ACFAnalyzer

ACFAnalyzer(columns, max_lag, *, unwrap=True, **config_kwargs)

Bases: Compute['Trajectory', ACFResult]

Compute autocorrelation function from trajectory data.

Extracts per-atom columns from each frame, optionally unwraps coordinates via Box.diff_dr, delegates to molrs.signal.acf_fft(), normalizes the ACF (divides by zero-lag value), and returns an ACFResult.

DielectricSusceptibility

DielectricSusceptibility(dt, temperature, max_correlation_time, *, epsilon_inf=1.0, window_type='hann', routes=None, volume=None, **config_kwargs)

Bases: Compute['Trajectory', DielectricSusceptibilityResult]

Frequency-dependent dielectric susceptibility from an MD trajectory.

Extracts atomic positions and charges per frame, unwraps coordinates via minimum-image convention, builds the total dipole moment series, and runs one or more spectral routes (Einstein-Helfand and/or Green-Kubo) through molrs.dielectric. The static dielectric constant is also computed once via Neumann's fluctuation formula and attached to every result.

Parameters:

Name Type Description Default
dt float

Frame spacing in ps.

required
temperature float

Temperature in K.

required
max_correlation_time int

Longest ACF lag in frames (clamped to n_frames - 1). Practical choice: ≤ n_frames / 10.

required
epsilon_inf float

High-frequency (electronic) permittivity, dimensionless. Use 1.0 for non-polarizable force fields.

1.0
window_type str

"hann" or "blackman" window applied to the ACF before FFT.

'hann'
routes list[str] | None

Subset of ["einstein-helfand", "green-kubo"]. Default runs both.

None
volume float | None

System volume in ų. If None, uses frame.box.volume from the first frame (assumes NVT/NVE).

None
Inputs

Each frame's atoms block must contain canonical columns x, y, z (Å) and charge (e). Frames must carry a non-free Box.

IonicConductivity

IonicConductivity(dt, temperature, max_correlation_time, *, volume=None, fit_start_frac=0.1, fit_end_frac=0.5, **config_kwargs)

Bases: Compute['Trajectory', ConductivityResult]

Static ionic conductivity sigma via the Einstein-Helfand relation.

Builds the ionic translational dipole M_J(t) = sum_i q_i r_i(t) from the trajectory (minimum-image unwrapped, same as :class:DielectricSusceptibility), then delegates the collective-MSD, linear fit, and S/m unit conversion to molrs.dielectric.Dielectric.einstein_helfand_conductivity:

sigma = lim_{t->inf} (1 / (6 V k_B T)) d/dt <|M_J(t) - M_J(0)|^2>.

Decomposition is the caller's responsibility and is done with selection, not arithmetic: pass a trajectory whose charge column is non-zero only on the mobile ions (e.g. via a :class:~molpy.Selector over the ion atoms, or by zeroing solvent charges). Including the solvent rotational dipole here would contaminate the translational MSD.

Parameters:

Name Type Description Default
dt float

Frame spacing in ps.

required
temperature float

Temperature in K.

required
max_correlation_time int

Longest MSD lag in frames (clamped to n_frames - 1). Practical choice: <= n_frames / 5.

required
volume float | None

System volume in A^3. If None, uses frame.box.volume from the first frame (assumes NVT/NVE).

None
fit_start_frac, fit_end_frac

Fractions of max_lag bounding the linear-fit window over the diffusive regime (default 0.1, 0.5). sigma is window-sensitive for few-carrier systems; report a range rather than a single digit.

required
Inputs

Each frame's atoms block must contain x, y, z (A) and charge (e); frames must carry a non-free Box.

SpectralAnalyzer

SpectralAnalyzer(dt, *, window_type='hann', **config_kwargs)

Bases: Compute[ACFResult, SpectralResult]

Convert time-domain ACF to frequency-domain spectrum.

Applies a window function, generates the frequency grid, and performs the time→frequency conversion. All computation delegated to molrs.signal.

Mean displacement correlation

mcd

Mean Displacement Correlation (MCD) computation for diffusion analysis.

This module implements the MCD method for computing self and distinct displacement correlations (MSD) from molecular dynamics trajectories.

Adapted from the tame library (https://github.com/Roy-Kid/tame).

MCDCompute

MCDCompute(tags, max_dt, dt, center_of_mass=None)

Bases: Compute['Trajectory', MCDResult]

Compute Mean Displacement Correlations (MSD) for diffusion analysis.

This class implements the MCD method which computes time correlation functions of particle displacements. It returns raw MSD values without fitting. It supports:

  • Self diffusion: MSD_i = <(r_i(t+dt) - r_i(t))²>
  • Distinct diffusion: <(r_i(t+dt) - r_i(t)) · (r_j(t+dt) - r_j(t))>

Parameters:

Name Type Description Default
tags list[str]

List of atom type specifications. Each tag can be: - Single integer (e.g., "3"): Self-diffusion MSD of type 3 - Two integers separated by comma (e.g., "3,4"): Distinct diffusion correlation between types 3 and 4

required
max_dt float

Maximum time lag in ps

required
dt float

Timestep in ps

required
center_of_mass dict[int, float] | None

Optional dict mapping element types to masses for COM removal. Format: {element_type: mass}, e.g., {1: 1.008, 6: 12.011}

None

Examples:

>>> from molpy.io import read_h5_trajectory
>>> traj = read_h5_trajectory("trajectory.h5")
>>>
>>> # Compute self-diffusion MSD of atom type 3
>>> mcd = MCDCompute(tags=["3"], max_dt=30.0, dt=0.01)
>>> result = mcd(traj)
>>> print(result.correlations["3"])  # MSD values at each time lag
>>>
>>> # Compute distinct diffusion between types 3 and 4
>>> mcd = MCDCompute(tags=["3,4"], max_dt=30.0, dt=0.01)
>>> result = mcd(traj)
>>> print(result.correlations["3,4"])  # Correlation values

Notes (tame-port audit): - Different-species distinct term. This implementation uses the collective cross-correlation <(sum_i Dr_i).(sum_j Dr_j)>. The tame original (tame/recipes/mdc.py) instead uses mean_i for the reference species, i.e. (1/N_i) times this value — an inconsistent normalization relative to its like-species and self terms (both collective). The collective form here is the physically correct Onsager cross term; for explicit, fully-normalized Onsager coefficients use :class:~molpy.compute.Onsager. - Pair-resolved (SSP) tags. tame's mdc also accepts a "i,j:METHOD:r0,r1" form that splits the partner displacement into surviving ("in") vs broken ("out") neighbours via tpairsurvive. That decomposition is not reproduced here; the pair-survival aspect is available separately as :class:~molpy.compute.Persist.

Polarization MSD

pmsd

Polarization Mean Square Displacement (PMSD) computation.

This module implements the PMSD method for computing polarization fluctuations in ionic systems from molecular dynamics trajectories.

Adapted from the tame library (https://github.com/Roy-Kid/tame).

PMSDCompute

PMSDCompute(cation_type, anion_type, max_dt, dt)

Bases: Compute['Trajectory', PMSDResult]

Compute Polarization Mean Square Displacement for ionic systems.

This class computes the PMSD which measures polarization fluctuations in ionic systems. The polarization is defined as:

P(t) = Σ_cations r_i(t) - Σ_anions r_j(t)

And the PMSD is:

PMSD(dt) = <(P(t+dt) - P(t))²>_t

Parameters:

Name Type Description Default
cation_type int

Atom type index for cations

required
anion_type int

Atom type index for anions

required
max_dt float

Maximum time lag in ps

required
dt float

Timestep in ps

required

Examples:

>>> from molpy.io import read_h5_trajectory
>>> traj = read_h5_trajectory("ionic_liquid.h5")
>>>
>>> # Compute PMSD for Li+ (type 1) and PF6- (type 2)
>>> pmsd = PMSDCompute(cation_type=1, anion_type=2, max_dt=30.0, dt=0.01)
>>> result = pmsd(traj)
>>>
>>> # Plot results
>>> import matplotlib.pyplot as plt
>>> plt.plot(result.time, result.pmsd)
>>> plt.xlabel("Time lag (ps)")
>>> plt.ylabel("PMSD (Ų)")
>>> plt.show()

Notes (tame-port audit): This returns the PMSD curve only. The tame original (tame/recipes/pmsd.py) additionally fits the long-time slope to a conductivity via the Einstein relation sigma = slope / (6 V kB T). That conductivity is provided here as a dedicated compute, :class:~molpy.compute.IonicConductivity (Einstein-Helfand, S/m), with the equivalent Green-Kubo route in :class:~molpy.compute.JACF.

Onsager coefficients

onsager

Onsager transport coefficients from collective mean-displacement correlations.

The Onsager phenomenological coefficients L_ij describe coupled transport of species i and j. They follow from the cross-correlation of the collective (summed) displacements of each species::

L_ij(tau) = <DP_i(tau) . DP_j(tau)>_t ,
    P_s(t)  = sum_{a in species s} r_a(t)   (unwrapped),
    DP_s(tau) = P_s(t+tau) - P_s(t).

The diagonal L_ii is the collective MSD of species i; off-diagonal L_ij captures the cross-correlated drift distinguishing the Onsager picture from the bare Nernst-Einstein sum. A long-time linear fit of L_ij yields the transport coefficient (left to the caller).

The numerically heavy windowed (all-time-origins) cross-correlation runs in Rust (molrs.transport.Onsager); this wrapper extracts coordinates, performs the minimum-image unwrap via :meth:molrs.Box.delta, and builds the per-species collective coordinates.

Adapted from the tame library (https://github.com/Roy-Kid/tame), tame/recipes/onsager.py.

Onsager

Onsager(tags, max_dt, dt, center_of_mass=None)

Bases: Compute['Trajectory', OnsagerResult]

Compute Onsager collective-displacement cross-correlations.

Parameters:

Name Type Description Default
tags list[str]

Species pairs "i,j" (e.g. ["1,1", "1,2", "2,2"]). Each yields the cross-correlation of the collective displacements of species i and j.

required
max_dt float

Maximum time lag in ps.

required
dt float

Timestep in ps.

required
center_of_mass dict[int, float] | None

Optional {type: mass} mapping; when given, the system center of mass is removed each frame before forming the collective coordinates.

None

Examples:

>>> from molpy.io import read_h5_trajectory
>>> traj = read_h5_trajectory("electrolyte.h5")
>>> ons = Onsager(tags=["1,1", "1,2", "2,2"], max_dt=20.0, dt=0.01)
>>> result = ons(traj)
>>> result.correlations["1,2"]  # L_12(tau), shape (n_cache,)

Current-ACF conductivity (Green–Kubo)

jacf

Ionic conductivity from the charge-current autocorrelation (Green-Kubo).

The DC ionic conductivity follows from the Green-Kubo relation for the collective charge current J(t) = sum_a q_a v_a(t)::

sigma = 1 / (3 V kB T) * integral_0^inf <J(0).J(t)> dt .

This wrapper assembles the collective current J = sum v_cation - sum v_anion (unit charges +/-1) from per-atom velocities and delegates the current autocorrelation + trapezoidal Green-Kubo integral to Rust (molrs.transport.Jacf).

Units (LAMMPS real, matching :mod:molpy.compute.dielectric): velocities in A/ps (so J is e*A/ps), dt in ps, volume in A^3, temperature in K; the output sigma is SI S/m. Pass velocities already in A/ps; conductivity scales linearly so other velocity units can be rescaled afterwards.

Adapted from the tame library (https://github.com/Roy-Kid/tame), tame/recipes/jacf.py (whose published version never evaluates the autocorrelation before integrating — corrected here).

JACF

JACF(cation_type, anion_type, max_dt, dt, temperature, volume=None)

Bases: Compute['Trajectory', JACFResult]

Green-Kubo ionic conductivity from the charge-current autocorrelation.

Parameters:

Name Type Description Default
cation_type int

Atom type index for cations (charge +1).

required
anion_type int

Atom type index for anions (charge -1).

required
max_dt float

Maximum correlation time in ps.

required
dt float

Timestep in ps.

required
temperature float

Temperature in K.

required
volume float | None

System volume in A^3. If None, the mean box volume over the trajectory is used.

None

Examples:

>>> from molpy.io import read_h5_trajectory
>>> traj = read_h5_trajectory("ionic_liquid.h5")
>>> jacf = JACF(cation_type=1, anion_type=2, max_dt=30.0, dt=0.01,
...             temperature=300.0)
>>> result = jacf(traj)
>>> result.sigma          # DC ionic conductivity, S/m
>>> result.jacf           # <J(0).J(t)>, shape (n_cache,)

Pair persistence

persist

Pair-survival (persistence) time-correlation functions.

Measures how long pairs of particles remain within a distance cutoff as a function of time lag — residence-time / hydrogen-bond-dynamics analysis. For a reference species i and partner species j::

C(tau) = < (1/N_i) sum_i sum_j S_ij(t, t+tau) >_t ,

where S_ij in {0,1} is the survival indicator for the pair born at t and observed at t+tau. C(0) is the mean coordination number.

Three survival criteria (see :class:molrs.transport.Persist):

  • continuous — within the survival cutoff at every frame since birth.
  • intermittent — within the cutoff at t+tau (re-formation allowed).
  • ssp — stable-state picture: born within inner cutoff r0, continuously within outer cutoff r1 (r1 >= r0) since.

The per-pair, per-frame survival accounting runs in Rust (molrs.transport.Persist); this wrapper extracts per-species coordinates and per-frame orthorhombic box edge lengths.

Adapted from the tame library (https://github.com/Roy-Kid/tame), tame/recipes/persist.py / tame/ops/time.py (tpairsurvive). The published persist.py recipe is non-functional (undefined names); this port implements the intended correlation with explicit survival criteria.

Persist

Persist(tags, max_dt, dt)

Bases: Compute['Trajectory', PersistResult]

Compute pair-survival (persistence) time-correlation functions.

Parameters:

Name Type Description Default
tags list[str]

Pair specifications "t1,t2:method:r0[,r1]" — e.g. "3,4:ssp:3.0,4.0" (cation-anion stable-state pairs born within 3 A, surviving while within 4 A) or "1,1:continuous:3.5" (like-species, single cutoff). method is one of continuous / intermittent / ssp.

required
max_dt float

Maximum time lag in ps.

required
dt float

Timestep in ps.

required

Examples:

>>> from molpy.io import read_h5_trajectory
>>> traj = read_h5_trajectory("electrolyte.h5")
>>> p = Persist(tags=["3,4:ssp:3.0,4.0"], max_dt=30.0, dt=0.01)
>>> result = p(traj)
>>> result.correlations["3,4:ssp:3.0,4.0"]  # C(tau), shape (n_cache,)

Radial distribution

rdf

Radial distribution function g(r) — molrs-backed.

Returns molrs.compute.density.RDFResult directly (no molpy wrapper). The result is finalized eagerly inside RDF.compute, so result.rdf is the normalized g(r) and not the raw histogram.

RDF

RDF(n_bins, r_max, r_min=0.0)

Bases: Compute

Histogram pair distances into g(r) over one or more frames.

Parameters

n_bins : int Number of histogram bins. r_max : float Upper edge of the last bin in Angstroms. r_min : float, default 0.0 Lower edge of bin 0 in Angstroms.

Notes

RDF takes two inputs (frames + neighbor lists) and therefore overrides __call__ rather than _compute. It is not directly compatible with single-input molexp workflows; chain via an explicit dataflow node if needed.

Mean-squared displacement

msd

Mean Squared Displacement — molrs-backed.

Returns molrs.compute.msd.MSDTimeSeries directly. Frame[0] is the reference; series.mean[i] is ⟨|r(i) - r(0)|²⟩ averaged over particles.

MSD

MSD()

Bases: Compute

Mean squared displacement against frame[0].

Examples

series = MSD()(trajectory_frames) series.mean.shape # (n_frames,)

Shape descriptors

shape

Per-cluster shape descriptors — molrs-backed.

Thin wrappers around molrs.compute.cluster:

  • CenterOfMass (frames, clusters) → mass-weighted centers
  • GyrationTensor (frames, clusters, centers) → 3×3 tensors per cluster
  • InertiaTensor (frames, clusters, com) → 3×3 tensors per cluster
  • RadiusOfGyration (frames, clusters, com) → Rg per cluster

CenterOfMass

CenterOfMass(masses=None)

Bases: Compute

Mass-weighted center per cluster.

Parameters

masses : ndarray | None Per-particle masses (length N). None falls back to unit mass.

GyrationTensor

GyrationTensor()

Bases: Compute

Gyration tensor per cluster (unweighted).

InertiaTensor

InertiaTensor(masses=None)

Bases: Compute

Inertia tensor per cluster.

Parameters

masses : ndarray | None Per-particle masses (length N). None falls back to unit mass.

RadiusOfGyration

RadiusOfGyration(masses=None)

Bases: Compute

Radius of gyration per cluster.

Parameters

masses : ndarray | None Per-particle masses (length N). None falls back to unit mass.

Decomposition

decomposition

Dimensionality reduction + clustering ML primitives — molrs-backed.

  • Pca (alias for molrs Pca2): 2-component PCA over a list of DescriptorRow objects.
  • KMeans: k-means clustering over a PcaResult.

DescriptorRow is re-exported as the input wrapper: each row is a 1-D float ndarray passed through DescriptorRow(row).

KMeans

KMeans(k, max_iter=100, seed=0)

Bases: Compute

k-means over a PcaResult.

Parameters

k : int Number of clusters. max_iter : int, default 100 Lloyd-iteration cap. seed : int, default 0 RNG seed for centroid initialization.

Pca

Pca()

Bases: Compute

Two-component PCA. Input: list of DescriptorRow.

Clustering

cluster

Distance-based clustering — molrs-backed.

Cluster takes (frames, nlists) and returns one ClusterResult per frame. ClusterCenters takes (frames, clusters) and returns the geometric centers per cluster.

Both wrappers expose two inputs and therefore override __call__ (not the single-input _compute hook) — mirroring the RDF pattern.

Cluster

Cluster(min_cluster_size)

Bases: Compute

Group particles into clusters via a neighbor-list connectivity graph.

Parameters

min_cluster_size : int Minimum size for a connected component to count as a cluster.

ClusterCenters

ClusterCenters()

Bases: Compute

Geometric centers per cluster (unweighted).

ClusterProperties

ClusterProperties()

Bases: Compute

Per-cluster properties (sizes, centers, masses, gyration tensors, Rg).

Takes (frames, clusters) where clusters is the sequence of ClusterResult returned by :class:Cluster. Returns one dict per frame.

Time series

time_series

Time-series analysis operations for trajectory data.

This module provides utilities for computing time-correlation functions, mean squared displacements, and other time-series statistics commonly used in molecular dynamics trajectory analysis.

Adapted from the tame library (https://github.com/Roy-Kid/tame).

TimeAverage

TimeAverage(shape, dtype=np.float64, dropnan='partial')

Compute running time average with NaN handling.

This class accumulates data over time and computes the average, with options for handling NaN values.

Parameters:

Name Type Description Default
shape tuple[int, ...]

Shape of data arrays to average

required
dtype dtype | type

Data type for accumulated arrays

float64
dropnan Literal['none', 'partial', 'all']

How to handle NaN values: - 'none': Include NaN values in average (result may be NaN) - 'partial': Ignore individual NaN entries - 'all': Skip entire frame if any NaN is present

'partial'

Examples:

>>> avg = TimeAverage(shape=(10,), dropnan='partial')
>>> avg.update(np.array([1.0, 2.0, np.nan, 4.0]))
>>> avg.update(np.array([2.0, 3.0, 3.0, 5.0]))
>>> result = avg.get()  # [1.5, 2.5, 3.0, 4.5]
get
get()

Get current time-averaged value.

Returns:

Type Description
NDArray

Time-averaged data array

reset
reset()

Reset accumulator to initial state.

update
update(new_data)

Add new data to running average.

Parameters:

Name Type Description Default
new_data NDArray

New data array to include in average

required

TimeCache

TimeCache(cache_size, shape, dtype=np.float64, default_val=np.nan)

Cache previous N frames of trajectory data for correlation calculations.

Uses an in-place ring buffer for O(1) per update (no array allocation).

Parameters:

Name Type Description Default
cache_size int

Number of frames to cache (maximum time lag)

required
shape tuple[int, ...]

Shape of data arrays to cache (e.g., (n_atoms, 3) for coordinates)

required
dtype dtype | type

Data type for cached arrays

float64
default_val float

Default value to fill cache initially (default: NaN)

nan

Examples:

>>> cache = TimeCache(cache_size=100, shape=(10, 3))
>>> coords = np.random.randn(10, 3)
>>> cache.update(coords)
>>> cached_data = cache.get()  # Shape: (100, 10, 3)
cache property
cache

Return data ordered newest-first (for backward compatibility).

get
get()

Get cached data array, ordered newest-first.

Returns:

Type Description
NDArray

Cached data with shape (cache_size, *data_shape)

reset
reset()

Reset cache to initial state.

update
update(new_data)

Add new frame to cache (O(1) in-place write).

Parameters:

Name Type Description Default
new_data NDArray

New data array to add (shape must match self.shape)

required

compute_acf

compute_acf(data, cache_size, dropnan='partial')

Compute autocorrelation function over trajectory.

Calculates: _{i,t}

The particle dimension is averaged, and the time dimension is accumulated using a rolling cache to compute correlations at different time lags.

Parameters:

Name Type Description Default
data NDArray

Trajectory data with shape (n_frames, n_particles, n_dim)

required
cache_size int

Maximum time lag (dt) to compute, in frames

required
dropnan Literal['none', 'partial', 'all']

How to handle NaN values in averaging

'partial'

Returns:

Type Description
NDArray

ACF array with shape (cache_size,) containing ACF at each time lag

Examples:

>>> # Velocity autocorrelation
>>> n_frames, n_particles = 1000, 100
>>> velocities = np.random.randn(n_frames, n_particles, 3)
>>> acf = compute_acf(velocities, cache_size=100)

compute_msd

compute_msd(data, cache_size, dropnan='partial')

Compute mean squared displacement over trajectory.

Calculates: <(r_i(t+dt) - r_i(t))^2>_{i,t}

The particle dimension is averaged, and the time dimension is accumulated using a rolling cache to compute correlations at different time lags.

Parameters:

Name Type Description Default
data NDArray

Trajectory data with shape (n_frames, n_particles, n_dim)

required
cache_size int

Maximum time lag (dt) to compute, in frames

required
dropnan Literal['none', 'partial', 'all']

How to handle NaN values in averaging

'partial'

Returns:

Type Description
NDArray

MSD array with shape (cache_size,) containing MSD at each time lag

Examples:

>>> # Simple 1D random walk
>>> n_frames, n_particles = 1000, 100
>>> positions = np.cumsum(np.random.randn(n_frames, n_particles, 1), axis=0)
>>> msd = compute_msd(positions, cache_size=100)
>>> # MSD should grow linearly with time for random walk

Workflow

workflow

Lightweight DAG orchestration for Compute nodes.

Zero non-stdlib dependencies (uses graphlib.TopologicalSorter).

Workflow

Workflow()

Compose Compute nodes into a DAG and execute them in topological order.

Parameters are bound by name: each node is called as node(**resolved) where resolved maps its parameter names to upstream results or externally-supplied values. The Workflow never inspects node signatures.

external_inputs property
external_inputs

All source names that are not registered nodes.

nodes property
nodes

Registered node names in insertion order.

add
add(name, compute, inputs=None)

Register a compute node.

Parameters:

Name Type Description Default
name str

Unique node name.

required
compute

Callable object called as compute(**resolved).

required
inputs dict[str, str] | None

Mapping from the node's parameter names to source names. Each source is either a registered node name or an external input name.

None

Returns:

Type Description
str

name, for fluent chaining.

Raises:

Type Description
WorkflowDuplicateNodeError

name already registered.

WorkflowCycleError

Adding this node creates a cycle.

predecessors
predecessors(name)

Return the set of node-name predecessors for name.

External inputs are excluded.

run
run(**externals)

Execute all nodes in topological order.

Parameters:

Name Type Description Default
**externals Any

Values for every external input.

{}

Returns:

Type Description
dict[str, Any]

{node_name: result} for every registered node.

Raises:

Type Description
WorkflowMissingInputError

One or more external inputs are absent.

topological_order
topological_order()

Return nodes in topological execution order.

WorkflowCycleError

Bases: WorkflowError

Adding this edge would create a cycle in the DAG.

WorkflowDuplicateNodeError

Bases: WorkflowError

A node with the same name is already registered.

WorkflowError

Bases: Exception

Base exception for Workflow errors.

WorkflowMissingInputError

WorkflowMissingInputError(missing)

Bases: WorkflowError

Required external inputs were not provided to run().