-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathnode-hive.js
107 lines (98 loc) · 3.08 KB
/
node-hive.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
var thrift = require('thrift'),
ttransport = require('thrift/transport'),
ThriftHive = require('gen-nodejs/ThriftHive'),
ResultSet = require('./result_set');
var hiveClient = function(config) {
var connect = function(onError, connected) {
var server = config.server;
var port = config.port || 10000;
var options = {transport: ttransport.TBufferedTransport, timeout: config.timeout || 1000};
var connection = thrift.createConnection(server, port, options);
var client = thrift.createClient(ThriftHive, connection);
var propagate = function(func, arguments) {
var args = [];
for (var i=2; i < arguments.length; i++) {
args.push(arguments[i]);
};
func.apply(null, args);
}
var continueOnSuccess = function(err, onSuccess) {
if (err) {
connection.end();
onError(true, err);
} else {
propagate(onSuccess, arguments)
}
};
connected({
execute: function(query, onSuccess) {
client.execute(query, function(err) {
continueOnSuccess(err, onSuccess);
});
},
getSchema: function(onSuccess) {
client.getSchema(function(err, schema) {
continueOnSuccess(err, onSuccess, schema);
});
},
fetchAll: function(onSuccess) {
client.fetchAll(function(err, data) {
continueOnSuccess(err, onSuccess, data);
});
},
fetchN: function(batchSize, onSuccess) {
client.fetchN(batchSize, function(err, data) {
continueOnSuccess(err, onSuccess, data);
});
},
closeConnection: function() {
connection.end();
}
});
};
return {
fetch: function(query, onCompletion) {
connect(onCompletion, function(client) {
client.execute(query, function() {
client.getSchema(function(schema) {
client.fetchAll(function(data) {
client.closeConnection();
onCompletion(null, ResultSet.create(data, schema));
});
});
});
});
},
fetchInBatch: function(batchSize, query, onBatchCompletion, onCompletion) {
connect(onBatchCompletion, function(client) {
client.execute(query, function() {
client.getSchema(function(schema) {
var fetchBatch = function() {
client.fetchN(batchSize, function(data) {
if(data.length > 0) {
onBatchCompletion(null, ResultSet.create(data, schema));
process.nextTick(fetchBatch);
} else {
client.closeConnection();
if (onCompletion) onCompletion(null, null);
}
});
};
fetchBatch();
});
});
});
},
execute: function(query, onCompletion){
connect(onCompletion, function(client) {
client.execute(query, function(){
client.closeConnection();
onCompletion(null, null);
});
});
},
};
};
exports.for = function(config) {
return hiveClient(config);
};