Skip to content

Latest commit

 

History

History
522 lines (419 loc) · 22 KB

File metadata and controls

522 lines (419 loc) · 22 KB

State(1): 状态的实现(上)

2020/10/26

注:源代码为Flink1.11.0版本

相关概念

While many operations in a dataflow simply look at one individual event at a time (for example an event parser), some operations remember information across multiple events (for example window operators). These operations are called stateful. [1]

当一个算子需要处理多个事件、并需要记住之前处理过的事件的结果时,被称为有状态的算子。在Flink中,状态除了记录过去计算的结果,还是进行容错和故障恢复的关键要素。

Keyed vs Non-keyed

在Flink中,数据流可以分为keyed数据流和non-keyed数据流,其区别在于:keyed数据流将数据进行了逻辑分片,从逻辑上每个key对应的partition只包含该key的数据流。当一个算子的计算资源变化时(DataStream#rescale、DataStream#rebalance等,见Physical Partitioning),keyed数据流能够保证一个key对应的partition中的数据全都分配到同一个task slot中。因此,keyed数据流在进行资源重分配时,其状态能够随着key一同被迁移到新的task slot中;而non-keyed数据流原本在运行时,上游算子就不知道会发送到哪个下游partition中,因此在重分配状态时,难以进行状态的迁移、合并和拆分(只有重放计算才能保证一定正确)。因此,keyed数据流可以使用Keyed State进行元素级的状态更新(支持所有的State类型),而non-keyed数据流只能依赖算子级的状态Operator State来管理状态(只支持List、Union、Broadcast三种)。

Keyed State

状态的初始化

从获取状态的代码中可以看到,在获取状态时,实际是通过KeyedStateStore#getxxState方法获取状态。KeyedStateStoreStreamOperatorStateHandler初始化,初始化过程位于AbstractStreamOperator#initializeState

// AbstractStreamOperator.class第237行
public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception {

	final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader());

	final StreamTask<?, ?> containingTask =
		Preconditions.checkNotNull(getContainingTask());
	final CloseableRegistry streamTaskCloseableRegistry =
		Preconditions.checkNotNull(containingTask.getCancelables());

	final StreamOperatorStateContext context =
		streamTaskStateManager.streamOperatorStateContext(
			getOperatorID(),
			getClass().getSimpleName(),
			getProcessingTimeService(),
			this,
			keySerializer,
			streamTaskCloseableRegistry,
			metrics);

	stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), streamTaskCloseableRegistry);
	timeServiceManager = context.internalTimerServiceManager();
	stateHandler.initializeOperatorState(this);
	runtimeContext.setKeyedStateStore(stateHandler.getKeyedStateStore().orElse(null));
}

StreamOperatorStateHandler在构造时,会初始化算子状态后端operatorStateBackend(上面Keyed vs Non-keyed节中提到的Operator State的实际状态存储部分,会在下一章介绍)、keyed状态后端keyedStateBackend和keyed状态存储keyedStateStore(对non-keyed数据流,keyedStateBackendnull,所以keyedStateStore也是null):

// StreamOperatorStateHandler.class第77行
public StreamOperatorStateHandler(
		StreamOperatorStateContext context,
		ExecutionConfig executionConfig,
		CloseableRegistry closeableRegistry) {
	this.context = context;
	operatorStateBackend = context.operatorStateBackend();
	keyedStateBackend = context.keyedStateBackend();
	this.closeableRegistry = closeableRegistry;

	if (keyedStateBackend != null) {
		keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, executionConfig);
	}
	else {
		keyedStateStore = null;
	}
}

keyedStateBackend是实际状态存储的位置,根据配置可以分为Heap后端和Ttl后端(Ttl状态后端在下一节介绍);keyedStateStore被注册到runtimeContext中,负责在计算方法中提供状态的创建和获取。

获取的状态和创建

要在算子中管理计算状态,算子的计算方法需要实现RichFunction接口,RichFunction提供了获取运行时上下文的方法getRuntimeContext()

// RichFunction.class
public interface RichFunction extends Function {

	void open(Configuration parameters) throws Exception;

	void close() throws Exception;

	RuntimeContext getRuntimeContext();

	IterationRuntimeContext getIterationRuntimeContext();

	void setRuntimeContext(RuntimeContext t);
}

常用的如RichMapFunction等类名带有Rich的UDF类都是RichFunction接口的实现,注意ProcessFunction类和KeyedProcessFunction类也是一个RichFunction。这些UDF可以通过AbstractRichFunction#getRuntimeContext获取运行时上下文StreamingRuntimeContext,然后从中获取任意一种类型的状态。其实际获取状态的调用栈如下:

  • StreamingRuntimeContext#getState
  • DefaultKeyedStateStore#getState
  • DefaultKeyedStateStore#getPartitionedState
  • AbstractKeyedStateBackend#getPartitionedState(HeapKeyedStateBackend#getPartitionedState)
// AbstractKeyedStateBackend.class第308行
public <N, S extends State> S getPartitionedState(
		final N namespace,
		final TypeSerializer<N> namespaceSerializer,
		final StateDescriptor<S, ?> stateDescriptor) throws Exception {

	checkNotNull(namespace, "Namespace");

	if (lastName != null && lastName.equals(stateDescriptor.getName())) {
		lastState.setCurrentNamespace(namespace);
		return (S) lastState;
	}

	InternalKvState<K, ?, ?> previous = keyValueStatesByName.get(stateDescriptor.getName());
	if (previous != null) {
		lastState = previous;
		lastState.setCurrentNamespace(namespace);
		lastName = stateDescriptor.getName();
		return (S) previous;
	}

	final S state = getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
	final InternalKvState<K, N, ?> kvState = (InternalKvState<K, N, ?>) state;

	lastName = stateDescriptor.getName();
	lastState = kvState;
	kvState.setCurrentNamespace(namespace);

	return state;
}

AbstractKeyedStateBackend#getPartitionedState中做了大量的缓存工作,首先查询上一次获取状态时的返回值lastState,然后查询历史记录中所有获取过的状态keyValueStatesByName(是一个以<name,state>为键值对的map)。当两个缓存中都没有查询到所需的状态,则调用AbstractKeyedStateBackend#getOrCreateKeyedState创建一个新的状态(这个方法由于会被StreamOperatorStateHandler直接调用,其内部又做了一次缓存的检查):

// AbstractKeyedStateBackend.class第267行
@Override
@SuppressWarnings("unchecked")
public <N, S extends State, V> S getOrCreateKeyedState(
		final TypeSerializer<N> namespaceSerializer,
		StateDescriptor<S, V> stateDescriptor) throws Exception {
	checkNotNull(namespaceSerializer, "Namespace serializer");
	checkNotNull(keySerializer, "State key serializer has not been configured in the config. " +
			"This operation cannot use partitioned state.");

	InternalKvState<K, ?, ?> kvState = keyValueStatesByName.get(stateDescriptor.getName());
	if (kvState == null) {
		if (!stateDescriptor.isSerializerInitialized()) {
			stateDescriptor.initializeSerializerUnlessSet(executionConfig);
		}
		kvState = TtlStateFactory.createStateAndWrapWithTtlIfEnabled(
			namespaceSerializer, stateDescriptor, this, ttlTimeProvider);
		keyValueStatesByName.put(stateDescriptor.getName(), kvState);
		publishQueryableStateIfEnabled(stateDescriptor, kvState);
	}
	return (S) kvState;
}

状态是通过TtlStateFactory#createStateAndWrapWithTtlIfEnabled创建得到的,并在创建完成后,新创建的状态会加入keyed状态后端的缓存中。其中参数this对应的是AbstractKeyedStateBackend的实现类HeapKeyedStateBackend实例。当不启用Ttl时,实际创建状态的过程位于HeapKeyedStateBackend#createInternalState方法:

// HeapKeyedStateBackend.class第263行
@Override
@Nonnull
public <N, SV, SEV, S extends State, IS extends S> IS createInternalState(
	@Nonnull TypeSerializer<N> namespaceSerializer,
	@Nonnull StateDescriptor<S, SV> stateDesc,
	@Nonnull StateSnapshotTransformFactory<SEV> snapshotTransformFactory) throws Exception {
	StateFactory stateFactory = STATE_FACTORIES.get(stateDesc.getClass());
	if (stateFactory == null) {
		String message = String.format("State %s is not supported by %s",
			stateDesc.getClass(), this.getClass());
		throw new FlinkRuntimeException(message);
	}
	StateTable<K, N, SV> stateTable = tryRegisterStateTable(
		namespaceSerializer, stateDesc, getStateSnapshotTransformFactory(stateDesc, snapshotTransformFactory));
	return stateFactory.createState(stateDesc, stateTable, getKeySerializer());
}

在创建一个新的状态实例前,向状态表中注册一个新的状态,其主要作用是用于快照,这部分将在后续介绍snapshot、checkpoint等相关机制时介绍。需要注意的是,在调用栈中向AbstractKeyedStateBackend#getPartitionedState中传入的参数为VoidNamespace.INSTANCEVoidNamespaceSerializer.INSTANCEstateDescriptor,这就意味着keyedState是没有namespace的(对比windowState,其namespace标识了窗口)。

Time-To-Live

TtlState是HeapState的一个装饰器,提供了根据时间使状态过期的功能(目前只有Processing Time)。TTL的具体使用方法在官方文档Working With State中的State Time-To-Live (TTL)章节能够看到,在这里就不复述了。TtlState实际上是用TtlValue对HeapState保存的值进行了一次装饰,添加了timestamp属性:

// TtlValue.class
public class TtlValue<T> implements Serializable {
	private static final long serialVersionUID = 5221129704201125020L;

	@Nullable
	private final T userValue;
	private final long lastAccessTimestamp;

	public TtlValue(@Nullable T userValue, long lastAccessTimestamp) {
		this.userValue = userValue;
		this.lastAccessTimestamp = lastAccessTimestamp;
	}

	@Nullable
	public T getUserValue() {
		return userValue;
	}

	public long getLastAccessTimestamp() {
		return lastAccessTimestamp;
	}
}

可以看到lastAccessTimestamp是不可变的,每次更新TTL时,都会创建一个新的TtlValue替换原有的状态值。

TTL状态

TtlState与HeapState创建过程的分支点位于TtlStateFactory#createStateAndWrapWithTtlIfEnabled处:当设置了Ttl后,这一方法返回了TtlStateFactory#createState的结果:

// TtlStateFactory.class第119行
private IS createState() throws Exception {
	SupplierWithException<IS, Exception> stateFactory = stateFactories.get(stateDesc.getClass());
	if (stateFactory == null) {
		String message = String.format("State %s is not supported by %s",
			stateDesc.getClass(), TtlStateFactory.class);
		throw new FlinkRuntimeException(message);
	}
	IS state = stateFactory.get();
	if (incrementalCleanup != null) {
		incrementalCleanup.setTtlState((AbstractTtlState<K, N, ?, TTLSV, ?>) state);
	}
	return state;
}

其中stateFactory.get()即对应状态类型的create方法,如ValueState对应createValueState()方法:

// TtlStateFactory.class第134行
private IS createValueState() throws Exception {
	ValueStateDescriptor<TtlValue<SV>> ttlDescriptor = new ValueStateDescriptor<>(
		stateDesc.getName(), new TtlSerializer<>(LongSerializer.INSTANCE, stateDesc.getSerializer()));
	return (IS) new TtlValueState<>(createTtlStateContext(ttlDescriptor));
}

创建一个TtlState的构造参数是TtlStateContext

private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V>
	createTtlStateContext(StateDescriptor<TTLS, TTLV> ttlDescriptor) throws Exception {

	ttlDescriptor.enableTimeToLive(stateDesc.getTtlConfig()); // also used by RocksDB backend for TTL compaction filter config
	OIS originalState = (OIS) stateBackend.createInternalState(
		namespaceSerializer, ttlDescriptor, getSnapshotTransformFactory());
	return new TtlStateContext<>(
		originalState, ttlConfig, timeProvider, (TypeSerializer<V>) stateDesc.getSerializer(),
		registerTtlIncrementalCleanupCallback((InternalKvState<?, ?, ?>) originalState));
}

TtlStateContext提供了TtlState所需的所有信息:需要装饰的原始HeapState、Ttl配置信息、时间供应器(实际上就是System::currentTimeMillis,目前只支持Processing Time)、状态描述的序列化器以及TTL增量清理回调方法。清理回调方法只在获取状态和更新状态前异步执行,对状态提供的功能而言是透明的。此处以ValueState为例,TtlValueState由3层结构组成,分别是提供基本TTL功能的AbstractTtlDecorator、提供State基本功能的AbstractTtlState(namespace、serializer等)、以及提供状态获取、更新功能的TtlValueState

TtlValueState
// TtlValueState.class
class TtlValueState<K, N, T>
	extends AbstractTtlState<K, N, T, TtlValue<T>, InternalValueState<K, N, TtlValue<T>>>
	implements InternalValueState<K, N, T> {
	TtlValueState(TtlStateContext<InternalValueState<K, N, TtlValue<T>>, T> tTtlStateContext) {
		super(tTtlStateContext);
	}

	@Override
	public T value() throws IOException {
		accessCallback.run();
		return getWithTtlCheckAndUpdate(original::value, original::update);
	}

	@Override
	public void update(T value) throws IOException {
		accessCallback.run();
		original.update(wrapWithTs(value));
	}

	@Nullable
	@Override
	public TtlValue<T> getUnexpiredOrNull(@Nonnull TtlValue<T> ttlValue) {
		return expired(ttlValue) ? null : ttlValue;
	}
}
AbstractTtlState
// AbstractTtlState.class
abstract class AbstractTtlState<K, N, SV, TTLSV, S extends InternalKvState<K, N, TTLSV>>
	extends AbstractTtlDecorator<S>
	implements InternalKvState<K, N, SV> {
	private final TypeSerializer<SV> valueSerializer;

	/** This registered callback is to be called whenever state is accessed for read or write. */
	final Runnable accessCallback;

	AbstractTtlState(TtlStateContext<S, SV> ttlStateContext) {
		super(ttlStateContext.original, ttlStateContext.config, ttlStateContext.timeProvider);
		this.valueSerializer = ttlStateContext.valueSerializer;
		this.accessCallback = ttlStateContext.accessCallback;
	}

	<SE extends Throwable, CE extends Throwable, T> T getWithTtlCheckAndUpdate(
		SupplierWithException<TtlValue<T>, SE> getter,
		ThrowingConsumer<TtlValue<T>, CE> updater) throws SE, CE {
		return getWithTtlCheckAndUpdate(getter, updater, original::clear);
	}

	@Override
	public TypeSerializer<K> getKeySerializer() {
		return original.getKeySerializer();
	}

	@Override
	public TypeSerializer<N> getNamespaceSerializer() {
		return original.getNamespaceSerializer();
	}

	@Override
	public TypeSerializer<SV> getValueSerializer() {
		return valueSerializer;
	}

	@Override
	public void setCurrentNamespace(N namespace) {
		original.setCurrentNamespace(namespace);
	}

	@Override
	public byte[] getSerializedValue(
		byte[] serializedKeyAndNamespace,
		TypeSerializer<K> safeKeySerializer,
		TypeSerializer<N> safeNamespaceSerializer,
		TypeSerializer<SV> safeValueSerializer) {
		throw new FlinkRuntimeException("Queryable state is not currently supported with TTL.");
	}

	@Override
	public void clear() {
		original.clear();
		accessCallback.run();
	}

	/**
	 * Check if state has expired or not and update it if it has partially expired.
	 *
	 * @return either non expired (possibly updated) state or null if the state has expired.
	 */
	@Nullable
	public abstract TTLSV getUnexpiredOrNull(@Nonnull TTLSV ttlValue);

	@Override
	public StateIncrementalVisitor<K, N, SV> getStateIncrementalVisitor(int recommendedMaxNumberOfReturnedRecords) {
		throw new UnsupportedOperationException();
	}
}
AbstractTtlDecorator
abstract class AbstractTtlDecorator<T> {
	/** Wrapped original state handler. */
	final T original;

	final StateTtlConfig config;

	final TtlTimeProvider timeProvider;

	/** Whether to renew expiration timestamp on state read access. */
	final boolean updateTsOnRead;

	/** Whether to renew expiration timestamp on state read access. */
	final boolean returnExpired;

	/** State value time to live in milliseconds. */
	final long ttl;

	AbstractTtlDecorator(
		T original,
		StateTtlConfig config,
		TtlTimeProvider timeProvider) {
		Preconditions.checkNotNull(original);
		Preconditions.checkNotNull(config);
		Preconditions.checkNotNull(timeProvider);
		this.original = original;
		this.config = config;
		this.timeProvider = timeProvider;
		this.updateTsOnRead = config.getUpdateType() == StateTtlConfig.UpdateType.OnReadAndWrite;
		this.returnExpired = config.getStateVisibility() == StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp;
		this.ttl = config.getTtl().toMilliseconds();
	}

	<V> V getUnexpired(TtlValue<V> ttlValue) {
		return ttlValue == null || (!returnExpired && expired(ttlValue)) ? null : ttlValue.getUserValue();
	}

	<V> boolean expired(TtlValue<V> ttlValue) {
		return TtlUtils.expired(ttlValue, ttl, timeProvider);
	}

	<V> TtlValue<V> wrapWithTs(V value) {
		return TtlUtils.wrapWithTs(value, timeProvider.currentTimestamp());
	}

	<V> TtlValue<V> rewrapWithNewTs(TtlValue<V> ttlValue) {
		return wrapWithTs(ttlValue.getUserValue());
	}

	<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> V getWithTtlCheckAndUpdate(
		SupplierWithException<TtlValue<V>, SE> getter,
		ThrowingConsumer<TtlValue<V>, CE> updater,
		ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
		TtlValue<V> ttlValue = getWrappedWithTtlCheckAndUpdate(getter, updater, stateClear);
		return ttlValue == null ? null : ttlValue.getUserValue();
	}

	<SE extends Throwable, CE extends Throwable, CLE extends Throwable, V> TtlValue<V> getWrappedWithTtlCheckAndUpdate(
		SupplierWithException<TtlValue<V>, SE> getter,
		ThrowingConsumer<TtlValue<V>, CE> updater,
		ThrowingRunnable<CLE> stateClear) throws SE, CE, CLE {
		TtlValue<V> ttlValue = getter.get();
		if (ttlValue == null) {
			return null;
		} else if (expired(ttlValue)) {
			stateClear.run();
			if (!returnExpired) {
				return null;
			}
		} else if (updateTsOnRead) {
			updater.accept(rewrapWithNewTs(ttlValue));
		}
		return ttlValue;
	}
}

获取状态最终通过AbstractTtlDecorator#getWrappedWithTtlCheckAndUpdate方法返回状态值。其中可以看到,只要状态过期,就会执行HeapValueState#clear方法清除状态的值(如同官方文档中对TTL状态的描述“在获取状态时清除过期状态”),然后根据TTL设置的ReturnExpiredIfNotCleanedUp决定是否要返回该过期值。如果设置了OnReadAndWrite的TTL刷新策略,则会使用当前时间戳创建一个新的TtlValue并更新到其装饰的HeapValueState中。

其他的TtlState的处理方式和TtlValueState基本一致:TtlReducingStateTtlAggregatingState是丢弃整个过期的结果(本身状态只保存了reduce和aggregate的结果);TtlListState是遍历状态中的列表,并将未过期的值重新组成一个新的列表,代替原来的状态列表;TtlMapState直接将过期的<key,value>丢弃。

增量清理

增量清理,即通过设置TtlConfig.BuildercleanupIncrementally(int,boolean)方法增加的后台状态清理功能。增量清理的实现位于TtlIncrementalCleanup类中,由TtlStateFactory#registerTtlIncrementalCleanupCallback构造回调函数,在每次访问状态和更新状态时执行。

// TtlStateFactory.class第210行
private Runnable registerTtlIncrementalCleanupCallback(InternalKvState<?, ?, ?> originalState) {
	StateTtlConfig.IncrementalCleanupStrategy config =
		ttlConfig.getCleanupStrategies().getIncrementalCleanupStrategy();
	boolean cleanupConfigured = config != null && incrementalCleanup != null;
	boolean isCleanupActive = cleanupConfigured &&
		isStateIteratorSupported(originalState, incrementalCleanup.getCleanupSize());
	Runnable callback = isCleanupActive ? incrementalCleanup::stateAccessed : () -> { };
	if (isCleanupActive && config.runCleanupForEveryRecord()) {
		stateBackend.registerKeySelectionListener(stub -> callback.run());
	}
	return callback;
}

// TtlIncrementalCleanup.class第55行
void stateAccessed() {
	initIteratorIfNot();
	try {
		runCleanup();
	} catch (Throwable t) {
		throw new FlinkRuntimeException("Failed to incrementally clean up state with TTL", t);
	}
}

private void initIteratorIfNot() {
	if (stateIterator == null || !stateIterator.hasNext()) {
		stateIterator = ttlState.original.getStateIncrementalVisitor(cleanupSize);
	}
}

private void runCleanup() {
	int entryNum = 0;
	Collection<StateEntry<K, N, S>> nextEntries;
	while (
		entryNum < cleanupSize &&
		stateIterator.hasNext() &&
		!(nextEntries = stateIterator.nextEntries()).isEmpty()) {

		for (StateEntry<K, N, S> state : nextEntries) {
			S cleanState = ttlState.getUnexpiredOrNull(state.getState());
			if (cleanState == null) {
				stateIterator.remove(state);
			} else if (cleanState != state.getState()) {
				stateIterator.update(state, cleanState);
			}
		}

		entryNum += nextEntries.size();
	}
}

TtlStateFactory在注册增量清理回调函数时,将stateAccessed方法注册为清理方法:首先从状态的stateTable中获取至多cleanupSize个状态,然后逐一检查这些状态是否过期,并直接将过期的状态从tateTable中移除。

参考文献

  1. Stateful Stream Processing
  2. Streaming 102: The world beyond batch
  3. 流式计算系统系列(4):状态