-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathapi.py
109 lines (87 loc) · 3.47 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import falcon
import logging
import json
from google.cloud import language
from google.cloud.language import enums
from google.cloud.language import types
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "<<ADD_KEY_FILE_HERE>>"
logger = logging.getLogger()
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s %(name)-12s %(levelname)-8s %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
class RequestLogger(object):
def process_request(self, req, resp):
logger.info("{} request received... {}".format(req.method, req.uri))
class RequireJSON(object):
def process_request(self, req, resp):
if not req.client_accepts_json:
raise falcon.HTTPNotAcceptable(
"This API only supports responses encoded as JSON.",
href="http://docs.examples.com/api/json")
if req.method in ("POST", "PUT"):
if "application/json" not in req.content_type:
raise falcon.HTTPUnsupportedMediaType(
"This API only supports requests encoded as JSON.",
href="http://docs.examples.com/api/json")
class Status(object):
def on_get(self, req, resp):
"""Status check"""
resp.status = falcon.HTTP_200 # This is the default status
resp.body = ("Hello World!!")
class SentimentAnalysis(object):
def on_post(self, req, resp):
"""Return sentiment scores for input sentences"""
client = language.LanguageServiceClient()
document = types.Document(
content=req.media.get("data"),
type=enums.Document.Type.PLAIN_TEXT)
annotations = client.analyze_sentiment(document=document)
# Process results
response = {}
score = annotations.document_sentiment.score
magnitude = annotations.document_sentiment.magnitude
document_response = {
"document": {
"score": score,
"magnitude": magnitude
}
}
logger.info("Document sentiment analysis: Score: {}, Magnitude: {}".format(
score, magnitude))
sentence_response_list =[]
sentences_response = {
"sentences" : sentence_response_list
}
for index, sentence in enumerate(annotations.sentences):
sentence_sentiment = round(sentence.sentiment.score, 2)
sentence_emotional_magnitude = round(
sentence.sentiment.magnitude, 2)
pos_or_neg = "positive" if sentence_sentiment >= 0.5 else "negative"
sentence_response = {
"sentence": sentence.text.content,
"score": sentence_sentiment,
"magnitude": sentence_emotional_magnitude,
"pos_or_neg": pos_or_neg
}
sentence_response_list.append(sentence_response)
logger.info("Input sentence: {} - has a sentiment score of {} and is {} with an emotional magnitude of {}".format(
sentence.text.content, sentence_sentiment, pos_or_neg, sentence_emotional_magnitude))
resp.media = {**document_response, **sentences_response}
# Configure app middleware
app = falcon.API(
middleware=[
RequestLogger(),
RequireJSON(),
]
)
logger.info("API is up!!!!")
# Setup resources
status = Status()
sentiment_analysis = SentimentAnalysis()
# Add routes
app.add_route("/status", status)
app.add_route("/analyze", sentiment_analysis)