-
Notifications
You must be signed in to change notification settings - Fork 0
/
ai_processor.py
315 lines (272 loc) · 12 KB
/
ai_processor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
from openai import OpenAI
from config import OPENAI_API_KEY
from logger import logger
from google.oauth2.credentials import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import os
from google.auth.transport.requests import Request
from google_auth_oauthlib.flow import InstalledAppFlow
import hashlib
import json
import matplotlib.pyplot as plt
from wordcloud import WordCloud
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.decomposition import LatentDirichletAllocation
# Initialize the OpenAI client with the API key from config
client = OpenAI(api_key=OPENAI_API_KEY)
# Cache for embeddings
embedding_cache = {}
def truncate_text(text, max_tokens=1000):
"""Truncate text to a maximum number of tokens (approximate)."""
words = text.split()
return ' '.join(words[:max_tokens])
def get_cached_embedding(text):
"""Get embedding from cache or generate new one."""
text_hash = hashlib.md5(text.encode()).hexdigest()
if text_hash in embedding_cache:
return embedding_cache[text_hash]
return None
def save_cached_embedding(text, embedding):
"""Save embedding to cache."""
text_hash = hashlib.md5(text.encode()).hexdigest()
embedding_cache[text_hash] = embedding
# Optionally save cache to disk to persist between runs
with open('embedding_cache.json', 'w') as f:
json.dump(embedding_cache, f)
def generate_embeddings(text):
logger.info("Generating embeddings for text")
try:
cached_embedding = get_cached_embedding(text)
if cached_embedding:
logger.info("Using cached embedding")
return cached_embedding
truncated_text = truncate_text(text)
response = client.embeddings.create(
input=truncated_text,
model="text-embedding-ada-002"
)
embedding = response.data[0].embedding
save_cached_embedding(text, embedding)
logger.info("Embeddings generated successfully")
return embedding
except Exception as e:
logger.error(f"Error generating embeddings: {e}")
raise
def generate_summary(content):
logger.info("Generating detailed summary from content")
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an AI assistant that creates detailed, structured summaries of newsletter content. Your summaries should be engaging, insightful, and highlight the most important information."},
{"role": "user", "content": f"""Please provide a comprehensive summary of the following newsletter content.
Include the following sections:
1. Newsletter Name and Date
2. Executive Summary (2-3 sentences)
3. Main Topics Covered (bullet points)
4. Key Insights (3-5 bullet points)
5. Important Links or Resources
6. Notable Quotes or Statistics
7. Emerging Trends or Patterns
8. Action Items or Takeaways
9. Brief Analysis or Commentary
Here's the content to summarize:
{content[:4000]}"""} # Increased character limit to 4000
],
max_tokens=1500, # Increased token limit
n=1,
temperature=0.7,
)
summary = response.choices[0].message.content.strip()
logger.info("Detailed summary generated successfully")
return summary
except Exception as e:
logger.error(f"Error generating detailed summary: {e}")
raise
def analyze_sentiment(content):
logger.info("Analyzing sentiment of content")
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an AI assistant that analyzes the sentiment of text."},
{"role": "user", "content": f"Analyze the sentiment of the following text and provide a score from -1 (very negative) to 1 (very positive), along with a brief explanation:\n\n{content[:1000]}"}
],
max_tokens=100,
n=1,
temperature=0.5,
)
sentiment_analysis = response.choices[0].message.content.strip()
logger.info("Sentiment analysis completed successfully")
return sentiment_analysis
except Exception as e:
logger.error(f"Error analyzing sentiment: {e}")
raise
def generate_word_cloud(text):
logger.info("Generating word cloud")
try:
wordcloud = WordCloud(width=800, height=400, background_color='white').generate(text)
plt.figure(figsize=(10, 5))
plt.imshow(wordcloud, interpolation='bilinear')
plt.axis('off')
plt.tight_layout(pad=0)
plt.savefig('wordcloud.png')
logger.info("Word cloud generated successfully")
except Exception as e:
logger.error(f"Error generating word cloud: {e}")
def extract_topics(content):
logger.info("Extracting main topics from content")
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an AI assistant that extracts main topics from text."},
{"role": "user", "content": f"Extract the 5 main topics from the following text, providing a brief description for each:\n\n{content[:2000]}"}
],
max_tokens=300,
n=1,
temperature=0.5,
)
topics = response.choices[0].message.content.strip()
logger.info("Main topics extracted successfully")
return topics
except Exception as e:
logger.error(f"Error extracting topics: {e}")
raise
def extract_topics_lda(content, num_topics=5):
logger.info("Extracting topics using LDA")
try:
# Split the content into sentences or paragraphs to create multiple documents
documents = content.split('\n')
# Ensure we have at least 2 documents
if len(documents) < 2:
documents = content.split('. ')
if len(documents) < 2:
logger.warning("Not enough content for LDA. Falling back to simple keyword extraction.")
return extract_simple_topics(content)
vectorizer = TfidfVectorizer(max_df=0.95, min_df=1, stop_words='english')
doc_term_matrix = vectorizer.fit_transform(documents)
lda = LatentDirichletAllocation(n_components=min(num_topics, len(documents)), random_state=42)
lda.fit(doc_term_matrix)
feature_names = vectorizer.get_feature_names_out()
topics = []
for topic_idx, topic in enumerate(lda.components_):
top_words = [feature_names[i] for i in topic.argsort()[:-10 - 1:-1]]
topics.append(f"Topic {topic_idx + 1}: {', '.join(top_words)}")
logger.info("LDA topic extraction completed successfully")
return topics
except Exception as e:
logger.error(f"Error extracting topics using LDA: {e}")
logger.info("Falling back to simple keyword extraction")
return extract_simple_topics(content)
def extract_simple_topics(content):
logger.info("Extracting simple topics")
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "You are an AI assistant that extracts main topics from text."},
{"role": "user", "content": f"Extract the 5 main topics from the following text, providing a brief description for each:\n\n{content[:2000]}"}
],
max_tokens=300,
n=1,
temperature=0.5,
)
topics = response.choices[0].message.content.strip()
logger.info("Simple topic extraction completed successfully")
return topics
except Exception as e:
logger.error(f"Error extracting simple topics: {e}")
return "Unable to extract topics"
def analyze_newsletters(newsletters):
logger.info("Analyzing newsletters")
try:
summaries = []
sentiments = []
all_content = ""
topics = []
for i, newsletter in enumerate(newsletters, 1):
logger.info(f"Generating summary for newsletter {i} of {len(newsletters)}")
summary = generate_summary(newsletter['content'])
summaries.append(summary)
sentiment = analyze_sentiment(newsletter['content'])
sentiments.append(sentiment)
all_content += newsletter['content'] + " "
topic = extract_topics(newsletter['content'])
topics.append(topic)
lda_topics = extract_topics_lda(all_content)
# Add LDA topics to the combined report
combined_report = "Weekly Newsletter Report\n\n"
combined_report += "=" * 50 + "\n\n"
for i, (summary, sentiment, topic) in enumerate(zip(summaries, sentiments, topics), 1):
combined_report += f"Newsletter {i}:\n\n"
combined_report += summary + "\n\n"
combined_report += f"Sentiment Analysis:\n{sentiment}\n\n"
combined_report += f"Main Topics:\n{topic}\n\n"
combined_report += "-" * 30 + "\n\n"
combined_report += "LDA Topic Analysis:\n"
for topic in lda_topics:
combined_report += f"{topic}\n"
combined_report += "\n"
combined_report += "Word Cloud: See attached image 'wordcloud.png'\n\n"
combined_report += "=" * 50 + "\n\n"
combined_report += "End of Weekly Report"
return combined_report
except Exception as e:
logger.error(f"Error analyzing newsletters: {e}")
raise
def get_google_docs_credentials():
creds = None
if os.path.exists('token.json'):
creds = Credentials.from_authorized_user_file('token.json', ['https://www.googleapis.com/auth/documents'])
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file('client_secret.json', ['https://www.googleapis.com/auth/documents'])
creds = flow.run_local_server(port=0)
with open('token.json', 'w') as token:
token.write(creds.to_json())
return creds
def save_to_google_docs(summary, title):
logger.info("Saving summary to Google Docs")
try:
creds = get_google_docs_credentials()
service = build('docs', 'v1', credentials=creds)
document = service.documents().create(body={'title': title}).execute()
doc_id = document['documentId']
requests = [
{
'insertText': {
'location': {'index': 1},
'text': summary
}
}
]
service.documents().batchUpdate(documentId=doc_id, body={'requests': requests}).execute()
logger.info(f"Summary saved to Google Docs with ID: {doc_id}")
return doc_id
except HttpError as e:
logger.error(f"Error saving to Google Docs: {e}")
raise
def save_to_file(summary, filename):
logger.info(f"Saving summary to file: {filename}")
try:
with open(filename, 'w') as f:
f.write(summary)
logger.info(f"Summary saved to file: {filename}")
except Exception as e:
logger.error(f"Error saving to file: {e}")
raise
def process_newsletters(newsletters, output_type='file', output_name='newsletter_summary'):
summary = analyze_newsletters(newsletters)
if output_type == 'google_docs':
return save_to_google_docs(summary, output_name)
elif output_type == 'file':
filename = f"{output_name}.txt"
save_to_file(summary, filename)
return filename
else:
raise ValueError("Invalid output_type. Choose 'google_docs' or 'file'.")