Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add logical changes to code for better metics and updating state in separate function #12

Merged
merged 4 commits into from
Jan 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 103 additions & 3 deletions dapr-distributed-calendar/go/go_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ func addEvent(w http.ResponseWriter, r *http.Request) {
}
state, _ := json.Marshal(data)
log.Print(string(state))
id := data[0]["key"]

// Check if event with given ID already exists
bodyBytes, err := checkEvent(id)
if err != nil {
log.Printf("Error while checking event: %e", err)
return
}

if string(bodyBytes) != "" {
log.Printf("Event with ID %s already exists", id)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("Event already exists"))
return
}

resp, err := http.Post(stateURL, "application/json", bytes.NewBuffer(state))
if err != nil {
Expand All @@ -116,6 +131,20 @@ func deleteEvent(w http.ResponseWriter, r *http.Request) {

deleteURL := stateURL + "/" + eventID.ID

// Check if event with given ID exists before delete
bodyBytes, err := checkEvent(eventID.ID)
if err != nil {
log.Printf("Error while checking event: %e", err)
return
}

if string(bodyBytes) == "" {
log.Printf("Event with ID %s does not exist exists", eventID.ID)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("Event does not exists"))
return
}

req, err := http.NewRequest(http.MethodDelete, deleteURL, nil)
if err != nil {
log.Fatalln("Error creating delete request", err)
Expand All @@ -130,9 +159,7 @@ func deleteEvent(w http.ResponseWriter, r *http.Request) {
log.Printf("Response after delete call: %s", resp.Status)

defer resp.Body.Close()
bodyBytes, _ := io.ReadAll(resp.Body)
eventsCounter.Add(context.Background(), -1)
log.Print(string(bodyBytes))
}

func getEvent(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -166,8 +193,80 @@ func getEvent(w http.ResponseWriter, r *http.Request) {
log.Printf("Error reading response body: %v", err)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(bodyBytes)
if string(bodyBytes) != "" {
log.Print(string(bodyBytes))
}
}

func updateEvent(w http.ResponseWriter, r *http.Request) {
var event Event

err := json.NewDecoder(r.Body).Decode(&event)
if err != nil {
log.Printf("Error while decoding: %e", err)
return
}
log.Printf("Event Name: %s", event.Name)
log.Printf("Event Date: %s", event.Date)
log.Printf("Event ID: %s", event.ID)

var data = make([]map[string]string, 1)
data[0] = map[string]string{
"key": event.ID,
"value": event.Name + " " + event.Date,
}
state, _ := json.Marshal(data)
log.Print(string(state))
id := data[0]["key"]

// Check if event with given ID already exists
bodyBytes, err := checkEvent(id)
if err != nil {
log.Printf("Error while checking event: %e", err)
return
}
if string(bodyBytes) == "" {
log.Printf("Event with ID %s does not exists", id)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("Event does not exists"))
return
}

req, err := http.NewRequest(http.MethodPost, stateURL, bytes.NewBuffer(state))
if err != nil {
log.Fatalln("Error posting to state", err)
return
}
client := &http.Client{}
_, err = client.Do(req)
if err != nil {
log.Fatalln("Error updating event", err)
return
}
}

func checkEvent(id string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, stateURL+"/"+id, nil)
if err != nil {
log.Fatalln("Error creating get request", err)
return nil, err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Fatalln("Error getting event", err)
return nil, err
}

log.Print(string(bodyBytes))
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
log.Printf("Error reading response body: %v", err)
return nil, err
}
return bodyBytes, nil
}

func main() {
Expand All @@ -194,5 +293,6 @@ func main() {
router.HandleFunc("/addEvent", addEvent).Methods("POST")
router.HandleFunc("/deleteEvent", deleteEvent).Methods("POST")
router.HandleFunc("/getEvent", getEvent).Methods("POST")
router.HandleFunc("/updateEvent", updateEvent).Methods("PUT")
log.Fatal(http.ListenAndServe(":6000", router))
}
1 change: 1 addition & 0 deletions dapr-distributed-calendar/kubernetes-deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ helm install redis bitnami/redis --namespace 12-factor-app --wait

# deploy the 12-factor-app
kubectl apply -f kubernetes/.
kubectl wait --for=condition=ready pod --all --timeout=200s -n 12-factor-app

# setup locust for loadgeneration OPTIONAL
kubectl create configmap my-loadtest-locustfile --from-file locust/main.py -n 12-factor-app
Expand Down
24 changes: 20 additions & 4 deletions dapr-distributed-calendar/locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
default_headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36'}

class EventsUser(HttpUser):
wait_time = between(5, 10) # Adjust the wait time between tasks as needed
wait_time = between(3, 5) # Adjust the wait time between tasks as needed

@task
def event_lifecycle(self):
Expand All @@ -22,7 +22,7 @@ def event_lifecycle(self):
}

# Send the POST request to create an event
response = self.client.post('/newevent', json=data, headers=headers)
response = self.client.post(f'/newevent', json=data, headers=headers)

# Check if the request was successful
if response.status_code == 200:
Expand All @@ -31,16 +31,32 @@ def event_lifecycle(self):
print(f"Failed to create event {event_id}. Status code: {response.status_code}")

# Get the event
response = self.client.get('/event/{event_id}')
response = self.client.get(f'/event/{event_id}')

# Check if the request was successful
if response.status_code == 200:
print(f"Event {event_id} retrieved successfully")
else:
print(f"Failed to retrieve event {event_id}. Status code: {response.status_code}")

# Update the event
updated_data = {
"data": {
"name": f"Updated Event {event_id}",
"date": "2020-10-10",
"id": str(event_id)
}
}
response = self.client.put(f'/updateevent', json=updated_data, headers=headers)

# Check if the update was successful
if response.status_code == 200:
print(f"Event {event_id} updated successfully")
else:
print(f"Failed to update event {event_id}. Status code: {response.status_code}")

# Delete the event
response = self.client.delete('/event/{event_id}')
response = self.client.delete(f'/event/{event_id}')

# Check if the request was successful
if response.status_code == 200:
Expand Down
110 changes: 85 additions & 25 deletions dapr-distributed-calendar/node/node_controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const bodyParser = require('body-parser');
require('isomorphic-fetch');

const app = express();
// const { HTTP } = require("cloudevents");

app.use(bodyParser.json());

const daprPort = process.env.DAPR_HTTP_PORT || 3500;
Expand All @@ -22,24 +22,10 @@ const opentelemetry = require('@opentelemetry/api');

const myMeter = opentelemetry.metrics.getMeter('controller');

const newEventCounter = myMeter.createCounter('newEvents.counter');
const getEventCounter = myMeter.createCounter('getEvents.counter');
const deleteEventCounter = myMeter.createCounter('deleteEvents.counter');
// app.get('/dapr/subscribe', (_req, res) => {
// res.json([
// {
// pubsubname: "pubsub",
// topic: "events-topic",
// route: "getmsg"
// }
// ]);
// });

// app.post('/getmsg', (req, res) => {
// const receivedEvent = HTTP.toEvent({ headers: req.headers, body: req.body });
// console.log(receivedEvent);
// res.sendStatus(200);
// });
const newEventCounter = myMeter.createCounter('newEvents-call.counter');
const getEventCounter = myMeter.createCounter('getEvents-call.counter');
const deleteEventCounter = myMeter.createCounter('deleteEvents-call.counter');
const updateEventCounter = myMeter.createCounter('updateEvents-call.counter');

function send_notif(data) {
var message = {
Expand Down Expand Up @@ -80,13 +66,23 @@ app.post('/newevent', (req, res) => {
headers: {
"Content-Type": "application/json"
}
}).then((response) => {
}).then(async (response) => {
// Only post if event does not already exist
const responseBodyString = await streamToString(response.body);

// Check if response body contains "Event already exists"
if (responseBodyString.includes("Event already exists")) {
console.log("Event already exists.");
res.status(404).send({ message: "Event already exists." });
return;
}

if (!response.ok) {
throw "Failed to persist state.";
}

console.log("Successfully persisted state.");
res.status(200).send();
res.status(200).send();
}).catch((error) => {
console.log(error);
res.status(500).send({message: error});
Expand All @@ -107,10 +103,19 @@ app.delete('/event/:id', (req, res) => {
headers: {
"Content-Type": "application/json"
}
}).then((response) => {
if (!response.ok) {n
}).then(async (response) => {
if (!response.ok) {
throw "Failed to delete state.";
}
// Only delete if event does exist
const responseBodyString = await streamToString(response.body);

// Check if response body contains "Event does not exists"
if (responseBodyString.includes("Event does not exists")) {
console.log("Event does not exists.");
res.status(404).send({ message: "Event does not exists." });
return;
}

console.log("Successfully deleted state.");
res.status(200).send();
Expand All @@ -120,6 +125,8 @@ app.delete('/event/:id', (req, res) => {
});
});

const streamToString = require('stream-to-string');

app.get('/event/:id', (req, res) =>{
getEventCounter.add(1);
const key = req.params.id;
Expand All @@ -133,16 +140,69 @@ app.get('/event/:id', (req, res) =>{
headers: {
"Content-Type": "application/json"
}
}).then((response) => {
}).then(async (response) => {
if (!response.ok) {
throw "Failed to get state.";
}
console.log("Successfully got state.");
res.status(200).send();
const responseBodyString = await streamToString(response.body);

// Check if response body is empty
if (responseBodyString.trim() === "") {
console.log("No event found.");
res.status(404).send({ message: "No event found." });
return;
}

try {
const responseBody = JSON.parse(responseBodyString);
res.status(200).json(responseBody);
} catch (error) {
console.log("Error parsing JSON:", error);
res.status(500).send({ message: "Error parsing JSON" });
}
}).catch((error) => {
console.log(error);
res.status(500).send({message: error});
});
})

app.put('/updateevent', (req, res) => {
updateEventCounter.add(1);

const data = req.body.data;
const eventId = data.id;
console.log("Updating event! Event ID: " + eventId);

console.log("Data passed as body to Go", JSON.stringify(data))

// Assuming your Go service has an endpoint like '/updateEvent'
fetch(invokeUrl + `/updateEvent`, {
method: "PUT", // Use PUT method for updating
body: JSON.stringify(data),
headers: {
"Content-Type": "application/json"
}
}).then(async (response) => {
const responseBodyString = await streamToString(response.body);

// Check if response body contains "Event does not exists"
if (responseBodyString.includes("Event does not exists")) {
console.log("Event does not exists.");
res.status(404).send({ message: "Event does not exists." });
return;
}

if (!response.ok) {
throw "Failed to update event.";
}

console.log("Successfully updated event.");
res.status(200).send();
}).catch((error) => {
console.log(error);
res.status(500).send({ message: error });
});
});

app.listen(port, () => console.log(`Node App listening on port ${port}!`));
Loading