-
Notifications
You must be signed in to change notification settings - Fork 0
/
lca.py
executable file
·233 lines (214 loc) · 11.8 KB
/
lca.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/env python3
import turtle
import random # for choice & randint
import time # for sleep
import math # for sqrt
import argparse
import numpy as np
class Agent:
def __init__(self, x=None, y=None, node_degree=5, half_height=100, half_width=100, weights=None, args=None,
radius=20.0):
if args is None:
args = dict()
x = x if x else random.randint(-half_width, half_width)
y = y if y else random.randint(-half_height, half_height)
self.turt = turtle.Turtle()
self.radius = radius
self.node_degree = node_degree
self.group_id = 0 # for later clumping into different step timings
self.duplex = None # 4 states: does it get influenced or is an influencer
self.original_pose = (x, y)
self.turt.penup()
self.turt.goto(x, y)
self.turt.shape("circle")
self.turt.shapesize(outline=3)
self.turt.color("black", random.choices(args['colors'], weights=weights, k=1)[0])
self.dark = False
self.dark_steps_remaining = 0
def within_range(self, agents, rad=None, for_overwhelm=False):
agents_in_range = []
r = self.radius if rad is None else rad
for o_agent in agents:
dist = math.sqrt(abs(self.turt.xcor() - o_agent.turt.xcor()) ** 2 +
abs(self.turt.ycor() - o_agent.turt.ycor()) ** 2)
if dist <= r:
agents_in_range.append((dist, o_agent))
agents_in_range.sort(key=lambda bob: bob[0])
if for_overwhelm:
return [agent_in_range[1] for agent_in_range in agents_in_range]
return [agent_in_range[1] for agent_in_range in agents_in_range[:(self.node_degree + 1)]]
def consensus(agents, colors, agent=None, count_dark=False):
agent_colors_count = [[oagent.turt.color()[1] for oagent in agents if count_dark or not oagent.dark].count(color) for color in
colors]
max_colors_count = max(agent_colors_count)
max_colors = [colors[index] for index in range(len(colors)) if agent_colors_count[index] == max_colors_count]
if agent:
# if the turtle's own color is in the max then use that
# this will also cover the case where there is a tie
if agent.turt.color()[1] in max_colors:
return agent.turt.color()[1]
else: # there is a tie of colors that are not the turtle's own color so choose a random one
return random.choices(max_colors)
if len(max_colors) == 1:
return max_colors[0]
return "tie: " + str(max_colors)
def consensus_reached(agents):
return not any([not (agents[0].turt.color()[1] == agent.turt.color()[1]) for agent in agents])
def main(args):
# @NOTE we are always opening the window right now
# print(args)
# args for time(speed), radius, colors, color balance, walk angles/type,rng seed
# Initialize turtle environment
screen = turtle.Screen()
screen.title("LCA")
screen.tracer(False)
half_width = 300
# half_width = int(screen.window_width() / 2)
# print(half_width)
half_height = 300
# half_height = int(screen.window_height() / 2)
# print(half_height)
# Config values
random.seed(args['seed'])
np.random.seed(args['seed'])
radius = args['radius']
colors = args['colors']
degree = args['node_degree']
print("degree", degree)
weights = [float(weight) for weight in args['weights']]
# Initialize all of the agents
agents = [Agent(half_width=half_width, half_height=half_height, node_degree=degree, weights=weights, args=args,
radius=radius) for _ in range(args['agents'])]
if args['vis']:
screen.update()
print("Seed:", args['seed'])
print("Starting consensus: ", consensus(agents, colors, count_dark=True))
print(colors)
if args['output']:
turtle.getscreen().getcanvas().postscript(file=args['output'] + "_start.eps")
dark = args['dark']
min_dark_steps = args['min_dark_steps']
max_dark_steps = args['max_dark_steps']
teleport = args['teleport']
teleport_forward = args['teleport_forward']
min_teleport_distance = args['min_teleport_distance']
max_teleport_distance = args['max_teleport_distance']
overwhelm = args['overwhelm']
overwhelm_radius_scaler = args['overwhelm_radius_scaler']
random_trigger = args['random_trigger']
scaled_radius = overwhelm_radius_scaler * radius # for overwhelm behavior
if teleport_forward and not teleport:
teleport = True
# Main loop
loop_times = 0
while not consensus_reached(agents):
if loop_times % 100 == 0:
print([[agent.turt.color()[1] for agent in agents].count(color) for color in colors])
if loop_times > 1500:
print("terminated early")
print("After", str(loop_times), "iterations")
exit()
# movement
for agent in agents:
if abs(agent.turt.ycor()) > half_height or abs(agent.turt.xcor()) > half_width:
# if at edge bounce, @NOTE still can get stuck
my_x = agent.turt.xcor()
my_y = agent.turt.ycor()
if abs(agent.turt.ycor()) > half_height:
if agent.turt.ycor() <= 0:
my_y = -half_height + 1
else:
my_y = half_height - 1
if abs(agent.turt.xcor()) > half_width:
if agent.turt.xcor() <= 0:
my_x = -half_width + 1
else:
my_x = half_width - 1
# agent.turt.back(args['max_walk_distance']+1) # this should reduce getting stuck at the edge
# 180 would perfect bounce but at the corner would be bad could get stuck oscillating
agent.turt.right(random.randint(160, 200))
agent.turt.goto(my_x, my_y)
# correlated random walk
agent.turt.right(random.randint(args['min_walk_angle'], args['max_walk_angle']))
agent.turt.forward(random.randint(args['min_walk_distance'], args['max_walk_distance']))
teleported = False
# trigger either if it is overwhelmed or got the random hit
if (overwhelm != 0 and (len(agent.within_range(agents, rad=scaled_radius,for_overwhelm=True)) >= overwhelm)) \
or (random_trigger != 0 and random.random() < (random_trigger / 100.0)):
if dark:
agent.dark = True
agent.dark_steps_remaining = random.randint(min_dark_steps, max_dark_steps)
if teleport:
teleported = True
d = np.random.normal(loc=(max_teleport_distance + min_teleport_distance) / 2, scale=(max_teleport_distance - min_teleport_distance) / 6)
if teleport_forward:
agent.turt.forward(d)
else: # just teleport Gaussian from current point
theta = np.random.uniform(-np.pi, np.pi)
agent.turt.setheading(theta*(180/np.pi)) # convert from r->degree
agent.turt.goto(agent.turt.xcor()+(d*np.cos(theta)), agent.turt.ycor()+(d*np.sin(theta)))
if agent.dark:
if agent.dark_steps_remaining < 0:
agent.dark = False
agent.turt.color("black", agent.turt.color()[1])
agent.dark_steps_remaining -= 1
else:
# update agent color value with black outline
new_consensus = consensus(agent.within_range(agents), colors, agent=agent)
if agent.turt.color()[1] != new_consensus:
agent.turt.color("black", new_consensus)
if args['bounce']:
agent.turt.right(180) # if we changed state then bounce, could use random.randint(160, 200)
if not teleported:
# correlated random walk
agent.turt.right(random.randint(args['min_walk_angle'], args['max_walk_angle']))
agent.turt.forward(random.randint(args['min_walk_distance'], args['max_walk_distance']))
if args['vis']:
screen.update()
loop_times = loop_times + 1
# Print out results from experiment
turtle.write("Consensus of " + agents[0].turt.color()[1] + " reached\nAfter " + str(loop_times) + " iterations",
move=True, align="center")
if args['output']:
turtle.getscreen().getcanvas().postscript(file=args['output'] + "_end.eps")
print("Consensus of", agents[0].turt.color()[1], "reached")
print("After", str(loop_times), "iterations")
""" running in repl not in command line
import lca
lca.main({'agents': 256, 'seed': 10, 'node_degree': 2, 'radius': 20, 'vis': True, 'bounce': False,
'colors': ["white", "black"], 'weights': [0.6, 0.4],
'min_walk_angle': -45, 'max_walk_angle': 45, 'min_walk_distance': 5, 'max_walk_distance': 10, 'output': None})
"""
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--agents', type=int, default=256, help='number of agents')
parser.add_argument('--seed', type=int, default=random.randint(0, 9000), help='rng seed')
parser.add_argument('--node_degree', type=int, default=2, help='node degree')
parser.add_argument('--radius', type=float, default=20.0, help='radius of local network')
parser.add_argument('--sleep', type=float, default=0.0, help='Step Sleep in seconds')
parser.add_argument('--vis', action='store_true', default=False, help='Enable visualization') # true default
parser.add_argument('--bounce', action='store_true', help='Bounce on state change') # False default
parser.add_argument('-c', '--colors', nargs='+', default=["white", "black"], help='list of colors')
parser.add_argument('-w', '--weights', nargs='+', default=[0.6, 0.4], help='list of weights of said colors')
parser.add_argument('--min_walk_angle', type=int, default=-45, help='walk step min angle turned')
parser.add_argument('--max_walk_angle', type=int, default=45, help='walk step max angle turned')
parser.add_argument('--min_walk_distance', type=float, default=5, help='min walk step distance forward')
parser.add_argument('--max_walk_distance', type=float, default=10, help='max walk step distance forward')
parser.add_argument('--output', type=str, default=None, help='output name postfix')
parser.add_argument('--dark', action='store_true',
help='Behavior: Agents Will Go Dark(ie. stop communicating)') # False default
parser.add_argument('--min_dark_steps', type=int, default=4, help='min number of steps will walk while dark')
parser.add_argument('--max_dark_steps', type=int, default=10, help='max number of steps will walk while dark')
parser.add_argument('--teleport', action='store_true',
help='Behavior: Agents Will Teleport to a Random Location') # False default
parser.add_argument('--teleport_forward', action='store_true',
help='Behavior: Agents Will Teleport Forward') # False default
parser.add_argument('--min_teleport_distance', type=float, default=20, help='min teleport distance')
parser.add_argument('--max_teleport_distance', type=float, default=40, help='max teleport distance')
parser.add_argument('--overwhelm', type=int, default=0,
help='Trigger: number of agents within the radius that will cause a behavior "scaredy cat"')
parser.add_argument('--overwhelm_radius_scaler', type=float, default=1,
help='Trigger: scaler to the local network radius for the behavior trigger')
parser.add_argument('--random_trigger', type=float, default=0,
help='Trigger: percent chance of an agent randomly executing a behavior')
main(vars(parser.parse_args()))