-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
6ee8035
commit 7209145
Showing
30 changed files
with
1,248 additions
and
1,618 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
import random | ||
import socket | ||
|
||
from pynars.NARS import Reasoner | ||
from pynars.NARS.DataStructures.MC.SensorimotorChannel import SensorimotorChannel | ||
from pynars.Narsese import parser | ||
|
||
""" | ||
The sensorimotor channel for the Pong game. | ||
It is placed here because I believe that every channel needs to be designed by the user. | ||
""" | ||
|
||
nars_address = ("127.0.0.1", 54321) | ||
game_address = ("127.0.0.1", 12345) | ||
|
||
|
||
class PongChannel(SensorimotorChannel): | ||
|
||
def __init__(self, ID, num_slot, num_events, num_anticipations, num_operations, num_predictive_implications, | ||
num_reactions, N=1): | ||
super().__init__(ID, num_slot, num_events, num_anticipations, num_operations, num_predictive_implications, | ||
num_reactions, N) | ||
""" | ||
Babbling is designed to be very simple. If the current channel cannot generate an operation based on the | ||
reaction, then there is a certain probability that an operation will be generated through babbling. It is worth | ||
noting that currently a channel can only execute one operation in a cycle. Of course, in the future, this | ||
restriction can be lifted after the mutually exclusive relationship between operations is specified. | ||
Babbling can be performed a limited number of times and cannot be replenished. | ||
""" | ||
self.num_babbling = 200 | ||
self.babbling_chance = 0.5 | ||
|
||
def information_gathering(self): | ||
""" | ||
Receive a string from the game and parse it into a task. | ||
""" | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
sock.bind(nars_address) | ||
data, _ = sock.recvfrom(1024) | ||
status = data.decode() | ||
if status != "GAME FAILED": | ||
try: | ||
return [parser.parse(each) for each in status.split("|")] | ||
except: # for unexpected game input error | ||
print(status) | ||
exit() | ||
else: | ||
return [] | ||
|
||
def babbling(self): | ||
""" | ||
Based on the probability and remaining counts. | ||
""" | ||
if random.random() < self.babbling_chance and self.num_babbling > 0: | ||
self.num_babbling -= 1 | ||
return random.choice(list(self.operations.keys())) | ||
|
||
|
||
def execute_MLeft(): | ||
""" | ||
All channels need to register for its own operations. It is recommended to list them in the channel created. | ||
""" | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
sock.sendto("^left".encode(), game_address) | ||
|
||
|
||
def execute_MRight(): | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
sock.sendto("^right".encode(), game_address) | ||
|
||
|
||
def execute_Hold(): | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
sock.sendto("^hold".encode(), game_address) | ||
|
||
|
||
if __name__ == "__main__": | ||
""" | ||
Open the game first and run this script to see all predictions generated. | ||
"+1, +2" will not be in the Narsese. | ||
""" | ||
r = Reasoner(100, 100) | ||
pc = PongChannel("Pong", 2, 5, 50, 5, 50, 50, 1) | ||
pc.register_operation("^left", execute_MLeft, ["^left", "left"]) | ||
pc.register_operation("^right", execute_MRight, ["^right", "right"]) | ||
pc.register_operation("^hold", execute_Hold, ["^hold", "mid"]) | ||
for _ in range(1000): | ||
pc.channel_cycle(r.memory) | ||
pc.input_buffer.predictive_implications.show(lambda x: x.task.sentence) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
import random | ||
import socket | ||
import sys | ||
from threading import Thread | ||
|
||
import pygame | ||
|
||
# initialize pygame | ||
pygame.init() | ||
|
||
# game canvas | ||
screen_width = 600 | ||
screen_height = 400 | ||
screen = pygame.display.set_mode((screen_width, screen_height)) | ||
|
||
""" | ||
For NARS, the game is run independently. It is very likely that in the game, the ball runs one frame, but NARS has read | ||
this frame dozens of times. Adjust the overall movement speed to solve this problem. | ||
""" | ||
ball_speed_augment = 1 | ||
|
||
# default variables | ||
ball_speed_x = random.choice([1, -1]) * random.randint(2 * ball_speed_augment, 4 * ball_speed_augment) | ||
ball_speed_y = -random.randint(2 * ball_speed_augment, 4 * ball_speed_augment) | ||
paddle_speed = 0 # paddle initial speed, 0 := not moving | ||
paddle_width = 100 | ||
paddle_height = 20 | ||
ball_radius = 10 | ||
ball_pos = [screen_width // 2, screen_height // 2] | ||
paddle_pos = [screen_width // 2 - paddle_width // 2, screen_height - paddle_height] | ||
font = pygame.font.Font(None, 36) | ||
|
||
# game score, for display | ||
survive_time = 0 | ||
|
||
""" | ||
The socket is used to establish communication between NARS and the game, and as expected, NARS will also use the exact | ||
same method to print content to the UI. | ||
""" | ||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | ||
game_address = ("localhost", 12345) | ||
control_address = ("localhost", 54321) | ||
|
||
|
||
def reset_game(): | ||
global ball_pos, ball_speed_x, ball_speed_y, paddle_pos, paddle_speed | ||
ball_pos = [screen_width // 2, screen_height // 2] | ||
ball_speed_x = random.choice([1, -1]) * random.randint(2 * ball_speed_augment, 4 * ball_speed_augment) | ||
ball_speed_y = -random.randint(2 * ball_speed_augment, 4 * ball_speed_augment) | ||
paddle_pos = [screen_width // 2 - paddle_width // 2, screen_height - paddle_height] | ||
paddle_speed = 0 | ||
|
||
|
||
def send_status(message=None): | ||
""" | ||
In this game, information is sent to NARS every frame, describing: 1) the position of the ball relative to the | ||
paddle, and 2) whether the current situation is satisfactory. | ||
Each message is just a string. | ||
Messages are separated by "|". | ||
""" | ||
if message is None: | ||
|
||
if ball_pos[0] < paddle_pos[0]: | ||
msg_1 = "<{left} --> [on]>. %1;0.9%" | ||
# sock.sendto(msg.encode(), control_address) | ||
msg_2 = "<{SELF} --> [good]>. %" + str( | ||
1 - (paddle_pos[0] - ball_pos[0]) / (screen_width - paddle_width)) + ";0.9%" | ||
sock.sendto((msg_1 + "|" + msg_2).encode(), control_address) | ||
elif ball_pos[0] > paddle_pos[0] + paddle_width: | ||
msg_1 = "<{right} --> [on]>. %1;0.9%" | ||
# sock.sendto(msg.encode(), control_address) | ||
msg_2 = "<{SELF} --> [good]>. %" + str( | ||
1 - (ball_pos[0] - (paddle_pos[0] + paddle_width)) / (screen_width - paddle_width)) + ";0.9%" | ||
sock.sendto((msg_1 + "|" + msg_2).encode(), control_address) | ||
else: | ||
msg = "<{SELF} --> [good]>. %1;0.9%" | ||
sock.sendto(msg.encode(), control_address) | ||
|
||
|
||
def game_loop(): | ||
global ball_pos, ball_speed_x, ball_speed_y, paddle_pos, paddle_speed, survive_time, survive_time_curve | ||
|
||
running = True | ||
clock = pygame.time.Clock() | ||
|
||
while running: | ||
|
||
survive_time += 1 | ||
|
||
for event in pygame.event.get(): | ||
if event.type == pygame.QUIT: | ||
running = False | ||
|
||
if not running: | ||
break | ||
|
||
# update ball pos | ||
ball_pos[0] += ball_speed_x | ||
ball_pos[1] += ball_speed_y | ||
|
||
# boundary collision check | ||
if ball_pos[0] <= ball_radius or ball_pos[0] >= screen_width - ball_radius: | ||
ball_speed_x = -ball_speed_x | ||
if ball_pos[1] <= ball_radius: | ||
ball_speed_y = -ball_speed_y | ||
|
||
# paddle collision check | ||
if ball_pos[1] + ball_radius >= paddle_pos[1] and paddle_pos[0] < ball_pos[0] < paddle_pos[0] + paddle_width: | ||
ball_speed_y = -ball_speed_y | ||
|
||
# game failed | ||
if ball_pos[1] >= screen_height: | ||
send_status("GAME FAILED") | ||
survive_time = 0 | ||
reset_game() # restart | ||
|
||
# update paddle | ||
paddle_pos[0] += paddle_speed | ||
|
||
if paddle_pos[0] < 0: | ||
paddle_pos[0] = 0 | ||
if paddle_pos[0] > screen_width - paddle_width: | ||
paddle_pos[0] = screen_width - paddle_width | ||
|
||
screen.fill((0, 0, 0)) | ||
pygame.draw.rect(screen, (255, 255, 255), | ||
pygame.Rect(paddle_pos[0], paddle_pos[1], paddle_width, paddle_height)) | ||
pygame.draw.circle(screen, (255, 255, 255), ball_pos, ball_radius) | ||
# show the survived time | ||
score_text = font.render(f"Score: {survive_time}", True, (255, 255, 255)) | ||
screen.blit(score_text, (10, 10)) # in the top left corner | ||
pygame.display.flip() | ||
|
||
send_status() | ||
clock.tick(60) | ||
|
||
pygame.quit() | ||
sys.exit() | ||
|
||
|
||
def receive_commands(): | ||
global paddle_speed | ||
sock.bind(game_address) | ||
|
||
while True: | ||
data, _ = sock.recvfrom(1024) | ||
command = data.decode() | ||
print(f"command received: {command}") | ||
|
||
if command == "^left": | ||
paddle_speed = -5 * ball_speed_augment | ||
elif command == "^right": | ||
paddle_speed = 5 * ball_speed_augment | ||
elif command == "^stop": | ||
paddle_speed = 0 | ||
|
||
|
||
if __name__ == "__main__": | ||
t = Thread(target=receive_commands) | ||
t.daemon = True | ||
t.start() | ||
|
||
game_loop() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
from pynars.NARS.DataStructures.MC import Utils | ||
from pynars.NARS.DataStructures.MC.Utils import BufferTask, PriorityQueue | ||
|
||
|
||
class Buffer: | ||
|
||
def __init__(self, size, N=1, D=0.9): | ||
""" | ||
Buffer, which is not event buffer, currently only used in the Narsese channel. | ||
pq: a queue of buffer_tasks (in Utils.py) | ||
size: the length of the buffer | ||
N: pop top-N tasks in the pq | ||
D: decay rate for the remaining tasks in the buffer | ||
""" | ||
self.pq = PriorityQueue(size) | ||
self.N = N | ||
self.D = D | ||
|
||
def select(self): | ||
""" | ||
Select the top-N BufferTasks from the buffer, decay the remaining. | ||
""" | ||
ret = [] | ||
push_back = [] | ||
for i in range(len(self.pq)): | ||
if i <= self.N: | ||
buffer_task, _ = self.pq.pop() | ||
ret.append(buffer_task) | ||
else: | ||
buffer_task, _ = self.pq.pop() | ||
buffer_task.expiration_effect *= self.D | ||
push_back.append(buffer_task) | ||
for each in push_back: | ||
self.pq.push(each, each.priority) | ||
return [each.task for each in ret] | ||
|
||
def add(self, tasks, memory): | ||
""" | ||
Convert input tasks (Task) into BufferTasks. | ||
""" | ||
for each in tasks: | ||
buffer_task = BufferTask(each) | ||
buffer_task.preprocess_effect = Utils.preprocessing(each, memory) | ||
self.pq.push(buffer_task, buffer_task.priority) |
This file was deleted.
Oops, something went wrong.
Oops, something went wrong.