Sharding Stateful Services From Modular Hashing To Consistent Hashing (Implementation)



Github Link To Code in article: https://github.com/hamdaankhalid/ConsistentHashing


In my previous article "Scaling Horizontally With Load Balancing" I went over techniques to scale stateless services, in particular web-services. In this article I will be covering a technique used for scaling stateful services "Sharding".


A quick refresher on stateful vs stateless services can be found in another article I wrote "Stateful vs Stateless Services". To summarize the previous article Stateless services do not persist context between multiple runs/calls/requests, whereas stateful services have ability to carry through context between multiple executions. Scaling stateless services can be performed by almost trivial horizontal scaling when compared to the effort spent in scaling stateful services horizontally. I'd like to focus in particular on the idea of "Sharding".


During a lecture on virtual memory I once heard a professor say "Most problems in computer science can be solved by adding a layer of indirection". Sharding is for the most part a layer of indirection where data is directed to particular server from a cluster consistently, you split your monolithic storage into logical or even physical segments and each segment is mapped to incoming requests/calls based on one or multiple attributes of incoming data.



Bedis (A cousin of Redis)

For the sake of this article lets assume the stateful service we are trying to scale is a database server, let's calling Bedis a distant cousin of Redis. Over the article we will be taking Bedis from being a single node server to having it run on a cluster. As we shard Bedis we will be able to go through different problems and solutions and evolve our DB.


Bedis is a simple key value store that exposes certain operations as HTTP endpoints for a user's application to call. Bedis exposes the following endpoints:

  • POST /item accepts a key, value based object in json format as request body. It persists the key value object.
  • GET /item/:key expects a key as a query parameter, and returns key val object as response.



Can We Still Round Robin?

Bedis is stateful so we cant just whip up N servers put them behind a load balancer and round robin requests. Why? Imagine you make a POST /item request storing object A and the round robin algorithm on the load balancer routes it to server A. You follow it up by making a GET /item/(objectA's key) request but the load balancer since it follows round robin routes you to server B. Your data was persisted on server A, but we are trying to access the data from server B where it doesn't exist. We are going to get a response indicating that no such object exists. This is in violation of our very basic functionality (insert and retrieve).


Let's start dissecting why the above strategy fails. The core issue is that requests that were previously made to a server must be made back to the same server. This can be achieved by adding a layer of indirection which maintains such a mapping. This could be a smarter client library proxy that is aware of all the servers in the cluster or a separate proxy server that maintains this mapping and the user's application actually interacts with the proxy server. Let's not choose either routes right now and focus on the fact that either method would need a reliable way to maintain a mapping of what data lives where.



Maintaining Data And Server Mapping


Let's explore a few options on how we can have our proxy component maintain a mapping.

  • InMemory Map: The most naive approach to maintaining a mapping would be a Map (Ba dum tss). We could maintain a hash-map or even a tree base map of the format <Key, Server Address>. Whenever there is an insert a server is chosen with round robin or even at random), a new item in map is created. When a retrieve comes for a query on the item, we find the server which corresponds to the key in the query and forward request handling to that server. There is a very obvious issue in this solution, our map will consume more and more memory as requests come in. We are looking at our memory usage on the proxy server growing linearly with every insert in Bedis. This is not acceptable for the use case. We need to explore an algorithm that lets us maintain a mapping without creating an entry for every single item!
  • Modular Hashing: Given a cluster of server address as a list [Server Address A, Server Address B, Server Address C], every key can be mapped to a server address by using the modulo operator %. An incoming key can be mapped into an integer value using a string hash function, This mapped integer value now needs to be translated into an item our server address list, this is where we can use the modulo operator with: SERVER IDX = MAPPED_INT % LEN(server address); This server idx points to the index for a server in our server addresses list, this server will now be mapped to the key. This mapping does not incur the cost of linearly increasing memory usage for maintaining itself. However, we have not yet considered the scenario of adding or removing servers from our cluster. How does our modular hashing algorithm keep up with changes in cluster membership? This is where the modular hashing algorithm takes a hit. By the nature of using the length of our server addresses list our mapping is coupled to the number of servers, any change to the number of elements of this list will affect our mappings. We could stop all operations on the cluster, and then query all the key values from the server and rehash each key and find the object the new server they map to, this is however a lengthy operation, you have to shuffle a lot of data around, and in most systems running on a cluster a change to the cluster is inevitable, there will be additions and removals, we cannot realistically keep up with reshuffling across hundreds of servers with every change. This is where we find the need to a very sophisticated algorithm known as Consistent Hashing. This algorithm is truly beautiful and solves the problem of shuffling data for cluster membership changes. We will now discuss Consistent Hashing in painful details.
  • Consistent Hashing: Too much to discuss so this guy get's his own section.



Consistent Hashing

Consistent hashing is an alternative to our modular hash based approach. Although it very much does use the modulo operator, it differs in the sense that the modular operation works without using the number of servers. The implementation of Sharding logic being independent of the number of servers in our clusters lets us overcome the problem of rehashing and distributing items in our cluster as the membership changes occur.

Consistent hashing works on the idea of an abstract ring along whose boundaries our servers are placed. To place a server we first need to hash the server's Id (or any consistent unique field to the server) into a number, we then use the modulo operator on the hashed server id number and the ring size (an arbitrary number we can initialize our ring with. The result from the modulo operation will give us a number between 0 and ring size, this will be the placement of our server on the ring. When visualizing it helps to think that 0 marks the 12'0 clock mark on a ring, and the ring size size marks the 11:59'O clock mark. So let's say our ring size was 12. If we had to place a server with the id "madagascar" -> hash function -> 200 -> 200 % 12 -> 8; Then in our wall clock like consistent hash ring, our server would be placed at the 8 O'Clock mark.

Incoming servers that join our cluster would also be spread along the boundaries of the ring similarly. A good hash function, is responsible for helping us spread our servers somewhat evenly across the boundaries.

Alright now we know how to add our servers, let's now try to add and route data. Incoming data is sharded based on a sharding field. This sharding field goes through the exact same transformations as our server id and produces a number between 0 to ring size. This number decides which server will hold the incoming data. We travel in a clockwise direction from our sharding fields mapped value and find the first server we encounter. This encountering of the first server along the clockwise direction of our ring can also be expressed as the first server who's id was mapped to a value greater than our incoming data's sharding field's value. This is the server responsible for holding the data associated to the given sharding field. The next time we have a request where we want to get the data, we take the requested data's sharding field (attached on the request) follow the above transformation and find the first server along the ring clockwise from our sharding field's mapped value. We can then forward the request to that server. That explains consistent hashing in a static cluster, but clusters are not static, servers are removed, and added as needed.

With addition of another server we follow the same protocol as adding the first server, however now we have a new arc in our circle. This arc represents the data partition and which server holds the data for it. A newly added server would however not hold any data, for this reason we need to shuffle some data. Shuffling in this case is going to be much smaller of a problem than what we saw with modular hashing. In order to bring the new server up to date. We must find the first server along the ring clockwise that is placed after our newly added server on the ring. We need to take all items in the next server and put any and all items who's sharding field maps to a value lesser than our newly added servers mapped id/position. After the data has been moved our cluster can be continued to be queried and produce expected results.

A removal of a server from our ring is slightly simpler, we find the server in our ring that needs to be removed, we take all the data that lives in this server and dump it into the next server in the ring in the clockwise direction. After dumping the data we remove the reference to the server from our ring.

The image below is taken from here

Figure ..: Consistent hashing maps nodes and data items into the same... |  Download Scientific Diagram


Now let's go ahead and build Bedis with some Go Code!

The overall architecture of bedis will be structured in a router and workers based architecture. Workers will be single node api servers that expose a key value like store.

The router will be our proxy server that a user would sent Get and Post requests to. The proxy will implement the consistent hashing algorithm and create a ring composed of the worker nodes.


Here's is what our Worker Node (single node api server) looks like.

There are 2 endpoints that are used to interface with the top level Hashtable, the upload, and get by key by an actual user.

The rest of the endpoints are used by the proxy to maintain cluster membership and move data around from server to server.

package servers


import (
	"encoding/json"
	"github.com/gorilla/mux"
	"log"
	"net/http"
	"sync"
)


var store = map[string]string{}


var mu sync.Mutex


type uploadReq struct {
	Key   string `json:"key"`
	Value string `json:"value"`
}


func GetApp() *mux.Router {
	// allKeysRoute, removeKeyRoute, addKeyRoute, getKeyRoute
	r := mux.NewRouter()


	// UPLOAD KEY VAL
	r.HandleFunc("/key", func(writer http.ResponseWriter, request *http.Request) {
		mu.Lock()
		defer mu.Unlock()
		data := uploadReq{}
		_ = json.NewDecoder(request.Body).Decode(&data)


		key := data.Key
		value := data.Value


		log.Println("Upload Req: ", data)
		store[key] = value


		writer.WriteHeader(http.StatusCreated)
	}).Methods(http.MethodPost)


	// GET ALL KEYS
	r.HandleFunc("/keys", func(writer http.ResponseWriter, request *http.Request) {
		mu.Lock()
		defer mu.Unlock()


		data := make(map[string][]string)
		data["keys"] = []string{}
		for key := range store {
			data["keys"] = append(data["keys"], key)
		}
		body, _ := json.Marshal(data)
		writer.WriteHeader(http.StatusOK)
		writer.Write(body)
	}).Methods(http.MethodGet)


	// GET BY KEY
	r.HandleFunc("/key", func(writer http.ResponseWriter, request *http.Request) {
		mu.Lock()
		defer mu.Unlock()


		log.Println("RequestURL: ", request.URL.Query()["key"])


		key := request.URL.Query()["key"][0]


		val, found := store[key]


		if !found {
			log.Println("Val not found")
			writer.WriteHeader(http.StatusNotFound)
			return
		}


		data := make(map[string]string)
		data["key"] = key
		data["value"] = val


		log.Print("Write back ", data)


		resp, _ := json.Marshal(data)


		writer.WriteHeader(http.StatusOK)
		_, _ = writer.Write(resp)
	}).Methods(http.MethodGet)


	// DELETE BY KEY
	r.HandleFunc("/key", func(writer http.ResponseWriter, request *http.Request) {
		mu.Lock()
		defer mu.Unlock()


		log.Println("RequestURL: ", request.URL.Query()["key"])


		key := request.URL.Query()["key"][0]
		log.Printf("Deleting key %s \n", key)
		delete(store, key)


		writer.WriteHeader(http.StatusOK)
	}).Methods(http.MethodDelete)
	return r
}


Our proxy server is where we run the consistent hashing logic and handle request routing.

It exposes the same 2 endpoints for application logic or users to use 'Get By Key' and 'Upload Key Val'

The other endpoints are to be used when adding members or removing members into our cluster. The Proxy server is composed of a struct which maintains the ring state and lets us call certain methods it exposes to get the right server to interact with. This is the server a user interacts with for the upload and get requesting of key value pairs as well adding or removing servers to the cluster dynamically. It makes heavy use of the consistenthashing package exposed struct consistenhashing which provides very simple methods such as AddMember, RemoveMember, and GetShard. The first two methods encapsulate a ring of server and key value pair placements just like the diagram above showed us. The GetShard is a way to be able to make use of the ring and map incoming keys into the ring and finding the right server just as described by the algorithm.

package proxy


import (
	"bytes"
	"encoding/json"
	"fmt"
	"github.com/gorilla/mux"
	"github.com/hamdaankhalid/consistenthashing/consistenthashing"
	"io"
	"log"
	"net/http"
)


func New(hmp *consistenthashing.ConsistentHashing) *mux.Router {
	r := mux.NewRouter()


	// UPLOAD KEY VAL
	r.HandleFunc("/key", func(writer http.ResponseWriter, request *http.Request) {
		log.Println("Upload Key Request")


		buf, err := io.ReadAll(request.Body)
		if err != nil {
			writer.WriteHeader(http.StatusInternalServerError)
			return
		}


		reqCopy := io.NopCloser(bytes.NewBuffer(buf))
		request.Body = reqCopy


		data := make(map[string]string)


		_ = json.Unmarshal(buf, &data)


		shard, err := hmp.GetShard(data["Key"])
		if err != nil {
			writer.WriteHeader(http.StatusInternalServerError)
			return
		}


		log.Printf("Upload for key %s \n", data["Key"])


		// Proxy
		url := fmt.Sprintf("%s://%s%s", "http", shard, request.RequestURI)
		proxyRequest(writer, request, url)
	}).Methods(http.MethodPost)


	// GET BY KEY
	r.HandleFunc("/key", func(writer http.ResponseWriter, request *http.Request) {
		log.Println("Get Key Request")
		relayForKeyBasedRequest(writer, request, hmp)
	}).Methods(http.MethodGet)


	// DELETE BY KEY
	r.HandleFunc("/key", func(writer http.ResponseWriter, request *http.Request) {
		log.Println("Delete Key Request")
		relayForKeyBasedRequest(writer, request, hmp)
	}).Methods(http.MethodDelete)


	// Cluster Management APIs are all get requests, that let you interact with and mutate cluster membership changes


	// Add cluster member
	r.HandleFunc("/add-member", func(writer http.ResponseWriter, request *http.Request) {
		log.Println("Add member Request")


		servers := request.URL.Query()["srv"]
		for _, server := range servers {
			err := hmp.AddMember(server)
			if err != nil {
				writer.WriteHeader(http.StatusInternalServerError)
				return
			}
		}


		hmp.PrintTopology()
		writer.WriteHeader(http.StatusOK)
	}).Methods(http.MethodGet)


	// Remove cluster member
	r.HandleFunc("/remove-member", func(writer http.ResponseWriter, request *http.Request) {
		log.Println("Remove member Request")


		servers := request.URL.Query()["srv"]


		for _, server := range servers {
			err := hmp.RemoveMember(server)
			if err != nil {
				log.Println("Failed to remove member")
			}
		}


		hmp.PrintTopology()


		writer.WriteHeader(http.StatusOK)
	}).Methods(http.MethodGet)


	return r
}


func relayForKeyBasedRequest(writer http.ResponseWriter, request *http.Request, hmp *consistenthashing.ConsistentHashing) {
	key := request.URL.Query()["key"][0]


	shard, err := hmp.GetShard(key)
	if err != nil {
		writer.WriteHeader(http.StatusInternalServerError)
		return
	}


	// Proxy
	url := fmt.Sprintf("%s://%s%s", "http", shard, request.RequestURI)
	proxyRequest(writer, request, url)
}


func proxyRequest(w http.ResponseWriter, req *http.Request, newUrl string) {
	log.Println("Proxying to ", newUrl)


	proxyReq, err := http.NewRequest(req.Method, newUrl, req.Body)
	if err != nil {
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	proxyReq.Header = req.Header
	proxyReq.Header = make(http.Header)


	resp, err := http.DefaultClient.Do(proxyReq)
	if err != nil {
		http.Error(w, err.Error(), http.StatusBadGateway)
		return
	}


	copyHeader(w.Header(), resp.Header)
	w.WriteHeader(resp.StatusCode)
	io.Copy(w, resp.Body)
	resp.Body.Close()
}


func copyHeader(dst, src http.Header) {
	for k, vv := range src {
		for _, v := range vv {
			dst.Add(k, v)
		}
	}
}


package consistenthashing


import (
	"bytes"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
	"sync"
)


// HashingFunc lets me compose ConsistentHashing struct object with a plethora of different hashing algorithms
type HashingFunc func(string) int


type ConsistentHashing struct {
	sync.Mutex


	// These routes are the endpoints exposed by every server in cluster to move data around during redistribution
	allKeysRoute, removeKeyRoute, addKeyRoute, getKeyRoute string
	hashFunc                                               HashingFunc
	ringSize                                               int
	ring                                                   *ring
}


func New(allKeysRoute string,
	removeKeyRoute string,
	addKeyRoute string,
	getKeyRoute string,
	hashFunc HashingFunc,
	ringSize int,
) *ConsistentHashing {
	return &ConsistentHashing{
		allKeysRoute:   allKeysRoute,
		removeKeyRoute: removeKeyRoute,
		getKeyRoute:    getKeyRoute,
		addKeyRoute:    addKeyRoute,
		hashFunc:       hashFunc,
		ringSize:       ringSize,
		ring:           &ring{size: ringSize},
	}
}


/*
GetShard will find the first server where the shardKey's mapped keyId is greater than the serverPosition, then return the address of the
next server while satisfying circular ring constraints
*/
func (ch *ConsistentHashing) GetShard(shardKey string) (string, error) {
	ch.Lock()
	defer ch.Unlock()
	keyPos := ch.hashFunc(shardKey) % ch.ringSize
	log.Printf("Getting owning server for key with pos: %d \n", keyPos)
	owner, err := ch.ring.getOwner(keyPos)
	if err != nil {
		return "", err
	}
	log.Printf("Owner: %v \n", owner)


	return owner.address, nil
}


/*
AddMember Adds a server into our cluster while preserving consistent hashing constraints. It will iterate over the ring
and find the first server where the nodeId is greater than current servers newly mapped id/position. Then we will insert
a new server entity between the previous server and then server containing the value greater than newly mapped
id/position.
*/
func (ch *ConsistentHashing) AddMember(serverAddr string) error {
	ch.Lock()
	defer ch.Unlock()


	log.Println("Adding new server to cluster members")


	nodePos := ch.hashFunc(serverAddr) % ch.ringSize
	newNode := &ringMember{address: serverAddr, position: nodePos}


	log.Println("Adding new server into ring, new server pos: ", nodePos)


	// Insert in ring, and get the next node after it post-insertion
	newInsertedAt := ch.ring.insert(newNode)


	if ch.ring.numServers() == 1 {
		return nil
	}
	next := ch.ring.getNextRingMember(newInsertedAt)


	log.Printf("%v inserted at %d redistributing from server %v \n", newNode, newInsertedAt, next)


	err := ch.redistribute(next, newNode, false)


	if err != nil {
		log.Println(err)
		return err
	}


	return nil
}


func (ch *ConsistentHashing) RemoveMember(serverAddr string) error {
	ch.Lock()
	defer ch.Unlock()


	removeIdx := ch.ring.find(serverAddr)


	log.Printf("Removing %s server from idx %d \n", serverAddr, removeIdx)


	if removeIdx == -1 {
		return errors.New("no server with address in cluster")
	}


	// move everything from current node into the next node
	currNode, err := ch.ring.get(removeIdx)
	if err != nil {
		return err
	}


	successor := ch.ring.getNextRingMember(removeIdx)


	err = ch.redistribute(currNode, successor, true)
	if err != nil {
		log.Printf("Error redistributing %s \n", err.Error())
		return err
	}


	// now remove currNode from the ring
	err = ch.ring.remove(removeIdx)
	if err != nil {
		return err
	}


	return nil
}


func (ch *ConsistentHashing) PrintTopology() {
	ch.Lock()
	defer ch.Unlock()
	log.Println("----Topology----")
	for idx, member := range ch.ring.partitionsRing {
		log.Printf("idx %d: server %s with pos %d\n", idx, member.address, member.position)
	}
	log.Println("---------------")
}


func (ch *ConsistentHashing) redistribute(from *ringMember, to *ringMember, isRemoval bool) error {
	resp, err := http.Get("http://" + from.address + ch.allKeysRoute)
	if err != nil {
		return err
	}
	var decodedResp allKeysResponse
	err = json.NewDecoder(resp.Body).Decode(&decodedResp)
	if err != nil {
		return err
	}
	fmt.Println("redistributing from ", from, ", to ", to)


	var wg sync.WaitGroup
	for _, key := range decodedResp.Keys {
		keyId := ch.hashFunc(key) % ch.ringSize
		correctPlacement, err := ch.ring.getOwner(keyId)
		if err != nil {
			log.Println(err)
			continue
		}
		if (correctPlacement.address == to.address) || isRemoval {
			log.Println("Moving key ", key, " from ", from, ", to ", to)
			wg.Add(1)
			go func(wg *sync.WaitGroup, key string, removeKeyRoute string, addKeyRoute string, getKeyRoute string) {
				defer wg.Done()


				client := &http.Client{}


				// Get Key Val from fromMem
				getKeyUrl := "http://" + from.address + getKeyRoute + "?key=" + key
				resp, err := client.Get(getKeyUrl)
				if err != nil {
					log.Println("Error getting key")
					log.Println(err)
					return
				}
				if resp.StatusCode != http.StatusOK {
					log.Printf("Get key response unsuccessful got %d for request to %s \n", resp.StatusCode, getKeyUrl)
					return
				}
				buf, err := io.ReadAll(resp.Body)
				resp.Body.Close()
				respBody := bytes.NewBuffer(buf)


				// Add key val to toMem
				resp, err = client.Post("http://"+to.address+addKeyRoute, resp.Header.Get("Content-Type"), respBody)
				if err != nil {
					log.Println("Error adding key")
					log.Println(err)
					return
				}
				if resp.StatusCode != http.StatusCreated {
					log.Print("Post key val response unsuccessful")
					return
				}


				// remove key val from fromMem
				removeUrl := "http://" + from.address + removeKeyRoute + "?key=" + key
				req, err := http.NewRequest(http.MethodDelete, removeUrl, nil)
				if err != nil {
					log.Println(err)
					return
				}
				resp, err = client.Do(req)
				if err != nil {
					log.Println(err)
					return
				}
				if resp.StatusCode != http.StatusOK {
					log.Printf("Delete response unsuccessful got %d on request to %s \n", resp.StatusCode, removeUrl)
					return
				}
			}(&wg, key, ch.removeKeyRoute, ch.addKeyRoute, ch.getKeyRoute)
		}
	}
	wg.Wait()
	return nil
}


type allKeysResponse struct {
	Keys []string `json:"keys"`
}


type ringMember struct {
	address string
	// position is decided by hashing address
	position int
}


type ring struct {
	size           int
	partitionsRing []*ringMember
}


func (r *ring) insert(newNode *ringMember) int {
	insertionIdx := 0
	for idx, member := range r.partitionsRing {
		if newNode.position > member.position {
			insertionIdx = idx + 1
		}
	}


	leftPart := r.partitionsRing[:insertionIdx]
	rightPart := r.partitionsRing[insertionIdx:]


	r.partitionsRing = append(leftPart, append([]*ringMember{newNode}, rightPart...)...)


	return insertionIdx
}


func (r *ring) getNextRingMember(idx int) *ringMember {
	return r.partitionsRing[(idx+1)%len(r.partitionsRing)]
}


func (r *ring) get(idx int) (*ringMember, error) {
	if idx >= len(r.partitionsRing) {
		return nil, errors.New("out of range")
	}
	return r.partitionsRing[idx], nil
}


func (r *ring) find(nodeAddr string) int {
	for idx, member := range r.partitionsRing {
		if nodeAddr == member.address {
			return idx
		}
	}
	return -1
}


func (r *ring) remove(idx int) error {
	if idx < 0 || idx > len(r.partitionsRing) {
		return errors.New("invalid argument range")
	}


	r.partitionsRing = append(r.partitionsRing[:idx], r.partitionsRing[idx+1:]...)
	return nil
}


func (r *ring) getOwner(dataPos int) (*ringMember, error) {
	if len(r.partitionsRing) == 0 {
		return nil, errors.New("no servers")
	}


	var pre int
	for pre < len(r.partitionsRing) && dataPos > r.partitionsRing[pre].position {
		pre++
	}


	return r.partitionsRing[pre%len(r.partitionsRing)], nil
}


func (r *ring) numServers() int {
	return len(r.partitionsRing)
}



Testing With A Simulation

To test this little distributed system of ours I wrote up a small simulation test. It tests all capabilities and functional requirements of our server in real life scenarios.

The test requires the user to create a master node, and N proxy nodes, and provide them to the test function Run.

The test setup initializes such that our proxy can have number of servers from our nodes passed in by the user ranging from 1 to all nodes provided by user. The test proceeds to iterate over a user defined number, on each iteration a key value pair is created using a random sequence generator, this pair is then uploaded. A failure to upload would be logged to console. After upload it will probabilistically add a server to the cluster (if there are any inactive clusters), probabilistically remove a server from a cluster (if any servers are active), and probabilistically query a random value that was uploaded before. At any given point if our response from the master/proxy server is not as expected we will log the errors to indicate the errors.

package loadtesting


import (
	"bytes"
	"encoding/json"
	"errors"
	"log"
	"math/rand"
	"net/http"
	"reflect"
)


type keyVal struct {
	Key, Value string
}


/*
Run must be preceded by a static pool of node servers, and the master server being up and running.
*/
func Run(dataPoints int, master string, nodes []string) {
	activeNodes, inactiveNodes := setup(master, nodes)
	log.Println("Initial Active Nodes: ", activeNodes)
	log.Println("Initial In-active Nodes: ", inactiveNodes)
	var uploaded []keyVal
	for i := 0; i < dataPoints; i++ {
		kv := keyVal{randSeq(8), randSeq(8)}
		err := upload(master, &kv)
		if err != nil {
			log.Printf("Fail: error during upload key val %s \n", err.Error())
			continue
		}
		uploaded = append(uploaded, kv)


		// randomly remove a server
		if len(activeNodes) > 1 && rand.Intn(100) > 50 {
			removeIdx := rand.Intn(len(activeNodes))
			removedSrv := activeNodes[removeIdx]
			err := removeServer(master, removedSrv)
			if err != nil {
				log.Printf("Fail: error during remove server %s \n", err.Error())
			}
			inactiveNodes = append(inactiveNodes, removedSrv)
			activeNodes = append(activeNodes[:removeIdx], activeNodes[removeIdx+1:]...)
		}


		// randomly add back a server
		if len(inactiveNodes) > 1 && rand.Intn(100) > 50 {
			addIdx := rand.Intn(len(inactiveNodes))
			addedSrv := inactiveNodes[addIdx]
			err := addServer(master, addedSrv)
			if err != nil {
				log.Printf("Fail: error during add server %s \n", err.Error())
			}
			activeNodes = append(activeNodes, addedSrv)
			inactiveNodes = append(inactiveNodes[:addIdx], inactiveNodes[addIdx+1:]...)
		}


		// randomly query uploaded data
		if len(uploaded) > 0 && rand.Intn(100) > 50 {
			keyValIdx := rand.Intn(len(uploaded))
			candidate := uploaded[keyValIdx]


			result, err := getKey(master, candidate.Key)
			if err != nil {
				log.Printf("Fail: error during get key %s \n", err.Error())
				continue
			}


			if !reflect.DeepEqual(candidate, *result) {
				log.Printf("Fail: Expect %v, Got %v \n", candidate, *result)
			}
		}


		log.Println("Active Nodes: ", activeNodes)
		log.Println("In-active Nodes: ", inactiveNodes)
	}
}


func setup(master string, nodes []string) ([]string, []string) {
	var activeNodes []string
	// active nodes start with an arbitrary number of nodes being added
	// num between 1 and len(nodes)
	initNumServers := rand.Intn(len(nodes)) + 1
	for i := 0; i < initNumServers; i++ {
		// select random number in range 0, to last idx of nodes that have not been added already
		candidateIdx := rand.Intn(len(nodes))
		selectedNode := nodes[candidateIdx]
		err := addServer(master, selectedNode)
		if err != nil {
			log.Fatal("Failed to add server during setup")
		}
		activeNodes = append(activeNodes, selectedNode)
		// remove selectedNode from nodes
		nodes = append(nodes[:candidateIdx], nodes[candidateIdx+1:]...)
	}
	// nodes that remain in initial nodes list are inactive/unselected for initial configuration
	inactiveNodes := nodes
	return activeNodes, inactiveNodes
}


func addServer(master string, addr string) error {
	// addr example localhost:6969
	url := "http://" + master + "/add-member?srv=" + addr
	resp, err := http.Get(url)
	if err != nil {
		return err
	}


	log.Println("Add server status ", resp.StatusCode)
	if resp.StatusCode != http.StatusOK {
		return errors.New("failed to add server got")
	}


	return nil
}


func removeServer(master string, addr string) error {
	url := "http://" + master + "/remove-member?srv=" + addr
	resp, err := http.Get(url)
	if err != nil {
		return err
	}


	log.Println("Remove server status ", resp.StatusCode)
	if resp.StatusCode != http.StatusOK {
		return errors.New("failed to remove server data got")
	}


	return nil
}


func upload(master string, kv *keyVal) error {
	data, err := json.Marshal(kv)
	if err != nil {
		return err
	}


	url := "http://" + master + "/key"
	resp, err := http.Post(url, "application/json", bytes.NewBuffer(data))


	if err != nil {
		return err
	}
	log.Println("Upload status ", resp.StatusCode)
	if resp.StatusCode != http.StatusCreated {
		return errors.New("upload request failed")
	}


	return nil
}


func getKey(master string, key string) (*keyVal, error) {
	log.Printf("Trying to get %s \n", key)
	result := &keyVal{}
	url := "http://" + master + "/key?key=" + key
	resp, err := http.Get(url)
	if err != nil {
		return nil, err
	}
	log.Println("Get status ", resp.StatusCode)
	if resp.StatusCode != http.StatusOK {
		return nil, errors.New("get request failed")
	}
	defer resp.Body.Close()


	err = json.NewDecoder(resp.Body).Decode(result)
	if err != nil {
		return nil, err
	}


	return result, nil
}


func randSeq(n int) string {
	letters := []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
	b := make([]rune, n)
	for i := range b {
		b[i] = letters[rand.Intn(len(letters))]
	}
	return string(b)
}




Far from truly scalable, I tested this little distributed system called Bedis with 5 nodes, and 150 key value pairs and was able to assert success.... great success.

Consistent hashing is just one of the ways to go about scaling a stateful service. It has been used in systems such as DynamoDb and Riak to achieve horizontal scalability. To provide better availability real systems often provide the ability to replicate key value pairs on one server onto the next server in the ring.



Comments

Commenter: T

👏🏻👏🏻👏🏻

Commenter: D

Let’s go!!!!

Commenter: HK

This code was soooo fun to write :)

Add a comment: