Source code for polychrom.legacy.contactmaps
# Code written by: Maksim Imakaev (imakaev@mit.edu)
"""
This file contains a bunch of method to work on contact maps of a Hi-C data.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import sys
import warnings
import numpy as np
from polychrom.polymer_analyses import calculate_contacts as giveContacts
from polychrom.polymerutils import load
[docs]
def rescalePoints(points, bins):
"converts array of contacts to the reduced resolution contact map"
a = np.histogram2d(points[:, 0], points[:, 1], bins)[0]
a = a + np.transpose(a)
return a
[docs]
def rescaledMap(data, bins, cutoff=1.7, contactMap=None):
# print data.sum(), bins.sum(), cutoff
"""calculates a rescaled contact map of a structure
Parameters
----------
data : Nx3 or 3xN array
polymer conformation
bins : Lx1 array
bin starts
cutoff : float, optional
cutoff for contacts
Returns
-------
resXres array with the contact map
"""
t = giveContacts(data, cutoff)
x = np.searchsorted(bins, t[:, 0]) - 1
y = np.searchsorted(bins, t[:, 1]) - 1
assert x.min() >= 0
assert y.min() >= 0
assert x.max() < len(bins) - 1
assert y.max() < len(bins) - 1
matrixSize = len(bins) - 1
index = matrixSize * x + y
unique, inds = np.unique(index, return_counts=True)
uniquex = unique // matrixSize
uniquey = unique % matrixSize
if contactMap is None:
contactMap = np.zeros((matrixSize, matrixSize), dtype=int)
contactMap[uniquex, uniquey] += inds
return contactMap
[docs]
def pureMap(data, cutoff=1.7, contactMap=None):
"""calculates an all-by-all contact map of a single polymer chain.
Doesn't work for multi-chain polymers!!!
If contact map is supplied, it just updates it
Parameters
----------
data : Nx3 or 3xN array
polymer conformation
cutoff : float
cutoff for contacts
contactMap : NxN array, optional
contact map to update, if averaging is used
"""
data = np.asarray(data)
if len(data.shape) != 2:
raise ValueError("Wrong dimensions of data")
if 3 not in data.shape:
raise ValueError("Wrong size of data: %s,%s" % data.shape)
if data.shape[0] == 3:
data = data.T
data = np.asarray(data, float, order="C")
t = giveContacts(data, cutoff)
N = data.shape[0]
if contactMap is None:
contactMap = np.zeros((N, N), "int32")
contactMap[t[:, 0], t[:, 1]] += 1
contactMap[t[:, 1], t[:, 0]] += 1
return contactMap
[docs]
def averageBinnedContactMap(
filenames,
chains=None,
binSize=None,
cutoff=1.7,
n=4, # Num threads
loadFunction=load,
exceptionsToIgnore=None,
printProbability=1,
map_function=map,
):
"""
Returns an average contact map of a set of conformations.
Non-existing files are ignored if exceptionsToIgnore is set to IOError.
example:\n
An example:
.. code-block:: python
>>> filenames = ["myfolder/blockd%d.dat" % i for i in range(1000)]
>>> cmap = averageBinnedContactMap(filenames) + 1 #getting cmap
#either showing a log of a map (+1 for zeros)
>>> plt.imshow(numpy.log(cmap +1))
#or truncating a map
>>> vmax = np.percentile(cmap, 99.9)
>>> plt.imshow(cmap, vmax=vmax)
>>> plt.show()
Parameters
----------
filenames : list of strings
Filenames to average map over
chains : list of tuples or Nx2 array
(start,end+1) of each chain
binSize : int
size of each bin in monomers
cutoff : float, optional
Cutoff to calculate contacts
n : int, optional
Number of threads to use.
By default 4 to minimize RAM consumption.
exceptionsToIgnore : list of Exceptions
List of exceptions to ignore when finding the contact map.
Put IOError there if you want it to ignore missing files.
Returns
-------
tuple of two values:
(i) MxM numpy array with the conntact map binned to binSize resolution.
(ii) chromosomeStarts a list of start sites for binned map.
"""
n = min(n, len(filenames))
subvalues = [filenames[i::n] for i in range(n)]
getResolution = 0
fileInd = 0
while getResolution == 0:
try:
data = loadFunction(filenames[fileInd]) # load filename
getResolution = 1
except Exception:
fileInd = fileInd + 1
if fileInd >= len(filenames):
print("no valid files found in filenames")
raise ValueError("no valid files found in filenames")
if chains is None:
chains = [[0, len(data)]]
if binSize is None:
binSize = int(np.floor(len(data) / 500))
bins = []
chains = np.asarray(chains)
chainBinNums = np.ceil((chains[:, 1] - chains[:, 0]) / (0.0 + binSize))
for i in range(len(chainBinNums)):
bins.append(binSize * (np.arange(int(chainBinNums[i]))) + chains[i, 0])
bins.append(np.array([chains[-1, 1] + 1]))
bins = np.concatenate(bins)
bins = bins - 0.5
Nbase = len(bins) - 1
if Nbase > 10000:
warnings.warn(UserWarning("very large contact map" " may be difficult to visualize"))
chromosomeStarts = np.cumsum(chainBinNums)
chromosomeStarts = np.hstack((0, chromosomeStarts))
def myaction(values): # our worker receives some filenames
mysum = None # future contact map.
for i in values:
try:
data = loadFunction(i)
if np.random.random() < printProbability:
print(i)
except tuple(exceptionsToIgnore):
print("file not found", i)
continue
if data.shape[0] == 3:
data = data.T
if mysum is None: # if it's the first filename,
mysum = rescaledMap(data, bins, cutoff) # create a map
else: # if not
rescaledMap(data, bins, cutoff, mysum)
# use existing map and fill in contacts
return mysum
blocks = list(map_function(myaction, subvalues))
blocks = [i for i in blocks if i is not None]
a = blocks[0]
for i in blocks[1:]:
a = a + i
a = a + a.T
return a, chromosomeStarts
[docs]
def averagePureContactMap(
filenames,
cutoff=1.7,
n=4, # Num threads
loadFunction=load,
exceptionsToIgnore=[],
printProbability=0.005,
map_function=map,
):
"""
Parameters
----------
filenames : list of strings
Filenames to average map over
cutoff : float, optional
Cutoff to calculate contacts
n : int, optional
Number of threads to use.
By default 4 to minimize RAM consumption with pure maps.
exceptionsToIgnore : list of Exceptions
List of exceptions to ignore when finding the contact map.
Put IOError there if you want it to ignore missing files.
Returns
-------
An NxN (for pure map) numpy array with the contact map.
"""
"""
Now we actually need to modify our contact map by adding
contacts from each new file to the contact map.
We do it this way because our contact map is huge (maybe a gigabyte!),
so we can't just add many gigabyte-sized arrays together.
Instead of this each worker creates an empty "average contact map",
and then loads files one by one and adds contacts from each file to a contact map.
Maps from different workers are then added together manually.
"""
n = min(n, len(filenames))
subvalues = [filenames[i::n] for i in range(n)]
def myaction(values): # our worker receives some filenames
mysum = None # future contact map.
for i in values:
try:
data = loadFunction(i)
if np.random.random() < printProbability:
print(i)
except tuple(exceptionsToIgnore):
print("file not found", i)
continue
except Exception:
print("Unexpected error:", sys.exc_info()[0])
print("File is: ", i)
return -1
if data.shape[0] == 3:
data = data.T
if mysum is None: # if it's the first filename,
if len(data) > 6000:
warnings.warn(
UserWarning("very large contact map" " may cause errors. these may be fixed with n=1 threads.")
)
if len(data) > 20000:
warnings.warn(UserWarning("very large contact map" " may be difficult to visualize."))
mysum = pureMap(data, cutoff) # create a map
else: # if not
pureMap(data, cutoff, mysum)
# use existing map and fill in contacts
return mysum
blocks = list(map_function(myaction, subvalues))
blocks = [i for i in blocks if i is not None]
a = blocks[0]
for i in blocks[1:]:
a = a + i
return a