# Code written by: Maksim Imakaev (imakaev@mit.edu)
"""
Analyses of polymer conformations
------------------------------
This module presents a collection of utils to work with polymer conformations.
Tools for calculating contacts
------------------------------
The main function calculating contacts is: :py:func:`polychrom.polymer_analyses.calculate_contacts`
Right now it is a simple wrapper around scipy.cKDTree.
Another function :py:func:`polychrom.polymer_analyses.smart_contacts` was added recently to help build contact maps
with a large contact radius. It randomly sub-samples the monomers; by default selecting N/cutoff monomers. It then
calculates contacts from sub-sampled monomers only. It is especially helpful when the same code needs to calculate
contacts at large and small contact radii.Because of sub-sampling at large contact radius, it avoids the problem of
having way-too-many-contacts at a large contact radius. For ordinary contacts, the number of contacts scales as
contact_radius^3; however, with smart_contacts it would only scale linearly with contact radius, which leads to
significant speedsups.
Tools to calculate P(s) and R(s)
--------------------------------
We provide functions to calculate P(s), Rg^2(s) and R^2(s) for polymers. By default, they use log-spaced bins on the
X axis, with about 10 bins per order of magnitude, but aligned such that the last bins ends exactly at (N-1). They
output (bin, scaling) for Rg^2 and R^2, and (bin_mid, scaling) for contacts. In either case, the returned values are
ready to plot. The difference is that Rg and R^2 are evaluated at a given value of s, while contacts are aggregated
for (bins[0].. bins[1]), (bins[1]..bins[2]). Therefore, we have to return bin mids for contacts.
"""
from math import sqrt
import numpy as np
import pandas as pd
from scipy.spatial import cKDTree
from scipy.ndimage import gaussian_filter1d
try:
from . import _polymer_math
except Exception:
pass
[docs]def generate_bins(N, start=4, bins_per_order_magn=10):
lstart = np.log10(start)
lend = np.log10(N - 1) + 1e-6
num = int(np.ceil((lend - lstart) * bins_per_order_magn))
bins = np.unique(np.logspace(lstart, lend, dtype=int, num=max(num, 0)))
if len(bins) > 0:
assert bins[-1] == N - 1
return bins
[docs]def Rg2_scaling(data, bins=None, ring=False):
"""Calculates average gyration radius of subchains a function of s
Parameters
----------
data: Nx3 array
bins: subchain lengths at which to calculate Rg
ring: treat polymer as a ring (default: False)
"""
data = np.asarray(data, float)
N = data.shape[0]
assert data.shape[1] == 3
data = np.concatenate([[[0, 0, 0]], data])
if bins is None:
bins = generate_bins(N)
coms = np.cumsum(data, 0) # cumulative sum of locations to calculate COM
coms2 = np.cumsum(data**2, 0) # cumulative sum of locations^2 to calculate RG
def radius_gyration(len2):
if ring:
comsadd = coms[1:len2, :].copy()
coms2add = coms2[1:len2, :].copy()
comsadd += coms[-1, :][None, :]
coms2add += coms2[-1, :][None, :]
comsw = np.concatenate([coms, comsadd], axis=0)
coms2w = np.concatenate([coms2, coms2add], axis=0)
else:
comsw = coms
coms2w = coms2
coms2d = (-coms2w[:-len2, :] + coms2w[len2:, :]) / len2
comsd = ((comsw[:-len2, :] - comsw[len2:, :]) / len2) ** 2
diffs = coms2d - comsd
sums = np.sum(diffs, 1)
return np.mean(sums)
rads = [0.0 for _ in range(len(bins))]
for i in range(len(bins)):
rads[i] = radius_gyration(int(bins[i]))
return np.array(bins), rads
[docs]def R2_scaling(data, bins=None, ring=False):
"""
Returns end-to-end distance scaling of a given polymer conformation.
..warning:: This method averages end-to-end scaling over all possible
subchains of given length
Parameters
----------
data: Nx3 array
bins: the same as in giveCpScaling
ring: is the polymer a ring?
"""
data = np.asarray(data, float)
N = data.shape[0]
assert data.shape[1] == 3
data = data.T
if bins is None:
bins = generate_bins(N)
if ring:
data = np.concatenate([data, data], axis=1)
rads = [0.0 for _ in range(len(bins))]
for i in range(len(bins)):
length = bins[i]
if ring:
rads[i] = np.mean((np.sum((data[:, :N] - data[:, length : length + N]) ** 2, 0)))
else:
rads[i] = np.mean((np.sum((data[:, :-length] - data[:, length:]) ** 2, 0)))
return np.array(bins), rads
[docs]def Rg2(data):
"""
Simply calculates gyration radius of a polymer chain.
"""
data = np.asarray(data)
assert data.shape[1] == 3
return np.mean((data - np.mean(data, axis=0)) ** 2) * 3
[docs]def Rg2_matrix(data):
"""
Uses dynamic programming and vectorizing to calculate Rg for each subchain of the polymer.
Returns a matrix for which an element [i,j] is Rg of a subchain from i to j including i and j
"""
data = np.asarray(data, float)
assert data.shape[1] == 3
N = data.shape[0]
data = np.concatenate([[[0, 0, 0]], data])
coms = np.cumsum(data, 0) # cumulative sum of locations to calculate COM
coms2 = np.cumsum(data**2, 0) # cumulative sum of locations^2 to calculate RG
dists = np.abs(np.arange(N)[:, None] - np.arange(N)[None, :]) + 1
coms2d = (-coms2[:-1, None, :] + coms2[None, 1::, :]) / dists[:, :, None]
comsd = ((coms[:-1, None, :] - coms[None, 1:, :]) / dists[:, :, None]) ** 2
sums = np.sum(coms2d - comsd, 2)
np.fill_diagonal(sums, 0)
mask = np.arange(N)[:, None] > np.arange(N)[None, :]
sums[mask] = sums.T[mask]
return sums
[docs]def ndarray_groupby_aggregate(
df,
ndarray_cols,
aggregate_cols,
value_cols=[],
sample_cols=[],
preset="sum",
ndarray_agg=lambda x: np.sum(x, axis=0),
value_agg=lambda x: x.sum(),
):
"""
A version of pd.groupby that is aware of numpy arrays as values of columns
* aggregates columns ndarray_cols using ndarray_agg aggregator,
* aggregates value_cols using value_agg aggregator,
* takes the first element in sample_cols,
* aggregates over aggregate_cols
It has presets for sum, mean and nanmean.
"""
if preset == "sum":
ndarray_agg = lambda x: np.sum(x, axis=0)
value_agg = lambda x: x.sum()
elif preset == "mean":
ndarray_agg = lambda x: np.mean(x, axis=0)
value_agg = lambda x: x.mean()
elif preset == "nanmean":
ndarray_agg = lambda x: np.nanmean(x, axis=0)
value_agg = lambda x: x.mean()
def combine_values(in_df):
"""
splits into ndarrays, 'normal' values, and samples;
performs aggregation, and returns a Series
"""
average_arrs = pd.Series(
index=ndarray_cols,
data=[ndarray_agg([np.asarray(j) for j in in_df[i].values]) for i in ndarray_cols],
)
average_values = value_agg(in_df[value_cols])
sample_values = in_df[sample_cols].iloc[0]
agg_series = pd.concat([average_arrs, average_values, sample_values])
return agg_series
return df.groupby(aggregate_cols).apply(combine_values)
[docs]def streaming_ndarray_agg(
in_stream,
ndarray_cols,
aggregate_cols,
value_cols=[],
sample_cols=[],
chunksize=30000,
add_count_col=False,
divide_by_count=False,
):
"""
Takes in_stream of dataframes
Applies ndarray-aware groupby-sum or groupby-mean: treats ndarray_cols as numpy arrays,
value_cols as normal values, for sample_cols takes the first element.
Does groupby over aggregate_cols
if add_count_col is True, adds column "count", if it's a string - adds column with add_count_col name
if divide_by_counts is True, divides result by column "count".
If it's a string, divides by divide_by_count column
This function can be used for automatically aggregating P(s), R(s) etc.
for a set of conformations that is so large that all P(s) won't fit in RAM,
and when averaging needs to be done over so many parameters
that for-loops are not an issue. Examples may include simulations in which sweep
over many parameters has been performed.
"""
value_cols_orig = [i for i in value_cols]
ndarray_cols, value_cols = list(ndarray_cols), list(value_cols)
aggregate_cols, sample_cols = list(aggregate_cols), list(sample_cols)
if add_count_col is not False:
if add_count_col is True:
add_count_col = "count"
value_cols.append(add_count_col)
def agg_one(dfs, aggregate):
"""takes a list of DataFrames and old aggregate
performs groupby and aggregation and returns new aggregate"""
if add_count_col is not False:
for i in dfs:
i[add_count_col] = 1
df = pd.concat(dfs + ([aggregate] if aggregate is not None else []), sort=False)
aggregate = ndarray_groupby_aggregate(
df,
ndarray_cols=ndarray_cols,
aggregate_cols=aggregate_cols,
value_cols=value_cols,
sample_cols=sample_cols,
preset="sum",
)
return aggregate.reset_index()
aggregate = None
cur = []
count = 0
for i in in_stream:
cur.append(i)
count += len(i)
if count > chunksize:
aggregate = agg_one(cur, aggregate)
cur = []
count = 0
if len(cur) > 0:
aggregate = agg_one(cur, aggregate)
if divide_by_count is not False:
if divide_by_count is True:
divide_by_count = "count"
for i in ndarray_cols + value_cols_orig:
aggregate[i] = aggregate[i] / aggregate[divide_by_count]
return aggregate
[docs]def kabsch_msd(P, Q):
"""
Calculates MSD between two vectors using Kabash alcorithm
Borrowed from https://github.com/charnley/rmsd with some changes
rmsd is licenced with a 2-clause BSD licence
Copyright (c) 2013, Jimmy Charnley Kromann <jimmy@charnley.dk> & Lars Bratholm
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
P = P - np.mean(P, axis=0)
Q = Q - np.mean(Q, axis=0)
C = np.dot(np.transpose(P), Q)
V, S, W = np.linalg.svd(C)
d = (np.linalg.det(V) * np.linalg.det(W)) < 0.0
if d:
S[-1] = -S[-1]
V[:, -1] = -V[:, -1]
# Create Rotation matrix U
U = np.dot(V, W)
dist = np.mean((np.dot(P, U) - Q) ** 2) * 3
return dist
kabsch_rmsd = kabsch_msd
[docs]def mutualSimplify(a, b, verbose=False):
"""
Ported here from openmmlib.
Given two polymer rings, it attempts to reduce the number of monomers in each of
them while preserving the linking between them. It does so by trying to remove
monomers one-by-one. If no other bonds pass through the triangle formed by the 2
old bonds and 1 new bond, it accepts removal of the monomer. It does so until no
monomers in either of the rings can be removed.
"""
if verbose:
print("Starting mutual simplification of polymers")
while True:
la, lb = len(a), len(b)
if verbose:
print(len(a), len(b), "before; ", end=" ")
a, b = _polymer_math.mutualSimplify(a, b)
if verbose:
print(len(a), len(b), "after one; ", end=" ")
b, a = _polymer_math.mutualSimplify(b, a)
if verbose:
print(len(a), len(b), "after two; ")
if (len(a) == la) and (len(b) == lb):
if verbose:
print("Mutual simplification finished")
return a, b
[docs]def getLinkingNumber(data1, data2, simplify=True, randomOffset=True, verbose=False):
"""
Ported here from openmmlib as well.
"""
if simplify:
data1, data2 = mutualSimplify(a=data1, b=data2, verbose=verbose)
return _polymer_math.getLinkingNumber(data1, data2, randomOffset=randomOffset)
[docs]def calculate_cistrans(data, chains, chain_id=0, cutoff=5, pbc_box=False, box_size=None):
"""
Analysis of the territoriality of polymer chains from simulations, using the cis/trans ratio.
Cis signal is computed for the marked chain ('chain_id') as amount of contacts of the chain with itself
Trans signal is the total amount of trans contacts for the marked chain with other chains from 'chains'
(and with all the replicas for 'pbc_box'=True)
"""
if data.shape[1] != 3:
raise ValueError("Incorrect polymer data shape. Must be Nx3.")
if np.isnan(data).any():
raise RuntimeError("Data contains NANs")
N = len(data)
if pbc_box:
if box_size is None:
raise ValueError("Box size is not given")
else:
data_scaled = np.mod(data, box_size)
else:
box_size = None
data_scaled = np.copy(data)
if chains is None:
chains = [[0, N]]
chain_id = 0
chain_start = chains[chain_id][0]
chain_end = chains[chain_id][1]
# all contact pairs available in the scaled data
tree = cKDTree(data_scaled, boxsize=box_size)
pairs = tree.query_pairs(cutoff, output_type="ndarray")
# total number of contacts of the marked chain:
# each contact is counted twice if both monomers belong to the marked chain and
# only once if just one of the monomers in the pair belong to the marked chain
all_signal = len(pairs[pairs < chain_end]) - len(pairs[pairs < chain_start])
# contact pairs of the marked chain with itself
tree = cKDTree(data[chain_start:chain_end], boxsize=None)
pairs = tree.query_pairs(cutoff, output_type="ndarray")
# doubled number of contacts of the marked chain with itself (i.e. cis signal)
cis_signal = 2 * len(pairs)
assert all_signal >= cis_signal
trans_signal = all_signal - cis_signal
return cis_signal, trans_signal