Skip to content

Commit

Permalink
Rename shared storage to shared objects and change according to comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Fanoid committed May 12, 2023
1 parent 0885794 commit 3a1f57a
Show file tree
Hide file tree
Showing 21 changed files with 372 additions and 330 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.ml.common.sharedstorage;
package org.apache.flink.ml.common.sharedobjects;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
Expand Down Expand Up @@ -56,12 +56,12 @@
import java.util.Objects;
import java.util.Optional;

/** Base class for the shared storage wrapper operators. */
abstract class AbstractSharedStorageWrapperOperator<T, S extends StreamOperator<T>>
/** Base class for the shared objects wrapper operators. */
abstract class AbstractSharedObjectsWrapperOperator<T, S extends StreamOperator<T>>
implements StreamOperator<T>, IterationListener<T>, CheckpointedStreamOperator {

private static final Logger LOG =
LoggerFactory.getLogger(AbstractSharedStorageWrapperOperator.class);
LoggerFactory.getLogger(AbstractSharedObjectsWrapperOperator.class);

protected final StreamOperatorParameters<T> parameters;

Expand All @@ -72,18 +72,18 @@ abstract class AbstractSharedStorageWrapperOperator<T, S extends StreamOperator<
protected final Output<StreamRecord<T>> output;

protected final StreamOperatorFactory<T> operatorFactory;
private final SharedStorageContextImpl context;
private final SharedObjectsContextImpl context;
protected final OperatorMetricGroup metrics;
protected final S wrappedOperator;
protected transient StreamOperatorStateHandler stateHandler;

protected transient InternalTimeServiceManager<?> timeServiceManager;

@SuppressWarnings({"unchecked", "rawtypes"})
AbstractSharedStorageWrapperOperator(
AbstractSharedObjectsWrapperOperator(
StreamOperatorParameters<T> parameters,
StreamOperatorFactory<T> operatorFactory,
SharedStorageContextImpl context) {
SharedObjectsContextImpl context) {
this.parameters = Objects.requireNonNull(parameters);
this.streamConfig = Objects.requireNonNull(parameters.getStreamConfig());
this.containingTask = Objects.requireNonNull(parameters.getContainingTask());
Expand All @@ -101,11 +101,11 @@ abstract class AbstractSharedStorageWrapperOperator<T, S extends StreamOperator<
parameters.getOperatorEventDispatcher())
.f0;
Preconditions.checkArgument(
wrappedOperator instanceof SharedStorageStreamOperator,
wrappedOperator instanceof SharedObjectsStreamOperator,
String.format(
"The wrapped operator is not an instance of %s.",
SharedStorageStreamOperator.class.getSimpleName()));
((SharedStorageStreamOperator) wrappedOperator).onSharedStorageContextSet(context);
SharedObjectsStreamOperator.class.getSimpleName()));
((SharedObjectsStreamOperator) wrappedOperator).onSharedObjectsContextSet(context);
}

private OperatorMetricGroup createOperatorMetricGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
* limitations under the License.
*/

package org.apache.flink.ml.common.sharedstorage;
package org.apache.flink.ml.common.sharedobjects;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.util.Preconditions;

import java.io.Serializable;

Expand All @@ -32,27 +33,29 @@
public class ItemDescriptor<T> implements Serializable {

/** Name of the item. */
public String key;
public final String name;

/** Type serializer. */
public TypeSerializer<T> serializer;
public final TypeSerializer<T> serializer;

/** Initialize value. */
public T initVal;
public final T initVal;

private ItemDescriptor(String key, TypeSerializer<T> serializer, T initVal) {
this.key = key;
private ItemDescriptor(String name, TypeSerializer<T> serializer, T initVal) {
Preconditions.checkNotNull(
initVal, "Cannot use `null` as the initial value of a shared item.");
this.name = name;
this.serializer = serializer;
this.initVal = initVal;
}

public static <T> ItemDescriptor<T> of(String key, TypeSerializer<T> serializer, T initVal) {
return new ItemDescriptor<>(key, serializer, initVal);
public static <T> ItemDescriptor<T> of(String name, TypeSerializer<T> serializer, T initVal) {
return new ItemDescriptor<>(name, serializer, initVal);
}

@Override
public int hashCode() {
return key.hashCode();
return name.hashCode();
}

@Override
Expand All @@ -64,12 +67,12 @@ public boolean equals(Object o) {
return false;
}
ItemDescriptor<?> that = (ItemDescriptor<?>) o;
return key.equals(that.key);
return name.equals(that.name);
}

@Override
public String toString() {
return String.format(
"ItemDescriptor{key='%s', serializer=%s, initVal=%s}", key, serializer, initVal);
"ItemDescriptor{name='%s', serializer=%s, initVal=%s}", name, serializer, initVal);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.ml.common.sharedstorage;
package org.apache.flink.ml.common.sharedobjects;

import org.apache.flink.iteration.operator.OperatorUtils;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
Expand All @@ -29,14 +29,14 @@
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

/** Wrapper for {@link OneInputStreamOperator}. */
class OneInputSharedStorageWrapperOperator<IN, OUT>
extends AbstractSharedStorageWrapperOperator<OUT, OneInputStreamOperator<IN, OUT>>
class OneInputSharedObjectsWrapperOperator<IN, OUT>
extends AbstractSharedObjectsWrapperOperator<OUT, OneInputStreamOperator<IN, OUT>>
implements OneInputStreamOperator<IN, OUT>, BoundedOneInput {

OneInputSharedStorageWrapperOperator(
OneInputSharedObjectsWrapperOperator(
StreamOperatorParameters<OUT> parameters,
StreamOperatorFactory<OUT> operatorFactory,
SharedStorageContextImpl context) {
SharedObjectsContextImpl context) {
super(parameters, operatorFactory, context);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
* limitations under the License.
*/

package org.apache.flink.ml.common.sharedstorage;
package org.apache.flink.ml.common.sharedobjects;

import org.apache.flink.util.AbstractID;

/** ID of a shared storage. */
class StorageID extends AbstractID {
/** ID of a pool for shared objects. */
class PoolID extends AbstractID {
private static final long serialVersionUID = 1L;

public StorageID(byte[] bytes) {
public PoolID(byte[] bytes) {
super(bytes);
}

public StorageID() {}
public PoolID() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,76 +16,75 @@
* limitations under the License.
*/

package org.apache.flink.ml.common.sharedstorage;
package org.apache.flink.ml.common.sharedobjects;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;

import java.io.Serializable;
import java.util.List;
import java.util.Map;

/**
* The builder of the subgraph that will be executed with a common shared storage. Users can only
* The builder of the subgraph that will be executed with a common shared objects. Users can only
* create data streams from {@code inputs}. Users can not refer to data streams outside, and can not
* add sources/sinks.
*
* <p>The shared storage body requires all streams accessing the shared storage, i.e., {@link
* SharedStorageBodyResult#accessors} have same parallelism and can be co-located.
* <p>The shared objects body requires all transformations accessing the shared objects, i.e.,
* {@link SharedObjectsBodyResult#coLocatedTransformations}, to have same parallelism and can be
* co-located.
*/
@Experimental
@FunctionalInterface
public interface SharedStorageBody extends Serializable {
public interface SharedObjectsBody extends Serializable {

/**
* This method creates the subgraph for the shared storage body.
* This method creates the subgraph for the shared objects body.
*
* @param inputs Input data streams.
* @return Result of the subgraph, including output data streams, data streams with access to
* the shared storage, and a mapping from share items to their owners.
* the shared objects, and a mapping from share items to their owners.
*/
SharedStorageBodyResult process(List<DataStream<?>> inputs);
SharedObjectsBodyResult process(List<DataStream<?>> inputs);

/**
* The result of a {@link SharedStorageBody}, including output data streams, data streams with
* access to the shared storage, and a mapping from descriptors of share items to their owners.
* The result of a {@link SharedObjectsBody}, including output data streams, data streams with
* access to the shared objects, and a mapping from descriptors of share items to their owners.
*/
@Experimental
class SharedStorageBodyResult {
class SharedObjectsBodyResult {
/** A list of output streams. */
private final List<DataStream<?>> outputs;

/**
* A list of data streams which access to the shared storage. All data streams in the list
* should implement {@link SharedStorageStreamOperator}.
*/
private final List<DataStream<?>> accessors;
/** A list of {@link Transformation}s that should be co-located. */
private final List<Transformation<?>> coLocatedTransformations;

/**
* A mapping from descriptors of shared items to their owners. The owner is specified by
* {@link SharedStorageStreamOperator#getSharedStorageAccessorID()}, which must be kept
* unchanged for an instance of {@link SharedStorageStreamOperator}.
* {@link SharedObjectsStreamOperator#getSharedObjectsAccessorID()}, which must be kept
* unchanged for an instance of {@link SharedObjectsStreamOperator}.
*/
private final Map<ItemDescriptor<?>, SharedStorageStreamOperator> ownerMap;
private final Map<ItemDescriptor<?>, SharedObjectsStreamOperator> ownerMap;

public SharedStorageBodyResult(
public SharedObjectsBodyResult(
List<DataStream<?>> outputs,
List<DataStream<?>> accessors,
Map<ItemDescriptor<?>, SharedStorageStreamOperator> ownerMap) {
List<Transformation<?>> coLocatedTransformations,
Map<ItemDescriptor<?>, SharedObjectsStreamOperator> ownerMap) {
this.outputs = outputs;
this.accessors = accessors;
this.coLocatedTransformations = coLocatedTransformations;
this.ownerMap = ownerMap;
}

public List<DataStream<?>> getOutputs() {
return outputs;
}

public List<DataStream<?>> getAccessors() {
return accessors;
public List<Transformation<?>> getCoLocatedTransformations() {
return coLocatedTransformations;
}

public Map<ItemDescriptor<?>, SharedStorageStreamOperator> getOwnerMap() {
public Map<ItemDescriptor<?>, SharedObjectsStreamOperator> getOwnerMap() {
return ownerMap;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@
* limitations under the License.
*/

package org.apache.flink.ml.common.sharedstorage;
package org.apache.flink.ml.common.sharedobjects;

import org.apache.flink.annotation.Experimental;
import org.apache.flink.util.function.BiConsumerWithException;

/**
* Context for shared storage. Every operator implementing {@link SharedStorageStreamOperator} will
* have an instance of this context set by {@link
* SharedStorageStreamOperator#onSharedStorageContextSet} in runtime. User defined logic can be
* Context for shared objects. Every operator implementing {@link SharedObjectsStreamOperator} will
* get an instance of this context set by {@link
* SharedObjectsStreamOperator#onSharedObjectsContextSet} in runtime. User-defined logic can be
* invoked through {@link #invoke} with the access to shared items.
*/
@Experimental
public interface SharedStorageContext {
public interface SharedObjectsContext {

/**
* Invoke user defined function with provided getters/setters of the shared storage.
* Invoke user defined function with provided getters/setters of the shared objects.
*
* @param func User defined function where share items can be accessed through getters/setters.
* @throws Exception Possible exception.
Expand Down
Loading

0 comments on commit 3a1f57a

Please sign in to comment.