-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHIVABM_Evolution.py
executable file
·830 lines (727 loc) · 31.7 KB
/
HIVABM_Evolution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
#!/usr/bin/env python
# encoding: utf-8
"""
Module that carries the defintions for the Population for
the ABM of drug use and HIV.
+ Author: Lars Seemann
+ Email: [email protected]
+ Date: 2011-01-28
Copyright (c) 2010, under the Simplified BSD License. \n
For more information on FreeBSD see: http://www.opensource.org/licenses/bsd-license.php \n
All rights reserved.
"""
__author__="Lars Seemann ([email protected])"
import random
import unittest
from copy import deepcopy, copy
try: from HIVABM_PartialNetwork import ScaleFreeNetworkClass
except ImportError: raise ImportError("Can't import ScaleFreeNetworkClass")
class HIVModel(ScaleFreeNetworkClass):
"""
:Purpose:
This is the core class used to simulate
the spread of HIV and drug use in one MSA
(Metropolitan Statistical Area).
:Input:
N : int
Number of agents. Default: 10000
tmax: int
Number of simulation steps (years).
:py:class:`SocialNetworkClass` : Inherited
:py:class:`PopulationClass` : Inherited
:Attributes:
:py:attr:`tmax` : int
Number of time steps simulated.
:py:attr:`CleanSyringeUsers` : list
:py:attr:`SEPAgents` : dict
Dictionary of users who participated in a
syringe exchange program (SEP) {agent:time}.
:py:attr:`DrugTreatmentAgents` : dict
Dictionary of users who underwent drug
treatment {agent:time}.
:py:attr:`TestedAgents` : list
List of agents who get tested for HIV every time step.
:py:attr:`tmp_Agents` : dict
Changes resulting from parsing through the agents
and applying the update rules are stored
in :py:attr:`tmp_agent_dict`.
All attributes from :py:class:`SocialNetworkClass` \n
All attributes from :py:class:`PopulationClass`
:Methods:
:py:meth:`_update_population` \n
:py:meth:`_needle_transmission` \n
:py:meth:`_sex_transmission` \n
:py:meth:`_drug_transition` \n
:py:meth:`_update_IDU` \n
:py:meth:`_update_NIDU_ND` \n
:py:meth:`_update_AllAgents` \n
:py:meth:`_VCT` \n
:py:meth:`_SEP` \n
:py:meth:`_enter_drug_treatment` \n
:py:meth:`_initiate_HAART` \n
:py:meth:`_get_partner` \n
:py:meth:`_update_AllAgents` \n
:py:meth:`run` \n
:py:meth:`store_results` \n
:py:meth:`get_HIV_prevalence_drugs` \n
:py:meth:`get_HIV_prevalence` \n
:py:meth:`plot_results` \n
All methods from :py:class:`SocialNetworkClass` \n
All methods from :py:class:`PopulationClass`
"""
def __init__(self, N = 10000, tmax = 40):
""" Initialize HIVModel object """
if (type(tmax) is not int):
raise ValueError("Number of time steps must be integer")
else:
self.tmax = tmax
self.N_TreatmentSpots = 0 # initiate total number of treatment spots
# Initialize inherited social Network
#print 'Create Social Network...'
#PopulationClass.__init__(self, n = N) # Create population
# OLD
# Clean syring users
#self.CleanSyringeUsers =[]
#for agent in self.Agents.keys():
# agent_drug_type = self.get_agent_charactersitic(agent, 'Drug Type')
# if agent_drug_type == 'IDU' and random.random() < 0.1:
# self.CleanSyringeUsers.append(agent)
# Other lists / dictionaries
self.SafeSexUsers = dict() # dict of users who had safe sex (agent:time)
self.SEPAgents = dict() # dict of users who used SEP ( agent:time)
self.DrugTreatmentAgents_current = dict() # dictionary of users who are currently undergoing
# drug treatent (agent:time)
self.DrugTreatmentAgents_past = dict() # dictionary of users who underwent drug treat-
# ment in the past(agent:time)
self.VCTAgents = dict() # list of agents who get tested for HIV ((agent:time)
self.HAARTAgents = dict() # dict of agents in HIV treatment (agent:time)
self.HAARTAdherence = dict() # dict of agents HIV treatment (HAART) adherence (agent:adherence)
self.tmp_Agents = dict() # dictionary that keeps track of the update
# Keep track of results
self.IDUprevalence = [0]*self.tmax
self.NIDUprevalence = [0]*self.tmax
self.NDprevalence = [0]*self.tmax
self.HIVtotalPrevalence = [0]*self.tmax
self.HIVinIDU = [0]*self.tmax
self.HIVinNIDU = [0]*self.tmax
self.HIVinND = [0]*self.tmax
def _update_population(self):
"""
:Purpose:
Update the population. Changes resulting from parsing
through the agents and applying the update rules are stored
in :py:attr:`tmp_agent_dict`. This method updates the whole
population, i.e., it copies changes from the :py:attr:`tmp_agent_dict`
dictionary and copies it into the :py:attr:`Agents` dictionary.
:Input:
none
:Output:
none
"""
for (u,d) in self.tmp_Agents.iteritems():
self.Agents.update({u:d})
def _needle_transmission(self, agent, partner):
"""
:Purpose:
Simulate random transmission of HIV between two IDU agents through needle.\n
Needed in _update_IDUand
:Input:
agents : int
partner : int
:Output: -
"""
# both must be IDU
partner_drug_type = self.get_agent_charactersitic(partner, 'Drug Type')
agent_drug_type = self.get_agent_charactersitic(agent, 'Drug Type')
if not (partner_drug_type == 'IDU' and agent_drug_type == 'IDU' ):
raise ValueError("To share a needle both agents must be IDU")
else:
# Do they share a needle?
if (agent in self.SEPAgents.keys() or partner in self.SEPAgents.keys() ):
pass # no needle sharing
elif (random.random() < 0.66):
pass # no needle sharing
else: # they do share a needle
# HIV+ ?
HIV_agent = self.get_agent_charactersitic(agent,'HIV')
HIV_partner = self.get_agent_charactersitic(partner,'HIV')
# if agent HIV+
if HIV_agent == 1 and random.random() < 0.009:
tmp_partner_dict = deepcopy(self.Agents[partner])
tmp_partner_dict.update({'HIV' : 1})
self.tmp_Agents.update({partner : tmp_partner_dict})
# if partner HIV+
if HIV_partner == 1 and random.random() < 0.009:
tmp_agent_dict = deepcopy(self.Agents[agent])
tmp_agent_dict.update({'HIV' : 1})
self.tmp_Agents.update({agent : tmp_partner_dict})
def _sex_transmission(self, agent, partner, time):
"""
:Purpose:
Simulate random transmission of HIV between two agents through Sex.
Needed for all users. Sex is not possible in case the agent and
assigned partner have incompatible Sex behavior.
:Input:
agents : int
partner : int
time : int
:Output:
none
"""
# Double check: Sex possible?
Type_agent = self.get_agent_charactersitic(agent,'Sex Type')
Type_partner = self.get_agent_charactersitic(partner,'Sex Type')
if Type_agent == 'HM' and Type_partner in [ 'HF', 'MSM']:
pass
elif Type_partner == 'HM' and Type_agent in [ 'HF', 'MSM']:
pass
elif Type_agent == 'MSM' and Type_partner in ['MSM', 'WSW' , 'HF']:
pass
elif Type_partner == 'MSM' and Type_agent in ['MSM', 'WSW' , 'HF']:
pass
elif Type_agent == 'WSW' and Type_partner in ['MSM', 'WSW' , 'HM']:
pass
elif Type_partner == 'WSW' and Type_agent in ['MSM', 'WSW' , 'HM']:
pass
else:
raise ValueError("Sex must be possible! %s %s"%(str(Type_agent),str(Type_partner)))
# HIV status of agent and partner
# Everything from here is only run if one of them is HIV+
HIVstatus_Agent = self.get_agent_charactersitic(agent,'HIV')
HIVstatus_Partner = self.get_agent_charactersitic(partner,'HIV')
if HIVstatus_Agent == 1 or HIVstatus_Partner == 1:
# Sex between men?
if Type_agent in ['HM', 'MSM'] and Type_partner in ['HM', 'MSM']:
SexBetweenMen = 1
elif Type_partner in ['HM', 'MSM'] and Type_agent in ['HM', 'MSM']:
SexBetweenMen = 1
else:
SexBetweenMen = 0
# Define probabilities for unsafe sex
UnsafeSexProb_UnknowHIVStatus = {'MSM':{'IDU':0.65, 'NIDU':0.55, 'ND':0.45},
'HM':{'IDU':0.75, 'NIDU':0.75, 'ND':0.75},
'HF':{'IDU':0.75, 'NIDU':0.75, 'ND':0.75},
'WSW':{'IDU':0.80, 'NIDU':0.80, 'ND':0.80}}
UnsafeSexProb_HIVPosStatus = {'MSM':{'IDU':0.65, 'NIDU':0.55, 'ND':0.45},
'HM':{'IDU':0.40, 'NIDU':0.40, 'ND':0.40},
'HF':{'IDU':0.40, 'NIDU':0.40, 'ND':0.40},
'WSW':{'IDU':0.45, 'NIDU':0.45, 'ND':0.45}}
DrugType_Agent = self.get_agent_charactersitic(agent,'Drug Type')
DrugType_Partner = self.get_agent_charactersitic(partner,'Drug Type')
if agent in self.VCTAgents and HIVstatus_Agent == 1:
p_SafeSex_Agent = UnsafeSexProb_HIVPosStatus[Type_agent][DrugType_Agent]
else:
p_SafeSex_Agent = UnsafeSexProb_HIVPosStatus[Type_agent][DrugType_Agent]
if partner in self.VCTAgents and HIVstatus_Partner == 1:
p_SafeSex_Partner = UnsafeSexProb_HIVPosStatus[Type_partner][DrugType_Partner]
else:
p_SafeSex_Partner= UnsafeSexProb_HIVPosStatus[Type_partner][DrugType_Partner]
p_SafeSex = 0.5*(p_SafeSex_Agent + p_SafeSex_Partner)
# Safe Sex?
if random.random() < p_SafeSex:
SafeSexFlag = 0
else:
SafeSexFlag = 1
# HIV transmission only possible if unprotected sex
if SafeSexFlag == 0:
# HIV transmission probability
if agent in self.HAARTAgents:
if SexBetweenMen == 1:
if random.random() < 0.4:
p_transmission = random.choice([0.452, 0.382, 0.214, 0.113])
else:
p_transmission = 0.010
elif SexBetweenMen == 0:
if random.random() < 0.4:
p_transmission = random.choice([0.113, 0.092, 0.047, 0.024, 0.002])
else:
p_transmission = 0.002
else:
raise ValueError("Check SexBetweenMen flag! %s"%str(SexBetweenMen))
else:
if SexBetweenMen == 1:
p_transmission = 0.452
elif SexBetweenMen == 0:
p_transmission = 0.113
else:
raise ValueError("Check SexBetweenMen flag! %s"%str(SexBetweenMen))
# HIV transmission
# if agent HIV+
if HIVstatus_Agent == 1 and random.random() < p_transmission:
tmp_partner_dict = deepcopy(self.Agents[partner])
tmp_partner_dict.update({'HIV' : 1})
self.tmp_Agents.update({partner : tmp_partner_dict})
# if partner HIV+
if HIVstatus_Partner == 1 and random.random() < p_transmission:
tmp_agent_dict = deepcopy(self.Agents[agent])
tmp_agent_dict.update({'HIV' : 1})
self.tmp_Agents.update({agent : tmp_agent_dict})
def _drug_transition(self, agent, partner):
"""
:Purpose:
Simulate transition of drug behavior. The following scenarios are
possible:
+ ND agent might become NIDU when meeting NIDU
+ NIDU might become IDU when meeting IDU
The function is only applied for NIDU and ND users.
:Input:
agents : int
partner : int
:Output: -
"""
partner_drug_type = self.get_agent_charactersitic(partner, 'Drug Type')
agent_drug_type = self.get_agent_charactersitic(agent, 'Drug Type')
# NIDU -> IDU
if agent_drug_type == 'NIDU' and partner_drug_type == 'IDU':
if random.random() < 0.12:
# agent becomes IDU
tmp_agent_dict = deepcopy(self.Agents[agent])
tmp_agent_dict.update({'Drug Type' : 'IDU'})
self.tmp_Agents.update({agent : tmp_agent_dict})
if partner_drug_type == 'NIDU' and agent_drug_type == 'IDU':
if random.random() < 0.12:
# partner becomes IDU
tmp_partner_dict = deepcopy(self.Agents[partner])
tmp_partner_dict.update({'Drug Type' : 'IDU'})
self.tmp_Agents.update({partner : tmp_partner_dict})
# ND -> NIDU
if agent_drug_type == 'ND' and partner_drug_type == 'NIDU':
if random.random() < 0.30:
# agent becomes NIDU
tmp_agent_dict = deepcopy(self.Agents[agent])
tmp_agent_dict.update({'Drug Type' : 'NIDU'})
self.tmp_Agents.update({agent : tmp_agent_dict})
if partner_drug_type == 'ND' and agent_drug_type == 'NIDU':
if random.random() < 0.30:
# partner becomes NIDU
tmp_partner_dict = deepcopy(self.Agents[partner])
tmp_partner_dict.update({'Drug Type' : 'NIDU'})
self.tmp_Agents.update({partner : tmp_partner_dict})
def _update_IDU(self, agent, partner, time):
"""
:Purpose:
Let IDU agent interact with a partner.
Update IDU agents:
1 - determine transition type
2 - Injection rules
3 - Sex rules
4 - HIV transmission
5 - SEP
:Input:
agent : int
partner : int
time : int
Output:
none
"""
partner_drug_type = self.get_agent_charactersitic(partner, 'Drug Type')
agent_drug_type = self.get_agent_charactersitic(agent, 'Drug Type')
if partner_drug_type == 'IDU' and agent_drug_type == 'IDU':
if random.random() < 0.5: self. _needle_transmission(agent, partner)
else: self._sex_transmission(agent, partner, time)
elif partner_drug_type in ['NIDU', 'ND'] or agent_drug_type in ['NIDU', 'ND']:
self._drug_transition(agent, partner)
self._sex_transmission(agent, partner, time)
else:
raise ValueError("Agents must be either IDU, NIDU, or ND")
# SEP
if partner_drug_type == 'IDU':
if random.random() < 0.6: self.SEPAgents.update({partner:time})
if agent_drug_type == 'IDU':
if random.random() < 0.6: self.SEPAgents.update({agent:time})
def _update_NIDU_ND(self, agent, partner, time):
"""
:Purpose:
Let NIDU or ND agent interact.
:Input:
agent : int
partner : int
time : int
Output: none
"""
self._drug_transition(agent, partner)
self._sex_transmission(agent, partner, time)
def _VCT(self, agent, time):
"""
:Purpose:
Account for voluntary Counseling and Testing(VCT)
:Input:
agent : int
partner : int
time : int
:Output:
none
"""
# Drug Treatment
# SEP
drug_type = self.get_agent_charactersitic(agent, 'Drug Type')
SEPstat = FALSE
if agent in self.SEPAgents:
if time == self.SEPAgents[agent]:
SEPstat = TRUE
# MSM
if drug_type == 'IDU':
if SEPstat:
if random.random() < 0.75:
self.VCTAgents.update({agent:time})
else:
if random.random() < 0.60:
self.VCTAgents.update({agent:time})
elif agent in self.MSM_agents and random.random() < 0.25:
self.VCTAgents.update({agent:time})
else:
if random.random() < 0.09:
self.VCTAgents.update({agent:time})
def _SEP(self, agent, time):
"""
:Purpose:
Account for SEP (Syringe Exchange Program) for IDU agents. \n
SEP use depends on the year and the functional relationship
is given by
P(SEP USE (t+1) | IDU) = P(SEP USE (t) | IDU)*f_t(SEP density)
where f_t(SEP density) = (1+? + ln(?))
:Input:
time : int
:Output:
bool
"""
if agent not in self.IDU_agents:
raise ValueError("_SEP only valid for IDU agents! %s"%
str(self.get_agent_charactersitic(agent,'Drug Type')))
# The following values are estimated from the SEP prob plot in the documentation
P_SEPuse_dict={1:0.25, 2:0.275, 3:0.3, 4:0.32, 5:0.34, 6:0.36, 7:0.38, 8:0.39, 9:0.41, 10:0.42, 11:0.44}
if random.random()<P_SEPuse_dict[time]:
self.SEPAgents.update({agent:time})
def _enter_drug_treatment(self, agent, time):
"""
:Purpose:
Account for drug treatment. \n
Entering drug treatment is similar to SEP, drug treatment has a functional relationship given as
follows:
P(treatment (t+1) | IDU or NIDU) = P(treatment (agent,t) | IDU) if x < N
else: 0 (x >= 0)
where N is the total number of treatment slots available. \n
An agent who was already in drug treatment and relapsed, has a pobability twice as strong
to reenter drug treatment at a later point.
:Input:
agent : int
time : int
:Output:
bool
"""
N_TrSpots_Max = 500 # max number of treatment spots
CurrentlyTreatedFlag = FALSE
if self.N_TreatmentSpots < 500:
# Determine probability of treatment
if agent in self.DrugTreatmentAgents_current.keys():
CurrentlyTreatedFlag = TRUE
prob = 0.6
elif agent in self.DrugTreatmentAgents_past.keys():
if agent in self.SEPAgents:
prob = 2.*0.09
elif agent in self.IDU_agents or agent in self.NIDU_agents:
prob = 2.*0.18
else:
# Probability considering SEP status
if agent in self.SEPAgents:
prob = 0.09
elif agent in self.IDU_agents or agent in self.NIDU_agents:
prob = 0.18
else:
raise ValueError("_SEP only valid for NIDU or IDU agents! %s"%
str(self.get_agent_charactersitic(agent,'Drug Type')))
# Assign treatment randomly
if random.random() < factor*0.09:
self.DrugTreatmentAgents_current.update({agent:time})
self.N_TreatmentSpots += 1
return TRUE
else:
if CurrentlyTreatedFlag:
# Exit drug treatment
self.DrugTreatmentAgents_past.update({agent:self.DrugTreatmentAgents_current[agent]})
del self.DrugTreatmentAgents_current[agent]
return FALSE
elif self.N_TreatmentSpots == 500:
return FALSE
else:
raise ValueError("Check self.N_TreatmentSpots! Max value = 500! %d"%self.N_TreatmentSpots)
def _initiate_HAART(self, agent, time):
"""
:Purpose:
Account for HIV treatment through highly active antiretroviral therapy (HAART).
HAART was implemented in 1996, hence, there is treatment only after 1996.
HIV treatment assumes that the agent knows his HIV+ status.
:Input:
time : int
:Output:
none
"""
# Determine probability of HIV treatment
if time >= 5: # HAART implemented 1996
if agent in self.HAARTAgents:
pass # agents never discontinure therapy
if agent in self.VCTAgents:
agent_drug_type = self.get_agent_charactersitic(agent, 'Drug Type')
if agent_drug_type == 'IDU':
if agent in self.VCTAgents:
if agent in self.DrugTreatmentAgents_current:
prob = 0.21
else:
prob = 0.12
elif agent_drug_type == 'NIDU':
if agent in self.VCTAgents:
prob = 0.14
else:
if agent in self.VCTAgents:
prob = 0.22
else:
prob = 0.0
else:
prob = 0.0
if random.random() < prob:
self.HAARTAgents.update({agent:time})
# adherence
if random.random() < 0.6:
adherence = 1
else:
adherence = 0
self.HAARTAdherence.update({agent:adherence})
def _get_partner(self, agent, agent_drug_type):
"""
:Purpose:
Get a partner for agent.
:Input:
agent : int
agent_drug_type : str
Either 'IDU', 'NIDU', 'ND'
:Output:
partner : int
"""
if agent not in range(self.PopulationSize):
raise ValueError("Invalid agent! %s"%str(agent))
if agent_drug_type not in ['IDU', 'NIDU', 'ND']:
raise ValueError("Invalid drug type! %s"%str(agent_drug_type))
if agent_drug_type == 'NIDU':
pass
elif agent_drug_type == 'IDU':
pass
else:
pass
def _get_number_of_partners(self,
agent,
agent_drug_type,
agent_sex_type):
"""
:Purpose:
Get number of partners for a agent.
Drawn from Poisson distribution.
:Input:
agent : int
agent_drug_type : str
Either 'IDU', 'NIDU', 'ND'
:Output:
NumPartners : int
"""
# Check input
# Drug type
if agent not in range(self.PopulationSize):
raise ValueError("Invalid agent! %s"%str(agent))
if agent_drug_type not in ['IDU', 'NIDU', 'ND']:
raise ValueError("Invalid drug type! %s"%str(agent_drug_type))
# Sex type
if agent_sex_type not in ['HM','NF','MSM','WSW']:
raise ValueError("Invalid sex type! %s"%str(agent_sextype))
if agent_drug_type == 'NIDU':
PoissonLambda = 3
elif agent_drug_type == 'IDU':
PoissonLambda = 5
elif agent_sex_type == 'MSM':
PoissonLambda = 1
RandNumCont = np.random.poisson(PoissonLambda, 1)
return RandNumCont[0]
def _update_AllAgents(self, time):
"""
:Purpose:
Update IDU agents:\n
For each agent:\n
1 - determine agent type
2 - update accordingly
3 - VCT (Voluntsry Counseling and Testing)
:Input:
agent, time
:Output:
none
"""
for agent in self.Agents.keys():
agent_drug_type = self.get_agent_charactersitic(agent, 'Drug Type')
NumPartners = self._get_number_of_partners(agent_drug_type)
n = 0
while n < NumPartners:
# Choose a random partner
partner = self._get_partner(agent, agent_drug_type)
# Transmission of HIV / Transition of agents
if agent_drug_type == 'IDU':
self._update_IDU(agent, partner, time)
else:
self._update_NIDU_ND(agent, partner, time)
# Counseling and Testing (VCT)
self._VCT(agent,time)
#
def run(self):
"""
Core of the model
:param agent: agent
"""
for t in range(1,self.tmax):
print ('TIME = '+str(t)+' ________________________________________')
self._update_AllAgents(t)
self._update_population()
self.store_results(t)
def store_results(self, time):
""" Store result into appropriate arrays """
NumIDU = 0
NumNIDU = 0
NumND = 0
for agent in self.Agents.keys():
agent_drug_type = self.get_agent_charactersitic(agent, 'Drug Type')
if agent_drug_type == 'IDU': NumIDU+=1
elif agent_drug_type == 'NIDU': NumNIDU+=1
elif agent_drug_type == 'ND': NumND+=1
else: raise ValueError("Agents must be either IDU, NIDU, or ND")
self.IDUprevalence[time] = NumIDU
self.NIDUprevalence[time] = NumNIDU
self.NDprevalence[time] = NumND
self.HIVtotalPrevalence[time] = self.get_HIV_prevalence()
HIVinDrugsResults = self.get_HIV_prevalence_drugs()
self.HIVinIDU[time] = HIVinDrugsResults[0]
self.HIVinNIDU[time] = HIVinDrugsResults[1]
self.HIVinND[time] = HIVinDrugsResults[2]
def get_HIV_prevalence_drugs(self):
"""
get HIV prevalence within all three drug user groups
"""
count_HIV_IDU = 0
count_HIV_NIDU = 0
count_HIV_ND = 0
for agent in self.Agents:
HIVstatus = self.get_agent_charactersitic(agent, 'HIV')
if HIVstatus == 1:
agent_drug_type = self.get_agent_charactersitic(agent, 'Drug Type')
if agent_drug_type == 'IDU': count_HIV_IDU += 1
elif agent_drug_type == 'NIDU': count_HIV_NIDU += 1
elif agent_drug_type == 'ND': count_HIV_ND += 1
else: raise ValueError("Agent must be either IDU, NIDU or ND !")
elif HIVstatus != 0:
print HIVstatus
raise ValueError("HIV status must be either 0 or 1 !")
print [count_HIV_IDU, count_HIV_NIDU, count_HIV_ND]
return [count_HIV_IDU, count_HIV_NIDU, count_HIV_ND]
def get_HIV_prevalence(self):
""" get HIV prevalence within IDUs """
HIVcount = 0.0
for agent in self.Agents.keys():
HIVstatus = self.get_agent_charactersitic(agent, 'HIV')
if HIVstatus == 1: HIVcount += 1
print HIVcount
return HIVcount
def plot_results(self):
""" Plot Results """
plt.subplot(231)
plt.plot(self.IDUprevalence[1:], 'bo-', markerfacecolor='green')
plt.ylabel('IDU')
plt.xlabel('Time')
plt.subplot(232)
plt.plot(self.NIDUprevalence[1:], 'bo-', markerfacecolor='green')
plt.ylabel('NIDU')
plt.xlabel('Time')
plt.subplot(233)
plt.plot(self.NDprevalence[1:], 'bo-', markerfacecolor='green')
plt.ylabel('ND')
plt.xlabel('Time')
plt.subplot(234)
plt.plot(self.HIVinIDU[1:], 'bo-', markerfacecolor='red')
plt.ylabel('IDU HIV+')
plt.xlabel('Time')
plt.subplot(235)
plt.plot(self.HIVinNIDU[1:], 'bo-', markerfacecolor='red')
plt.ylabel('NIDU HIV+ ')
plt.xlabel('Time')
plt.subplot(236)
plt.plot(self.HIVinND[1:], 'bo-', markerfacecolor='red')
plt.ylabel('ND HIV+ ')
plt.xlabel('Time')
plt.show()
class TestClassMethods(unittest.TestCase):
"""
:Purpose:
Unittest for testing the methods of the HIV class.
"""
def setUp(self):
"""
:Purpose:
Tests that all models from setup pass inspection.
``setUp`` is perfomed before each method.
"""
self.N_pop = 1000
self.T = 40
def test_update_population(self):
""" Test: :py:meth:`_update_population` """
print " ... Test: method _update_population"
myModel = HIVModel(N = self.N_pop, tmax = self.T)
self.assertTrue(len(myModel.tmp_Agents) == 0)
myModel.tmp_Agents = {1:{'N':1, 'M':2}, 2:{'N':3, 'M':4}}
myModel._update_population()
self.assertTrue(myModel.Agents[1]['N'] == 1)
self.assertTrue(myModel.Agents[1]['M'] == 2)
self.assertTrue(myModel.Agents[2]['N'] == 3)
self.assertTrue(myModel.Agents[2]['M'] == 4)
def test_needle_transmission(self):
""" Test: :py:meth:`_needle_transmission` """
print " ... Test: method _needle_transmission"
myModel = HIVModel(N = self.N_pop, tmax = self.T)
self.assertTrue(len(myModel.tmp_Agents) == 0)
chosenpairs = {}
for agent in myModel.IDU_agents:
partner = random.choice(myModel.IDU_agents)
chosenpairs.update({agent:partner})
myModel._needle_transmission(agent, partner)
if len(myModel.tmp_Agents) > 0:
for (agent, d) in myModel.tmp_Agents.iteritems():
self.assertTrue(agent in myModel.IDU_agents)
self.assertTrue(d['HIV'] == 1)
# agent or partner must have been HIV=1 before transmission
HIVstatus = (myModel.Agents[chosenpairs[agent]]['HIV'] == 1
or myModel.Agents[agent]['HIV'] == 1)
self.assertTrue(HIVstatus == 1)
def test_sex_transmission(self):
""" Test: :py:meth:`_sex_transmission` """
print " ... Test: method _sex_transmission"
myModel = HIVModel(N = self.N_pop, tmax = self.T)
self.assertTrue(len(myModel.tmp_Agents) == 0)
chosenpairs = {}
for agent in myModel.IDU_agents:
partner = random.choice(myModel.IDU_agents)
chosenpairs.update({agent:partner})
time = random.choice(range(self.T))
myModel._sex_transmission(agent, partner, time)
if len(myModel.tmp_Agents) > 0:
for (agent, d) in myModel.tmp_Agents.iteritems():
self.assertTrue(d['HIV'] == 1)
# agent or partner must have been HIV=1 before transmission
HIVstatus = (myModel.Agents[chosenpairs[agent]]['HIV'] == 1
or myModel.Agents[agent]['HIV'] == 1)
self.assertTrue(HIVstatus == 1)
# check if sex behavior matches
Type_agent = myModel.get_agent_charactersitic(agent, 'Sex Type')
Type_partner = myModel.get_agent_charactersitic(partner, 'Sex Type')
Type_agent = self.get_agent_charactersitic(agent,'Sex Type')
Type_partner = self.get_agent_charactersitic(partner,'Sex Type')
if Type_agent == 'HM': self.assertTrue(Type_partner in [ 'HF', 'MSM'])
elif Type_partner == 'HM': self.assertTrue(Type_agent in [ 'HF', 'MSM'])
elif Type_agent == 'MSM': self.assertTrue(Type_partner in ['MSM', 'WSW' , 'HF'])
elif Type_partner == 'MSM': self.assertTrue(Type_agent in ['MSM', 'WSW' , 'HF'])
elif Type_agent == 'WSW': self.assertTrue(Type_partner in ['MSM', 'WSW' , 'HM'])
elif Type_partner == 'WSW': self.assertTrue(Type_agent in ['MSM', 'WSW' , 'HM'])
if __name__=='__main__':
"""unittest"""
unittest.main()