Skip to content

Commit 8b70b50

Browse files
authored
Merge pull request #374 from datacamp/feature/revise-pkg-download
Feature/revise pkg download
2 parents 81b297f + f305725 commit 8b70b50

File tree

9 files changed

+94
-155
lines changed

9 files changed

+94
-155
lines changed

Procfile

-1
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,3 @@ debug: NODE_ENV=production PORT=5000 node -r dotenv/config node_modules/sails/bi
55
console: node -r dotenv/config node_modules/sails/bin/sails console
66
clean-task: grunt sails_tasks:authorCleaning
77
recover-maintainers: grunt sails_tasks:maintainerRecover
8-
index-stats: grunt sails_tasks:indexStats

api/controllers/WorkerController.js

+4-14
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,6 @@ module.exports = {
3939
}
4040
},
4141

42-
indexStats: function(req, res) {
43-
CronService.indexAggregatedDownloadStats().then(function(result) {
44-
console.log("Finished indexing stats");
45-
res.send(200, "done");
46-
}).catch(function(err){
47-
return res.negotiate(err.errors);
48-
});
49-
},
50-
5142
updatePercentile: function(req, res) {
5243
ElasticSearchService.updateLastMonthPercentiles().then(function(){
5344
console.log("Finished updating percentiles");
@@ -61,8 +52,7 @@ module.exports = {
6152
DownloadStatistic.getNotIndexedDates().then(function(days) {
6253
return days.map(function(day) {
6354
var date = new Date(day.absents);
64-
var now = new Date();
65-
return Utils.dateDiffInDays(date, now);
55+
return date;
6656
});
6757
}).then(function(diffs) {
6858
if (diffs.length <= 0) {
@@ -71,9 +61,9 @@ module.exports = {
7161
}
7262
res.send(200, "scheduled");
7363
DownloadStatsService.reverseDependenciesCache = {}; //clean old cache
74-
return Promise.map(diffs, function (nDay) {
75-
console.log("Started indexing for today - " + nDay + "days");
76-
return CronService.splittedAggregatedDownloadstats(nDay)
64+
return Promise.map(diffs, function (day) {
65+
console.log(`Started indexing for ${day}.`);
66+
return CronService.splittedAggregatedDownloadstats(day)
7767
.catch({message: "empty"}, function() {
7868
console.log("No stats for this time range yet");
7969
return 1;

api/services/CronService.js

+2-44
Original file line numberDiff line numberDiff line change
@@ -4,55 +4,13 @@
44
* All async tasks
55
*/
66

7-
var _ = require('lodash');
87
var Promise = require('bluebird');
9-
var http = require('http');
10-
11-
var getIndexOfMonth= function(month){
12-
return ["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"].indexOf(month);
13-
}
148

159
module.exports = {
1610

17-
//We need aggregated download stats in elasticsearch to be able to rescore the search result based on popularity
18-
indexAggregatedDownloadStats: function() {
19-
console.log('Started index aggregate stats job');
20-
//Get aggregated data
21-
return ElasticSearchService.lastMonthDownloadCount().then(function (buckets) {
22-
23-
var stats = _.reduce(buckets, function(acc, bucket) {
24-
acc[bucket.key] = bucket.download_count.value;
25-
return acc;
26-
}, {});
27-
28-
29-
return Package.findAll({
30-
attributes: ['name']
31-
}).then(function(packages) {
32-
var records = _.map(packages, function(_package) {
33-
return {
34-
package_name: _package.name,
35-
last_month_downloads: stats[_package.name] || 0
36-
};
37-
});
38-
39-
var groups = _.chunk(records, 500);
40-
41-
return Promise.map(groups, function(group) {
42-
return DownloadStatistic.bulkCreate(group, {
43-
updateOnDuplicate:true,
44-
});
45-
}, {concurrency: 1});
46-
});
47-
48-
49-
});
50-
51-
},
52-
53-
splittedAggregatedDownloadstats :function(days,callback){
11+
splittedAggregatedDownloadstats :function(day,callback){
5412
console.log('Started splitted aggregated download count');
55-
return Promise.promisify(ElasticSearchService.dailyDownloadsBulk)(days);
13+
return Promise.promisify(DownloadStatsService.getDailyDownloads)(day);
5614
},
5715

5816
};

api/services/DownloadStatsService.js

+86-38
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
var _ = require('lodash');
22
var Promise = require('bluebird');
33
var dateFormat = require('dateformat');
4+
var CSV = require('csv-js');
5+
var r = require('request');
6+
var zlib = require('zlib');
47

58
module.exports = {
69

@@ -28,67 +31,112 @@ module.exports = {
2831
return _.sortedIndexOf(haystack, needle) !== -1 ;
2932
},
3033

31-
processDownloads:function(response,directDownloads,indirectDownloads,total,callback) {
32-
var hits = response.hits.hits;
33-
var _response = {
34-
hits: { total: response.hits.total },
35-
_scroll_id: response._scroll_id
34+
getDailyDownloads: function (day, callback) {
35+
36+
var dayDateString = dateFormat(day, "yyyy-mm-dd").toString();
37+
var url = `http://cran-logs.rstudio.com/${day.getFullYear()}/${dayDateString}.csv.gz`;
38+
39+
var requestSettings = {
40+
method: 'GET',
41+
url,
42+
encoding: null,
3643
};
37-
var hit_date = hits[1].fields.datetime[0];
38-
var date = new Date(hit_date);
39-
var formattedDate = dateFormat(date, "yyyy-mm-dd").toString();
4044

41-
Promise.map(hits, function(hit, i) {
42-
//execute queries to find inverse dependencies for all hits asynchronous, and find indirect hits before and after in ordered records
43-
var package_name = hit.fields.package[0];
45+
console.info('Sending request ...');
46+
r(requestSettings, function(error, response, buf) {
47+
if(response.statusCode === 404){
48+
return callback({message: "empty"});
49+
}
50+
else if(response.statusCode === 200){
51+
console.info('Unzipping ...');
52+
zlib.gunzip(buf, function(err, dezipped) {
53+
if(err){
54+
return callback(err);
55+
}
56+
console.info('Parsing csv ...');
57+
var downloads = CSV.parse(dezipped.toString());
58+
downloads.shift(); // remove header line
59+
var downloads = _.map(downloads, function(download){
60+
return {
61+
date: download[0],
62+
time: download[1],
63+
dateTime: new Date(`${download[0]}T${download[1]}Z`),
64+
package: download[6],
65+
ip_id: download[9]
66+
}
67+
});
68+
downloads.sort(function(download1, download2){
69+
return download1.dateTime.getTime - download2.dateTime.getTime;
70+
});
71+
DownloadStatsService.processDailyDownloads(day, downloads, callback);
72+
});
73+
}
74+
});
75+
76+
77+
},
78+
79+
processDailyDownloads: function(date, downloads, callback) {
80+
console.info('Processing downloads ...');
81+
var indirectDownloads = {}; // The value for every key is a set with ip_id's, this will automatically only count unique ip's
82+
var directDownloads = {};
83+
Promise.map(downloads, function(download, i) {
84+
var package_name = download.package;
85+
86+
function addDownloadTo(hash, download) {
87+
if(!hash[package_name])
88+
hash[package_name] = new Set();
89+
hash[package_name].add(download.ip_id);
90+
}
91+
4492
return DownloadStatsService.getReverseDependencies(package_name).then(function(rootPackageNames) {
4593

4694
var indirect = false;
4795
var j=i+1;
4896

49-
var thisHitTimestamp = new Date(hit.fields.datetime[0]).getTime();
97+
var downloadTime = download.dateTime.getTime();
5098

51-
while (!indirect && j<hits.length && hits[j].fields.ip_id[0] == hit.fields.ip_id[0] &&
52-
new Date(hits[j].fields.datetime[0]).getTime()< (thisHitTimestamp+60000)
53-
) {
54-
if(DownloadStatsService.binarySearchIncludes(rootPackageNames,hits[j].fields.package[0])) {
55-
indirectDownloads[package_name] = indirectDownloads[package_name]+1 || 1;
56-
indirect=true;
99+
for(j= i + 1; j < downloads.length; j++) {
100+
if(indirect || downloads[j].dateTime.getTime() > downloadTime + 60 * 1000)
101+
break;
102+
if(downloads[j].ip_id === download.ip_id && DownloadStatsService.binarySearchIncludes(rootPackageNames, downloads[j].package)){
103+
addDownloadTo(indirectDownloads, download)
104+
indirect = true;
57105
}
58-
j+=1;
59106
}
60-
j=i-1;
61-
while (j>=0 && hits[j].fields.ip_id[0] == hit.fields.ip_id[0] &&
62-
new Date(hits[j].fields.datetime[0]).getTime()+60000> (thisHitTimestamp) &&
63-
!(indirect)
64-
) {
65-
if(DownloadStatsService.binarySearchIncludes(rootPackageNames,hits[j].fields.package[0])) {
66-
indirectDownloads[package_name] = indirectDownloads[package_name]+1 || 1;
67-
indirect=true;
107+
108+
for(j= i - 1; j >= 0; j--) {
109+
if(indirect || downloads[j].dateTime.getTime() < downloadTime - 60 * 1000)
110+
break;
111+
if(downloads[j].ip_id === download.ip_id && DownloadStatsService.binarySearchIncludes(rootPackageNames, downloads[j].package)){
112+
addDownloadTo(indirectDownloads, download)
113+
indirect = true;
68114
}
69-
j-=1;
70115
}
116+
71117
if(!indirect){
72-
directDownloads[package_name] = directDownloads[package_name]+1 || 1;
118+
addDownloadTo(directDownloads, download)
73119
}
74120
});
75-
76-
}, {concurrency: 10}).then(function(){
77-
78-
return ElasticSearchService.scrollDailyDownloadsBulk(_response,formattedDate,directDownloads,indirectDownloads,total,callback);
121+
}, {concurrency: 10})
122+
.then(function(){
123+
DownloadStatsService.writeDownloadsToDB(date, directDownloads, indirectDownloads)
124+
.then(function(result){
125+
console.info('Downloads written to database!');
126+
callback(null,result);
127+
});
79128
});
80129
},
81130

82-
//write all splitted download counts to the database
83-
writeSplittedDownloadCounts: function(date,directDownloads,indirectDownloads){
84-
console.log("writing data");
131+
writeDownloadsToDB: function(date,directDownloads,indirectDownloads){
132+
console.info("Writing data to database ...");
85133
return Package.findAll({attributes: ['name']}).then(function(packages) {
86134
var records = _.map(packages, function(_package) {
87135
return {
88136
package_name: _package.name,
89137
date: date,
90-
indirect_downloads: indirectDownloads[_package.name] || 0,
91-
direct_downloads: directDownloads[_package.name] || 0
138+
indirect_downloads: (indirectDownloads[_package.name] || new Set()).size,
139+
direct_downloads: (directDownloads[_package.name] || new Set()).size
92140
};
93141
});
94142
var groups = _.chunk(records,500);

api/services/ElasticSearchService.js

-45
Original file line numberDiff line numberDiff line change
@@ -400,51 +400,6 @@ module.exports = {
400400
});
401401
},
402402

403-
//download first 10000 results and proceed by processing and scrolling
404-
dailyDownloadsBulk:function(days, callback){
405-
var body = ElasticSearchService.queries.filters.lastMonthDownloads(days);
406-
if (days < 1) return Promise.resolve("Nothing to do");
407-
408-
return es.search({
409-
scroll:'5M',
410-
index: 'stats',
411-
body: body,
412-
}, function processAndGetMore(error,response){
413-
//check the response
414-
if (typeof response === "undefined") {
415-
var err ="you received an undefined response, response:"+response+
416-
"\n this was probably caused because there were no stats yet for this day"+
417-
"\n or processing time took over 5 minutes (the scroll interval";
418-
callback(err);
419-
} else if (typeof response.hits === "undefined" || response.hits.total === 0) { return callback({message: "empty"}); }
420-
else DownloadStatsService.processDownloads(response,{},{},10000,callback);
421-
});
422-
},
423-
424-
//scroll further in search result, when response already contains a scroll id
425-
scrollDailyDownloadsBulk: function(response,date,directDownloads,indirectDownloads,total,callback) {
426-
console.log("processing next 10000 records");
427-
if (response.hits.total > total) {
428-
// now we can call scroll over and over
429-
es.scroll({
430-
scrollId: response._scroll_id,
431-
scroll: '5M'
432-
}, function processScroll(error,response){
433-
if (typeof response == "undefined" || typeof response.hits == "undefined") {
434-
var err ="you received an undefined response, response:"+response+
435-
"\n this was probably caused because there were no stats yet for this day"+
436-
"\n or processing time took over 5 minutes (the scroll interval";
437-
callback(err);
438-
}
439-
return DownloadStatsService.processDownloads(response,directDownloads,indirectDownloads,total+10000,callback);
440-
});
441-
} else {
442-
//write the responses to the database when done
443-
DownloadStatsService.writeSplittedDownloadCounts(date,directDownloads,indirectDownloads).then(function(result){
444-
callback(null,result);
445-
});
446-
}
447-
},
448403
helpSearchQuery:function(pattern,fields,fuzzy,max_dist){
449404
var highlighting = false;
450405
var t = {}

config/env/docker.js

-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ module.exports = {
4949

5050
routes: {
5151
'post /tasks': 'WorkerController.processMessage',
52-
'get /index-stats': 'WorkerController.indexStats',
5352
'get /last-day-splitted-stats': 'WorkerController.lastDaySplittedDownloads'
5453
}
5554

config/env/worker.js

-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ module.exports = {
3636

3737
routes: {
3838
'post /tasks': 'WorkerController.processMessage',
39-
'post /index-stats': 'WorkerController.indexStats',
4039
'post /last-day-splitted-stats': 'WorkerController.lastDaySplittedDownloads',
4140
'post /update-percentile': 'WorkerController.updatePercentile'
4241
}

package.json

+2
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"cheerio": "^0.20.0",
1313
"connect-flash": "^0.1.1",
1414
"connect-redis": "^3.1.0",
15+
"csv-js": "^1.0.0",
1516
"dateformat": "^1.0.12",
1617
"db-migrate": "^0.10.0-beta.12",
1718
"db-migrate-mysql": "^1.1.7",
@@ -51,6 +52,7 @@
5152
"passport-local": "^1.0.0",
5253
"rc": "1.0.1",
5354
"redis": "^2.6.2",
55+
"request": "^2.81.0",
5456
"sails": "~0.12.11",
5557
"sails-hook-sequelize": "^1.0.1",
5658
"sails-hook-sequelize-blueprints": "^0.3.0",

tasks/config/sails-tasks.js

-11
Original file line numberDiff line numberDiff line change
@@ -23,17 +23,6 @@ module.exports = function(grunt) {
2323
}
2424
]
2525
},
26-
27-
indexStats: {
28-
functions: [
29-
function (callback) {
30-
CronService.indexAggregatedDownloadStats().then(function(result) {
31-
console.log("Finished indexing stats");
32-
callback();
33-
});
34-
}
35-
]
36-
},
3726
});
3827

3928

0 commit comments

Comments
 (0)