Skip to content

Feature/revise pkg download #374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 11, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Procfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ debug: NODE_ENV=production PORT=5000 node -r dotenv/config node_modules/sails/bi
console: node -r dotenv/config node_modules/sails/bin/sails console
clean-task: grunt sails_tasks:authorCleaning
recover-maintainers: grunt sails_tasks:maintainerRecover
index-stats: grunt sails_tasks:indexStats
18 changes: 4 additions & 14 deletions api/controllers/WorkerController.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,6 @@ module.exports = {
}
},

indexStats: function(req, res) {
CronService.indexAggregatedDownloadStats().then(function(result) {
console.log("Finished indexing stats");
res.send(200, "done");
}).catch(function(err){
return res.negotiate(err.errors);
});
},

updatePercentile: function(req, res) {
ElasticSearchService.updateLastMonthPercentiles().then(function(){
console.log("Finished updating percentiles");
Expand All @@ -61,8 +52,7 @@ module.exports = {
DownloadStatistic.getNotIndexedDates().then(function(days) {
return days.map(function(day) {
var date = new Date(day.absents);
var now = new Date();
return Utils.dateDiffInDays(date, now);
return date;
});
}).then(function(diffs) {
if (diffs.length <= 0) {
Expand All @@ -71,9 +61,9 @@ module.exports = {
}
res.send(200, "scheduled");
DownloadStatsService.reverseDependenciesCache = {}; //clean old cache
return Promise.map(diffs, function (nDay) {
console.log("Started indexing for today - " + nDay + "days");
return CronService.splittedAggregatedDownloadstats(nDay)
return Promise.map(diffs, function (day) {
console.log(`Started indexing for ${day}.`);
return CronService.splittedAggregatedDownloadstats(day)
.catch({message: "empty"}, function() {
console.log("No stats for this time range yet");
return 1;
Expand Down
46 changes: 2 additions & 44 deletions api/services/CronService.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,55 +4,13 @@
* All async tasks
*/

var _ = require('lodash');
var Promise = require('bluebird');
var http = require('http');

var getIndexOfMonth= function(month){
return ["Jan","Feb","Mar","Apr","May","Jun","Jul","Aug","Sep","Oct","Nov","Dec"].indexOf(month);
}

module.exports = {

//We need aggregated download stats in elasticsearch to be able to rescore the search result based on popularity
indexAggregatedDownloadStats: function() {
console.log('Started index aggregate stats job');
//Get aggregated data
return ElasticSearchService.lastMonthDownloadCount().then(function (buckets) {

var stats = _.reduce(buckets, function(acc, bucket) {
acc[bucket.key] = bucket.download_count.value;
return acc;
}, {});


return Package.findAll({
attributes: ['name']
}).then(function(packages) {
var records = _.map(packages, function(_package) {
return {
package_name: _package.name,
last_month_downloads: stats[_package.name] || 0
};
});

var groups = _.chunk(records, 500);

return Promise.map(groups, function(group) {
return DownloadStatistic.bulkCreate(group, {
updateOnDuplicate:true,
});
}, {concurrency: 1});
});


});

},

splittedAggregatedDownloadstats :function(days,callback){
splittedAggregatedDownloadstats :function(day,callback){
console.log('Started splitted aggregated download count');
return Promise.promisify(ElasticSearchService.dailyDownloadsBulk)(days);
return Promise.promisify(DownloadStatsService.getDailyDownloads)(day);
},

};
Expand Down
124 changes: 86 additions & 38 deletions api/services/DownloadStatsService.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
var _ = require('lodash');
var Promise = require('bluebird');
var dateFormat = require('dateformat');
var CSV = require('csv-js');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to also add thoses package to package.json

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably removed it when removing other packages I added but didn't need eventually

var r = require('request');
var zlib = require('zlib');

module.exports = {

Expand Down Expand Up @@ -28,67 +31,112 @@ module.exports = {
return _.sortedIndexOf(haystack, needle) !== -1 ;
},

processDownloads:function(response,directDownloads,indirectDownloads,total,callback) {
var hits = response.hits.hits;
var _response = {
hits: { total: response.hits.total },
_scroll_id: response._scroll_id
getDailyDownloads: function (day, callback) {

var dayDateString = dateFormat(day, "yyyy-mm-dd").toString();
var url = `http://cran-logs.rstudio.com/${day.getFullYear()}/${dayDateString}.csv.gz`;

var requestSettings = {
method: 'GET',
url,
encoding: null,
};
var hit_date = hits[1].fields.datetime[0];
var date = new Date(hit_date);
var formattedDate = dateFormat(date, "yyyy-mm-dd").toString();

Promise.map(hits, function(hit, i) {
//execute queries to find inverse dependencies for all hits asynchronous, and find indirect hits before and after in ordered records
var package_name = hit.fields.package[0];
console.info('Sending request ...');
r(requestSettings, function(error, response, buf) {
if(response.statusCode === 404){
return callback({message: "empty"});
}
else if(response.statusCode === 200){
console.info('Unzipping ...');
zlib.gunzip(buf, function(err, dezipped) {
if(err){
return callback(err);
}
console.info('Parsing csv ...');
var downloads = CSV.parse(dezipped.toString());
downloads.shift(); // remove header line
var downloads = _.map(downloads, function(download){
return {
date: download[0],
time: download[1],
dateTime: new Date(`${download[0]}T${download[1]}Z`),
package: download[6],
ip_id: download[9]
}
});
downloads.sort(function(download1, download2){
return download1.dateTime.getTime - download2.dateTime.getTime;
});
DownloadStatsService.processDailyDownloads(day, downloads, callback);
});
}
});


},

processDailyDownloads: function(date, downloads, callback) {
console.info('Processing downloads ...');
var indirectDownloads = {}; // The value for every key is a set with ip_id's, this will automatically only count unique ip's
var directDownloads = {};
Promise.map(downloads, function(download, i) {
var package_name = download.package;

function addDownloadTo(hash, download) {
if(!hash[package_name])
hash[package_name] = new Set();
hash[package_name].add(download.ip_id);
}

return DownloadStatsService.getReverseDependencies(package_name).then(function(rootPackageNames) {

var indirect = false;
var j=i+1;

var thisHitTimestamp = new Date(hit.fields.datetime[0]).getTime();
var downloadTime = download.dateTime.getTime();

while (!indirect && j<hits.length && hits[j].fields.ip_id[0] == hit.fields.ip_id[0] &&
new Date(hits[j].fields.datetime[0]).getTime()< (thisHitTimestamp+60000)
) {
if(DownloadStatsService.binarySearchIncludes(rootPackageNames,hits[j].fields.package[0])) {
indirectDownloads[package_name] = indirectDownloads[package_name]+1 || 1;
indirect=true;
for(j= i + 1; j < downloads.length; j++) {
if(indirect || downloads[j].dateTime.getTime() > downloadTime + 60 * 1000)
break;
if(downloads[j].ip_id === download.ip_id && DownloadStatsService.binarySearchIncludes(rootPackageNames, downloads[j].package)){
addDownloadTo(indirectDownloads, download)
indirect = true;
}
j+=1;
}
j=i-1;
while (j>=0 && hits[j].fields.ip_id[0] == hit.fields.ip_id[0] &&
new Date(hits[j].fields.datetime[0]).getTime()+60000> (thisHitTimestamp) &&
!(indirect)
) {
if(DownloadStatsService.binarySearchIncludes(rootPackageNames,hits[j].fields.package[0])) {
indirectDownloads[package_name] = indirectDownloads[package_name]+1 || 1;
indirect=true;

for(j= i - 1; j >= 0; j--) {
if(indirect || downloads[j].dateTime.getTime() < downloadTime - 60 * 1000)
break;
if(downloads[j].ip_id === download.ip_id && DownloadStatsService.binarySearchIncludes(rootPackageNames, downloads[j].package)){
addDownloadTo(indirectDownloads, download)
indirect = true;
}
j-=1;
}

if(!indirect){
directDownloads[package_name] = directDownloads[package_name]+1 || 1;
addDownloadTo(directDownloads, download)
}
});

}, {concurrency: 10}).then(function(){

return ElasticSearchService.scrollDailyDownloadsBulk(_response,formattedDate,directDownloads,indirectDownloads,total,callback);
}, {concurrency: 10})
.then(function(){
DownloadStatsService.writeDownloadsToDB(date, directDownloads, indirectDownloads)
.then(function(result){
console.info('Downloads written to database!');
callback(null,result);
});
});
},

//write all splitted download counts to the database
writeSplittedDownloadCounts: function(date,directDownloads,indirectDownloads){
console.log("writing data");
writeDownloadsToDB: function(date,directDownloads,indirectDownloads){
console.info("Writing data to database ...");
return Package.findAll({attributes: ['name']}).then(function(packages) {
var records = _.map(packages, function(_package) {
return {
package_name: _package.name,
date: date,
indirect_downloads: indirectDownloads[_package.name] || 0,
direct_downloads: directDownloads[_package.name] || 0
indirect_downloads: (indirectDownloads[_package.name] || new Set()).size,
direct_downloads: (directDownloads[_package.name] || new Set()).size
};
});
var groups = _.chunk(records,500);
Expand Down
45 changes: 0 additions & 45 deletions api/services/ElasticSearchService.js
Original file line number Diff line number Diff line change
Expand Up @@ -400,51 +400,6 @@ module.exports = {
});
},

//download first 10000 results and proceed by processing and scrolling
dailyDownloadsBulk:function(days, callback){
var body = ElasticSearchService.queries.filters.lastMonthDownloads(days);
if (days < 1) return Promise.resolve("Nothing to do");

return es.search({
scroll:'5M',
index: 'stats',
body: body,
}, function processAndGetMore(error,response){
//check the response
if (typeof response === "undefined") {
var err ="you received an undefined response, response:"+response+
"\n this was probably caused because there were no stats yet for this day"+
"\n or processing time took over 5 minutes (the scroll interval";
callback(err);
} else if (typeof response.hits === "undefined" || response.hits.total === 0) { return callback({message: "empty"}); }
else DownloadStatsService.processDownloads(response,{},{},10000,callback);
});
},

//scroll further in search result, when response already contains a scroll id
scrollDailyDownloadsBulk: function(response,date,directDownloads,indirectDownloads,total,callback) {
console.log("processing next 10000 records");
if (response.hits.total > total) {
// now we can call scroll over and over
es.scroll({
scrollId: response._scroll_id,
scroll: '5M'
}, function processScroll(error,response){
if (typeof response == "undefined" || typeof response.hits == "undefined") {
var err ="you received an undefined response, response:"+response+
"\n this was probably caused because there were no stats yet for this day"+
"\n or processing time took over 5 minutes (the scroll interval";
callback(err);
}
return DownloadStatsService.processDownloads(response,directDownloads,indirectDownloads,total+10000,callback);
});
} else {
//write the responses to the database when done
DownloadStatsService.writeSplittedDownloadCounts(date,directDownloads,indirectDownloads).then(function(result){
callback(null,result);
});
}
},
helpSearchQuery:function(pattern,fields,fuzzy,max_dist){
var highlighting = false;
var t = {}
Expand Down
1 change: 0 additions & 1 deletion config/env/docker.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ module.exports = {

routes: {
'post /tasks': 'WorkerController.processMessage',
'get /index-stats': 'WorkerController.indexStats',
'get /last-day-splitted-stats': 'WorkerController.lastDaySplittedDownloads'
}

Expand Down
1 change: 0 additions & 1 deletion config/env/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ module.exports = {

routes: {
'post /tasks': 'WorkerController.processMessage',
'post /index-stats': 'WorkerController.indexStats',
'post /last-day-splitted-stats': 'WorkerController.lastDaySplittedDownloads',
'post /update-percentile': 'WorkerController.updatePercentile'
}
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"cheerio": "^0.20.0",
"connect-flash": "^0.1.1",
"connect-redis": "^3.1.0",
"csv-js": "^1.0.0",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ludov04 Seems like I already did that

Copy link
Contributor

@ludov04 ludov04 Aug 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jhoobergs I missed it somehow, but zlib is still missing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ludov04 zlib is part of node

"dateformat": "^1.0.12",
"db-migrate": "^0.10.0-beta.12",
"db-migrate-mysql": "^1.1.7",
Expand Down Expand Up @@ -51,6 +52,7 @@
"passport-local": "^1.0.0",
"rc": "1.0.1",
"redis": "^2.6.2",
"request": "^2.81.0",
"sails": "~0.12.11",
"sails-hook-sequelize": "^1.0.1",
"sails-hook-sequelize-blueprints": "^0.3.0",
Expand Down
11 changes: 0 additions & 11 deletions tasks/config/sails-tasks.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,6 @@ module.exports = function(grunt) {
}
]
},

indexStats: {
functions: [
function (callback) {
CronService.indexAggregatedDownloadStats().then(function(result) {
console.log("Finished indexing stats");
callback();
});
}
]
},
});


Expand Down