forked from phreeza/cells
-
Notifications
You must be signed in to change notification settings - Fork 1
/
cells_helpers.pyx
183 lines (165 loc) · 6.35 KB
/
cells_helpers.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
# cython: language_level=3, binding=True
cimport numpy
from agent import Agent
from constants import BODY_ENERGY, ENERGY_CAP, MOVE_COST, SPAWN_TOTAL_ENERGY, SUSTAIN_COST
import random
from worldview import WorldView
cdef enum:
ACT_SPAWN = 0
ACT_MOVE = 1
ACT_EAT = 2
ACT_RELEASE = 3
ACT_ATTACK = 4
ACT_LIFT = 5
ACT_DROP = 6
cpdef get_small_view_fast(self, int x, int y):
cdef numpy.ndarray[object, ndim=2] values = self.values
cdef int width = self.width
cdef int height = self.height
cdef int dr
cdef int dc
cdef int adj_x
cdef int adj_y
assert self.values.dtype == object
ret = []
get = self.get
for dr in range(-1,2):
for dc in range(-1,2):
if not dr and not dc:
continue
adj_x = x + dr
if not 0 <= adj_x < width:
continue
adj_y = y + dc
if not 0 <= adj_y < height:
continue
a = values[adj_x, adj_y]
if a is not None:
ret.append(a.get_view())
return ret
def sign(int x):
if x < 0:
return -1
elif x > 0:
return 1
else:
return 0
cpdef get_next_move(int old_x, int old_y, int x, int y):
return (old_x + sign(x - old_x), old_y + sign(y - old_y))
cpdef collect_agent_actions(self):
cdef int x
cdef int y
agent_population = self.agent_population
agent_map = self.agent_map
plant_map = self.plant_map
energy_map = self.energy_map
terr_map = self.terr
view = WorldView(None, None, None, terr_map, energy_map)
messages = self.messages
actions = []
for a in agent_population:
x = a.x
y = a.y
view.me = a
view.agent_views = get_small_view_fast(agent_map, x, y)
view.plant_views = get_small_view_fast(plant_map, x, y)
actions.append((a, a.act(view, messages[a.team])))
return actions
def run_agents_core(self):
actions = collect_agent_actions(self)
actions_dict = dict(actions)
random.shuffle(actions)
self.agent_map.lock()
# Apply the action for each agent - in doing so agent uses up 1 energy unit.
for (agent, action) in actions:
#This is the cost of mere survival
agent.energy -= SUSTAIN_COST
if action.type == ACT_MOVE: # Changes position of agent.
act_x, act_y = action.get_data()
(new_x, new_y) = get_next_move(agent.x, agent.y,
act_x, act_y)
# Move to the new position if it is in range and it's not
#currently occupied by another agent.
if (self.agent_map.in_range(new_x, new_y) and
not self.agent_map.get(new_x, new_y)):
self.move_agent(agent, new_x, new_y)
agent.energy -= MOVE_COST
elif action.type == ACT_SPAWN: # Creates new agents and uses additional 50 energy units.
act_x, act_y = action.get_data()[:2]
(new_x, new_y) = get_next_move(agent.x, agent.y,
act_x, act_y)
if (self.agent_map.in_range(new_x, new_y) and
not self.agent_map.get(new_x, new_y) and
agent.energy >= SPAWN_TOTAL_ENERGY):
agent.energy -= SPAWN_TOTAL_ENERGY
agent.energy //= 2
a = Agent(new_x, new_y, agent.energy, agent.get_team(),
self.minds[agent.get_team()],
action.get_data()[2:])
self.add_agent(a)
elif action.type == ACT_EAT:
#Eat only as much as possible.
intake = min(self.energy_map.get(agent.x, agent.y),
ENERGY_CAP - agent.energy)
agent.energy += intake
self.energy_map.change(agent.x, agent.y, -intake)
elif action.type == ACT_RELEASE:
#Dump some energy onto an adjacent field
#No Seppuku
output = action.get_data()[2]
output = min(agent.energy - 1, output)
act_x, act_y = action.get_data()[:2]
#Use get_next_move to simplyfy things if you know
#where the energy is supposed to end up.
(out_x, out_y) = get_next_move(agent.x, agent.y,
act_x, act_y)
if (self.agent_map.in_range(out_x, out_y) and
agent.energy >= 1):
agent.energy -= output
self.energy_map.change(out_x, out_y, output)
elif action.type == ACT_ATTACK:
#Make sure agent is attacking an adjacent field.
act_x, act_y = act_data = action.get_data()
next_pos = get_next_move(agent.x, agent.y, act_x, act_y)
new_x, new_y = next_pos
victim = self.agent_map.get(act_x, act_y)
terr_delta = (self.terr.get(agent.x, agent.y)
- self.terr.get(act_x, act_y))
if (victim is not None and victim.alive and
next_pos == act_data):
#If both agents attack each other, both loose double energy
#Think twice before attacking
try:
contested = (actions_dict[victim].type == ACT_ATTACK)
except:
contested = False
agent.attack(victim, terr_delta, contested)
if contested:
victim.attack(agent, -terr_delta, True)
elif action.type == ACT_LIFT:
if not agent.loaded and self.terr.get(agent.x, agent.y) > 0:
agent.loaded = True
self.terr.change(agent.x, agent.y, -1)
elif action.type == ACT_DROP:
if agent.loaded:
agent.loaded = False
self.terr.change(agent.x, agent.y, 1)
# Kill all agents with negative energy.
team = [0 for n in self.minds]
for agent in self.agent_population:
if agent.energy < 0 and agent.alive:
self.energy_map.change(agent.x, agent.y, BODY_ENERGY)
self.del_agent(agent)
else:
team[agent.team] += 1
# Team wins (and game ends) if opposition team has 0 agents remaining.
# Draw if time exceeds time limit.
winner = 0
alive = 0
for t in team:
if t != 0:
alive += 1
else:
if alive == 0:
winner += 1
return (alive, winner)