-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlamda-test.py
144 lines (122 loc) · 5.14 KB
/
lamda-test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import boto3
import time
import json
import base64
from botocore.exceptions import ClientError
from botocore.exceptions import ConnectionError as awsConnectionError
# -----------------------------------------------------------------------------
# function to write to dynamodb
# -----------------------------------------------------------------------------
def write_to_dynamo(tweet_id_str,tweet_sentiment,**kwargs):
"""TODO: Need to write what goes here! Description and Reference info etc.
"""
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('tweets')
#--------------------------------------------------------------------------
# DynamoRecord = {
# 'tweet_id_str':ProcessedTweet['id_str'],
# 'tweet_sentiment':SentimentInfo['Sentiment'],
# 'sentiment_pos_pct':SentimentInfo['SentimentScore']['Positive'],
# 'sentiment_neg_pct':SentimentInfo['SentimentScore']['Negative'],
# 'sentiment_neu_pct':SentimentInfo['SentimentScore']['Neutral'],
# 'sentiment_mix_pct':SentimentInfo['SentimentScore']['Mixed'],
# 'tweet_user_id_str':ProcessedTweet['user_id_str'],
# 'tweet_user_name':ProcessedTweet['user_name'],
# 'tweet_text':ProcessedTweet['text'],
# 'tweet_hashtags':ProcessedTweet['hashtags'],
# 'tweet_json':ProcessedTweet['tweet_json']
# }
#--------------------------------------------------------------------------
RecordtoWrite = {'tweet_id_str':tweet_id_str,'tweet_sentiment':tweet_sentiment}
RecordtoWrite.update(kwargs)
table.put_item(
Item=RecordtoWrite
)
return
# -----------------------------------------------------------------------------
def get_tweet_details(tweet):
"""Stuff
"""
# Take in tweet josn and process tweet and return records
# Grab following:
# id_str
# text / or entended_tweet.full_text
# user id_str
# user screen_name
# hashtags
# Full JSON object
ReturnedValue = {}
ht_list = []
dictTweet = json.loads(tweet)
# Ignore retweets for sentiment analyis but keep quotes and only do analysis on new text
if 'retweeted_status' not in dictTweet:
if dictTweet['is_quote_status']:
#Handle Quotes
TweetText = dictTweet['text']
elif dictTweet['truncated']:
# Get the text from extended tweet fields
TweetText = dictTweet['extended_tweet']['full_text']
else:
# Get the normal text
TweetText = dictTweet['text']
for hashtag in dictTweet['entities']['hashtags']:
ht_list.append(hashtag['text'])
ReturnedValue = {
'id_str':dictTweet['id_str'],
'user_id_str':dictTweet['user']['id_str'],
'user_name':dictTweet['user']['screen_name'],
'text':TweetText,
'hashtags':ht_list,
'RT':False,
'tweet_json':tweet
}
else:
#Set RT value to true for retweets and don't return anything else except for raw json.
ReturnedValue = {'tweet_json':tweet,'RT':True}
return ReturnedValue
def get_sentiment(text):
"""Stuff
"""
comprehend = boto3.client("comprehend")
#Pass is tweet text and return below
# Grab following:
# Sentiment
# Postive %
# Negative %
# Neutral %
# Mixed %
# {
# 'Sentiment': 'POSITIVE'|'NEGATIVE'|'NEUTRAL'|'MIXED',
# 'SentimentScore': {
# 'Positive': ...,
# 'Negative': ...,
# 'Neutral': ...,
# 'Mixed': ...
# }
# }
ReturnedSentiment = {}
ReturnedSentiment = comprehend.detect_sentiment(Text=text,LanguageCode='en')
return ReturnedSentiment
def lambda_handler(event, context):
for record in event['Records']:
# Decode the kinesis data record into tweet. This should be a json string.
tweet = base64.b64decode(record['kinesis']['data'])
ProcessedTweet = get_tweet_details(tweet)
if not ProcessedTweet['RT']:
SentimentInfo = get_sentiment(ProcessedTweet['text'])
DynamoRecord = {
'tweet_id_str':ProcessedTweet['id_str'],
'tweet_sentiment':SentimentInfo['Sentiment'],
'sentiment_pos_pct':str(SentimentInfo['SentimentScore']['Positive']),
'sentiment_neg_pct':str(SentimentInfo['SentimentScore']['Negative']),
'sentiment_neu_pct':str(SentimentInfo['SentimentScore']['Neutral']),
'sentiment_mix_pct':str(SentimentInfo['SentimentScore']['Mixed']),
'tweet_user_id_str':ProcessedTweet['user_id_str'],
'tweet_user_name':ProcessedTweet['user_name'],
'tweet_text':ProcessedTweet['text'],
'tweet_hashtags':ProcessedTweet['hashtags'],
'tweet_json':ProcessedTweet['tweet_json']
}
write_to_dynamo(**DynamoRecord)
return 'Successfully processed {} records.'.format(len(event['Records']))