Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Database class #97

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

Real-Gecko
Copy link
Contributor

Hey! Check this out: I've created Database class which aggregates module functionality in single place, creating more maintainable code. It renders db_params.py, etl.py and some parts of db_helper_factory.py obsolete. There is still some place for improvement, like refactoring DbHelper class, but for now this is enough I guess.
Module is now easier to use, like:

oracle = Database(dbtype='ORACLE', host="10.1.1.15", port=1521, dbname="ORADB", user="scott")
postgres = Database(dbtype='PG', host="localhost", port=5432, dbname="testing", user="postgres")
select = "select *..."
insert = "insert into ..."
with oracle.connect('ORAPASS'):
    with postgres.connect('PGPASS'):
        rows = oracle.iter_rows(select, row_factory=dict_row_factory, transform=transform)
        postgres.executemany(insert, rows)

@volcan01010
Copy link
Collaborator

Thanks - this is very interesting. I'll think about this and reply properly later.

@Real-Gecko
Copy link
Contributor Author

Check my second commit. I've implemented connection pooling and MySQL DbHelper. There's no backward incompatible changes, so module can still be used as before, but now there's more :D
You can even now work with Database class like this

with oracle:
    with sqlite:
        rows = oracle.iter_rows(select)
        sqlite.executemany(selectlite, rows)

There're some caveats though:

  • Pooled connections do not accept extra parameters such as connect_timeout or client_encoding in case of PostgreSQL. You can still connect with desired parameters by calling connect_directly from Database class, but connection will not be pooled (except for pyodbc I guess.
  • There're still places where code can be improved and cleaned, so I think I'll provide some more patches.

@volcan01010
Copy link
Collaborator

volcan01010 commented Nov 19, 2020

Thanks again for all this. There is a lot of good code in here that would improve etlhelper but there is a lot to understand so it may take some time to incorporate it. @metazool, @ximenesuk, @dvalters and @kerberpolis may have comments, too.


The current merge request covers many different concepts and it may be easier to incorporate if it was divided into smaller requests (MySQL, connection pooling, Database class). Each of these should have tests, too.

MySQL connection

The simplest one is the MySQL DbHelper. We would be very grateful for this. Please have a look at the open merge request for the Informix driver (#25) for an example of what is required. That request stalled because I couldn't get the Docker container working to run the tests. If you know how to set up a MySQL Docker container and can add instructions to CONTRIBUTING.md, that would be excellent.

Note that most of our integration test suite can be run locally with just a PostgreSQL docker container. Only the Oracle and MSSQL Server tests require those databases. We run those inside BGS against our own servers. See CONTRIBUTING.md for details.

Connection pooling

We have an open ticket on connection pooling (#94). The ticket isn't fully defined yet and we haven't set acceptance criteria. We would welcome your thoughts this.

Database class

The code in this merge request represents quite a big change to etlhelper. By handling the connections internally, it abstracts the user another level away from the raw DBAPI and provides a more object-oriented (rather than function-based) interface. On one hand, this is a step further than we had anticipated going - etlhelper does what it needs to already.

On the other hand, this Database class represents the logical next step from when we added the connect method to the DbParams class. Plus the syntax of oracle.get_rows(sql) is really nice.

The Database class, as defined here, repeats a lot of code from etl.py, db_params.py etc. That code is kept in separate modules by design as they apply to different concepts. I would not like to refactor those significantly and certainly not introduce any breaking changes to their interfaces at this point. I wonder if a Database class could be created that reused that code instead of copying/replacing it? A similar approach was taken when connect was added to DbParams - we just import the function and call it on self.

def connect(self, password_variable=None, **kwargs):

I like that you store the password_variable on the class, so that you don't to add it again. That's a good call, but I think it's cleaner to add it separately rather than modify existing functions.

How about something like this?

class Database:
    def __init__(self, **kwargs):
        self.db_params = DbParams(**kwargs)
        self.connection_args = {}
        self._password_variable = None

    @property
    def password_variable(self):
        if not self._password_varible:
            raise SomeException("explain what this is and why you need it")
        return self._password_variable

    @password_variable.setter
    def password_variable(self, value):
        self._password_variable = value

    def connect(self):
        return self.db_params.connect(self.password_variable, **self.connection_args)

    def get_rows(sql, ...):
        with self.connect() as conn:
            return get_rows(sql, conn, ...)

This can be used as:

oracle = Database(dbtype="ORACLE", ...)
oracle.password_variable = "ORACLE_PASSWORD"
oracle.connection_args = {"encoding": "utf8"}

sql = "SELECT ..."
rows  = oracle.get_rows(sql)

This provides similar functionality without having to modify existing code. It keeps things simple by not using persistent connections and also provides a way to get a connection if you want to do something manual with the cursors. etlhelperdeliberately uses the connection once then closes it. For many ETL processes this is fine, but we have started looking at connection pools since we have been using it in APIs.

Passing the kwargs down to connect each time is a bit ugly, though. Maybe they should be stored separately (e.g. self.connection_args)? (Fixed in edited version)

I also don't have a good sense of how iter_rows would work, though. copy_rows could take the target database as an argument and call its executemany. When does the connection close? Do you need to to anything special to handle errors? I can't really tell without trying.


Thanks again for your contributions so far. It is a very nice surprise when these helpful fixes appear out of nowhere. How did you find out about etlhelper and what are you using it for?

@Real-Gecko
Copy link
Contributor Author

How did you find out about etlhelper and what are you using it for?

I'm currently working on a project where I need to periodically transfer data from Oracle to Postgre. Lucky for us we have full control over Postgre server, so obvious choice was Foreign Data Wrapper for Oracle. However it requires manual compilation and installation - luxury we do not always have in our projects. Also project is quite big and a need may arise to integrate with other DB types. So I decided to seek for other ETL solutions, preferably written in Python. Pandas was an option, but it's unable to "upsert" data, I solved the issue with pangres. However performance was not quite good: it worked 5 times slower than FDW solution. So after googling a bit I encountered pyetl which looks interesting, but does not work with Postgres and also ETLHelper, which worked flawlessly and worked 2 times faster than pandas solution (2.5 times slower than FDW) which is I think best we can get while transferring data through Python.

So currently I do not use ETLHelper much, but I may start using it in near future. As there's always need to transfer data from one place to another. That's why I decided to improve it. It's always nice to have ready to use tool which simply does the job.

@Real-Gecko
Copy link
Contributor Author

MySQL connection

That's doable.

@Real-Gecko
Copy link
Contributor Author

Connection pooling

Unfortunately we cannot get much flexibility here. Cause for example in case of psycopg2 *args and **kwargs are passed to the connect() function. This means that even if you connect to the same database, but with different credentials you'll need to create new connection pool. Same stuff happens with other DB types. That's why my current implementation creates pools based on DbParams(host, port etc), it uses connection_string sha256 hash to differentiate between pools and does not support additional arguments. Not the best implementation out there, but it definitely works in case you're constantly connecting and disconnecting connection times reduced dramatically. In my setup connecting to Oracle DB via VPN might take ~0.5 seconds and reconnecting from a pool might take just fractions of milliseconds which is huge difference IMO.

@Real-Gecko
Copy link
Contributor Author

I wonder if a Database class could be created that reused that code instead of copying/replacing it?

Actually that's why Database class was created. I did not want to change existing module code to not break anything and instead provide better interface to access it's functionality.

A similar approach was taken when connect was added to DbParams - we just import the function and call it on self.

Well actually it leads to barely maintainable code. I did not like the way DbParams and password_variable are tossed around from function to function, I wanted to improve it without breaking current functionality and it took me some time to understand what's going on. For example this part:

try:
required_params = DB_HELPER_FACTORY.from_dbtype(self.dbtype).required_params
except ETLHelperHelperError:

Where new DbHelper instantiated just to get it's required_params. Or with already mentioned connect where class method simply calls external function:
def connect(self, password_variable=None, **kwargs):
"""
Return database connection.
:param password_variable: str, name of environment variable with password
:param kwargs: connection specific keyword arguments e.g. row_factory
:return: Connection object
"""
return connect(self, password_variable, **kwargs)

I understand that you try to keep backward compatibility, but sometimes it's better to refactor completely, before project turns into development hell :D

@Real-Gecko
Copy link
Contributor Author

That's a good call, but I think it's cleaner to add it separately rather than modify existing functions.

Yep, there're places that need rework. Now I think that connect actually has to provide unpooled connection just like before, while something like connect_pooled should return connection from pool. It'll make transition from current version of the module to new functionality easier.

@Real-Gecko
Copy link
Contributor Author

I like that you store the password_variable on the class, so that you don't to add it again. That's a good call, but I think it's cleaner to add it separately rather than modify existing functions.

Yep, additional methods for dealing with password_variable can be added, but I also like to have in __init__ much simpler to use. Same goes for connection_args, we can use separate method for setting 'em or allow to pass them to connect method. More options is always better.

@Real-Gecko
Copy link
Contributor Author

When does the connection close?

Actually in current version it never closes unless you use with on DbParams.connect. That's why I implemented separate close method now. And also __enter__ end __exit__ methods which allow to use call:

with oracle:
    do_something()

@Real-Gecko
Copy link
Contributor Author

Overall: as I said there are still places to improve the code, which are not here yet cause I try not to change existing source code at all to avoid breaking anything, while try to reuse parts of it and keep familiar look. This database class is not the final one I think cause I have even more ideas how to improve the module. For example I'm thinking about adding something like Datasource class that'll represent some kind of well data source and abstract away even Database class. This way we'll be able to extend functionality even further, for example "upsert" data into DB from CSV or XML file, take data from JSON based rest service and feed it to SOAP server. The need for such cases arise a lot, especially CSV to DB (Oracle, Postgres) and I always implement some custom solutions for this which is not good. I guess this project just gave me an inspiration to create one solution to fit them all :D

@volcan01010
Copy link
Collaborator

volcan01010 commented Dec 16, 2020

Hi,

I had another chance to look through this. I do like the idea of the Database class providing my_db.fetchall(sql).
Maybe we can add that by the time we get to v2.0.

Our aim with etlhelper is not to build the one solution to fit them all, but just to make it a bit easier to work with databases. If it can be a component of yours, though, that would be great.

I still think the best way to implement a Database would be to re-use the existing functions. An updated implementation (taking into account your suggestions on adding passwords) is below.

This version solves the issue of closing connections: get_rows etc. close the connection before returning, while iter_rows etc. wrap it in a class that closes it when the iterator is exhausted. (The class is based on code that @spenny-liam wrote for a recent project). It would still need more testing.

class Database:
    def __init__(self, password_variable=None, connection_args={}, **kwargs):
        self.password_variable = password_variable
        self.connection_args = connection_args
        self.db_params = DbParams(**kwargs)
    
    @classmethod
    def from_db_params(cls, db_params, password_variable=None, connection_args={}):
        return cls(password_variable=password, connection_args=connection_args, **db_params)

    def connect(self):
        return self.db_params.connect(self.password_variable, **self.connection_args)

    def get_rows(sql, ...):
        with self.connect() as conn:
            rows = get_rows(sql, conn, ...)
        return rows
    
    def iter_rows(sql, ...):
        try:
            conn = self.connect()
            iterator = iter_rows(sql, conn, ...)
            # Connection will be closed when iterator is exhausted
            rows = ConnectionClosingIterator(iterator, conn)
        finally:
            # close connection if something goes wrong while preparing iterator
            conn.close()

        return rows


class ConnectionClosingIterator():
    # This class-level variable is shared by all instances
    _active_connections = set()

    def __init__(self, iterator, conn):
        self.iterator = iterator

        if conn in self._active_connections:
            # If same connection is used by two iterators then the
            # second will break when the first closes the connection
            raise Exception("Can't use same connection for two iterators")

        self.conn = conn
        self._active_connections.add(conn)
    
    def __iter__(self):
        try:
            yield from self.iterator
        finally:
            self.conn.close()
            self._active_connections.remove(self.conn)
    
    def __next__(self):
        try:
            return next(self.iterator)
        except StopIteration:
            self.conn.close()
            self._active_connections.remove(self.conn)
            raise

For connection pooling, I'd like to add the incrementally, too. If we can get it working outside a Database class, it will be easy to add it in afterwards. With an implementation based on the above, connecting to a database as a different user would count as a different database and so would have a pool of its own.

    def __init__(self, ...):
        ...
        self.session = None

    def start_session(self, **kwargs):
        self.session = ...

    def end_session(self, **kwargs):
        self.session.close(...)

    def connect(self):
        if self.session:
            conn = self.db_params.session_connect(self.session)
        else:
            self.db_params.connect(self.password_variable, **self.connection_args)
        return conn

It is likely to be next year before we can do significant work on this, but I'm making the notes here while they are fresh in my mind, particularly @spenny-liam's iterator that closes the connection.

@volcan01010 volcan01010 mentioned this pull request Oct 11, 2021
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants