-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-31010] Add Transformer and Estimator for GBTClassifier and GBTRegressor #210
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Left some comments below.
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/gbtclassifier/GBTClassifier.java
Outdated
Show resolved
Hide resolved
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/GBTModelParams.java
Outdated
Show resolved
Hide resolved
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/GBTModelParams.java
Outdated
Show resolved
Hide resolved
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/GBTModelParams.java
Outdated
Show resolved
Hide resolved
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/loss/LogLoss.java
Outdated
Show resolved
Hide resolved
flink-ml-lib/src/main/java/org/apache/flink/ml/classification/gbtclassifier/GBTClassifier.java
Outdated
Show resolved
Hide resolved
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/GBTRunner.java
Outdated
Show resolved
Hide resolved
flink-ml-lib/src/main/java/org/apache/flink/ml/common/param/HasMinInfoGain.java
Outdated
Show resolved
Hide resolved
...-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/datastorage/IterationSharedStorage.java
Outdated
Show resolved
Hide resolved
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/defs/Split.java
Show resolved
Hide resolved
Hi, @lindong28 , thanks for your valuable comments. I've update the PR based on comments and offline discussions. Please take a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update! Left some comments below.
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/BaseGBTParams.java
Outdated
Show resolved
Hide resolved
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/BaseGBTParams.java
Show resolved
Hide resolved
flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorageBody.java
Outdated
Show resolved
Hide resolved
private transient SharedStorageContext sharedStorageContext; | ||
|
||
public CalcLocalSplitsOperator() { | ||
sharedStorageAccessorID = getClass().getSimpleName() + "-" + UUID.randomUUID(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be simpler and more reliable to use StreamOperator#getOperatorID
as the accessor ID?
Operators in a given operator graph is guaranteed to have different operatorIDs. A given operator is guaranteed to have the same operatorID after the job is restarted as long as the job graph is the same. And users can manually specify operatorID for operators in a job so that the operatorID will be the same even if the job graph is changed.
If we can re-use the operatorID as the accessorID, maybe we can remove the method SharedStorageStreamOperator#getSharedStorageAccessorID
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I've tried StreamOperator#getOperatorID
before.
However, the Operator ID cannot be obtained before execution. Then, we are unable to specify the owner map of share data items when building graph. Without the owner map, it is difficult to control access in runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that there is a way to remove sharedStorageAccessorID and still pass all tests. We can discuss the code change offline.
...ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorageContextImpl.java
Outdated
Show resolved
Hide resolved
flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorage.java
Outdated
Show resolved
Hide resolved
flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorage.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
static class Reader<T> { | ||
protected final Tuple3<StorageID, Integer, String> t; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a name more readable than t
(e.g. itemId
)?
Preconditions.checkState(owners.get(t).equals(ownerId)); | ||
} | ||
|
||
void set(T value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be a bit more consistent with the existing ListStateWithCache#update
to name this method update(...)
?
...ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/SharedStorageContextImpl.java
Outdated
Show resolved
Hide resolved
flink-ml-core/src/main/java/org/apache/flink/ml/common/sharedstorage/ItemDescriptor.java
Outdated
Show resolved
Hide resolved
OperatorStateUtils.getUniqueElement(histBuilderState, HIST_BUILDER_STATE_NAME) | ||
.orElse(null); | ||
|
||
sharedStorageContext.initializeState(this, getRuntimeContext(), context); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of passing the ownerMap
to SharedStorageContextImpl
and use ownerMap
to determine the ItemDescriptor
owned by this operator, would it be more straightforward to have this operator pass to this method the list of ItemDescriptor
owned by it directly?
Then we should be able to simplify the code by removing e.g. SharedStorageStreamOperator#getSharedStorageAccessorID
and SharedStorageContextImpl#setOwnerMap
.
/** Default implementation of {@link SharedStorageContext} using {@link SharedStorage}. */ | ||
@SuppressWarnings("rawtypes") | ||
class SharedStorageContextImpl implements SharedStorageContext, Serializable { | ||
private final StorageID storageID; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of keeping the storageID
here, an alternative approach is generate the storageId once during graph building phase and passes it to all operators. Then each operator can pass the storageID
to initializeState
.
The storageId can be pass to operators via either the constructor or the setGlobalJobParameters/getGlobalJobParameters of ExecutionConfig.
If we can do this, we might be able to simplify the code and remove e.g. SharedStorageWrapper
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. I left some comments about the SharedObjects
infra here.
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/GBTRunner.java
Outdated
Show resolved
Hide resolved
flink-ml-lib/src/main/java/org/apache/flink/ml/common/gbt/GBTRunner.java
Outdated
Show resolved
Hide resolved
package org.apache.flink.ml.common.sharedobjects; | ||
|
||
/** Interface for all operators that need to access the shared objects. */ | ||
public interface SharedObjectsStreamOperator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand, this PR tries to provide an infrastructure for sharing objects among multiple Flink operators, through java static variables.
To achieve this, it empoys one specific Flink operator for each sharing object as the writer and others operators as the reader. Based on this, the GBDT implementation relies on the Flink events to guarantee the read/write order of each object.
However, can you explain some other machine learning algorithms that would use SharedObjects
in the future? And is there a general way that developers can guarantee the order of read/writes is correct? If a reader of an object changes the value of that object, does it still follows the assumption of SharedObjects
?
There is another possible solution [1] that we put all the computation logic into one operator (i.e., WorkerOperator) and all the computation logic into another operator (i.e., ServerOperator). In this case, we would not need shared objects anymore. Let's have a thorough comparison between these two options.
[1] #237
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your valuable comments. You mentioned several issues in your comments, and I will answer them one by one.
Q: "can you explain some other machine learning algorithms that would use SharedObjects in the future?"
One common pattern where SharedObjects can be used is same datasets are needed in operators before and after a reduce
operator. Here I list some algorithms (correct me if I made a mistake):
‒ All distributed algorithms based on decision trees: both model and training data are required during the nodes splitting after reducing the intermediate data.
‒ Second-order gradient optimizer, e.g. Newton, L-BFGS: require the use of old gradient data after the reduce operation.
‒ Algorithms that have two rounds of (unmergeable) reduce
operation in each iteration: GBDT, ALS, GMM, and LDA.
‒ Evaluation metrics are calculated every few rounds of iterations: evaluation metrics has to be calculated with reduce
after model is updated.
Q: "is there a general way that developers can guarantee the order of read/writes is correct?"
A simple approach is to connect the reader and writer with an addition dummy stream. Readers read the data only after receiving elements from the stream.
This approach assumes the reads and writers are not interleaved which should be true in most algorithms. If not (please give some examples), multi-threading techniques like atomics can be used.
Q: "If a reader of an object changes the value of that object, does it still follows the assumption of SharedObjects?"
I think this is a common issue in Java, and one solution is to give readers a deep clone of the object, which can be expensive. Balancing efficiency and safety, I chose efficiency.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#237 proposes a brilliant solution about abstracting iterative computations inspired by Parameter Server. I believe both solutions can work well in many algorithms/scenarios.
Before comparing two solutions, there are two facts about PS infra I must emphasize:
One fact is the PS infra is built on the DataStream APIs, which means there will be no performance improvement compared to implement with raw DataStream APIs. So we mainly discuss its usability with aspect to developers.
The other fact is the current status of functionalities shown in #237 cannot fully meet the requirements of GBDT implementation. MessageType
, model format, reduce logic of messages, etc. are all fixed/hard-coded with respect to gradient-based algorithms. The usability will drop significantly if forcing GBDT implementation to use current APIs.
Therefore, to make a reasonable comparison between two solutions, I assume an extended version of current PS infra which supports POJO message types and POJO model data, user-defined reduce
function, etc. Here are my thoughts under this assumption:
- Framework Intrusiveness
Using PS infra means developers cannot use DataStream APIs in iterations anymore. Then, there are cases where PS infra cannot implement:
- side outputs: evaluation result streams; prediction and model streams in online cases.
- partition/join/coGroup of training data sets: AUC calculation after model update, ALS, SimRank.
As for SharedObjects, it is an augment to DataStream APIs. There is no extra limitation to developers.
The intrusiveness also influences the observation of operators when job running as, in PS infra, multiple computations are merged in to one operator, like in/out stats, checkpoint status. This decreases usability to both developers and end-users.
- Applicable scenarios
Besides inapplicable cases mentioned above, PS infra cannot work in non-iteration cases. But SharedObjects can work. One possible case is to improve consecutive joins with a same dataset by reducing a copy of dataset.
- Learning curve
PS infra provides a whole set of concepts and interfaces such as Message, ModelUpdater, ProcessStage, TrainingUtils, etc., which are not related to the existing DataStream API and have a steeper learning curve.
SharedObjects provides two interfaces, SharedObjectsUtils and SharedObjectsContext, and can be developed directly based on the existing DataStream API code, making it easier for developers to accept.
Overall speaking, I think both solutions can coexist because they are on different levels of APIs and have no conflicts. How about you? @zhipeng93
What is the purpose of the change
Add Transformer and Estimator for GBTClassifier and GBTRegressor.
Details about features compared to SparkML's implementation are as follows:
maxMemoryInMB
,cacheNodeIds
, andcheckpointInterval
.[1] https://xgboost.readthedocs.io/en/stable/tutorials/model.html#the-structure-score
Brief change log
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: noDocumentation