diff --git a/Tests/Test_Buffer.py b/Tests/Test_Buffer.py new file mode 100644 index 00000000..50d27889 --- /dev/null +++ b/Tests/Test_Buffer.py @@ -0,0 +1,133 @@ +import unittest + +from pynars import Narsese +from pynars.Config import Config +from pynars.NARS.DataStructures import EventBuffer +from pynars.Narsese import Judgement, Term, Task, Stamp, Base, Statement + + +class TEST_Buffer(unittest.TestCase): + + def test(self): + pass # todo add regular Buffer tests if needed + +class TEST_EventBuffer(unittest.TestCase): + + def test_3_firstorder_event_temporal_chaining(self): + """ + Add 3 first order events to the buffer (A,B,C), each with different timestamps (A=1, B=2, C=3) + + Ensure that the compound events are all created: + (A &/ B), (B &/ C), (A &/ C) + + Ensure that the implication statement is created: + ((A &/ B) =/> C) + """ + event_buffer: EventBuffer = EventBuffer(capacity=3) + + event_A_task: Task = Narsese.parser.parse("A2>.") + event_A_time = 0 + event_A_task.stamp.t_occurrence = event_A_time + + event_B_task: Task = Narsese.parser.parse("B2>.") + event_B_time = event_A_time + (Config.temporal_duration + 1) + event_B_task.stamp.t_occurrence = event_B_time + + event_C_task: Task = Narsese.parser.parse("C2>.") + event_C_time = event_B_time + (Config.temporal_duration + 1) + event_C_task.stamp.t_occurrence = event_C_time + + event_buffer.put(event_A_task) + event_buffer.put(event_B_task) + event_buffer.put(event_C_task) + results = event_buffer.generate_temporal_sentences() + + + A_and_B: Task = Narsese.parser.parse("(&/, A2>,+" + str(event_B_time - event_A_time) + ",B2>).") + + B_and_C: Task = Narsese.parser.parse("(&/, B2>,+" + str(event_C_time - event_B_time) + ",C2>).") + + A_and_C: Task = Narsese.parser.parse("(&/, A2>,+" + str(event_C_time - event_A_time) + ",C2>).") + + A_and_B_imply_C = Narsese.parser.parse("<(&/, A2>,+" + str(event_B_time - event_A_time) + ",B2>,+" + str( + event_C_time - event_B_time) + ") =/> C2>>.") + + expected_results = [A_and_B.term, + B_and_C.term, + A_and_C.term, + A_and_B_imply_C.term] + + for result in results: + self.assertTrue(result.term in expected_results,msg=str(result.term) + " was not found in results.") + expected_results.remove(result.term) + + + def test_buffer_overflow_maintains_capacity(self): + """ + Test to ensure the number of items in the buffer doesnt + exceed its upper bound + """ + capacity = 5 + event_buffer: EventBuffer = EventBuffer(capacity=capacity) + + # ensure the buffer adds events regularly + for i in range(capacity): + self.assertEqual(i, len(event_buffer)) + event_task = Narsese.parser.parse("A2>.") + event_task.stamp.t_occurrence = 1 + event_buffer.put(event_task) + + # ensure the buffer is at max capcity + self.assertEqual(capacity, len(event_buffer)) + + # ensure the buffer does not exceed its capacity when overflowing + event_task = Narsese.parser.parse("A2>.") + event_task.stamp.t_occurrence = 1 + event_buffer.put(event_task) + self.assertEqual(capacity, len(event_buffer)) + + def test_buffer_overflow_discards_older_events(self): + """ + Test to ensure that new events are added to the buffer, + whereas old events are purged from the buffer. + """ + capacity = 5 + event_buffer: EventBuffer = EventBuffer(capacity=capacity) + + for i in range(capacity): + event_task = Narsese.parser.parse("B" + str(i) + ">.") + event_task.stamp.t_occurrence = i + 1 + event_buffer.put(event_task) + + # ensure getting older/newer events functions properly + self.assertTrue(event_buffer.get_newest_event().stamp.t_occurrence > event_buffer.get_oldest_event().stamp.t_occurrence) + + # add an event older than all the others, ensure it doesnt get into the buffer + old_event_task = Narsese.parser.parse("oldB>.") + old_event_task.stamp.t_occurrence = 0 + event_buffer.put(old_event_task) + self.assertTrue(event_buffer.get_oldest_event().term != old_event_task.term) + + # add an event newer than all the others, ensure it goes to the front of the buffer + new_event_task = Narsese.parser.parse("newB>.") + new_event_task.stamp.t_occurrence = capacity + event_buffer.put(new_event_task) + self.assertTrue(event_buffer.get_newest_event().term == new_event_task.term) + +if __name__ == '__main__': + + test_classes_to_run = [ + TEST_EventBuffer + ] + + loader = unittest.TestLoader() + + suites = [] + for test_class in test_classes_to_run: + suite = loader.loadTestsFromTestCase(test_class) + suites.append(suite) + + suites = unittest.TestSuite(suites) + + runner = unittest.TextTestRunner() + results = runner.run(suites) \ No newline at end of file diff --git a/pynars/NARS/Control/Reasoner.py b/pynars/NARS/Control/Reasoner.py index fb6d1028..a64f500c 100644 --- a/pynars/NARS/Control/Reasoner.py +++ b/pynars/NARS/Control/Reasoner.py @@ -9,7 +9,7 @@ from pynars.Narsese._py.Budget import Budget from pynars.Narsese._py.Statement import Statement from pynars.Narsese._py.Task import Belief -from ..DataStructures import Bag, Memory, NarseseChannel, Buffer, Task, Concept +from ..DataStructures import Bag, Memory, NarseseChannel, Buffer, Task, Concept, EventBuffer from ..InferenceEngine import GeneralEngine, TemporalEngine, VariableEngine from pynars import Config from pynars.Config import Enable @@ -38,6 +38,7 @@ def __init__(self, n_memory, capacity, config='./config.json', nal_rules={1, 2, self.memory = Memory(n_memory, global_eval=self.global_eval) self.overall_experience = Buffer(capacity) self.internal_experience = Buffer(capacity) + self.event_buffer = EventBuffer(3) self.narsese_channel = NarseseChannel(capacity) self.perception_channel = Channel(capacity) self.channels: List[Channel] = [ @@ -131,11 +132,17 @@ def observe(self, tasks_derived: List[Task]): Process Channels/Buffers """ judgement_revised, goal_revised, answers_question, answers_quest = None, None, None, None - # step 1. Take out an Item from `Channels`, and then put it into the `Overall Experience` + # step 1. Take out an Item from `Channels`, and then put it into the `Overall Experience` and Event Buffers for channel in self.channels: task_in: Task = channel.take() if task_in is not None: self.overall_experience.put(task_in) + if self.event_buffer.can_task_enter(task_in): + self.event_buffer.put(task_in) + # when there's a new event, run the temporal chaining + temporal_results = self.event_buffer.generate_temporal_sentences() + for result in temporal_results: + self.overall_experience.put(result) # step 2. Take out an Item from the `Internal Experience`, with putting it back afterwards, and then put it # into the `Overall Experience` diff --git a/pynars/NARS/DataStructures/_py/Buffer.py b/pynars/NARS/DataStructures/_py/Buffer.py index fd657e7c..b224ac88 100644 --- a/pynars/NARS/DataStructures/_py/Buffer.py +++ b/pynars/NARS/DataStructures/_py/Buffer.py @@ -1,8 +1,11 @@ +from pynars.NAL.Functions import Truth_intersection, Stamp_merge +from pynars.NAL.Inference.TemporalRules import induction_composition, induction_implication from .Bag import Bag from pynars.Config import Config -from pynars.Narsese import Item, Task +from pynars.Narsese import Item, Task, TermType, Compound, Interval, Statement from pynars.NAL.Functions.BudgetFunctions import * -from typing import Callable, Any +from typing import Callable, Any, List + class Buffer(Bag): ''' @@ -40,3 +43,79 @@ def __init__(self, capacity: int, n_buckets: int=None, take_in_order: bool=False def is_expired(self, put_time, current_time): return (current_time - put_time) > self.max_duration + + +class EventBuffer: + ''' + This buffer holds first-order events, sorted by time. + The purpose of this buffer is to generate temporal implication statements, e.g., (A &/ B =/> C) + and compound events, e.g., (A &/ B). + + The operation for generating temporal statements is exhaustive. That means, for generating 3-component + implication statements like (A &/ B =/> C), the algorithm scales O(n^3) for n elements + + The oldest events are at the lowest index, the newest events are at the highest index. + The larger the event's timestamp, the newer it is. + ''' + def __init__(self, capacity: int): + self.buffer: List[Task] = [] + self.capacity: int = capacity + + def __len__(self): + return len(self.buffer) + + def get_oldest_event(self): + return self.buffer[0] + + def get_newest_event(self): + return self.buffer[-1] + + def generate_temporal_sentences(self): + results: List[Task] = [] + # first event A occurred, then event B occurred, then event C + for i in range(len(self.buffer)): + event_A_task = self.buffer[i] + for j in range(i+1,len(self.buffer)): + # create (A &/ B) + event_B_task = self.buffer[j] + compound_event_task = induction_composition(event_A_task, event_B_task) + results.append(compound_event_task) # append + for k in range(j + 1, len(self.buffer)): + # create (A &/ B) =/> C + event_C = self.buffer[k] + temporal_implication_task = induction_implication(compound_event_task, event_C) + results.append(temporal_implication_task) # append + + return results + + def put(self, event_task_to_insert: Task): + if not self.can_task_enter(event_task_to_insert): + raise Exception("ERROR! Only events with first-order statements can enter the EventBuffer.") + + if len(self.buffer) == 0: # if nothing in the buffer, just insert it + self.buffer.append(event_task_to_insert) + return + + newest_event = self.get_newest_event() + + if event_task_to_insert.stamp.t_occurrence >= newest_event.stamp.t_occurrence: + # if its newer than even the newest event, just insert it at the end + self.buffer.append(event_task_to_insert) + else: + # otherwise, we have to go through the list to insert it properly + for i in range(len(self.buffer)): + buffer_event = self.buffer[i] + if event_task_to_insert.stamp.t_occurrence <= buffer_event.stamp.t_occurrence: + # the inserted event occurs first, so insert it here + self.buffer.insert(i, event_task_to_insert) + break + + + if len(self.buffer) > self.capacity: + # if too many events, take out the oldest event + self.buffer.pop(0) + + def can_task_enter(self, task: Task): + return task.is_event \ + and task.term.type == TermType.STATEMENT \ + and not task.term.is_higher_order \ No newline at end of file