Source code for polychrom.contactmaps
"""
Building contact maps
=====================
This module is the main workhorse of tools to calculate contactmaps, both from polymer simulations and from other
simulations (e.g. 1D simulations of loop extrusion). All of the functions here are designed to be parallelized,
and lots of efforts were put into making this possible.
The reasons we need parallel contactmap code is the following:
* Calculating contact maps is slow, esp. at large contact radii, and benefits greatly
from parallelizing
* Doing regular multiprocesing.map has limitations
* It can only handle heataps up to some size, and transferring gigabyte-sized heatmaps between processes takes minutes
* It can only do as many heatmaps as fits in RAM, which on 20-core 128GB machine is no more than 5GB/heatmap
The structure of this class is as follows.
On the outer level, it provides three methods to average contactmaps:
* :func:`monomerResolutionContactMap`
* :func:`binnedContactMap`,
* :func:`monomerResolutionContactMapSubchains`.
The first two create contact map from an entire file: either monomer-resolution or binned. The last one creates
contact maps from sub-chains in a file, starting at a given set of starting points. It is useful when doing contact
maps from several copies of a system in one simulation.
The first two methods have a legacy implementation from the old library that is still here to do the tests.
On the middle level, it provides a method "averageContacts". This method accepts a "contact iterator", and can be
used to average contacts from both a set of filenames and from a simulation of some kind (e.g. averaging positions of
loop extruding factors from a 1D loop extrusion simulation). All of the outer level functions (
monomerResolutionContactMap for example) are implemented using this method.
On the lower level, there are internals of the "averageContacts" method and an associated "worker" function. There is
generally no need to understand the code of those functions. There exists a reference implementation of both the
worker and the :func:`averageContacts` function, :class:`simpleWorker` and :func:`averageContactsSimple`. They do
all the things that "averageContacts" do, but on only one core. In fact, "averageContacts" defaults to
"averageContactsSimple" if requested to run on one core because it is a little bit faster.
"""
import ctypes
import multiprocessing as mp
import random
import warnings
from contextlib import closing
import numpy as np
from . import polymer_analyses, polymerutils
[docs]def indexing(smaller, larger, M):
"""converts x-y indexes to index in the upper triangular matrix"""
return larger + smaller * (M - 1) - smaller * (smaller - 1) // 2
[docs]def triagToNormal(triag, M):
"""Convert triangular matrix to a regular matrix"""
ar = np.arange(M)
mask = ar[:, None] <= ar[None, :]
x, y = np.nonzero(mask)
new = np.zeros((M, M), dtype=triag.dtype)
new[x, y] = triag
return new + new.T
[docs]def tonumpyarray(mp_arr):
"Converts mp.array to numpy array"
return np.frombuffer(mp_arr.get_obj(), dtype=np.int32) # .reshape((N,N))
[docs]def findN(filenames, loadFunction, exceptions):
"Finds length of data in filenames, handling the fact that files could be not loadable"
for i in range(30):
if i == 29:
raise ValueError("Could not load any of the 30 randomly selected files")
try:
N = len(loadFunction(random.choice(filenames)))
break
except tuple(exceptions):
continue
return N
[docs]def init(*args):
"""
Initializes global arguments for the worker
"""
global sharedArrays__
global contactIterator__
global contactBlock__
global N__
global classInitArgs__
global classInitKwargs__
global contactProcessing__
contactProcessing__ = args[-6]
classInitKwargs__ = args[-4]
classInitArgs__ = args[-5]
sharedArrays__ = args[:-6]
contactIterator__ = args[-3]
contactBlock__ = args[-2]
N__ = args[-1]
[docs]def chunk(mylist, chunksize):
"""
Args:
mylist: array
chunksize: int
Returns:
list of chunks of an array len chunksize (last chunk is less)
"""
N = len(mylist)
chunks = list(range(0, N, chunksize)) + [N]
return [mylist[i:j] for i, j in zip(chunks[:-1], chunks[1:])]
[docs]def simple_worker(x, uniqueContacts):
"""
A "reference" version of "worker" function below that runs on only one core.
Unlike the actual multicore worker, it can write contacts to the matrix directly
without sorting. This is useful when your contact finding is faster than sorting
a 1D array fo contacts
If uniqueContacts True, assume that contactFinder outputs only unique contacts (
like pure contact map) if False, do not assume that (like in binned contact
map). Using False is always safe, but True will add a minor speed up, especially
for very large contact radius.
"""
my_iterator = contactIterator__(x, *classInitArgs__, **classInitKwargs__)
while True:
try:
contacts = my_iterator.next()
if contacts is None:
continue
contacts = np.asarray(contacts)
assert len(contacts.shape) == 2
if contacts.shape[1] != 2:
raise ValueError("Contacts.shape[1] must be 2. Exiting.")
contacts = contactProcessing__(contacts)
ctrue = indexing(contacts[:, 0], contacts[:, 1], N__)
if not uniqueContacts:
position, counts = np.unique(ctrue, return_counts=True)
sharedArrays__[0][position] += counts
else:
sharedArrays__[0][ctrue] += 1
except StopIteration:
return
[docs]def averageContactsSimple(contactIterator, inValues, N, **kwargs):
"""
This is a reference one-core implementation
Args:
contactIterator:
an iterator. See descriptions of "filenameContactMap" class below for example and explanations
inValues:
an array of values to pass to contactIterator. Would be an array of arrays of filenames etc.
N:
Size of the resulting contactmap
**kwargs:
arrayDtype: ctypes dtype (default c_int32) for the contact map
classInitArgs: args to pass to the constructor of contact iterator as second+ args (first is the file list)
classInitKwargs: dict of keyword args to pass to the coonstructor
uniqueContacts: whether contact iterator outputs unique contacts (true) or contacts can be duplicate (False)
contactProcessing: function f(contacts), should return processed contacts
Returns:
contactmap
"""
arrayDtype = kwargs.get("arrayDtype", ctypes.c_int32)
contactBlock = 0
classInitArgs = kwargs.get("classInitArgs", [])
classInitKwargs = kwargs.get("classInitKwargs", {})
uniqueContacts = kwargs.get("uniqueContacts", False)
contactProcessing = kwargs.get("contactProcessing", lambda x: x)
finalSize = N * (N + 1) // 2
sharedArrays = [np.zeros(finalSize, dtype=arrayDtype)] # just an array, not a shared array here bc 1 core
argset = list(sharedArrays) + [
contactProcessing,
classInitArgs,
classInitKwargs,
contactIterator,
contactBlock,
N,
]
init(*argset)
[simple_worker(x, uniqueContacts) for x in inValues] # just calling workers
final = triagToNormal(sharedArrays[0], N)
return final
[docs]def worker(x):
"""This is a parallel implementation of the worker using shared memory buckets
This worker is being called by the averageContact method
It receives contacts from the contactIterator by calling .next()
And puts contacts into the shared memory buckets
All the locks etc. for shared memory objects are handeled here as well
"""
import numpy as np
np.random.seed()
import random
random.seed() # making sure our bucket selectors are really randomized
sharedNumpy = list(map(tonumpyarray, sharedArrays__)) # shared numpy arrays
allContacts = []
contactSum = 0
myIterator = contactIterator__(x, *classInitArgs__, **classInitKwargs__) # acquiring and initializing iterator
stopped = False
while True: # main event loop
try:
contacts = myIterator.next() # fetch contacts
if contacts is None:
continue
contacts = np.asarray(contacts)
assert len(contacts.shape) == 2
if contacts.shape[1] != 2:
raise ValueError("Contacts.shape[1] must be 2. Exiting.")
contactSum += contacts.shape[0]
allContacts.append(contacts)
except StopIteration:
stopped = True
if (contactSum > contactBlock__) or stopped: # we aggregated enough contacts. ready to dump them.
if len(allContacts) == 0:
return # no contacts found at all - exiting (we must be stopped)
contactSum = 0
contacts = np.concatenate(allContacts, axis=0)
contacts = contactProcessing__(contacts)
if len(contacts) == 0:
if stopped: # contactProcessing killed all contacts? are we done?
return # if yes, exiting
continue # if not, going to the next bucket
ctrue = indexing(contacts[:, 0], contacts[:, 1], N__) # converting to 1D contacts
position, counts = np.unique(ctrue, return_counts=True) # unique contacts
assert position[0] >= 0
assert position[-1] < N__ * (N__ + 1) // 2 # boundary check for contacts
chunks = np.array(np.r_[0, np.cumsum(list(map(len, sharedArrays__)))], dtype=int)
inds = np.searchsorted(position, chunks) # assinging contacts to chunks here
if inds[-1] != len(position):
raise ValueError # last chunks boundary must be after all contacts)
indszip = list(zip(inds[:-1], inds[1:])) # extents of contact buckets
indarray = list(range(len(sharedArrays__)))
random.shuffle(indarray) # shuffled array of contactmap bucket indices we are going to work with
for j, (st, end) in enumerate(indszip):
position[st:end] -= chunks[j] # pre-subtracting offsets now - not to do it when the lock is being held
while len(indarray) > 0: # continue until all contacts are put in buckets
for i in range(len(indarray)): # going over all buckets
ind = indarray[i] # select current bucket
lock = sharedArrays__[ind].get_lock() # get lock state
if i == len(indarray): # is this the last bucket?
lock.acquire() # wait for it to be free, and work with it
else:
if not lock.acquire(0): # not the last bucket? Try to acquire the lock
continue # if failed, move to the next bucket
st, end = indszip[ind] # succeeded acquiring the lock? Then do things with our bucket
sharedNumpy[ind][position[st:end]] += counts[st:end] # add to the current bucket
lock.release()
indarray.pop(i) # remove the index of the bucket because it's finished
break # back to the main loop
allContacts = []
if stopped:
return
[docs]def averageContacts(contactIterator, inValues, N, **kwargs):
"""
A main workhorse for averaging contacts on multiple cores into one shared contact
map. It mostly does managing the arguments, and initializing the variables. All
of the logic of how contacts are actually put in shared memory buckets is in the
worker defined above.
PARAMETERS
----------
contactIterator : iterator
an iterator. See descriptions of "filenameContactMap" class below for
example and explanations
inValues : iterable
an array of values to pass to contactIterator. Would be an array of arrays
of filenames or something like that.
N : int
Size of one side of the resulting contactmap
arrayDtype : ctypes dtype (default c_int32) for the contact map
classInitArgs : args to pass to the constructor of contact iterator
classInitKwargs: dict of keyword args to pass to the constructor
contactProcessing: function f(contacts), should return processed contacts
nproc : int, number of processors(default 4)
bucketNum: int (default = nproc) Number of memory buckets to use
contactBlock: int (default 500k) Number of contacts to aggregate before writing
useFmap : True, False, or callable
If True, uses mirnylib.systemutils.fmap
If False, uses multiprocessing.Pool.map
Otherwise, uses provided function, assuming it of a fork-map type
(different initializations are needed for forkmap and
multiprocessing-style map)
Sorry, no outside multiprocessing-style maps for now, it's easy to fix
Let me know if it is needed.
Code that calcualtes a contactmap from a set of polymer conformation is in the
methods below (averageMonomerResolutionContactMap, etc.)
An example code that would run a contactmap from a simulation is below:
..code-block:: python
class simContactMap(object):
"contactmap 'finder' for a simulation"
def __init__(self, ind): # accept a parameter (e.g. random number generator seed)
self.model = initModel(ind) # pass parameter to the functon that returns me a model object
self.count = 10000000 # how many times to run a step of the model
self.model.steps(10000) # initial steps of the model to equilibrate it
def next(self): # actual realization of the self.next method
if self.count == 0: # terminate the simulation if we did self.count iterations
raise StopIteration
self.count -= 1 #decrement the counter
self.model.steps(30) # advance model by 30 steps
return np.array(self.model.getSMCs()).T # return current LEF positions
mymap = polychrom.contactmaps.averageContacts(simContactMap, range(20), 30000, nproc=20 )
"""
arrayDtype = kwargs.get("arrayDtype", ctypes.c_int32)
nproc = min(kwargs.get("nproc", 4), len(inValues))
bucketNum = kwargs.get("bucketNum", nproc)
if nproc == 1:
return averageContactsSimple(contactIterator, inValues, N, **kwargs)
contactBlock = kwargs.get("contactBlock", 5000000)
classInitArgs = kwargs.get("classInitArgs", [])
classInitKwargs = kwargs.get("classInitKwargs", {})
contactProcessing = kwargs.get("contactProcessing", lambda x: x)
finalSize = N * (N + 1) // 2
boundaries = np.linspace(0, finalSize, bucketNum + 1, dtype=int)
chunks = zip(boundaries[:-1], boundaries[1:])
sharedArrays = [mp.Array(arrayDtype, int(j - i)) for i, j in chunks]
argset = list(sharedArrays) + [
contactProcessing,
classInitArgs,
classInitKwargs,
contactIterator,
contactBlock,
N,
]
with closing(mp.Pool(processes=nproc, initializer=init, initargs=argset)) as p:
p.map(worker, inValues)
res = np.concatenate([tonumpyarray(i) for i in sharedArrays])
del sharedArrays # save memory
final = triagToNormal(res, N)
return final
[docs]class filenameContactMap(object):
"""
This is the iterator for the contact map finder
"""
[docs] def __init__(
self,
filenames,
cutoff=1.7,
loadFunction=None,
exceptionsToIgnore=[],
contactFunction=None,
):
"""
Init accepts arguments to initialize the iterator. filenames will be one of
the items in the inValues list of the "averageContacts" function cutoff and
loadFunction should be provided either in classInitArgs or classInitKwargs of
averageContacts
When initialized, the iterator should store these args properly and create
all necessary constructs
"""
self.filenames = filenames
self.cutoff = cutoff
self.exceptionsToIgnore = exceptionsToIgnore
self.contactFunction = contactFunction
self.loadFunction = loadFunction
self.i = 0
[docs] def next(self):
"""
This is the method which gets called by the worker asking for contacts. This
method should return new set of contacts each time it is called When there
are no more contacts to return (all filenames are gone, or simulation is
over), then this method should raise StopIteration
"""
if self.i == len(self.filenames):
raise StopIteration
try:
data = self.loadFunction(self.filenames[self.i])
except tuple(self.exceptionsToIgnore):
print("contactmap manager could not load file", self.filenames[self.i])
self.i += 1
return None
contacts = self.contactFunction(data, cutoff=self.cutoff)
self.i += 1
return contacts
[docs]def monomerResolutionContactMap(
filenames,
cutoff=5,
n=8, # Num threads
contactFinder=polymer_analyses.calculate_contacts,
loadFunction=polymerutils.load,
exceptionsToIgnore=[],
useFmap=False,
):
N = findN(filenames, loadFunction, exceptionsToIgnore)
args = [cutoff, loadFunction, exceptionsToIgnore, contactFinder]
values = [filenames[i::n] for i in range(n)]
return averageContacts(
filenameContactMap,
values,
N,
classInitArgs=args,
useFmap=useFmap,
uniqueContacts=True,
nproc=n,
)
[docs]def binnedContactMap(
filenames,
chains=None,
binSize=5,
cutoff=5,
n=8, # Num threads
contactFinder=polymer_analyses.calculate_contacts,
loadFunction=polymerutils.load,
exceptionsToIgnore=None,
useFmap=False,
):
n = min(n, len(filenames))
N = findN(filenames, loadFunction, exceptionsToIgnore)
if chains is None:
chains = [[0, N]]
bins = []
chains = np.asarray(chains)
chainBinNums = np.ceil((chains[:, 1] - chains[:, 0]) / (0.0 + binSize))
for i in range(len(chainBinNums)):
bins.append(binSize * (np.arange(int(chainBinNums[i]))) + chains[i, 0])
bins.append(np.array([chains[-1, 1] + 1]))
bins = np.concatenate(bins) - 0.5
Nbase = len(bins) - 1
if Nbase > 25000:
warnings.warn(UserWarning("very large contact map" " may be difficult to visualize"))
chromosomeStarts = np.cumsum(chainBinNums)
chromosomeStarts = np.hstack((0, chromosomeStarts))
def contactAction(contacts, myBins=[bins]):
contacts = np.asarray(contacts, order="C")
cshape = contacts.shape
contacts.shape = (-1,)
contacts = np.searchsorted(myBins[0], contacts) - 1
contacts.shape = cshape
return contacts
args = [cutoff, loadFunction, exceptionsToIgnore, contactFinder]
values = [filenames[i::n] for i in range(n)]
mymap = averageContacts(
filenameContactMap,
values,
Nbase,
classInitArgs=args,
useFmap=useFmap,
contactProcessing=contactAction,
nproc=n,
)
return mymap, chromosomeStarts
[docs]class filenameContactMapRepeat(object):
"""
This is a interator for the repeated contact map finder
"""
[docs] def __init__(
self,
filenames,
mapStarts,
mapN,
cutoff=1.7,
loadFunction=None,
exceptionsToIgnore=[],
contactFunction=None,
):
"""
Init accepts arguments to initialize the iterator. filenames will be one of
the items in the inValues list of the "averageContacts" function cutoff and
loadFunction should be provided either in classInitArgs or classInitKwargs of
averageContacts
When initialized, the iterator should store these args properly and create
all necessary constructs
"""
self.filenames = filenames
self.cutoff = cutoff
self.exceptionsToIgnore = exceptionsToIgnore
self.mapStarts = mapStarts
self.mapN = mapN
self.contactFunction = contactFunction
self.loadFunction = loadFunction
self.i = 0
self.curStarts = []
[docs] def next(self):
"""
This is the method which gets called by the worker asking for contacts. This
method should return new set of contacts each time it is called When there
are no more contacts to return (all filenames are gone, or simulation is
over), then this method should raise StopIteration
"""
try:
if len(self.curStarts) == 0:
if self.i == len(self.filenames):
raise StopIteration
self.data = self.loadFunction(self.filenames[self.i])
self.curStarts = list(self.mapStarts)
self.i += 1
start = self.curStarts.pop()
data = self.data[start : start + self.mapN]
assert len(data) == self.mapN
except tuple(self.exceptionsToIgnore):
print("contactmap manager could not load file", self.filenames[self.i])
self.i += 1
return None
contacts = self.contactFunction(data, cutoff=self.cutoff)
return contacts
[docs]def monomerResolutionContactMapSubchains(
filenames,
mapStarts,
mapN,
cutoff=5,
n=8, # Num threads
method=polymer_analyses.calculate_contacts,
loadFunction=polymerutils.load,
exceptionsToIgnore=[],
useFmap=False,
):
args = [mapStarts, mapN, cutoff, loadFunction, exceptionsToIgnore, method]
values = [filenames[i::n] for i in range(n)]
return averageContacts(
filenameContactMapRepeat,
values,
mapN,
classInitArgs=args,
useFmap=useFmap,
uniqueContacts=True,
nproc=n,
)