-
Notifications
You must be signed in to change notification settings - Fork 21
Examples
This page curates examples of usage. These snippets are meant to help illustrate functionality. Feel free to add your own!
This example illustrates how to create an engine against a Teradata database. You will need to have access to a machine with Teradata installed. Currently, the connection to Teradata is done using the DBAPI implementation in the teradata library. This requires you install the Teradata ODBC driver. The driver is available for Linux, Mac, and Windows.
from sqlalchemy import create_engine
td_engine = create_engine('teradata://dbuser:[email protected]')
The engine is the interface to the database. It defers decisions to the TeradataDialect
which provides access to connection objects provided by the DBAPI implementation. The database url provided determines which database to connect against.
SQLAlchemy provides a core expression language that programmatically generates SQL. Applications are typically built on top of the Core Expressions API. Functions such as select
, insert
, delete
are used to generate the sql string. In fact, the SQLAlchemy ORM is one such "application". The ORM provides a Query object that can also be used to query the database.
Select statements are the one of the most common SQL queries. We use the select
function provided by SQLAlchemy to generate a select statement. The result of select
is a Select object that represents a select statement.
- Simple
select
- [The FROM clause] (#FROM-clause)
- The WHERE clause
- LIMIT
- ORDER BY
- GROUP BY
- Scalar subqueries
from sqlalchemy import select
# dept is a Table object
sel_stmt = select([dept])
The select
function is given a list of table objects. In this case, just one is given.
Printing sel_stmt
generates the following:
SELECT department.deptno, department.name, department.location
FROM department
department
is the name of the Department table in the database. Notice that all the columns are printed so this is equivalent to a select * from
type of query.
Selecting certain columns is done as follows:
# Access columns from the Department Table object
sel_stmt = select([Department.name, Department.deptno])
sel_stmt = select([dept.c.name, dept.c.deptno])
s = select([users.c.fullname]).select_from(
users.join(addresses, addresses.c.email_address.like(users.c.name + '%')))
s = select([(Users.name, Department.deptno)]).where(Users.uid == Department.c.user_id)
Using conjunctions can be done in multiple ways. You can use operators which are overloaded when used by SQLAlchemy objects or you can explicitly use the functions and_
, or_
, not_
, etc.
from sqlalchemy import and_, or_
s = select([(users.c.fullname +
", " +
self.addresses.c.email_address).label('titles')]).where(
and_(
users.c.uid == self.addresses.c.user_id,
self.users.c.name.between('m', 'z'),
or_(
addresses.c.email_address.like('%@aol.com'),
addresses.c.email_address.like('%@msn.com')
)
)
)
stmt = select([users.c.name, addresses.c.email_address]).\
select_from(users.join(addresses)).\
limit(1)
# order by asc
stmt = select([users.c.name]).order_by(users.c.name)
# order by desc
stmt = select([users.c.name]).order_by(users.c.name.desc())
# group by
stmt = select([users.c.name, func.count(addresses.c.id)]).\
select_from(users.join(addresses)).\
group_by(users.c.name)
# group by having
stmt = select([users.c.name, func.count(addresses.c.id)]).\
select_from(users.join(addresses)).\
group_by(users.c.name).\
having(func.length(users.c.name) > 4)
stmt = select([users.c.name]).\
where(addresses.c.email_address.contains(users.c.name)).distinct()
stmt = select([func.count(addresses.c.id)]).where(users.c.uid == addresses.c.user_id).as_scalar()
Specifying a list of tables in a list for select
will use the default join on the database
# inserts
stmt =users.insert().values(name=bindparam('_name') + " .. name")
res =conn.execute(stmt, [{'uid': 4, '_name': 'name1'}, {'uid': 5, '_name': 'name2'}, {'uid': 6, '_name': 'name3'}, ])
# updates
#simple update
stmt =users.update().values(fullname="Fullname: " +users.c.name)
# update where
stmt =users.update().where(users.c.name == 'jack').values(name='ed')
# update many with bound params
stmt =users.update().where(users.c.name == bindparam('oldname')).\
values(name=bindparam('newname'))
res = self.conn.execute(stmt, [
{'oldname': 'jack', 'newname': 'ed'},
{'oldname': 'wendy', 'newname': 'mary'},])
# delete
# all rows in table
del_stmt = addresses.delete()
conn.execute(del_stmt)
#subset of rows in table
del_stmt = users.delete().where(users.c.name > 'm')
conn.execute(del_stmt)