KvStore - Store and Sync¶
Introduction¶
KvStore
is the in-memory key-value
datastore module of Open/R. It provides a
self-contained storage which aims for eventual consistency across the whole
network. Underlying implementation is based on conflict-free replicated data
type (CRDT). The stores are inter-connected in a mesh, and synchronize their
contents in an eventually consistent fashion. The store is used to disseminate a
set of key-value pairs to all nodes in the network/cluster. For example, a node
may post information to its local store about its adjacent neighbors under a key
adj:<node-name>
and this information will propagate to all other stores in the
network, under the same key name.
To limit the boundary of flooding domain, KvStore
has multiple KvStoreDb
instances spawned based on the number of AREA
configured when initialized.
Each KvStoreDb
will do sync/update individually with its counterpart of peer
node. For AREA
concept, see Area.md
for more details.
Inter Module Communication¶
[Producer] ReplicateQueue<Publication>
: propagatethrift::Publication
and kvStoreSynced signal to local subscribers( i.e.Decision
) for any delta update it notices. Those updates can come from either locally(e.g. prefix change fromPrefixManager
or adjacency change fromLinkMonitor
) or remotely.[Producer] ReplicateQueue<KvStoreSyncEvent>
: publishKvStoreSyncEvent
toLinkMonitor
to indicate progress of initial full-sync between node and its peers.[Consumer] RQueue<thrift::PeerUpdateRequest>
: receive PEER SPEC information fromLinkMonitor
to know how to establish TCP connection with peers over Thrift channel.[Consumer] RQueue<KeyValueRequest>
: receive requests fromLinkMonitor
andPrefixManager
to set and clear key-values inKvStore
. Used for key-values originated by local node.
Operations¶
Typical workflow at a high level will be:
KvStore
spawns one or multipleKvStoreDb
based on number of AREAs configured in the system upon initialization. Then it waits for peer update.Peer UP:
KvStore
receives PEER SPEC info(i.e. TCP port, link-local address etc.) and tries to establish TCP connection to fulfill initial sync of database. See later section for detail of state transition.Regarding to parallel adjacency case,
LinkMonitor
will help manage all of the complexity and make it transparent toKvStore
.
Peer DOWN:
KvStore
receives DOWN signal and close established TCP session. Clean up all data-structures for this peer and stop periodic syncing.
Deep Dive¶
KvStore Public APIs¶
KvStore
supports multiple types of messages exchanged between peers over TCP.
Message exchange is fulfilled via public API and invoked between thrift client
and server. Some examples are:
/*
* @params: area => single areaId to get K-V pairs from
* thrift::KeyGetParams => parameters to get specific K-V pairs
* @return: thrift::Publication
*/
folly::SemiFuture<std::unique_ptr<thrift::Publication>>
semifuture_getKvStoreKeyVals(std::string area, thrift::KeyGetParams keyGetParams)
/*
* @params: area => single areaId to set K-V pairs
* thrift::KeySetParams => parameters to set specific K-V pairs
* @return: folly::Unit
*/
folly::SemiFuture<folly::Unit>
semifuture_setKvStoreKeyVals(std::string area, thrift::KeySetParams keySetParams)
/*
* @params: area => single areaId to set K-V pairs
* thrift::KeySetParams => parameters to set specific K-V pairs
* @return: thrift::SetKvStoreKeyVals
*/
folly::SemiFuture<thrift::SetKvStoreKeyVals>
semifuture_setKvStoreKeyValues(std::string area, thrift::KeySetParams keySetParams)
/*
* @params: area => single areaId to set K-V pairs
* thrift::KeyDumpParams => parameters to dump ALL K-V pairs
* @return: thrift::Publication WITHOUT `value`(serialized binary data)
*/
folly::SemiFuture<std::unique_ptr<std::vector<thrift::Publication>>>
semifuture_dumpKvStoreHashes(std::string area, thrift::KeyDumpParams keyDumpParams);
/*
* @params: selectAreas => set of areas to dump keys from
* thrift::KeySetParams => parameters to set K-V pairs
* @return: thrift::Publication
*/
folly::SemiFuture<std::unique_ptr<std::vector<thrift::Publication>>>
semifuture_dumpKvStoreKeys(thrift::KeyDumpParams keyDumpParams,
std::set<std::string> selectAreas = {});
/*
* @params: area => area to dump self-originated K-V pairs from
* @return: SelfOriginatedKeyVals => map of K-V pairs
*/
folly::SemiFuture<std::unique_ptr<SelfOriginatedKeyVals>>
semifuture_dumpKvStoreSelfOriginatedKeys(std::string area);
/*
* @params: area => area to dump KvStore peers from
* @return: thrift::PeersMap => map of peers KvStore is subscribed to
*/
folly::SemiFuture<std::unique_ptr<thrift::PeersMap>>
semifuture_getKvStorePeers(std::string area);
/*
* @params: area => area to add/update KvStore peers to
* @return: folly::Unit
*/
folly::SemiFuture<folly::Unit> semifuture_addUpdateKvStorePeers(
std::string area, thrift::PeersMap peersToAdd);
/*
* @params: area => area to delete KvStore peers from
* @return: folly::Unit
*/
folly::SemiFuture<folly::Unit> semifuture_deleteKvStorePeers(
std::string area, std::vector<std::string> peersToDel);
/*
* @params: selectAreas => set of areas to retrieve area summary
* @return: thrift::KvStoreAreaSummary => counters, key-vals, peers, etc.
*/
folly::SemiFuture<std::unique_ptr<std::vector<thrift::KvStoreAreaSummary>>>
semifuture_getKvStoreAreaSummaryInternal(
std::set<std::string> selectAreas = {});
/*
* @return: std::map<std::string, int64_t> => num_keys, num_peers, etc.
*/
folly::SemiFuture<std::map<std::string, int64_t>> semifuture_getCounters();
/*
* @return: messaging::RQueue<Publication> => read-only interface for KvStore
* updates queue
*/
messaging::RQueue<Publication> getKvStoreUpdatesReader();
/*
* @params: area => area to retrieve current KvStore peer state from
* peerName => name of peer for which to fetch current peer state
* @return: thrift::KvStorePeerState
*/
folly::SemiFuture<std::optional<thrift::KvStorePeerState>>
semifuture_getKvStorePeerState(
std::string const& area, std::string const& peerName);
For more information about KvStore
parameters, check out
KvStore Sync via Thrift¶
There are three types of sychronization message exchanged inside KvStore
:
Initial Full Sync
Incremental Updates, aka, flooding
Finalized Full Sync
Initial Full Sync - Finite State Machine (FSM)¶
Intitial full sync with a peer is performed when it is added to the local store.
KvStore
leverages FSM to track and update initial db full-sync with individual
peers. FSM ensures the clean state representation of peer and makes this
event-driven syncing easy to track and control.
KvStorePeerState
- IDLE => fresh and not yet connected(default)
- SYNCING => synchronizing with peer
- INITIALIZED => initial 3-way sync done
KvStorePeerEvent
- PEER_ADD => new peer_spec received
- PEER_DEL => peer is removed
- SYNC_RESP_RCVD => initial db full-sync response received
- THRIFT_API_ERROR => error/timeout/failure for initial sync
Incremental Updates - Flooding Update¶
All incremental changes in local KvStore are published as thrift::Publication
messages containing changes. All received incremental changes are processed and
applied locally and conditionally forwarded. The forwarding is done via thrift
client-server channel. Whenevenr an update is received, it is applied locally
and then forwarded to all neighbors. An update is ignored when it is echoed back
to avoid infinite loop of control packet. This is done via originatorId
.
Finalized Full Sync - Part of 3 way sync¶
No matter a syncing request comes from either side of two peers, KvStore
will
make sure both sides reach eventual consistency. So it implements finalized
full-sync mechanism to notify missing keys from peer’s perspective.
For example, let’s say we have K-V with format of (key, value, version):
NodeA has: (k0, a, 1), (k1, a, 1), (k2, a, 2), (k3, a, 1)
NodeB has: (k1, a, 1), (k2, b, 1), (k3, b, 2), (k4, b, 1)
Two cases:
A initiates a full-sync with B:
A requests to get full dump K-V database of B by providing hashes of local store;
B replies all of its K-V pairs to A with: thriftPub.keyVals: k3(B has higher version), k4(A doesn’t have) thriftPub.toBeUpdatedKeys: k0(B doesn’t have), k2(A has higher version);
A noticed B needs to be updated with k0 and k2. Send back with finalized full-sync;
B initiates a full-sync with A:
Similar logic to follow 3-way sync;
Implementation Details¶
Loop detection¶
To avoid blind flooding, the KV store implements loop detection logic similar to BGP. We assign every KV store an “originatorId” which is unique in the system. When a store receives a message, it checks the originatorId, and if it matches then flooding stops. This allows one to design efficient flooding topologies and avoid excessive message duplication in the mesh.
Data Encoding¶
One prominent feature is that all values are opaquely encoded as Thrift objects using client’s choice of protocol (though this is not strictly required). The data-store itself does not care about the value contents, it only needs to be told if two values are different, when it propagates them. At the same time, on the client side, this approach removes the burden of protocol encoding/decoding by using our standard Thrift libs.
Versioning of K-V pairs¶
We implement very simple versions for merge conflict resolution. Every key has a 64-bit version value, which is compared to the incoming update message. Only if the incoming version is greater will we update the local store and flood the original update message to our subscribers (peers). Notice that we’ll preserve the original version in the flooded message so that other folks can compare their versions with the original submission.
Delete Operation¶
KvStore only supports Add
or Update
operations. Implementing Delete
operation in CRDT is non-trivial and not well defined in eventual consistent
fashion. For all practical purposes, Delete
operation can be acheived via
optional time-to-live
aka ttl
field, which indicates the lifetime of
key-value.
Time-To-Live(ttl)¶
When advertising a key-value, set the ttl
to a specified amount of time and
submit to the local store. On update, the local store will flood it to all other
stores in the network. Periodically every store will scan locally stored keys
and decrement the ttl with the elapsed time. If the ttl
drops below 0
, the
key is removed from the local store. Since every node does the same operation,
the key is deleted from all stores. When key-dump is requested, then an updated
ttl is sent (received - elapsed time) reflecting the remaining lifetime of a
key-value since it’s origination.
ttl updates¶
In order to keep key-values with limited lifetime persisted for long duration,
originators emit ttl updates
with new ttl values. On receipt of a ttl update
(with higher version), the ttl of a key in local store is updated and the ttl
update is flooded to its neighbors.
KvStore
provides two different ways for ttl updates
:
Self-originated keys with link-state information. These
ttl updates
are completely managed by KvStore.Non-self-originated keys (e.g. key-vals injected from user for global view and eventual consistency). User is responsible for refreshing
ttl updates
periodically and updating ttlVersion on their own.
Key Expiry Notifications¶
Whenever keys are expired in a given KvStore, the notification is generated and published via thrift channel. All subscribers can take appropriate action to handle an expired key (for e.g. Decision removes adj/prefix DB of nodes). These notifications are ignored by other KvStores as they will be generating the very same notifications by themselves.
Self-originated key-values¶
All link-state protocol related key-values originated by the local node are sent
to KvStore
via key-value requests. There are 3 key-value request types:
Set
stores the key-value and adds it to a cache used to generatettl updates
to maintain the key-value inKvStore
for longer durations.Persist
does the same asSet
while also ensuring that the key-value submitted doesn’t get overriden by another node.Clear
removes from the cache, which endsttl updates
and allows the key to expire.
KvStore
stores key-values with newer versions to ensure new or updated
self-originated key-values make it to the network and are NOT overriden by
other nodes.
KvStoreClientInternal¶
KvStore
is core and a heavily used module in Open/R. Interacting with
KvStore
involves sending and receiving proper thrift objects on sockets. This
was leading to a lot of complexity in the code. KvStoreClientInternal
is added
to address this concern. It provides APIs to subscribe/unsubscribe to key-value
changes within KvStore
. A client uses these APIs to call a callback function
when the value associated with a key changes.