forked from jeromekelleher/discsim
-
Notifications
You must be signed in to change notification settings - Fork 0
/
discsim.py
256 lines (231 loc) · 9.36 KB
/
discsim.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
#
# Copyright (C) 2013 Jerome Kelleher <[email protected]>
#
# This file is part of discsim.
#
# discsim is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# discsim is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with discsim. If not, see <http://www.gnu.org/licenses/>.
#
"""
Coalescent simulation in continuous space under the disc replacement
model.
"""
from __future__ import division
from __future__ import print_function
import sys
import math
import random
import numbers
import ercs
import _discsim
__version__ = '1.0.0'
class Simulator(object):
"""
Simulate the extinction/recolonisation continuum disc model
in one or two dimensions, tracking either the genetic ancestors
(and their genealogies at multiple loci) or the pedigree ancestors.
If ``simulate_pedigree`` is ``True``, simulate the locations of the
pedigree ancestors of the sample; otherwise, simulate the locations
and ancestry of the genetic ancestors of the sample over muliple
loci.
"""
def __init__(self, torus_diameter, simulate_pedigree=False):
"""
Allocates a new simulator object on a torus of the specified diameter.
"""
self.torus_diameter = torus_diameter
self.simulate_pedigree = simulate_pedigree
self.sample = None
self.event_classes = None
self.num_parents = None
self.num_loci = None
self.recombination_probability = None
self.max_occupancy = None
self.max_population_size = None
self.pixel_size = None
self.dimension = None
self.random_seed = None
self.__simulator = None
def __set_defaults(self):
"""
Sets up the default values for instances that have not been
specified.
"""
if self.recombination_probability is None:
self.recombination_probability = 0.5
if self.num_loci is None:
self.num_loci = 1
if self.random_seed is None:
self.random_seed = random.randint(0, 2**31)
if self.max_population_size is None:
self.max_population_size = 1000
if self.pixel_size is None:
self.pixel_size = 2.0
if self.num_parents is None:
if self.simulate_pedigree:
self.num_parents = 2
else:
self.num_parents = 1 if self.num_loci is 1 else 2
if self.max_occupancy is None:
r = self.event_classes[0].r
u = self.event_classes[0].u
s = self.pixel_size
N = self.num_parents / u
area = s + 2 * r
if self.dimension == 2:
area = s**2 + 4 * r * s + math.pi * r**2
# a density of N per unit area is probably overkill
self.max_occupancy = int(area * N)
def __convert_sample(self):
"""
Coverts the sample of locations in the sample instance variable to
the format used by _ercs, a zero-indexed list. The zero'th element
of this list must be None
"""
if self.sample is None:
raise ValueError("sample must be specified")
if len(self.sample) < 2:
raise ValueError("At least one sample must be specified")
if self.sample[0] is not None:
raise ValueError("zeroth element of list samples must be None")
sample = self.sample[1:]
last_dim = None
dim = None
for x in sample:
if isinstance(x, numbers.Number):
dim = 1
else:
dim = 2
if last_dim is None:
last_dim = dim
if dim != last_dim:
raise ValueError("Sample must be either 1 or 2 dimensional")
self.dimension = dim
return sample
def __convert_events(self):
"""
Converts the events to the required low-level representation.
"""
if self.event_classes is None:
raise ValueError("event_classes must be specified")
if len(self.event_classes) == 0:
raise ValueError("At least one event class must be specified")
for ec in self.event_classes:
if not isinstance(ec, ercs.DiscEventClass):
raise ValueError("Only events from the disc model are supported")
llec = [ec.get_low_level_representation() for ec in self.event_classes]
return llec
def __allocate_simulator(self):
"""
Allocates a new simulator instance with the required instance
variables.
"""
sample = self.__convert_sample()
events = self.__convert_events()
self.__set_defaults()
self.__simulator = _discsim.Simulator(sample, events,
num_loci=self.num_loci, torus_diameter=self.torus_diameter,
pixel_size=self.pixel_size, random_seed=self.random_seed,
recombination_probability=self.recombination_probability,
num_parents=self.num_parents,
simulate_pedigree=int(self.simulate_pedigree),
max_population_size=self.max_population_size,
max_occupancy=self.max_occupancy, dimension=self.dimension)
def get_ll_object(self):
"""
Returns the low-level simulator object.
"""
return self.__simulator
def run(self, until=None):
"""
Runs the simulation until coalescence or the specified time is
exceeded. If ``until`` is not specified simulate until complete
coalescence. Returns True if the sample coalesced, and False
otherwise.
:param until: the time to simulate to.
:type until: float
:return: ``True`` if the sample has completely coalesced; ``False``
otherwise.
:rtype: Boolean
:raises: :exc:`_discsim.LibraryError` when the C library encounters an
error
"""
if self.__simulator == None:
self.__allocate_simulator()
dbl_max = sys.float_info.max
t = dbl_max if until == None else until
return self.__simulator.run(t)
def get_time(self):
"""
Returns the current time of the simulator.
"""
return self.__simulator.get_time()
def get_num_reproduction_events(self):
"""
Returns the number of reproduction events since the beginning
of the simulation.
"""
return self.__simulator.get_num_reproduction_events()
def get_population(self):
"""
Returns the current state of the population. For a pedigree simulation,
this returns the current locations of all individuals; in a genetic
simulation, this also returns the ancestral material mappings for
each individual.
:return: the current state of the population.
:rtype: A list describing the state of each extant ancestor. For a
pedigree simulation, this is a list of locations. For a genetic
simulation, this is a list of tuples ``(x, a)``, where ``x``
is the location of the ancestor and ``a`` is its ancestry. The
ancestry is a dictionary mapping a locus to the node it occupies
in the genealogy for that locus.
"""
return self.__simulator.get_population()
def get_history(self):
"""
Returns the history of the current ancestral population. This is
not defined for a pedigree simulation.
:return: the simulated history of the sample, (pi, tau)
:rtype: a tuple ``(pi, tau)``; ``pi`` is a list of lists of integers,
and ``tau`` is a list of lists of doubles
:raises: :exc:`NotImplementedError` if called for a pedigree
simulation.
"""
return self.__simulator.get_history()
def reset(self):
"""
Resets the simulation so that we can perform more replicates. This must
be called if any attributes of the simulation are changed; otherwise,
these changes will have no effect.
"""
self.__simulator = None
def print_state(self):
"""
Prints a short summary of the state of the simulator.
"""
print("torus_diameter = ", self.torus_diameter)
print("simulate_pedigree = ", self.simulate_pedigree)
print("sample = ", len(self.sample) - 1)
print("event_classes = ", len(self.event_classes))
for ec in self.event_classes:
print("\t", ec.get_low_level_representation())
print("num_parents = ", self.num_parents)
print("num_loci = ", self.num_loci)
print("recombination_probability = ", self.recombination_probability)
print("max_occupancy = ", self.max_occupancy)
print("max_population_size = ", self.max_population_size)
print("pixel_size = ", self.pixel_size)
print("dimension = ", self.dimension)
print("random_seed = ", self.random_seed)
n = 0 if self.__simulator is None else len(self.get_population())
print("population size = ", n)