Skip to content

Commit

Permalink
fix: add logical changes to code for better metics and updating state…
Browse files Browse the repository at this point in the history
… in separate function (#12)

* fix: return response on GET call

* fix: allow post only when event does not exist and delete if event exists

* fix: added put method

* fix: deployment, locust file and added ingress for grafana
  • Loading branch information
mati007thm authored Jan 19, 2024
1 parent 8f499d7 commit cf281e0
Show file tree
Hide file tree
Showing 7 changed files with 242 additions and 35 deletions.
106 changes: 103 additions & 3 deletions dapr-distributed-calendar/go/go_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,21 @@ func addEvent(w http.ResponseWriter, r *http.Request) {
}
state, _ := json.Marshal(data)
log.Print(string(state))
id := data[0]["key"]

// Check if event with given ID already exists
bodyBytes, err := checkEvent(id)
if err != nil {
log.Printf("Error while checking event: %e", err)
return
}

if string(bodyBytes) != "" {
log.Printf("Event with ID %s already exists", id)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("Event already exists"))
return
}

resp, err := http.Post(stateURL, "application/json", bytes.NewBuffer(state))
if err != nil {
Expand All @@ -116,6 +131,20 @@ func deleteEvent(w http.ResponseWriter, r *http.Request) {

deleteURL := stateURL + "/" + eventID.ID

// Check if event with given ID exists before delete
bodyBytes, err := checkEvent(eventID.ID)
if err != nil {
log.Printf("Error while checking event: %e", err)
return
}

if string(bodyBytes) == "" {
log.Printf("Event with ID %s does not exist exists", eventID.ID)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("Event does not exists"))
return
}

req, err := http.NewRequest(http.MethodDelete, deleteURL, nil)
if err != nil {
log.Fatalln("Error creating delete request", err)
Expand All @@ -130,9 +159,7 @@ func deleteEvent(w http.ResponseWriter, r *http.Request) {
log.Printf("Response after delete call: %s", resp.Status)

defer resp.Body.Close()
bodyBytes, _ := io.ReadAll(resp.Body)
eventsCounter.Add(context.Background(), -1)
log.Print(string(bodyBytes))
}

func getEvent(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -166,8 +193,80 @@ func getEvent(w http.ResponseWriter, r *http.Request) {
log.Printf("Error reading response body: %v", err)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(bodyBytes)
if string(bodyBytes) != "" {
log.Print(string(bodyBytes))
}
}

func updateEvent(w http.ResponseWriter, r *http.Request) {
var event Event

err := json.NewDecoder(r.Body).Decode(&event)
if err != nil {
log.Printf("Error while decoding: %e", err)
return
}
log.Printf("Event Name: %s", event.Name)
log.Printf("Event Date: %s", event.Date)
log.Printf("Event ID: %s", event.ID)

var data = make([]map[string]string, 1)
data[0] = map[string]string{
"key": event.ID,
"value": event.Name + " " + event.Date,
}
state, _ := json.Marshal(data)
log.Print(string(state))
id := data[0]["key"]

// Check if event with given ID already exists
bodyBytes, err := checkEvent(id)
if err != nil {
log.Printf("Error while checking event: %e", err)
return
}
if string(bodyBytes) == "" {
log.Printf("Event with ID %s does not exists", id)
w.Header().Set("Content-Type", "application/json")
w.Write([]byte("Event does not exists"))
return
}

req, err := http.NewRequest(http.MethodPost, stateURL, bytes.NewBuffer(state))
if err != nil {
log.Fatalln("Error posting to state", err)
return
}
client := &http.Client{}
_, err = client.Do(req)
if err != nil {
log.Fatalln("Error updating event", err)
return
}
}

func checkEvent(id string) ([]byte, error) {
req, err := http.NewRequest(http.MethodGet, stateURL+"/"+id, nil)
if err != nil {
log.Fatalln("Error creating get request", err)
return nil, err
}
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Fatalln("Error getting event", err)
return nil, err
}

log.Print(string(bodyBytes))
defer resp.Body.Close()
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
log.Printf("Error reading response body: %v", err)
return nil, err
}
return bodyBytes, nil
}

func main() {
Expand All @@ -194,5 +293,6 @@ func main() {
router.HandleFunc("/addEvent", addEvent).Methods("POST")
router.HandleFunc("/deleteEvent", deleteEvent).Methods("POST")
router.HandleFunc("/getEvent", getEvent).Methods("POST")
router.HandleFunc("/updateEvent", updateEvent).Methods("PUT")
log.Fatal(http.ListenAndServe(":6000", router))
}
1 change: 1 addition & 0 deletions dapr-distributed-calendar/kubernetes-deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ helm install redis bitnami/redis --namespace 12-factor-app --wait

# deploy the 12-factor-app
kubectl apply -f kubernetes/.
kubectl wait --for=condition=ready pod --all --timeout=200s -n 12-factor-app

# setup locust for loadgeneration OPTIONAL
kubectl create configmap my-loadtest-locustfile --from-file locust/main.py -n 12-factor-app
Expand Down
24 changes: 20 additions & 4 deletions dapr-distributed-calendar/locust/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
default_headers = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_4) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.100 Safari/537.36'}

class EventsUser(HttpUser):
wait_time = between(5, 10) # Adjust the wait time between tasks as needed
wait_time = between(3, 5) # Adjust the wait time between tasks as needed

@task
def event_lifecycle(self):
Expand All @@ -22,7 +22,7 @@ def event_lifecycle(self):
}

# Send the POST request to create an event
response = self.client.post('/newevent', json=data, headers=headers)
response = self.client.post(f'/newevent', json=data, headers=headers)

# Check if the request was successful
if response.status_code == 200:
Expand All @@ -31,16 +31,32 @@ def event_lifecycle(self):
print(f"Failed to create event {event_id}. Status code: {response.status_code}")

# Get the event
response = self.client.get('/event/{event_id}')
response = self.client.get(f'/event/{event_id}')

# Check if the request was successful
if response.status_code == 200:
print(f"Event {event_id} retrieved successfully")
else:
print(f"Failed to retrieve event {event_id}. Status code: {response.status_code}")

# Update the event
updated_data = {
"data": {
"name": f"Updated Event {event_id}",
"date": "2020-10-10",
"id": str(event_id)
}
}
response = self.client.put(f'/updateevent', json=updated_data, headers=headers)

# Check if the update was successful
if response.status_code == 200:
print(f"Event {event_id} updated successfully")
else:
print(f"Failed to update event {event_id}. Status code: {response.status_code}")

# Delete the event
response = self.client.delete('/event/{event_id}')
response = self.client.delete(f'/event/{event_id}')

# Check if the request was successful
if response.status_code == 200:
Expand Down
110 changes: 85 additions & 25 deletions dapr-distributed-calendar/node/node_controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const bodyParser = require('body-parser');
require('isomorphic-fetch');

const app = express();
// const { HTTP } = require("cloudevents");

app.use(bodyParser.json());

const daprPort = process.env.DAPR_HTTP_PORT || 3500;
Expand All @@ -22,24 +22,10 @@ const opentelemetry = require('@opentelemetry/api');

const myMeter = opentelemetry.metrics.getMeter('controller');

const newEventCounter = myMeter.createCounter('newEvents.counter');
const getEventCounter = myMeter.createCounter('getEvents.counter');
const deleteEventCounter = myMeter.createCounter('deleteEvents.counter');
// app.get('/dapr/subscribe', (_req, res) => {
// res.json([
// {
// pubsubname: "pubsub",
// topic: "events-topic",
// route: "getmsg"
// }
// ]);
// });

// app.post('/getmsg', (req, res) => {
// const receivedEvent = HTTP.toEvent({ headers: req.headers, body: req.body });
// console.log(receivedEvent);
// res.sendStatus(200);
// });
const newEventCounter = myMeter.createCounter('newEvents-call.counter');
const getEventCounter = myMeter.createCounter('getEvents-call.counter');
const deleteEventCounter = myMeter.createCounter('deleteEvents-call.counter');
const updateEventCounter = myMeter.createCounter('updateEvents-call.counter');

function send_notif(data) {
var message = {
Expand Down Expand Up @@ -80,13 +66,23 @@ app.post('/newevent', (req, res) => {
headers: {
"Content-Type": "application/json"
}
}).then((response) => {
}).then(async (response) => {
// Only post if event does not already exist
const responseBodyString = await streamToString(response.body);

// Check if response body contains "Event already exists"
if (responseBodyString.includes("Event already exists")) {
console.log("Event already exists.");
res.status(404).send({ message: "Event already exists." });
return;
}

if (!response.ok) {
throw "Failed to persist state.";
}

console.log("Successfully persisted state.");
res.status(200).send();
res.status(200).send();
}).catch((error) => {
console.log(error);
res.status(500).send({message: error});
Expand All @@ -107,10 +103,19 @@ app.delete('/event/:id', (req, res) => {
headers: {
"Content-Type": "application/json"
}
}).then((response) => {
if (!response.ok) {n
}).then(async (response) => {
if (!response.ok) {
throw "Failed to delete state.";
}
// Only delete if event does exist
const responseBodyString = await streamToString(response.body);

// Check if response body contains "Event does not exists"
if (responseBodyString.includes("Event does not exists")) {
console.log("Event does not exists.");
res.status(404).send({ message: "Event does not exists." });
return;
}

console.log("Successfully deleted state.");
res.status(200).send();
Expand All @@ -120,6 +125,8 @@ app.delete('/event/:id', (req, res) => {
});
});

const streamToString = require('stream-to-string');

app.get('/event/:id', (req, res) =>{
getEventCounter.add(1);
const key = req.params.id;
Expand All @@ -133,16 +140,69 @@ app.get('/event/:id', (req, res) =>{
headers: {
"Content-Type": "application/json"
}
}).then((response) => {
}).then(async (response) => {
if (!response.ok) {
throw "Failed to get state.";
}
console.log("Successfully got state.");
res.status(200).send();
const responseBodyString = await streamToString(response.body);

// Check if response body is empty
if (responseBodyString.trim() === "") {
console.log("No event found.");
res.status(404).send({ message: "No event found." });
return;
}

try {
const responseBody = JSON.parse(responseBodyString);
res.status(200).json(responseBody);
} catch (error) {
console.log("Error parsing JSON:", error);
res.status(500).send({ message: "Error parsing JSON" });
}
}).catch((error) => {
console.log(error);
res.status(500).send({message: error});
});
})

app.put('/updateevent', (req, res) => {
updateEventCounter.add(1);

const data = req.body.data;
const eventId = data.id;
console.log("Updating event! Event ID: " + eventId);

console.log("Data passed as body to Go", JSON.stringify(data))

// Assuming your Go service has an endpoint like '/updateEvent'
fetch(invokeUrl + `/updateEvent`, {
method: "PUT", // Use PUT method for updating
body: JSON.stringify(data),
headers: {
"Content-Type": "application/json"
}
}).then(async (response) => {
const responseBodyString = await streamToString(response.body);

// Check if response body contains "Event does not exists"
if (responseBodyString.includes("Event does not exists")) {
console.log("Event does not exists.");
res.status(404).send({ message: "Event does not exists." });
return;
}

if (!response.ok) {
throw "Failed to update event.";
}

console.log("Successfully updated event.");
res.status(200).send();
}).catch((error) => {
console.log(error);
res.status(500).send({ message: error });
});
});

app.listen(port, () => console.log(`Node App listening on port ${port}!`));
Loading

0 comments on commit cf281e0

Please sign in to comment.