Skip to content

Commit

Permalink
chapters/compute: Add checker for lab 8
Browse files Browse the repository at this point in the history
Add checkers for laboratory 8 tasks.

Signed-off-by: NickZaharia308 <[email protected]>
  • Loading branch information
NickZaharia308 committed Nov 24, 2024
1 parent dc21b9b commit 3051616
Show file tree
Hide file tree
Showing 94 changed files with 3,338 additions and 516 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/support
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
PYTHON = python3
SCRIPT = generate_skels.py

skels:
mkdir -p support/src
$(PYTHON) $(SCRIPT) --input ./solution/src --output ./support/src

clean:
rm -rf support/
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# `apache2` Simulator - Condition

## Conditions

Another way we can implement our `apache2` simulator is to use a condition variable.
This one is probably the most intuitive synchronization primitive.
It's a means by which a thread can tell another one: "Hey, wake up, _this_ happened!".
So it's a way for threads to notify each other.
For this reason, the main methods associated with conditions are `notify()` and `wait()`.
As you might expect, they are complementary:

- `wait()` puts the thread in the WAITING state until it's woken up by another one
- `notify()` wakes up one or more `wait()`-ing threads.
If `notify()` is called before any thread has called `wait()`, the first thread that calls it will continue its execution unhindered.

Let's talk about a classic problem in synchronization and parallel computing: **The producer-consumer problem**
The problem states that one or more producers generate data and places it in a shared buffer (a queue for example), while one or more consumers take data from the buffer to further process it.
More about it in this [video](https://www.youtube.com/watch?v=Qx3P2wazwI0).
There are a few rules though, such as:

- The producer must not insert data when the buffer is full.
- The consumer must not retrieve data if the buffer is empty.
- The producer and the consumer can't access the shared buffer at the same time.

Now enter `chapters/compute/synchronization/drills/tasks/apache2-simulator-condition/` and run `make skels`.
Look at the code in `chapters/compute/synchronization/drills/tasks/apache2-simulator/support/src/producer_consumer.py`.
We have one producer and one consumer for simplicity.
Observe that the producer calls `notify()` once there is data available, and the consumer calls `notify()`, when data is read.
Notice that this call is preceded by an `acquire()` call, and succeeded by a `release()` call.

`acquire()` and `release()` are commonly associated with mutexes or semaphores.
What do they have to do with condition variables?

Well, a lock `Condition` variable also stores an inner lock (mutex).
It is this lock that we `acquire()` and `release()`.
In fact, the [documentation](https://docs.python.org/3/library/threading.html#condition-objects) states we should only call `Condition` methods with its inner lock taken.

Why is this necessary?
We don't want both the consumer and the producer to modify a buffer at the same time, this could lead to a race condition, especially if we have more producers and more consumers.

So now we know we cannot only use a mutex.
The mutex is used to access and modify the `queue` atomically.
Now, you might be thinking that this code causes a deadlock:

```Python
condition.acquire()
with condition:
...
condition.wait()
```

The thread gets the lock and then, if there is no data, it switches its state to WAITING.
A classic deadlock, right?
No.
`wait()` also releases the inner lock of the `Condition` and being woken up reacquires it.
Neat!

So now we have both synchronization **and** signalling.
This is what conditions are for, ultimately.

Open `chapters/compute/synchronization/drills/tasks/apache2-simulator/support/src/apache2_simulator_condition.py` and follow the TODOs.
The code is similar to `apache2_simulator_semaphore.py`, but this time we use condition variables as shown in `producer_consumer.py`.

[Quiz](../../../drills/questions/notify-only-with-mutex.md)
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/python3 -u
# SPDX-License-Identifier: BSD-3-Clause

import sys
import argparse
import os.path
import re


def process_file(src, dst, pattern, replace, remove, replace_pairs, end_string=None):
if not pattern or not replace or not remove:
print(
f"ERROR: The script behaviour is not properly specified for {src}",
file=sys.stderr,
)
sys.exit(1)

fin = open(src, "r")
fout = open(dst, "w")
remove_lines = 0
skip_lines = 0
uncomment_lines = 0
end_found = True

for l in fin.readlines():
# Skip generation of file.
if "SKIP_GENERATE" in l:
fout.close()
os.remove(dst)
return

if end_string and end_found == False:
fout.write(l)
if end_string in l:
end_found = True
continue

if remove_lines > 0:
remove_lines -= 1
continue

if skip_lines > 0:
skip_lines -= 1
m = re.search(pattern, l)
if m:
l = "%s%s\n" % (m.group(1), m.group(3))
fout.write(l)
continue

if uncomment_lines > 0:
uncomment_lines -= 1
for fro, to in replace_pairs:
l = re.sub(fro, to, l)
fout.write(l)
continue

m = re.search(pattern, l)
if m:
if m.group(2):
skip_lines = int(m.group(2))
else:
skip_lines = 1

if end_string and end_string not in l:
end_found = False

l = "%s%s\n" % (m.group(1), m.group(3))

m = re.search(replace, l)
if m:
if m.group(2):
uncomment_lines = int(m.group(2))
else:
uncomment_lines = 1
continue

m = re.search(remove, l)
if m:
if m.group(2):
remove_lines = int(m.group(2))
else:
remove_lines = 1
continue

fout.write(l)

fout.close()


def main():
parser = argparse.ArgumentParser(
description="Generate skeletons sources from reference solution sources"
)
parser.add_argument(
"--input", help="input directory to process files", required=True
)
parser.add_argument(
"--output", help="output directory to copy processed files", required=True
)
args = parser.parse_args()

for root, dirs, files in os.walk(args.input):
new_root = os.path.join(args.output, os.path.relpath(root, args.input))
for d in dirs:
os.makedirs(os.path.join(new_root, d), exist_ok=True)

for src in files:
if (
re.match("Makefile.*$", src)
or re.match(r".*\.sh$", src)
or re.match(r".*\.[sS]$", src)
or re.match(r".*\.py$", src)
):
pattern = r"(^\s*#\s*TODO)( [0-9]*)(:.*)"
replace = r"(^\s*#\s*REPLACE)( [0-9]*)"
remove = r"(^\s*#\s*REMOVE)( [0-9]*)"
replace_pairs = [("# ", "")]
end_string = None
elif re.match(r".*\.asm$", src):
pattern = r"(^\s*;\s*TODO)( [0-9]*)(:.*)"
replace = r"(^\s*;\s*REPLACE)( [0-9]*)"
remove = r"(^\s*;\s*REMOVE)( [0-9]*)"
replace_pairs = [("; ", "")]
end_string = None
elif (
re.match(r".*\.[ch]$", src)
or re.match(r".*\.cpp$", src)
or re.match(r".*\.hpp$", src)
):
pattern = r"(.*/\*\s*TODO)([ 0-9]*)(:.*)"
replace = r"(.*/\*\s*REPLACE)( [0-9]*)"
remove = r"(.*/\*\s*REMOVE)( [0-9]*)"
replace_pairs = [(r"/\* ", ""), (r" \*/", "")]
end_string = "*/"
elif re.match(r".*\.d$", src):
pattern = r"(.*//\s*TODO)([ 0-9]*)(:.*)"
replace = r"(.*//\s*REPLACE)( [0-9]*)"
remove = r"(.*//\s*REMOVE)( [0-9]*)"
replace_pairs = [(r"// ", "")]
end_string = None
else:
continue

dst = os.path.join(new_root, src)
src = os.path.join(root, src)
print(dst)
process_file(src, dst, pattern, replace, remove, replace_pairs, end_string)


if __name__ == "__main__":
sys.exit(main())
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#!/usr/bin/python3 -u
# SPDX-License-Identifier: BSD-3-Clause

from random import randint
Expand All @@ -13,15 +14,20 @@ def worker(event, id):
msg = ""

while True:
# TODO 1: Acquire the lock before accessing the shared resource (messages)
event.acquire()
while len(messages) == 0:
# TODO 1: Wait for a message to be available if the list is empty
event.wait()

print(f"Worker {id} started handling message...")
sleep(randint(1, 5))

# TODO 2: Safely retrieve the message while holding the lock (use "msg")
msg = messages[0]
messages.pop(0)

# TODO 1: Release the lock after modifying the shared resource
event.release()

print(f"Worker {id} handling message: {msg}")
Expand All @@ -33,7 +39,7 @@ def worker(event, id):
def main():
event = Condition()

# Create and start the worker threads.
# Create and start the worker threads
thread_pool = [Thread(target=worker, args=(event, i)) for i in range(NUM_WORKERS)]
for t in thread_pool:
t.daemon = True
Expand All @@ -49,9 +55,14 @@ def main():
if msg == "exit":
break

# TODO 1: Acquire the lock before appending a message to the list
event.acquire()

# TODO 2: Append the message to "messages" and notify the workers that a message is available
messages.append(msg)
event.notify()

# TODO 1: Release the lock after writing (appending) in the shared resource
event.release()


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
#!/usr/bin/python3 -u
# SPDX-License-Identifier: BSD-3-Clause

import threading
import time
import random
from collections import deque

# Shared resources
queue = deque()
MAX_QUEUE_SIZE = 5
condition = threading.Condition()


# Producer part
def producer():
while True:
item = random.randint(1, 100)

# Acquire the lock
condition.acquire()

with condition:
while len(queue) >= MAX_QUEUE_SIZE:
print("The queue is full, producer is waiting...")

# Wait until there's space in the queue
condition.wait()
queue.append(item)
print(f"Produced value: {item}")

# Notify the consumer that an item is available
condition.notify()

# Release the lock
condition.release()

# Simulate work by sleeping
time.sleep(random.uniform(1, 1.5))


# Consumer part
def consumer():
while True:
# Acquire the lock
condition.acquire()

with condition:
while not queue:
print("The queue is empty, consumer is waiting...")

# Wait until there's an item in the queue
condition.wait()
item = queue.popleft()
print(f"Consumed value: {item}")

# Notify the producer that there's space in the queue
condition.notify()

# Release the lock
condition.release()

# Simulate work by sleeping
time.sleep(random.uniform(1, 1.5))


def main():
# Create both threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)

# Start threads
producer_thread.start()
consumer_thread.start()

# Join threads
producer_thread.join()
consumer_thread.join()


if __name__ == "__main__":
exit(main())
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/support
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
PYTHON = python3
SCRIPT = generate_skels.py

skels:
mkdir -p support/src
$(PYTHON) $(SCRIPT) --input ./solution/src --output ./support/src
cp -r ./solution/tests ./support/tests
chmod +x ./support/tests/checker.sh

clean:
rm -rf support/
Loading

0 comments on commit 3051616

Please sign in to comment.