reindexer

Replication and cluster mode

Reindexer supports async logical leader-follower replication and sync RAFT-cluster. Each node has to have unique server id, set via #config namespace or web interface. Maximum number of nodes in any replication setup is limited by server id value (max value is 999).

Table of contents:

Write ahead log (WAL)

The write-ahead log is a combination of row updates and statement execution. WAL is stored as part of namespace storage. Each WAL record has a unique 64-bit log sequence number (LSN), which also contains the server id. WAL overhead is 18 bytes of RAM per row update record. WAL is ring structure, therefore after N updates (4M by default) are recorded, the oldest WAL records will be automatically removed. WAL in storage contains only records with statements (bulk updates, deletes and index structure changes). Rows updates are not stored as dedicated WAL records, but each document contains its own LSN - this is enough to restore complete WAL in RAM on namespace loading from disk.

Side effect of this mechanic is lack of exact sequence of indexes updates/data updates, and therefore in case of incompatible data migration (e.g. indexed field type changed) follower will fail to apply offline WAL, and will fallback to forced sync.

During force sync, Reindexer will try to transfer all the namespace’s data into a temporary in-memory table (its name starts with the @tmp_ prefix) and then perform an atomic swap at the end of synchronization. This process requires additional RAM for a full copy of the namespace.

When a namespace is large (or the network connection is not fast enough), force sync takes a long time. During the sync, the leader node may still receive updates, which will be placed into the ring WAL buffer and internal online updates queue. So, it is possible to face situation, when right after force sync target namespace will have an outdated LSN (in cases, when both WAL buffer and online updates queue got overflow during synchronization process) and this will lead to another attempt of force sync. To avoid such situations, you may try to set larger WAL size (check the section below) and larger online updates buffer size (it may be set via --updatessize CLI flag or net.maxupdatessize config option on reindexer_server startup).

Maximum WAL size configuration

WAL size (maximum number of WAL records) may be configured via #config namespace. For example to set first_namespace’s WAL size to 4000000 and second_namespace’s to 100000 this command may be used:

Reindexer> \upsert #config { "type": "namespaces", "namespaces": [ { "namespace":"first_namespace", "wal_size": 4000000 }, { "namespace":"second_namespace", "wal_size": 100000 } ] }

Default WAL size is 4000000

Access namespace’s WAL with reindexer_tool

To view offline WAL contents from reindexer_tool SELECT statement with special condition to #lsn index is used:

Reindexer> SELECT * FROM epg LIMIT 2 where #lsn > 1000000

Data integrity check

Replication is a complex mechanism, and there is a potential risk of breaking data consistency between master and slave. Reindexer calculates a lightweight incremental hash of all namespace data (Checksum/DataHash). Checksum is used to quickly check that follower data is really up to date with the leader.

Async replication

In async replication setup Leader may be standalone server, or golang application with builtin or builtinserver bindings and Follower must be standalone server or golang application with builtinserver reindexer binding.

Replication is using 3 different mechanics:

Cascade replication setups

Cascade replication setups are also supported. In those setups only one leader exists, however, followers may have their own followers to:

             leader
           /        \
 follower1           follower2
     |                /     \
follower1.1   follower2.1 follower2.2

In example above follower1 and follower2 replicates data to other followers, but in comparison with leader, they don’t permit write access for external clients.

Usage

Configuration

Replication has to be configured on leader’s side by using special documents of namespace #config.

First of all general replication parameters must be set via replication item (those parameters are common for both async and cluster replication):

{
	"type":"replication",
	"replication":{
		"server_id": 0,
		"cluster_id": 2,
		"admissible_replication_tokens":[
			{
				"token":"<some_token_1>",
				"namespaces":["ns1"]
			},
			{
				"token":"<some_token_2>",
				"namespaces":["ns2", "ns3"]
			},
			{
				"token":"<some_token_3>",
				"namespaces":["ns4"]
			}
		]
	}
}

Then you are able to configure specific async replication via async_replication item:

{
	"type":"async_replication",
	"async_replication":{
		"role":"none",
		"replication_mode":"default",
		"sync_threads": 4,
		"syncs_per_thread": 2,
		"online_updates_timeout_sec": 20,
		"online_updates_delay_msec": 100,
		"sync_timeout_sec": 60,
		"retry_sync_interval_msec": 30000,
		"enable_compression": true,
		"batching_routines_count": 100,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"log_level":"info",
		"max_wal_depth_on_force_sync": 1000,
		"namespaces":[],
		"self_replication_token": "some_token",
		"nodes":
		[
			{
				"dsn": "cproto://192.168.1.5:6534/mydb",
				"namespaces": []
			}
		]
	}
}

Follower settings:

As second option replication can be configured by YAML-config files, which has to be placed to database directory. Sample of async replication config file is here and sample for general replication config file is here.

If a config file is present, it overrides settings from the #config namespace on Reindexer startup.

Check replication status

Replication status is available in the system namespace #memstats. For example, execution of the statement:

Reindexer> SELECT name,replication FROM #memstats WHERE name='media_items'

will return a JSON object with the replication status of namespace media_items

{
	"last_lsn_v2":{
		"server_id":20,
		"counter":5
	},
	"ns_version":{
		"server_id":20,
		"counter":5
	},
	"clusterization_status":{
		"leader_id": 20,
		"role": "simple_replica"
	},
	"temporary":false,
	"incarnation_counter":0,
	"checksum":6,
	"data_count":4,
	"updated_unix_nano":1594041127153561000,
	"status":"none",
	"wal_count":6,
	"wal_size":311
}

Actions

There are also a few actions available to interact with async replication via reindexer tool or web interface.

Reindexer> \upsert #config { "type":"action","action":{ "command":"restart_replication" } }
Reindexer> \upsert #config { "type":"action","action":{ "command":"reset_replication_role" } }
Reindexer> \upsert #config { "type":"action","action":{ "command":"reset_replication_role", "namespace": "ns_name" } }
Reindexer> \upsert #config { "type":"action","action":{ "command":"set_log_level", "type": "async_replication", "level":"trace" } }

Possible types: async_replication (controls log level for asynchronous replication), cluster (controls log level for synchronous replication and RAFT-manager).

Possible levels: none, error, warning, info, trace.

RAFT-cluster

Reindexer supports a RAFT-like algorithm for leader elections and synchronous replication with consensus awaiting. In a cluster setup, each node may be a standalone server or a Go application with the builtinserver binding.

Node roles

While running in cluster mode node may have one of 3 roles: Leader, Follower or Candidate. The role of each node is dynamic and defined by election algorithm.

Base leader elections

At startup each node begins an elections loop. Each elections’ iteration (elections term) has the following steps:

Operating as follower

Once a node has become a follower, it starts checking if the leader is available (via timestamp of the last received ping message). If the leader becomes unavailable, the follower initiates a new election term.

In follower state node changes roles of each namespace from cluster config to “cluster_replica” and sets corresponding leader ID (i.e. server ID of current leader). From this moment all the follower namespaces are readonly for anyone except cluster leader.

Any request, which requires write access will be proxied to leader and executed on the leader’s side.

Requests that do not require write access (and requests to system namespaces) will be executed locally.

Operating as leader

Once node has become leader it starts sending ping messages to all the followers. If follower did not respond to the ping message, it will be considered as dead. If N/2 of the followers are dead (N is total number of nodes in cluster), leader has to initiate new elections term.

Right after role switch node begins initial leader sync. During initial sync leader collects the latest data from at least N/2 other nodes. It can not replicate any data while initial sync is not completed.

When initial sync is done, the leader starts to synchronize followers. Same as for asynchronous replication there are 3 different mechanics for data synchronization: force sync, WAL sync and online updates. The leader continues to replicate data until it receives a request for manual re-elections or an error in consensus. In both of these situations, the node will initiate a new election term.

Guarantees

  1. Consensus for each data write.

Reindexer is using consensus algorithm to provide data safety. Each data write on leader generates one or few online-updates and then each update awaits approves from N/2 followers. Follower can not approve updates, if WAL/force sync is not done yet.

  1. Read after write is always safe.

If an operation was proxied from follower to leader, then it won’t generally be approved before it gets approval from the current node (emitter node), so if you wrote something in the current thread on any node, you’ll be able to read this data from the same node.

  1. Online updates are ordered.

For optimization purposes, concurrent writes are available even if some write operations from other threads are awaiting consensus right now. Reindexer guarantees that all of these concurrent writes will be performed on followers in the same order.

Configuration

Configuration via YML-file

On startup reindexer_server reads replication and cluster config from files replication.conf(sample) and cluster.conf(sample), which have to be placed in database directory.

replication.conf sets general replication parameters and has to be unique for each node in the cluster (those parameters may also be configured via the #config namespace).

cluster.conf sets specific cluster parameters (description may be found in sample). This file has to have the same content on each node of the cluster.

Examples

  1. Example script, which creates local cluster.

  2. Docker-compose config, which creates 3 Docker containers running a RAFT cluster. Usage:

# Run from docker-compose.yml directory
docker-compose up

In both examples default RPC ports are: 6534, 6535, 6536; and default HTTP ports are: 9088, 9089, 9090.

Configuration via system namespace

// work in progress

Statistics

Cluster statistics may be received via select-request to #replicationstats namespace:

Reindexer> SELECT * FROM #replicationstats where type="cluster"

This select has to have filter by type field with either cluster or async value. When type="cluster" is used, request will be proxied to leader.

Example of JSON-response:

{
    "type": "cluster",
    "initial_sync": {
        "force_syncs": {
            "count": 0,
            "max_time_us": 0,
            "avg_time_us": 0
        },
        "wal_syncs": {
            "count": 0,
            "max_time_us": 0,
            "avg_time_us": 0
        },
        "total_time_us": 2806
    },
    "force_syncs": {
        "count": 0,
        "max_time_us": 0,
        "avg_time_us": 0
    },
    "wal_syncs": {
        "count": 0,
        "max_time_us": 0,
        "avg_time_us": 0
    },
    "update_drops": 0,
    "pending_updates_count": 275,
    "allocated_updates_count": 275,
    "allocated_updates_size": 43288,
    "nodes": [
        {
            "dsn": "cproto://127.0.0.1:14000/node0",
            "server_id": 0,
            "pending_updates_count": 60,
            "status": "offline",
            "sync_state": "awaiting_resync",
            "role": "none",
            "is_synchronized": false,
            "namespaces": []
        },
        {
            "dsn": "cproto://127.0.0.1:14002/node2",
            "server_id": 2,
            "pending_updates_count": 0,
            "status": "online",
            "sync_state": "online_replication",
            "role": "follower",
            "is_synchronized": true,
            "namespaces": []
        },
        {
            "dsn": "cproto://127.0.0.1:14003/node3",
            "server_id": 3,
            "pending_updates_count": 0,
            "status": "online",
            "sync_state": "online_replication",
            "role": "follower",
            "is_synchronized": true,
            "namespaces": []
        },
        {
            "dsn": "cproto://127.0.0.1:14004/node4",
            "server_id": 4,
            "pending_updates_count": 275,
            "status": "online",
            "sync_state": "online_replication",
            "role": "follower",
            "is_synchronized": true,
            "namespaces": []
        },
        {
            "dsn": "cproto://127.0.0.1:14001/node1",
            "server_id": 1,
            "pending_updates_count": 0,
            "status": "online",
            "sync_state": "online_replication",
            "role": "leader",
            "is_synchronized": true,
            "namespaces": []
        }
    ]
}

Node statistics fields:

Manual leader’s selection

Leader of the cluster may be changed manually via #config namespace. For example, this request will transfer leadership to the node with server ID 2 (if it exists):

Reindexer> \upsert #config { "type":"action","action":{ "command":"set_leader_node", "server_id": 2 } }

Async replication of RAFT-cluster namespaces

Async replication with “default” replication mode

It’s possible to combine async replication and RAFT-cluster in setups like this:

        cluster1 (ns1, ns2)         cluster2 (ns1)
updates -> cl10 - cl11               cl20 - cl21
              \    /  async repl(ns2)  \    /
               cl12 -------------------> cl22

In setup above there are 2 independent RAFT-clusters: cluster1(over ns1 and ns2) and cluster2(over ns1). Also, one of the nodes of the first cluster replicating its data (ns2) asynchronously to one of the nodes of the second cluster.

Take a notice:

Async replication with “from_sync_leader” replication mode

It’s possible to combine async replication and RAFT-cluster in setups like this:

        cluster1 (ns1, ns2)               cluster2 (ns1)
updates -> cl10 - cl11   async repl(ns2)   cl20 - cl21
              \    /   ------------------->   \    /
               cl12                            cl22

In setup above there are 2 independent RAFT-clusters: cluster1(over ns1 and ns2) and cluster2(over ns1). Also, each of the nodes of the first cluster has the same async replication config like this:

{
	"type":"async_replication",
	"async_replication":{
		"role":"leader",
		"replication_mode":"from_sync_leader",
		"sync_threads": 4,
		"syncs_per_thread": 2,
		"online_updates_timeout_sec": 20,
		"sync_timeout_sec": 60,
		"retry_sync_interval_msec": 30000,
		"enable_compression": true,
		"batching_routines_count": 100,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"max_wal_depth_on_force_sync": 1000,
		"nodes":
		[
			{
				"dsn": "cproto://192.168.1.5:6534/mydb",
				"namespaces": ["ns2"],
				"replication_mode":"from_sync_leader"
			},
			{
				"dsn": "cproto://192.168.1.6:6534/mydb",
				"namespaces": ["ns2"],
				"replication_mode":"from_sync_leader"
			},
			{
				"dsn": "cproto://192.168.1.7:6534/mydb",
				"namespaces": ["ns2"],
				"replication_mode":"from_sync_leader"
			}
		]
	}
}

With the replication_mode: "from_sync_leader" option, only the current leader of cluster1 replicates its data (ns2) asynchronously to all the nodes of the second cluster (i.e. if one of the nodes from cluster1 is down, asynchronous replication will still work via the new leader)

Take a notice:

Known issues and constraints

Migration from Reindexer’s v3.x.x replication config

Current asynchronous replication implementation is incompatible with configs from v3.x.x, so you will have to migrate manually.

  1. Server IDs and cluster IDs don’t require any changes (they use the same object “replication” in the #config namespace)
  2. Current scheme works as “push-replication” and legacy scheme was “pull-replication”, so you have to move all information about namespaces and DSNs from legacy slaves to current leader (to “async_replication” in #config).
  3. Any additional configs (i.e. timeouts or appnames) now should also be configured on leader’s side.

Examples

Single-level setup

For example, we have the following replication scheme:

             leader(192.168.10.1)
             /                  \
   follower1(192.168.10.2)   follower2(192.168.10.3)
Legacy replication config before migration

In this case follower1 legacy replication config is:

{
	"type":"replication",
	"replication":{
		"role":"slave",
		"master_dsn":"cproto://192.168.10.1/db",
		"cluster_id":2,
		"server_id":1,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces":
		  - "ns1"
	}
}

follower2 config is:

{
	"type":"replication",
	"replication":{
		"role":"slave",
		"master_dsn":"cproto://192.168.10.1/db",
		"cluster_id":2,
		"server_id":2,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces":
		  - "ns2"
		  - "ns3"
	}
}

And leader config is:

{
	"type":"replication",
	"replication":{
		"role":"master",
		"master_dsn":"",
		"cluster_id":2,
		"server_id":0
	}
}

New replication config after migration

First of all deprecated parameters must be removed from old followers’ configs.

New follower1 config is:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":1
	}
}

New follower2 config:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":2
	}
}

Also, follower-role should be set for both follower1 and follower2:

{
	"type":"async_replication",
	"async_replication":{
		"role":"follower"
	}
}

Then remove deprecated fields from leader’s replication config:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":0
	}
}

And finally create new async replication config for leader:

{
	"type":"async_replication",
	"async_replication":{
		"role":"leader",
		"sync_threads": 4,
		"syncs_per_thread": 2,
		"online_updates_timeout_sec": 60,
		"retry_sync_interval_msec": 30000,
		"enable_compression": true,
		"batching_routines_count": 100,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces":[],
		"nodes":
		[
			{
				"dsn": "cproto://192.168.10.2:6534/mydb",
				"namespaces": ["ns1"]
			},
			{
				"dsn": "cproto://192.168.10.3:6534/mydb",
				"namespaces": ["ns2","ns3"]
			}
		]
	}
}

Cascade setup

Migration for cascade replication setup doesn’t have many differences from migration for any other setups. Initial scheme is:

    leader(192.168.10.1)
             |
   follower1(192.168.10.2)
             |
   follower2(192.168.10.3)
Legacy cascade replication config before migration

In this case follower1 legacy replication config is:

{
	"type":"replication",
	"replication":{
		"role":"slave",
		"master_dsn":"cproto://192.168.10.1/db",
		"cluster_id":2,
		"server_id":1,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces": []
	}
}

follower2 config is:

{
	"type":"replication",
	"replication":{
		"role":"slave",
		"master_dsn":"cproto://192.168.10.2/db",
		"cluster_id":2,
		"server_id":2,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces": []
	}
}

And leader config is:

{
	"type":"replication",
	"replication":{
		"role":"master",
		"master_dsn":"",
		"cluster_id":2,
		"server_id":0
	}
}

New cascade replication config after migration

Same as for simple single-level config, all deprecated parameters must be removed from old followers’ configs.

New follower1 config is:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":1
	}
}

New follower2 config:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":2
	}
}

Then follower-role should be set for follower2:

{
	"type":"async_replication",
	"async_replication":{
		"role":"follower"
	}
}

follower1 in this case will still have follower-role, however it also gets nodes config to replicate data:

{
	"type":"async_replication",
	"async_replication":{
		"role":"leader",
		"sync_threads": 4,
		"syncs_per_thread": 2,
		"online_updates_timeout_sec": 60,
		"retry_sync_interval_msec": 30000,
		"enable_compression": true,
		"batching_routines_count": 100,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces":[],
		"nodes":
		[
			{
				"dsn": "cproto://192.168.10.3:6534/mydb",
			}
		]
	}
}

Note, that there were no specific namespaces’ list for the followers, so there is no such field in nodes config (field namespaces from top level is used instead).

When followers configuration is done, remove deprecated fields from leader’s replication config:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":0
	}
}

And finally create new async replication config for leader (it looks like similar to follower1 config, however with different role and dsn):

{
	"type":"async_replication",
	"async_replication":{
		"role":"leader",
		"sync_threads": 4,
		"syncs_per_thread": 2,
		"online_updates_timeout_sec": 60,
		"retry_sync_interval_msec": 30000,
		"enable_compression": true,
		"batching_routines_count": 100,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces":[],
		"nodes":
		[
			{
				"dsn": "cproto://192.168.10.2:6534/mydb"
			}
		]
	}
}