reindexer

Replication and cluster mode

Reindexer supports async logical leader-follower replication and sync RAFT-cluster. Each node has to have unique server id, set via #config namespace or web interface. Maximum number of nodes in any replication setup is limited by server id value (max value is 999).

Table of contents:

Write ahead log (WAL)

Write ahead log is combination of rows updates and statements execution. WAL is stored as part of namespace storage. Each WAL record has unique 64-bit log sequence number (LSN), which also contains server id. WAL overhead is 18 bytes of RAM per each row update record. WAL is ring structure, therefore after N updates (4M by default) are recorded, oldest WAL records will be automatically removed. WAL in storage contains only records with statements (bulk updates, deletes and index structure changes). Rows updates are not stored as dedicated WAL records, but each document contains it’s own LSN - this is enough to restore complete WAL in RAM on namespace loading from disk.

Side effect of this mechanic is lack of exact sequence of indexes updates/data updates, and therefore in case of incompatible data migration (e.g. indexed field type changed) follower will fail to apply offline WAL, and will fallback to forced sync.

During force sync reindexer will try to transfer all of the namespace’s data into temporary in-memory table (it’s name starts with @tmp_-prefix) and then perform atomic swap at the end of synchronization. This process requires an additional RAM for the full namespace copy.

When namespace is large (or network connection is not fast enough) force sync takes a long time. During the sync leader-node may still recieve an updates, which will be placed into ring WAL buffer and internal online updates queue. So, it is possible to face situtation, when right after force sync target namespace will have an outdated LSN (in cases, when both WAL buffer and online updates queue got overflow during synchronization proccess) and this will lead to another attempt of force sync. To avoid such situations, you may try to set larger WAL size (check the section below) and larger online updates buffer size (it may be set via --updatessize CLI flag or net.maxupdatessize config option on reindexer_server startup).

Maximum WAL size configuration

WAL size (maximum number of WAL records) may be configured via #config namespace. For example to set first_namespace’s WAL size to 4000000 and second_namespace’s to 100000 this command may be used:

Reindexer> \upsert #config { "type": "namespaces", "namespaces": [ { "namespace":"first_namespace", "wal_size": 4000000 }, { "namespace":"second_namespace", "wal_size": 100000 } ] }

Default WAL size is 4000000

Access namespace’s WAL with reindexer_tool

To view offline WAL contents from reindexer_tool SELECT statement with special condition to #lsn index is used:

Reindexer> SELECT * FROM epg LIMIT 2 where #lsn > 1000000

Data integrity check

Replication is complex mechanism and there are present potential possibilities to broke data consistence between master and slave. Reindexer is calculates lightweight incremental hash of all namespace data (DataHash). DataHash is used to quick check, that data of follower is really up to date with leader.

Async replication

In async replication setup Leader may be standalone server, or golang application with builtin or builtinserver bindings and Follower must be standalone server or golang application with builtinserver reindexer binding.

Replication is using 3 different mechanics:

Cascade replication setups

Cascade replication setups are also supported. In those setups only one leader exists, however, followers may have there own followers to:

             leader
           /        \
 follower1           follower2
     |                /     \
follower1.1   follower2.1 follower2.2

In example above follower1 and follower2 replicates data to other followers, but in comparison with leader, they don’t permit write access for external clients.

Usage

Configuration

Replication has to be confiured on leader’s side by using special documents of namespace #config.

First of all general replication parameters must be set via replication item (those parameters are common for both async and cluster replication):

{
	"type":"replication",
	"replication":{
		"server_id": 0,
		"cluster_id": 2,
		"admissible_replication_tokens":[
			{
				"token":"<some_token_1>",
				"namespaces":["ns1"]
			},
			{
				"token":"<some_token_2>",
				"namespaces":["ns2", "ns3"]
			},
			{
				"token":"<some_token_3>",
				"namespaces":["ns4"]
			}
		]
	}
}

Then you are able to configure specific async replication via async_replication item:

{
	"type":"async_replication",
	"async_replication":{
		"role":"none",
		"replication_mode":"default",
		"sync_threads": 4,
		"syncs_per_thread": 2,
		"online_updates_timeout_sec": 20,
		"online_updates_delay_msec": 100,
		"sync_timeout_sec": 60,
		"retry_sync_interval_msec": 30000,
		"enable_compression": true,
		"batching_routines_count": 100,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"log_level":"info",
		"max_wal_depth_on_force_sync": 1000,
		"namespaces":[],
		"self_replication_token": "some_token",
		"nodes":
		[
			{
				"dsn": "cproto://192.168.1.5:6534/mydb",
				"namespaces": []
			}
		]
	}
}

Follower settings:

As second option replication can be configured by YAML-config files, which has to be placed to database directory. Sample of async replication config file is here and sample for general replication config file is here.

If config file is present, then it’s overrides settings from #config namespace on reindexer startup.

Check replication status

Replication status is available in system namespace #memstats. e.g, execution of statament:

Reindexer> SELECT name,replication FROM #memstats WHERE name='media_items'

will return JSON object with status of namespace media_items replication

{
	"last_lsn_v2":{
		"server_id":20,
		"counter":5
	},
	"ns_version":{
		"server_id":20,
		"counter":5
	},
	"clusterization_status":{
		"leader_id": 20,
		"role": "simple_replica"
	}
	"temporary":false,
	"incarnation_counter":0,
	"data_hash":6,
	"data_count":4,
	"updated_unix_nano":1594041127153561000,
	"status":"none",
	"wal_count":6,
	"wal_size":311
}

Actions

There are also a few actions available to interact with async repliication via reindexer tool or web interface.

Reindexer> \upsert #config { "type":"action","action":{ "command":"restart_replication" } }
Reindexer> \upsert #config { "type":"action","action":{ "command":"reset_replication_role" } }
Reindexer> \upsert #config { "type":"action","action":{ "command":"reset_replication_role", "namespace": "ns_name" } }
Reindexer> \upsert #config { "type":"action","action":{ "command":"set_log_level", "type": "async_replication", "level":"trace" } }

Possible types: async_replication (controls log level for asynchronous replication), cluster (controls log level for synchronous replication and RAFT-manager).

Possible levels: none, error, warning, info, trace.

RAFT-cluster

Reindexer support RAFT-like algorithm for leader elections and synchronious replication with consesus awaiting. In cluster setup each node may be standalone server or golang application with builtinserver binding.

Node roles

While running in cluster mode node may have one of 3 roles: Leader, Follower or Candidate. The role of each node is dynamic and defined by election algorithm.

Base leader elections

At startup each node begins an elections loop. Each elections iteration (elections term) has following steps:

Operating as follower

Once node has became follower it starts checking if leader is available (via timestamp of the last received ping message). If leader becomes unavailable, follower initiate new election term.

In follower state node changes roles of each namespace from cluster config to “cluster_replica” and sets corresponding leader ID (i.e. server ID of current leader). From this moment all of the follower namespaces are readonly for anyone except cluser leader.

Any request, which requires write access will be proxied to leader and executed on the leader’s side.

Requests, which doesn’t require write access (and request to system namespaces) will be executed localy.

Operating as leader

Once node has became leader it starts sending ping messages to all of the followers. If follower did not respond to the ping message, it will be considered as dead. If N/2 of the followers are dead (N is total number of nodes in cluster), leader has to initiate new elections term.

Right after role switch node begins initial leader sync. During initial sync leader collects latest data from at least N/2 other nodes. It can not replicate any data while initial sync is not completed.

When initial sync is done, leader start to synchronize followers. Same as for asynchronizous replication there are 3 different mechanics for data synchronization: force sync, WAL sync and online updates. Leader continues to replicate data, while it won’t recieve request for manual re-elections or error in consensus. In both of this situations node will initiate new elections term.

Guarantees

  1. Consensus for each data write.

Reindexer is using consensus algorithm to provide data safety. Each data write on leader generates one or few online-updates and then each update awaits approves from N/2 followers. Follower can not approve updates, if WAL/force sync is not done yet.

  1. Read after write is always safe.

If operation was proxied from follower to leader, then it won’t be generaly approved before it get approve from current node (emmiter node), so if you wrote something in current thread on any node, then you’ll be able to read this data from the same node.

  1. Online updates are ordered.

For optimization purposes concurrent writes are available even if some of the writing operations from other threads are awaiting consensus right now. Reindexer guarantees that all of this concurrent writes will be performed on followers with the same ordering.

Configuration

Configuration via YML-file

On startup reindexer_server reads replication and cluster config from files replication.conf(sample) and cluster.conf(sample), which have to be placed in database directory.

replication.conf sets general replication parameter and has to be unique for each node in cluster (those parameters also may be configured via #config namespace).

cluster.conf sets specific cluster parameters (description may be found in sample). This file has to have the same content on each node of the cluster.

Exapmples

  1. Example script, which creates local cluster.

  2. Docker-compose config, which create 3 docker-containers running RAFT-cluster. Usage:

# Run from docker-compose.yml directory
docker-compose up

In both examples default RPC ports are: 6534, 6535, 6536; and default HTTP ports are: 9088, 9089, 9090.

Configuration via system namespace

// work in progress

Statistics

Cluster statistics may be recieved via select-request to #replicationstats namespace:

Reindexer> SELECT * FROM #replicationstats where type="cluster"

This select has to have filter by type field with either cluster or async value. When type="cluster" is used, request will be proxied to leader.

Example of JSON-response:

{
    "type": "cluster",
    "initial_sync": {
        "force_syncs": {
            "count": 0,
            "max_time_us": 0,
            "avg_time_us": 0
        },
        "wal_syncs": {
            "count": 0,
            "max_time_us": 0,
            "avg_time_us": 0
        },
        "total_time_us": 2806
    },
    "force_syncs": {
        "count": 0,
        "max_time_us": 0,
        "avg_time_us": 0
    },
    "wal_syncs": {
        "count": 0,
        "max_time_us": 0,
        "avg_time_us": 0
    },
    "update_drops": 0,
    "pending_updates_count": 275,
    "allocated_updates_count": 275,
    "allocated_updates_size": 43288,
    "nodes": [
        {
            "dsn": "cproto://127.0.0.1:14000/node0",
            "server_id": 0,
            "pending_updates_count": 60,
            "status": "offline",
            "sync_state": "awaiting_resync",
            "role": "none",
            "is_synchronized": false,
            "namespaces": []
        },
        {
            "dsn": "cproto://127.0.0.1:14002/node2",
            "server_id": 2,
            "pending_updates_count": 0,
            "status": "online",
            "sync_state": "online_replication",
            "role": "follower",
            "is_synchronized": true,
            "namespaces": []
        },
        {
            "dsn": "cproto://127.0.0.1:14003/node3",
            "server_id": 3,
            "pending_updates_count": 0,
            "status": "online",
            "sync_state": "online_replication",
            "role": "follower",
            "is_synchronized": true,
            "namespaces": []
        },
        {
            "dsn": "cproto://127.0.0.1:14004/node4",
            "server_id": 4,
            "pending_updates_count": 275,
            "status": "online",
            "sync_state": "online_replication",
            "role": "follower",
            "is_synchronized": true,
            "namespaces": []
        },
        {
            "dsn": "cproto://127.0.0.1:14001/node1",
            "server_id": 1,
            "pending_updates_count": 0,
            "status": "online",
            "sync_state": "online_replication",
            "role": "leader",
            "is_synchronized": true,
            "namespaces": []
        }
    ]
}

Node statistics fields:

Manual leader’s selection

Leader of the cluster may be changed manually via #config namespace. For example, this request will transfer leadership to the node with server ID 2 (if it exists):

Reindexer> \upsert #config { "type":"action","action":{ "command":"set_leader_node", "server_id": 2 } }

Async replication of RAFT-cluster namespaces

Async replication with “default” replication mode

It’s possible to combine async replication and RAFT-cluster in setups like this:

        cluster1 (ns1, ns2)         cluster2 (ns1)
updates -> cl10 - cl11               cl20 - cl21
              \    /  async repl(ns2)  \    /
               cl12 -------------------> cl22

In setup above there are 2 independant RAFT-clusters: cluster1(over ns1 and ns2) and cluster2(over ns1). Also one of the nodes of the first cluster replicating its data (ns2) asynchronously to one of the nodes of the second cluster.

Take a notice:

Async replication with “from_sync_leader” replication mode

It’s possible to combine async replication and RAFT-cluster in setups like this:

        cluster1 (ns1, ns2)               cluster2 (ns1)
updates -> cl10 - cl11   async repl(ns2)   cl20 - cl21
              \    /   ------------------->   \    /
               cl12                            cl22

In setup above there are 2 independant RAFT-clusters: cluster1(over ns1 and ns2) and cluster2(over ns1). Also each of the nodes of the first cluster has the same async replication config like this:

{
	"type":"async_replication",
	"async_replication":{
		"role":"leader",
		"replication_mode":"from_sync_leader",
		"sync_threads": 4,
		"syncs_per_thread": 2,
		"online_updates_timeout_sec": 20,
		"sync_timeout_sec": 60,
		"retry_sync_interval_msec": 30000,
		"enable_compression": true,
		"batching_routines_count": 100,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"max_wal_depth_on_force_sync": 1000,
		"nodes":
		[
			{
				"dsn": "cproto://192.168.1.5:6534/mydb",
				"namespaces": ["ns2"],
				"replication_mode":"from_sync_leader"
			},
			{
				"dsn": "cproto://192.168.1.6:6534/mydb",
				"namespaces": ["ns2"],
				"replication_mode":"from_sync_leader"
			},
			{
				"dsn": "cproto://192.168.1.7:6534/mydb",
				"namespaces": ["ns2"],
				"replication_mode":"from_sync_leader"
			}
		]
	}
}

With replication_mode: "from_sync_leader" option only the current leader of cluster1 replicating its data (ns2) asynchronously to all the nodes of the second cluster (i.e. if one of the node from cluster1 down, asynchronous replication will still work via new leader)

Take a notice:

Known issues and constraints

Migration from Reindexer’s v3.x.x replication config

New asynchronous replication is incompatible with configs from v3.x.x, so you will have to migrate manually.

  1. Server ID’s and cluster ID’s doesn’t require any changes (they are using the same object “replication” in #config namespace)
  2. Current scheme works as “push-replication” and legacy scheme was “pull-replication”, so you have to move all information about namespaces and DSNs from leagcy slaves to current leader (to “async_replication” in #config).
  3. Any additional configs (i.e. timeouts or appnames) now should also be configured on leader’s side.

Examples

Single-level setup

For example, we have the following replication scheme:

             leader(192.168.10.1)
             /                  \
   follower1(192.168.10.2)   follower2(192.168.10.3)
Legacy replication config before migration

In this case follower1 legacy replication config is:

{
	"type":"replication",
	"replication":{
		"role":"slave",
		"master_dsn":"cproto://192.168.10.1/db",
		"cluster_id":2,
		"server_id":1,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces":
		  - "ns1"
	}
}

follower2 config is:

{
	"type":"replication",
	"replication":{
		"role":"slave",
		"master_dsn":"cproto://192.168.10.1/db",
		"cluster_id":2,
		"server_id":2,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces":
		  - "ns2"
		  - "ns3"
	}
}

And leader config is:

{
	"type":"replication",
	"replication":{
		"role":"master",
		"master_dsn":"",
		"cluster_id":2,
		"server_id":0
	}
}

New replication config after migration

First of all depreacted parameters must be removed from old followers’ configs.

New follower1 config is:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":1
	}
}

New follower2 config:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":2
	}
}

Also follower-role should be set for both follower1 and follower2:

{
	"type":"async_replication",
	"async_replication":{
		"role":"follower"
	}
}

Then remove deprecated fields from leader’s replication config:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":0
	}
}

And finally create new async replication config for leader:

{
	"type":"async_replication",
	"async_replication":{
		"role":"leader",
		"sync_threads": 4,
		"syncs_per_thread": 2,
		"online_updates_timeout_sec": 60,
		"retry_sync_interval_msec": 30000,
		"enable_compression": true,
		"batching_routines_count": 100,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces":[],
		"nodes":
		[
			{
				"dsn": "cproto://192.168.10.2:6534/mydb",
				"namespaces": ["ns1"]
			},
			{
				"dsn": "cproto://192.168.10.3:6534/mydb",
				"namespaces": ["ns2","ns3"]
			}
		]
	}
}

Cascade setup

Migration for cascade repliction setup doesn’t have much differences from migration for any other setups. Initial scheme is:

    leader(192.168.10.1)
             |
   follower1(192.168.10.2)
             |
   follower2(192.168.10.3)
Legacy cascade replication config before migration

In this case follower1 legacy replication config is:

{
	"type":"replication",
	"replication":{
		"role":"slave",
		"master_dsn":"cproto://192.168.10.1/db",
		"cluster_id":2,
		"server_id":1,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces": []
	}
}

follower2 config is:

{
	"type":"replication",
	"replication":{
		"role":"slave",
		"master_dsn":"cproto://192.168.10.2/db",
		"cluster_id":2,
		"server_id":2,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces": []
	}
}

And leader config is:

{
	"type":"replication",
	"replication":{
		"role":"master",
		"master_dsn":"",
		"cluster_id":2,
		"server_id":0
	}
}

New cascade replication config after migration

Same as for simple single-level config, all depreacted parameters must be removed from old followers’ configs.

New follower1 config is:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":1
	}
}

New follower2 config:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":2
	}
}

Then follower-role should be set for follower2:

{
	"type":"async_replication",
	"async_replication":{
		"role":"follower"
	}
}

follower1 in this case will still have follower-role, hovewer it also gets nodes config to replicate data:

{
	"type":"async_replication",
	"async_replication":{
		"role":"leader",
		"sync_threads": 4,
		"syncs_per_thread": 2,
		"online_updates_timeout_sec": 60,
		"retry_sync_interval_msec": 30000,
		"enable_compression": true,
		"batching_routines_count": 100,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces":[],
		"nodes":
		[
			{
				"dsn": "cproto://192.168.10.3:6534/mydb",
			}
		]
	}
}

Note, that there were no specific namespaces’ list for the followers, so there is no sucn field in nodes config (field namespaces from top level is used instead).

When followers configuration is done, remove deprecated fields from leader’s replication config:

{
	"type":"replication",
	"replication":{
		"cluster_id":2,
		"server_id":0
	}
}

And finally create new async replication config for leader (it looks like similar to follower1 config, however with different role and dsn):

{
	"type":"async_replication",
	"async_replication":{
		"role":"leader",
		"sync_threads": 4,
		"syncs_per_thread": 2,
		"online_updates_timeout_sec": 60,
		"retry_sync_interval_msec": 30000,
		"enable_compression": true,
		"batching_routines_count": 100,
		"force_sync_on_logic_error": false,
		"force_sync_on_wrong_data_hash": false,
		"namespaces":[],
		"nodes":
		[
			{
				"dsn": "cproto://192.168.10.2:6534/mydb"
			}
		]
	}
}