Skip to content
This repository was archived by the owner on Dec 2, 2019. It is now read-only.

Commit

Permalink
Allow asynchronous query execution, log fetching and query cancellation.
Browse files Browse the repository at this point in the history
Closes dropbox#48
  • Loading branch information
ptallada authored and jingw committed May 28, 2016
1 parent 1193601 commit b9973d1
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 6 deletions.
22 changes: 22 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,28 @@ DB-API
print cursor.fetchone()
print cursor.fetchall()
DB-API (asynchronous)
---------------------
.. code-block:: python
from pyhive import hive
from TCLIService.ttypes import TOperationState
cursor = hive.connect('localhost').cursor()
cursor.execute('SELECT * FROM my_awesome_data LIMIT 10', async=True)
status = cursor.poll().operationState
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE):
logs = cursor.fetch_logs()
for message in logs:
print message
# If needed, an asynchronous query can be cancelled at any time with:
# cursor.cancel()
status = cursor.poll().operationState
print cursor.fetchall()
SQLAlchemy
----------
First install this package to register it with SQLAlchemy (see ``setup.py``).
Expand Down
70 changes: 64 additions & 6 deletions pyhive/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ def commit(self):
"""Hive does not support transactions, so this does nothing."""
pass

def cursor(self):
def cursor(self, *args, **kwargs):
"""Return a new :py:class:`Cursor` object using the connection."""
return Cursor(self)
return Cursor(self, *args, **kwargs)

@property
def client(self):
Expand All @@ -151,9 +151,10 @@ class Cursor(common.DBAPICursor):
visible by other cursors or connections.
"""

def __init__(self, connection):
def __init__(self, connection, arraysize=1000):
self._operationHandle = None
super(Cursor, self).__init__()
self.arraysize = arraysize
self._connection = connection

def _reset_state(self):
Expand Down Expand Up @@ -214,7 +215,7 @@ def close(self):
"""Close the operation handle"""
self._reset_state()

def execute(self, operation, parameters=None):
def execute(self, operation, parameters=None, async=False):
"""Prepare and execute a database operation (query or command).
Return values are not defined.
Expand All @@ -230,12 +231,20 @@ def execute(self, operation, parameters=None):
self._state = self._STATE_RUNNING
_logger.info('%s', sql)

req = ttypes.TExecuteStatementReq(self._connection.sessionHandle, sql.encode('utf-8'))
req = ttypes.TExecuteStatementReq(self._connection.sessionHandle,
sql.encode('utf-8'), runAsync=async)
_logger.debug(req)
response = self._connection.client.ExecuteStatement(req)
_check_status(response)
self._operationHandle = response.operationHandle

def cancel(self):
req = ttypes.TCancelOperationReq(
operationHandle=self._operationHandle,
)
response = self._connection.client.CancelOperation(req)
_check_status(response)

def _fetch_more(self):
"""Send another TFetchResultsReq and update state"""
assert(self._state == self._STATE_RUNNING), "Should be running when in _fetch_more"
Expand All @@ -245,7 +254,7 @@ def _fetch_more(self):
req = ttypes.TFetchResultsReq(
operationHandle=self._operationHandle,
orientation=ttypes.TFetchOrientation.FETCH_NEXT,
maxRows=1000,
maxRows=self.arraysize,
)
response = self._connection.client.FetchResults(req)
_check_status(response)
Expand All @@ -256,6 +265,55 @@ def _fetch_more(self):
for row in response.results.rows:
self._data.append([_unwrap_col_val(val) for val in row.colVals])

def poll(self):
"""Poll for and return the raw status data provided by the Hive Thrift REST API.
:returns: ``ttypes.TGetOperationStatusResp``
:raises: ``ProgrammingError`` when no query has been started
.. note::
This is not a part of DB-API.
"""
if self._state == self._STATE_NONE:
raise ProgrammingError("No query yet")

req = ttypes.TGetOperationStatusReq(
operationHandle=self._operationHandle
)
response = self._connection.client.GetOperationStatus(req)
_check_status(response)

return response

def fetch_logs(self):
"""Retrieve the logs produced by the execution of the query.
Can be called multiple times to fetch the logs produced after the previous call.
:returns: list<str>
:raises: ``ProgrammingError`` when no query has been started
.. note::
This is not a part of DB-API.
"""
if self._state == self._STATE_NONE:
raise ProgrammingError("No query yet")

logs = []
while True:
req = ttypes.TFetchResultsReq(
operationHandle=self._operationHandle,
orientation=ttypes.TFetchOrientation.FETCH_NEXT,
maxRows=self.arraysize,
fetchType=1 # 0: results, 1: logs
)
response = self._connection.client.FetchResults(req)
_check_status(response)

for row in response.results.rows:
assert len(row.colVals) == 1, row.colVals
logs.append(_unwrap_col_val(row.colVals[0]))

if not response.results.rows:
break

return logs


#
# Type Objects and Constructors
Expand Down
28 changes: 28 additions & 0 deletions pyhive/tests/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,34 @@ def test_complex(self, cursor):
'0.1',
]])

@with_cursor
def test_async(self, cursor):
cursor.execute('SELECT * FROM one_row', async=True)
unfinished_states = (
ttypes.TOperationState.INITIALIZED_STATE,
ttypes.TOperationState.RUNNING_STATE,
)
while cursor.poll().operationState in unfinished_states:
cursor.fetch_logs()
assert cursor.poll().operationState == ttypes.TOperationState.FINISHED_STATE

self.assertEqual(len(cursor.fetchall()), 1)

@unittest.skipIf(os.environ.get('CDH') == 'cdh4', "feature doesn't exist?")
@with_cursor
def test_cancel(self, cursor):
# Need to do a JOIN to force a MR job. Without it, Hive optimizes the query to a fetch
# operator and prematurely declares the query done.
cursor.execute(
"SELECT reflect('java.lang.Thread', 'sleep', 1000L * 1000L * 1000L) "
"FROM one_row a JOIN one_row b",
async=True
)
self.assertEqual(cursor.poll().operationState, ttypes.TOperationState.RUNNING_STATE)
cursor.cancel()
self.assertEqual(cursor.poll().operationState, ttypes.TOperationState.CANCELED_STATE)
assert any('Stage' in line for line in cursor.fetch_logs())

def test_noops(self):
"""The DB-API specification requires that certain actions exist, even though they might not
be applicable."""
Expand Down

0 comments on commit b9973d1

Please sign in to comment.