-
Notifications
You must be signed in to change notification settings - Fork 128
(WIP) Quick start guide for developers
Requirements for installing MADlib:
- gcc
- flex (>= 2.5.35)
- bison (>= 2.4) -- (verify this)
- An installed version of Pivotal HAWQ, Pivotal Greenplum Database 4.2+ or PostgreSQL (64-bit) 9.2+ with plpython support enabled. Note: plpython may not be enabled in PostgreSQL by default.
Steps to follow:
- Set environment variables for database. Particularly, set
$PGHOST
,PGDATA
,$PGPORT_94
and$PGDATABASE
- In the project root run the following commands
mkdir build
cd build
cmake ..
make
- Install madlib in the database. In the build folder run
./src/bin/madpack -p postgres -c $USER@$PGHOST:$PGPORT_94/$PGDATABASE install
Let us add a new module called hello_world. Inside this module we implement a UDA, or User-Defined SQL Aggregate function, called avg_var
which computes the mean and variance for a given numerical column of a table. Unlike ordinary UDA of PostgreSQL, avg_var
will also work on a distributed database and take advantage of the underlying distributed network for parallel computations. The usage of avg_var
is very simple -- users simply run the following command in psql
:
select avg_var(bath) from houses
which will print three numbers on the screen, the mean, variance and number of rows in column bath
of table houses
.
Below are the main steps we will go through in this guide:
- register the module
- define the sql functions
- implement the functions in C++
- register the C++ header files
Add the following line to the file called Modules.yml
under ./src/config/
- name: hello_world
and create two folders:
./src/ports/postgres/modules/hello_world
and
./src/modules/hello_world
. The names of the folders need to match the name of the module specified in Modules.yml
.
Create file avg_var.sql_in
under folder ./src/ports/postgres/modules/hello_world
. Inside this file we define the aggregate function and other helper functions for computing mean and variance. The actual implementations of those functions will be in separate C++ files which we will describe in the next section.
At the beginning of file avg_var.sql_in
the command m4_include(
SQLCommon.m4')` is necessary to run the m4 macro processor. M4 is used to add platform-specific commands in the SQL definitions.
We define the aggregate function avg_var
using built-in PostgreSQL command CREATE AGGREGATE
.
DROP AGGREGATE IF EXISTS MADLIB_SCHEMA.avg_var(DOUBLE PRECISION);
CREATE AGGREGATE MADLIB_SCHEMA.avg_var(
DOUBLE PRECISION) (
SFUNC=MADLIB_SCHEMA.avg_var_transition,
STYPE=double precision[],
FINALFUNC=MADLIB_SCHEMA.avg_var_final,
m4_ifdef(`__POSTGRESQL__', `', `PREFUNC=MADLIB_SCHEMA.avg_var_merge_states,')
INITCOND='{0, 0, 0}'
);
We also define parameters passed to CREATE AGGREGATE
:
-
SFUNC
- the name of the state transition function to be called for each input row. The state transition function, or
avg_var_transition
in this example, is defined in the same fileavg_var.sql_in
and implemented later in C++.
- the name of the state transition function to be called for each input row. The state transition function, or
-
FINALFUNC
- the name of the final function called to compute the aggregate's result after all input rows have been traversed. The final function, or
avg_var_final
in this example, is defined in the same fileavg_var.sql_in
and implemented later in C++.
- the name of the final function called to compute the aggregate's result after all input rows have been traversed. The final function, or
-
PREFUNC
- the name of the merge function called to combine the aggregate's state values after each segment, or partition, of data has been traversed. The merge function is needed for distributed datasets on Greenplum and HAWQ. For PostgreSQL, the data is stored locally and the merge function is not necessary. For completeness we implement a merge function called
avg_var_merge_states
in this guide.
- the name of the merge function called to combine the aggregate's state values after each segment, or partition, of data has been traversed. The merge function is needed for distributed datasets on Greenplum and HAWQ. For PostgreSQL, the data is stored locally and the merge function is not necessary. For completeness we implement a merge function called
-
INITCOND
- the initial condition for the state value. In this example it is an all-zero double array corresponding to the values of mean, variance, and the number of rows, respectively.
The transition, merge, and final functions are defined in the same file avg_var.sql_in
as the aggregate function. More details about those functions can be found in the PostgreSQL documentation.
Create the header and the source files, avg_var.hpp
and avg_var.cpp
, under the folder ./src/modules/hello_world
. In the header file we declare the transition, merge and final functions using the macro DECLARE_UDF(MODULE, NAME)
. For example, the transition function avg_var_transition
is declared as DECLARE_UDF(hello_world, avg_var_transition)
. The macro DECLARE_UDF
is defined in the file dbconnector.hpp
under ./src/ports/postgres/dbconnector
.
Under the hood, each of the three UDFs is declared as a subclass of dbconnector::postgres::UDF
. The behavior of those UDFs is solely determined by its member function
AnyType run(AnyType &args);
In other words, we only need to implement the following methods in the avg_var.cpp
file:
AnyType avg_var_transition::run(AnyType& args);
AnyType avg_var_merge_states::run(AnyType& args);
AnyType avg_var_final::run(AnyType& args);
Here the AnyType
class works for both passing data from the DBMS to the C++ function, as well as returning values back from C++. Refer to TypeTraits_impl.hpp
for more details.
AnyType
avg_var_transition::run(AnyType& args) {
// get current state value
AvgVarTransitionState<MutableArrayHandle<double> > state = args[0];
// get current row value
double x = args[1].getAs<double>();
double d_ = (x - state.avg);
// online update mean
state.avg += d_ / static_cast<double>(state.numRows + 1);
double d = (x - state.avg);
double a = static_cast<double>(state.numRows) / static_cast<double>(state.numRows + 1);
// online update variance
state.var = state.var * a + d_ * d / static_cast<double>(state.numRows + 1);
state.numRows ++;
return state;
}
- there are two arguments for
avg_var_transition
, as specified inavg_var.sql_in
. The first one is an array of SQL double type, corresponding to the current mean, variance, and number of rows traversed and the second one is a double representing the current row value. - we will describe class
AvgVarTransitionState
later. Basically it takesargs[0]
, a SQL double array, passes the data to the appropriate C++ types and store them in thestate
instance. - both the mean and the variance are updated in an on-line fashion to avoid accumulating intermediate large sum.
AnyType
avg_var_merge_states::run(AnyType& args) {
AvgVarTransitionState<MutableArrayHandle<double> > stateLeft = args[0];
AvgVarTransitionState<ArrayHandle<double> > stateRight = args[1];
// Merge states together and return
stateLeft += stateRight;
return stateLeft;
}
- again, the arguments contained in
AnyType& args
are defined inavg_var.sql_in
. - the details are hidden in method of class
AvgVarTransitionState
which overloads the operator+=
AnyType
avg_var_final::run(AnyType& args) {
AvgVarTransitionState<MutableArrayHandle<double> > state = args[0];
// If we haven't seen any data, just return Null. This is the standard
// behavior of aggregate function on empty data sets (compare, e.g.,
// how PostgreSQL handles sum or avg on empty inputs)
if (state.numRows == 0)
return Null();
return state;
}
- class
AvgVarTransitionState
overloads theAnyType()
operator such that we can directly return state, an instance ofAvgVarTransitionState
, while the function is expected to return aAnyType
.
Below is the method that overloads the operator +=
for the class AvgVarTransitionState
:
/**
* @brief Merge with another State object
*
* We update mean and variance in a online fashion
* to avoid intermediate large sum.
*/
template <class OtherHandle>
AvgVarTransitionState &operator+=(
const AvgVarTransitionState<OtherHandle> &inOtherState) {
if (mStorage.size() != inOtherState.mStorage.size())
throw std::logic_error("Internal error: Incompatible transition "
"states");
double avg_ = inOtherState.avg;
double var_ = inOtherState.var;
uint16_t numRows_ = static_cast<uint16_t>(inOtherState.numRows);
double totalNumRows = static_cast<double>(numRows + numRows_);
double p = static_cast<double>(numRows) / totalNumRows;
double p_ = static_cast<double>(numRows_) / totalNumRows;
double totalAvg = avg * p + avg_ * p_;
double a = avg - totalAvg;
double a_ = avg_ - totalAvg;
numRows += numRows_;
var = p * var + p_ * var_ + p * a * a + p_ * a_ * a_;
avg = totalAvg;
return *this;
}
- Given the mean, variance and the size of two data sets, Welford’s method, computes the mean and variance of the two data sets combined.
The SQL functions defined in avg_var.sql_in
need to be able to locate the actual implementations from the C++ files. This is done by simply adding the following line to the file declarations.hpp
under ./src/modules/
#include "hello_world/avg_var.hpp"