forked from elasticsearch-dump/elasticsearch-dump
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathelasticdump.js
100 lines (86 loc) · 3.04 KB
/
elasticdump.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
var util = require("util");
var http = require("http");
var https = require("https");
var EventEmitter = require('events').EventEmitter;
var elasticdump = function(input, output, options){
this.input = input;
this.output = output;
this.options = options;
this.validateOptions();
this.toLog = true;
if(this.options.input == "$"){
this.inputType = 'stdio';
}else if(this.options.input.indexOf(":") >= 0){
this.inputType = 'elasticsearch';
}else{
this.inputType = 'file';
}
if(this.options.output == "$"){
this.outputType = 'stdio';
this.toLog = false;
}else if(this.options.output.indexOf(":") >= 0){
this.outputType = 'elasticsearch';
}else{
this.outputType = 'file';
}
if(options.maxSockets != null){
self.log('globally setting maxSockets=' + options.maxSockets);
http.globalAgent.maxSockets = options.maxSockets;
https.globalAgent.maxSockets = options.maxSockets;
}
var inputProto = require(__dirname + "/lib/transports/" + this.inputType)[this.inputType];
var outputProto = require(__dirname + "/lib/transports/" + this.outputType)[this.outputType];
this.input = (new inputProto(this, this.options.input));
this.output = (new outputProto(this, this.options.output));
}
util.inherits(elasticdump, EventEmitter);
elasticdump.prototype.log = function(message){
var self = this;
if(typeof self.options.logger === 'function'){
self.options.logger(message);
}else if(self.toLog === true){
self.emit("log", message);
}
}
elasticdump.prototype.validateOptions = function(){
var self = this;
// TODO
}
elasticdump.prototype.dump = function(callback, continuing, limit, offset, total_writes){
var self = this;
if(limit == null){ limit = self.options.limit; }
if(offset == null){ offset = self.options.offset; }
if(total_writes == null){ total_writes = 0; }
if(continuing !== true){
self.log('starting dump');
}
self.input.get(limit, offset, function(err, data){
if(err){ self.emit('error', err); }
self.log("got " + data.length + " objects from source " + self.inputType + " (offset: "+offset+")");
self.output.set(data, limit, offset, function(err, writes){
var toContinue = true;
if(err){
self.emit('error', err);
if( self.options['ignore-errors'] == true || self.options['ignore-errors'] == 'true' ){
toContinue = true;
}else{
toContinue = false;
}
}else{
total_writes += writes;
self.log("sent " + data.length + " objects to destination " + self.outputType + ", wrote " + writes);
offset = offset + limit;
}
if(data.length > 0 && toContinue){
self.dump(callback, true, limit, offset, total_writes);
}else if(toContinue){
self.log('dump complete');
if(typeof callback === 'function'){ callback(total_writes); }
}else if(toContinue == false){
self.log('dump ended with error');
if(typeof callback === 'function'){ callback(total_writes); }
}
});
});
}
exports.elasticdump = elasticdump;