forked from dazheng/gohive
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnection.go
156 lines (126 loc) · 3.86 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Package hivething wraps the hiveserver2 thrift interface in a few
// related interfaces for more convenient use.
package gohive
import (
"errors"
"fmt"
inf "github.com/dazheng/gohive/inf"
"context"
"git.apache.org/thrift.git/lib/go/thrift"
)
// Options for opened Hive sessions.
type Options struct {
PollIntervalSeconds int64
BatchSize int64
}
var (
DefaultOptions = Options{PollIntervalSeconds: 5, BatchSize: 10000}
)
type Connection struct {
thrift *inf.TCLIServiceClient
session *inf.TSessionHandle
options Options
}
func Connect(host string, options Options) (*Connection, error) {
transport, err := thrift.NewTSocket(host)
if err != nil {
return nil, err
}
if err := transport.Open(); err != nil {
return nil, err
}
if transport == nil {
return nil, errors.New("nil thrift transport")
}
/*
NB: hive 0.13's default is a TSaslProtocol, but
there isn't a golang implementation in apache thrift as
of this writing.
*/
protocol := thrift.NewTBinaryProtocolFactoryDefault()
client := inf.NewTCLIServiceClientFactory(transport, protocol)
s := inf.NewTOpenSessionReq()
s.ClientProtocol = 6
session, err := client.OpenSession(context.Background(), s)
if err != nil {
return nil, err
}
return &Connection{client, session.SessionHandle, options}, nil
}
func ConnectWithUser(host, username, password string, options Options) (*Connection, error) {
transport, err := thrift.NewTSocket(host)
if err != nil {
return nil, err
}
if err := transport.Open(); err != nil {
return nil, err
}
if transport == nil {
return nil, errors.New("nil thrift transport")
}
/*
NB: hive 0.13's default is a TSaslProtocol, but
there isn't a golang implementation in apache thrift as
of this writing.
*/
protocol := thrift.NewTBinaryProtocolFactoryDefault()
client := inf.NewTCLIServiceClientFactory(transport, protocol)
s := inf.NewTOpenSessionReq()
s.ClientProtocol = 6
s.Username = &username
s.Password = &password
session, err := client.OpenSession(context.Background(), s)
if err != nil {
return nil, err
}
return &Connection{client, session.SessionHandle, options}, nil
}
func (c *Connection) isOpen() bool {
return c.session != nil
}
// Closes an open hive session. After using this, the
// connection is invalid for other use.
func (c *Connection) Close() error {
if c.isOpen() {
closeReq := inf.NewTCloseSessionReq()
closeReq.SessionHandle = c.session
resp, err := c.thrift.CloseSession(context.Background(), closeReq)
if err != nil {
return fmt.Errorf("Error closing session: ", resp, err)
}
c.session = nil
}
return nil
}
// Issue a query on an open connection, returning a RowSet, which
// can be later used to query the operation's status.
func (c *Connection) Query(query string) (RowSet, error) {
executeReq := inf.NewTExecuteStatementReq()
executeReq.SessionHandle = c.session
executeReq.Statement = query
resp, err := c.thrift.ExecuteStatement(context.Background(), executeReq)
if err != nil {
return nil, fmt.Errorf("Error in ExecuteStatement: %+v, %v", resp, err)
}
if !isSuccessStatus(resp.Status) {
return nil, fmt.Errorf("Error from server: %s", resp.Status.String())
}
return newRowSet(c.thrift, resp.OperationHandle, c.options), nil
}
func (c *Connection) Exec(query string) (*inf.TExecuteStatementResp, error) {
executeReq := inf.NewTExecuteStatementReq()
executeReq.SessionHandle = c.session
executeReq.Statement = query
resp, err := c.thrift.ExecuteStatement(context.Background(), executeReq)
if err != nil {
return nil, fmt.Errorf("Error in ExecuteStatement: %+v, %v", resp, err)
}
if !isSuccessStatus(resp.Status) {
return nil, fmt.Errorf("Error from server: %s", resp.Status.String())
}
return resp, err
}
func isSuccessStatus(p *inf.TStatus) bool {
status := p.GetStatusCode()
return status == inf.TStatusCode_SUCCESS_STATUS || status == inf.TStatusCode_SUCCESS_WITH_INFO_STATUS
}