博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink的PartitionableListState
阅读量:7250 次
发布时间:2019-06-29

本文共 12496 字,大约阅读时间需要 41 分钟。

本文主要研究一下flink的PartitionableListState

PartitionableListState

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java

/**     * Implementation of operator list state.     *     * @param  the type of an operator state partition.     */    static final class PartitionableListState implements ListState {        /**         * Meta information of the state, including state name, assignment mode, and serializer         */        private RegisteredOperatorStateBackendMetaInfo stateMetaInfo;        /**         * The internal list the holds the elements of the state         */        private final ArrayList internalList;        /**         * A serializer that allows to perform deep copies of internalList         */        private final ArrayListSerializer internalListCopySerializer; PartitionableListState(RegisteredOperatorStateBackendMetaInfo stateMetaInfo) { this(stateMetaInfo, new ArrayList()); } private PartitionableListState( RegisteredOperatorStateBackendMetaInfo stateMetaInfo, ArrayList internalList) { this.stateMetaInfo = Preconditions.checkNotNull(stateMetaInfo); this.internalList = Preconditions.checkNotNull(internalList); this.internalListCopySerializer = new ArrayListSerializer<>(stateMetaInfo.getPartitionStateSerializer()); } private PartitionableListState(PartitionableListState toCopy) { this(toCopy.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList)); } public void setStateMetaInfo(RegisteredOperatorStateBackendMetaInfo stateMetaInfo) { this.stateMetaInfo = stateMetaInfo; } public RegisteredOperatorStateBackendMetaInfo getStateMetaInfo() { return stateMetaInfo; } public PartitionableListState deepCopy() { return new PartitionableListState<>(this); } @Override public void clear() { internalList.clear(); } @Override public Iterable get() { return internalList; } @Override public void add(S value) { Preconditions.checkNotNull(value, "You cannot add null to a ListState."); internalList.add(value); } @Override public String toString() { return "PartitionableListState{" + "stateMetaInfo=" + stateMetaInfo + ", internalList=" + internalList + '}'; } public long[] write(FSDataOutputStream out) throws IOException { long[] partitionOffsets = new long[internalList.size()]; DataOutputView dov = new DataOutputViewStreamWrapper(out); for (int i = 0; i < internalList.size(); ++i) { S element = internalList.get(i); partitionOffsets[i] = out.getPos(); getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov); } return partitionOffsets; } @Override public void update(List values) { internalList.clear(); addAll(values); } @Override public void addAll(List values) { if (values != null && !values.isEmpty()) { internalList.addAll(values); } } }
  • PartitionableListState是DefaultOperatorStateBackend使用的ListState实现,其内部使用的是ArrayList(internalList)来存储state,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo;其write方法将internalList的数据序列化到FSDataOutputStream,并返回每个记录对应的offset数组(partitionOffsets)

ListState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/ListState.java

/** * {@link State} interface for partitioned list state in Operations. * The state is accessed and modified by user functions, and checkpointed consistently * by the system as part of the distributed snapshots. * * 

The state is only accessible by functions applied on a {@code KeyedStream}. The key is * automatically supplied by the system, so the function always sees the value mapped to the * key of the current element. That way, the system can handle stream and state partitioning * consistently together. * * @param

Type of values that this list state keeps. */@PublicEvolvingpublic interface ListState
extends MergingState
> { /** * Updates the operator state accessible by {@link #get()} by updating existing values to * to the given list of values. The next time {@link #get()} is called (for the same state * partition) the returned state will represent the updated list. * *

If null or an empty list is passed in, the state value will be null. * * @param values The new values for the state. * * @throws Exception The method may forward exception thrown internally (by I/O or functions). */ void update(List

values) throws Exception; /** * Updates the operator state accessible by {@link #get()} by adding the given values * to existing list of values. The next time {@link #get()} is called (for the same state * partition) the returned state will represent the updated list. * *

If null or an empty list is passed in, the state value remains unchanged. * * @param values The new values to be added to the state. * * @throws Exception The method may forward exception thrown internally (by I/O or functions). */ void addAll(List

values) throws Exception;}

  • ListState主要用于operation存储partitioned list state,它继承了MergingState接口(指定OUT的泛型为Iterable<T>),同时声明了两个方法;其中update用于全量更新state,如果参数为null或者empty,那么state会被清空;addAll方法用于增量更新,如果参数为null或者empty,则保持不变,否则则新增给定的values

MergingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/MergingState.java

/** * Extension of {@link AppendingState} that allows merging of state. That is, two instances * of {@link MergingState} can be combined into a single instance that contains all the * information of the two merged states. * * @param 
Type of the value that can be added to the state. * @param
Type of the value that can be retrieved from the state. */@PublicEvolvingpublic interface MergingState
extends AppendingState
{ }
  • MergingState接口仅仅是继承了AppendingState接口,用接口命名表示该state支持state合并

AppendingState

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/AppendingState.java

/** * Base interface for partitioned state that supports adding elements and inspecting the current * state. Elements can either be kept in a buffer (list-like) or aggregated into one value. * * 

The state is accessed and modified by user functions, and checkpointed consistently * by the system as part of the distributed snapshots. * *

The state is only accessible by functions applied on a {@code KeyedStream}. The key is * automatically supplied by the system, so the function always sees the value mapped to the * key of the current element. That way, the system can handle stream and state partitioning * consistently together. * * @param

Type of the value that can be added to the state. * @param
Type of the value that can be retrieved from the state. */@PublicEvolvingpublic interface AppendingState
extends State { /** * Returns the current value for the state. When the state is not * partitioned the returned value is the same for all inputs in a given * operator instance. If state partitioning is applied, the value returned * depends on the current operator input, as the operator maintains an * independent state for each partition. * *

NOTE TO IMPLEMENTERS: if the state is empty, then this method * should return {@code null}. * * @return The operator state value corresponding to the current input or {@code null} * if the state is empty. * * @throws Exception Thrown if the system cannot access the state. */ OUT get() throws Exception; /** * Updates the operator state accessible by {@link #get()} by adding the given value * to the list of values. The next time {@link #get()} is called (for the same state * partition) the returned state will represent the updated list. * *

If null is passed in, the state value will remain unchanged. * * @param value The new value for the state. * * @throws Exception Thrown if the system cannot access the state. */ void add(IN value) throws Exception;}

  • AppendingState是partitioned state的基本接口,它继承了State接口,同时声明了get、add两个方法;get方法用于返回当前state的值,如果为空则返回null;add方法用于给state添加值

State

flink-core-1.7.0-sources.jar!/org/apache/flink/api/common/state/State.java

/** * Interface that different types of partitioned state must implement. * * 

The state is only accessible by functions applied on a {@code KeyedStream}. The key is * automatically supplied by the system, so the function always sees the value mapped to the * key of the current element. That way, the system can handle stream and state partitioning * consistently together. */@PublicEvolvingpublic interface State { /** * Removes the value mapped under the current key. */ void clear();}

  • State接口定义了所有不同partitioned state实现必须实现的方法,这里定义了clear方法用于清空当前state的所有值

RegisteredOperatorStateBackendMetaInfo

flink-runtime_2.11-1.7.0-sources.jar!/org/apache/flink/runtime/state/RegisteredOperatorStateBackendMetaInfo.java

/** * Compound meta information for a registered state in an operator state backend. * This contains the state name, assignment mode, and state partition serializer. * * @param  Type of the state. */public class RegisteredOperatorStateBackendMetaInfo extends RegisteredStateMetaInfoBase {    /**     * The mode how elements in this state are assigned to tasks during restore     */    @Nonnull    private final OperatorStateHandle.Mode assignmentMode;    /**     * The type serializer for the elements in the state list     */    @Nonnull    private final TypeSerializer partitionStateSerializer;    public RegisteredOperatorStateBackendMetaInfo(            @Nonnull String name,            @Nonnull TypeSerializer partitionStateSerializer,            @Nonnull OperatorStateHandle.Mode assignmentMode) {        super(name);        this.partitionStateSerializer = partitionStateSerializer;        this.assignmentMode = assignmentMode;    }    private RegisteredOperatorStateBackendMetaInfo(@Nonnull RegisteredOperatorStateBackendMetaInfo copy) {        this(            Preconditions.checkNotNull(copy).name,            copy.partitionStateSerializer.duplicate(),            copy.assignmentMode);    }    @SuppressWarnings("unchecked")    public RegisteredOperatorStateBackendMetaInfo(@Nonnull StateMetaInfoSnapshot snapshot) {        this(            snapshot.getName(),            (TypeSerializer) Preconditions.checkNotNull( snapshot.restoreTypeSerializer(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER)), OperatorStateHandle.Mode.valueOf( snapshot.getOption(StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE))); Preconditions.checkState(StateMetaInfoSnapshot.BackendStateType.OPERATOR == snapshot.getBackendStateType()); } /** * Creates a deep copy of the itself. */ @Nonnull public RegisteredOperatorStateBackendMetaInfo deepCopy() { return new RegisteredOperatorStateBackendMetaInfo<>(this); } @Nonnull @Override public StateMetaInfoSnapshot snapshot() { return computeSnapshot(); } //...... @Nonnull private StateMetaInfoSnapshot computeSnapshot() { Map
optionsMap = Collections.singletonMap( StateMetaInfoSnapshot.CommonOptionsKeys.OPERATOR_STATE_DISTRIBUTION_MODE.toString(), assignmentMode.toString()); String valueSerializerKey = StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER.toString(); Map
> serializerMap = Collections.singletonMap(valueSerializerKey, partitionStateSerializer.duplicate()); Map
> serializerConfigSnapshotsMap = Collections.singletonMap(valueSerializerKey, partitionStateSerializer.snapshotConfiguration()); return new StateMetaInfoSnapshot( name, StateMetaInfoSnapshot.BackendStateType.OPERATOR, optionsMap, serializerConfigSnapshotsMap, serializerMap); }}
  • RegisteredOperatorStateBackendMetaInfo继承了抽象类RegisteredStateMetaInfoBase,实现了snapshot的抽象方法,这里是通过computeSnapshot方法来实现;computeSnapshot方法主要是构造StateMetaInfoSnapshot所需的optionsMap、serializerConfigSnapshotsMap、serializerMap

小结

  • flink的manageed operator state仅仅支持ListState,DefaultOperatorStateBackend使用的ListState实现是PartitionableListState,其内部使用的是ArrayList(internalList)来存储state,而stateMetaInfo使用的是RegisteredOperatorStateBackendMetaInfo
  • PartitionableListState实现了ListState接口(update、addAll方法);而ListState接口继承了MergingState接口(指定OUT的泛型为Iterable<T>);MergingState接口没有声明其他方法,它继承了AppendingState接口;AppendingState接口继承了State接口,同时声明了get、add方法;State接口则定义了clear方法
  • RegisteredOperatorStateBackendMetaInfo继承了抽象类RegisteredStateMetaInfoBase,实现了snapshot的抽象方法,这里是通过computeSnapshot方法来实现;computeSnapshot方法主要是构造StateMetaInfoSnapshot所需的optionsMap、serializerConfigSnapshotsMap、serializerMap

doc

转载地址:http://clhbm.baihongyu.com/

你可能感兴趣的文章
Linux时间同步服务
查看>>
Python基础-----列表、元组、集合(2)
查看>>
iptables详解
查看>>
Redisson官方文档 - 12. 独立节点模式
查看>>
AD域笔记
查看>>
HTTP协议详解
查看>>
apache实现多端囗多域名配置
查看>>
Linux命令(15):type命令
查看>>
第一单元作业
查看>>
Azure云端部署Exchange 2016双数据中心—Part6(DAG切换测试)
查看>>
通过ansible部署高可用LNAMMKP架构
查看>>
IBM Aix系统添加硬盘步骤
查看>>
“esxcli software vib” commands to patch an ESXi 5.x/6.x host (2008939)
查看>>
heartbeat管理与虚拟IP介绍
查看>>
Syslog-ng+Rsyslog收集日志:RELP可靠传输,替代UDP、TCP(五)
查看>>
课程第八天内容《基础交换八》补充案例
查看>>
ionic 之 基本布局
查看>>
nginx开启目录浏览
查看>>
32位Linux设置超大Oracle SGA的分析
查看>>
const 的用法总结
查看>>