Showing posts with label distributed computing. Show all posts
Showing posts with label distributed computing. Show all posts

Message Locker

Problem

Extreme requirements require interesting solutions. Sometimes there is a need to come up with hybrid solution, that not always seems to be beautiful on first sight. One example is a Message Locker solution.

In service oriented architecture, application consists of many services that interact with each other.

Inevitably comes a need to fight high latencies caused by remote calls and necessary serialization/deserialization ( in a huge chain of dependent services calls, where each call results in a network hop with required fees like data marshaling and passing data through the network, adds at least a few milliseconds extra for each call.)

Service that requires to gather output from multiple dependencies to do its job is an aggregating service.

Such service needs to be smart at how it calls dependencies. If they are called one by one, then their latencies would be accumulated. The most obvious way to prevent this is to call each dependency in parallel. Now, service own latency would be defined mostly by its slowest dependency. In this case, we say that slowest dependency is in the critical path.

Aggregating service isn’t complex because it needs to call multiple services in parallel. And usually, there is simple way to avoid creating another service, if only business value it adds is aggregating output of multiple dependencies.

But, aggregating service becomes complex when:
  1. It adds complex logic on top of the data returned by dependencies
  2. It has to be sophisticated at orchestrating calls to many dependencies.
The need to orchestrate comes from the nature of SOA: sometimes Service need to call one or more dependencies first to gather the data necessary to call another dependency. Often it’s not possible to call all dependencies in parallel and just aggregate replies once all are available. In many cases, Service needs to call dependency A, to get the data necessary to call dependency B, results from which are required to decide if Service needs to call dependency C or D and so on.

Optimal call of dependencies is often the most important thing to do when fighting high latencies. And thus, eventually comes a need to have Aggregating service, that can call multiple dependencies in a savvy way.

But, even when there is an aggregating service in use already, inevitably comes a need to fight high latencies. And there are only so many ways this can be done:
  1. decrease latency for each dependency in the critical path (often by pulling dependencies of own dependency, and call them first)
  2. call dependencies in even smarter way.

This post stops on the 2nd way. If aggregating service already parallelizes calls to dependencies as much as possible and there is no way to make it even better, then, to be honest, not much can be done anymore.

Seriously, when service A needs to call dependency B so it can call dependency C later, what else can be done to save extra 10 ms you need that much?

That’s where Message Locker comes useful. It goes to a bit nasty territory to allow save additional milliseconds in aggregating service.

Message Locker

"Message Locker" means a Locker for a Message. Service allows to store a message in the some kind of locker, so only specific client can grab it. If message is not received during certain period, message becomes unavailable.

Message Locker is a distributed service that stores all the data in the memory. Client that sends a message into the locker is called sender. Client that receives message from locker is called receiver.

Each message is stored in the locker using a unique random key. When sender puts a message into the locker, it also provides additional attributes, like:
  1. TTL - time to store the message in the locker,
  2. Reads - number of times the message can be received.

Message would be removed from the locker whenever received for defined number of times or once its TTL expired. These rules prevent Message Locker to be bloated with obsolete messages.

Even after message was removed, Message Locker is still aware of it previous presence. Whenever receiver tries to get evicted message, it gets an error immediately.

In case receiver tries to get a message that is not evicted yet, it is returned to the receiver, and number of reads is increased. This approach doesn’t handles retries properly though.

In case receiver tries to get a message that is not sent yet, then Message Locker will hold the request until message becomes available or timeout happens.

How to use Message Locker?

Given 3 services A, B and C. Service A is an aggregator service, that calls multiple other services, among them services B and C. Service B has to be called before service C, as its output is part of input for service C. Service A also uses output of service B for own needs later as well.

Normally, service A would call service B, wait for reply and then call service C. During this workflow, service A needs to do following work before it can call C. This extra work becomes part of critical path:
  1. wait for reply from service B
  2. read reply from service B
  3. construct and call service C.
Network and extra serialization/deserialization are often expensive operations, and when one works with large amounts of data, could take 5-10ms. In this case, construction request and making remote call to service C also can add additional 5-10ms.

Without Message Locker

This is where Message Locker becomes helpful. Workflow is now changed: service A calls service B with key K, and in parallel calls service A with key K, B puts its reply into MessageLocker using key K, service C receive this reply using key K. Service A also receives service B’s reply from Locker using key K, and does this in parallel with service C call.

With Message Locker

In this case, there are following notable changes:
  1. time to construct and call service C happens in parallel with call to service B, and as such is removed from critical path
  2. time to deserialize request and do necessary initial work by service C is also execute in parallel with call to service B, and as such is removed from critical path
  3. time to deserialize reply from service B in service A also happens in parallel with call to service C, and as such is removed from critical path
  4. time to call to Message Locker, receive and deserialize received data by service C are added to critical path. This would eliminate savings added by #2.

Using Message Locker also adds complexities:
  1. Service A, B and C need to be integrated with Message Locker
  2. Service A or B needs to know how many times message would be received from locker or what timeout to use in order to not overload Message Locker with not need message and not cause issues with message being removed to fast.

Why not use existing solutions like...

Message Locker by itself is very similar to well known existing solutions: Message Broker and Distributed Cache. Although similarities are strong, there are a few differences, that make Message Locker to stand out for its own very specific use case.

Message Broker?

Message Broker would usually have a predefined list of queues. Producers would send messages to the queue and Consumers would consume. It is possible to create temporary queue, but it is usually expensive operation. Message Broker usually assumes processing latencies are less important than other traits, like persistence or durability or transactionality.

In this case Message Broker can’t be a good replacement for Message Locker.

Distributed Cache?

Message Locker is more like a distributed cache, with additional limitations. Message is stored only for 1 or few reads or very limited amount of time. Message is removed from locker as soon as it becomes “received”.

In ordinary caching, it is expected that content becomes available for much longer period than it is in Message Locker.

Summary

Message Locker is a way to decrease latencies in aggregation services by enabling additional parallelization. This is possible, as dependency between services are organized through a proxy - Message Locker. It holds the replies from dependencies and provides them to the receiver as soon as they are available. This allows to further hide expensive operations: network call and serialization/deserialization.

This comes with additional complexities:
  1. Right value, for timeout and number of reads to evict, can be error prone to define,
  2. Message Locker development and support can be cumbersome as well,
  3. Restructuring services to benefit from Message Locker.

But when there is no other choice and latencies had to be increased, Message Locker could be a solution.

Simple Multiple BloomFilters data structure implementation on Java

As a follow up on my previous post about using Multiple Bloom Filters data structure to identify hot values, I decided to write a simple dumb implementation on Java. And open source it.

The project can be found on GitHub and is proudly named multi-bloom-filter.

It comes with basic little class called MultiBloomFilter which does most of the job. It accepts the number of enclosed bloom filters, capacity of each BF and duration before the the head BF will be reset. One can also specify what hash function is used and how many times hash function should be applied for each value.

Simple example:

This short example shows that MBF will reset only one of the internal BFs. Means, whenever reset happens, it will remove only part of the data, and whenever the hot key is added again, it would be identified as such.

Once again, MBF is a great solution if you need to find a set of hot values for some period of time. In particular, this helps to put only hot values into the cache. If we have many hosts that use a single distributed cache service, then using MBF might save from redundant traffic of putting cold data into the cache, where they would be evicted pretty fast. Also, as hot keys are in MBF, means there is a high chance they are in the distributed cache as well. Thus application has some kind of "bloom filter" to check what is the chance that value could be found in the cache for specified key.

There are much more use cases for the MBF data structure. Being able to work in concurrent scalable environment is another "feature" that I love about BloomFilters and MultiBloomFilter in particular. For me, good implementation of BloomFilter, that is able to grow and scale correctly, has different mechanism to evict data and fight the false positives, sounds as a very useful service.

SLA

SLA (Service Layer Agreement) for large distributed software is very important. It plays the role of contract between the application and it's clients. It's also not very straightforward to achieve, especially if many components participate in the process. Each component in this case should have even more strict SLA, because the result of each component SLA would sum up. For example, if single call to some service A is resulted in multiple calls to other services, it's important that other services had better SLA in order to keep service A SLA promise.

There might be different types of SLA: resources, availability, durability and performance.
Many systems provides contract on how many resources are available for the client: memory, CPU, disk space etc.
Some websites and services say that their availability SLA is 99.9%, which means that 99.9% of time they going to be available. Actually, that is not very much at all. There is a nice table on Wikipedia with conversion between availability percentage and actual time.

Some services, especially the storage services like S3, have also durabitlity SLA. This to say how rarely the service might lost the data.

Performance SLA is common for running services that need to not only be available, but return response on request within specified period of time. For performance SLA, it is always common to use some percentile of requests that would be handled within SLA. For instance, it might be said that SLA is return response in 10 ms or less for 99.9% of requests.

Leader Election: Gallager-Humblet-Spira (GHS) algorithm

GHS is an algorithm to elect a leader in the arbitrary network. It is based on building a minimal spanning tree (MST) of the network, and then based on it electing a leader.

Algorithm for building MST is very similar to the Kruskal's algorithm, although it has some specifics for the nodes distributed in the arbitrary networks. For example, the nodes have to communicate with each other in order to detect connected components, merge etc. After the MST is built, one of the nodes is chosen to be a leader. The notification is sent to all other nodes in the MST.

Basically, the goal of this algorithm is to identify the nodes in the network, and the elect a leader in the known field.

Identify hot data to cache using Multiple Bloom Filters

Recently was thinking how to support caching of hot data in highly loaded application with huge throughput. Here I'm describing a note about the idea, based on bloom filters, that I've ended up. Far from the ideal, and I haven't tested it yet. But at this moment, it seems optimal in terms of CPU and memory usage. Also, after some research over the internet, I found that idea is not new though (see link at the end of post.)

Bloom Filter (BF) gives a nice way to check if some value have been seen before. Of course, this is a probabilistic data structure, so result always goes with with some level of error.

Using multiple hash functions within single BF allows it to be better at avoiding conflicts, but in this case, it also requires the size of bloom filter to be larger. As an opposite to having one large BF and using multiple hash functions, it is possible to using multiple BFs, each relying only on single hash function, both have different size for each BF.

Multiple BFs could be also used to figure out if specified value has been seen for some number of times. As an example, imagine there is an application that accepts reads of data from storage. There is a need to store some data in the LRU cache, but because the number of different values is so large, it is impossible to just add every value to the cache. Cache limit + LRU policy will make sure that even hot data could be easily removed from cache. For example, assume we have a cache with N elements, and we reading from storage M elements, such that M is much much larger than N. If we would start adding every read element to the cache, we easily fill it with one-time read values, and the hot data would be evicted.

Leader Election: LeLann, Chang and Roberts algorithm (LCR)

This algorithm is for ring networks. Each message goes in the network from one process to another, ie no broadcasting. This means that each process knows exactly about only one other process - it's neighbor. This could be imagined as linked list.

Algorithm complexity is better than in bully algorithm, because its worst case is O(N^2), which happens only if hosts are organized into the network using by increasing (or decreasing, depends on the what extreme is used) order of their UIDs.

Leader Election: Bully Algorithm

Only the process with largest (smallest) UID becomes the leader. If some process sees that it's UID is extremal to the current leader UID, it will initiate leader election.
The are 3 types of messages:
  • election message - sent to announce election
  • answer message - response to the election message
  • coordinator message - sent to announce the identity of the elected leader
Algorithm is following:
  1. Some process P finds that leader is not available (or coordinator is down).
  2. P initiates a new leader election by broadcasting election message.
  3. Each process replies with own UID
  4. If P finds a process with higher UID, it waits a bit and if there is no reply with higher UID, it will promote that host to leader.
  5. However, if P receives a coordinator message with new leader, which UID is lower than P's UID, it will initiate another election.
Algorithm requires there should be a coordinator. It also requires N^2 + N messages, as at any moment any process can try to start election, thus send election message, handle N answer messages and send coordinator message. As an optimization, the process can send messages to the processes with larger UID only, this will give us N^2 / 2 + N messages, which is better, but not ideal.

Leader Election in Distributed System

One of the important problems of distributed computing is electing a leader process. Leader process is needed for multiple reasons: it could be used to make a final decision, it could be used to run any task that has to be run only by single process at a time (like finding a next task to execute and spreading execution between other hosts, acquiring a lock etc.), it may contain authoritative data and provide them to other nodes etc.

There are multiple leader election algorithms. Almost always, for most of the tasks it's possible to find the solution where there is no need of single leader:
  • for task scheduling the list of tasks, tasks could be partitioned and with locking spread over multiple hosts
  • for making final decision, it's possible to come up with some consensus (like paxos algorithm)
  • distributed locking could help other systems to avoid having a leader (again it might be based on some consensus algorithm like paxos or raft)

Amazon Dynamo paper notes

Notes based on the paper about Amazon Dynamo. There is however a difference between Dynamo and AWS DynamoDB (at least based on name), however this is not discussed in the document.

Dynamo is designed to be an eventual consistent storage. At the same moment it allows clients to specify number of hosts in the read (R) and write (W) quorums. The number of hosts in the cluster N is also configurable. The good rule of thumb is to keep N >= 3, and R + W > N. In this case, only the correct data would be returned.

Many systems use synchronous data replication to keep consistency strong (e.g. MySQL). This causes issues with availability, b/c if the replica failed, it becomes unavailable to its clients. Only single master host could accept writes in such systems. In opossite to this, Dynamo uses asynchronous data replication algorithms. This helps to build partition tolerated systems, as Dynamo would accept reads and writes even during network partition, and resolves the update conflicts using different algorithms.

Dynamo is a P2P system that uses gossip based protocol to share the membership and failure detection information with peers. The membership information contains the list of key ranges that the node is responsible for. Also nodes use gossip protocol to discover itself to other hosts. Basically, when a new dynamo host starts, it knows already a list of seed hosts, which it connects to tell them about own availability. Those seed hosts help to deliver the information via gossiping to other peers.

Dynamo uses consistent hashing for partitioning. This allows adding new nodes to the hash space without a need to re-hash existing values. Instead, only some part of values is moved to the new node, after what it becomes available for reads and writes.

Merkle trees

Merkle tree is a hash tree, where leaves are hashes. And each intermediate node is a hash of its children. Root node is a hash of values in its children node. Thus to know the difference between two hash trees, it's just enough to compare the root nodes of those trees. It's also easy to find the actual difference between two trees: just traverse them top-to-bottom and find the nodes with different hashes. This is how Merkle tree helps to reduce the amount of data passed between two sources.

Merkle trees are used as anti-entropy mechanism that helps to detect cases of divergence between data stored in multiple location. Merkle trees, for example, are used by Dynamo and Cassandra to find the cases when replicas have different versions of data. Merkle trees are also an awesome way to detect changes before syncing up data, which is one of the primary tasks for Dropbox, Chrome Backup, Google Drive, BitTorrent Sync etc.

Merkle trees do not rely on timestamps, but on actual data differences. This makes hash trees a wonderful structure to synchronize data between source and target with minimal effort of detecting actual changes.

For example, assume that there is a backend B, and N clients, each client maintains copy of the data (like Dropbox, BitTorrent Sync, Google Drive etc.) And some of the clients changed the data locally, and uploads them to backend B. Then each client can determine the actual change and do exchange of only necessary data with backend.

Hash trees are used by Git and Bitcoin. As I mentioned before, Dynamo, Cassandra and Riak are using Merkle trees for anti-entropy as well.Merkle trees are also used in cryptography for message signatures (search for Lamport signature).

Links:

Onion Routing

I've being interested in how Tor is working for a long time already. So just recently got to the reading more about it and especially about the technique that is a heart of Tor. This technique is called Onion Routing and this random-random-note (RRN) is about it.

Before, I quite couldn't understand how anonymity could be reached in the modern networks. However, onion routing is actually quite simple but powerful way to do that. All genius is simple!

Gossip Protocol

Gossip protocol is a style of inter-process communication in the network, inspired by gossips that happen in a human social groups. Basically, nodes (peers) exchange some information. This information may contain the knowladge that node gain by itself (eg some events), received from user (eg search queries) or received from other nodes during conversation with them.

Peers are connecting to each other in a random manner to ensure that information is shared pretty quickly. This could be like 10 times per second that one peer connects to another and exchanges the information.

It's also possible to organize the groups of hosts where each node share the information only with nodes within this group. This simplifies finding next random node to communicate, and limits the number of random useless connections between nodes. Some nodes are shared between multiple groups, to ensure that information is spread over the whole network. As an alternative, nodes could communicate only with it's near neighbors. Another alternative is to define the hierarchical levels, thus minimize the need to send message to everyone, but instead only some hosts in the specified level.

Gossip protocol unfortunately does not not guarantee that knowledge would be shared between all nodes. Another issue is that the received information might be not correct or valid anymore (well, that's in the nature of gossips, right?). Also it is said that gossip protocol maintains relaxed consistency.

Another good usage of gossips is to multicast information over the network without known structure. For example, it's not always possible to multicast information using multicast IP, because hosts might be organized into different restricted networks.

Gossip protocols are in the core of the internet.

When using gossip protocols it's better to share information about the fact than the fact itself, ie configuration was changed instead of new value for property A is B. This is also true in many distributed systems.

Usages

There are multiple usages for gossip protocol, some of them are:

  • propagate events between multiple hosts, so each node could react if need
  • could be used to route data using different paths (get the data, split on packages, deliver to the target using different routes)
  • could be used to aggregate values among different hosts (each node receive the data, check if it was seen, if not - aggregate with own data and send forward to next hosts)
  • search among large amount of nodes (used in unstructured P2P networks)
  • finding the list of nodes or just counting nodes in the network of unknown size and structure
  • electing leaders
  • sorting nodes in the network, detecting failures and errors, recovering the network structure
  • used for monitoring and gathering data, for example for load balancing
  • can help with detecting network partitions and dynamic insertions of new services
  • heartbeat implementation
  • implementing epidemic algorithms in the network
  • gather information about other nodes to make local decision, like load balancing, accessing data located in specific nodes etc.; do this in scalable way
  • detecting new nodes and additing them to the cluster or load balancer, updating DNS records etc.

But usually gossip protocols are not the most efficient one. It's pretty often possible to find some specific protocol that does the job better. However, some other protocols may not work for large networks (ie paxos), while others are centralized thus might have failure issues (ie based on centralized db).

Another downside of gossip protocols are:
same information can be delivered same host twice
message latency is usually high especially for large networks

Human Memory

I've read an awesome book recently - The Design of Everyday Things by Donald Norman. I must say that didn't expect much of this book. My thought was "just another book for designers on how to create usable things". There was however something pushing me to read this book, maybe because  I've read about it in the In the Plex; book was references as the one that influenced Google's founders. Well, now I can understand why. Even thought the book is mostly about trivial things that everyone should understand and know. In fact, it's not true. Not everyone understand and know. I didn't. So many openings about regular things, views from different perspectives, inspirational rules etc.

There are many topics that I liked in this book. But the chapter named "To Err is Human" maybe the most favorite for me. Not only because I make so many mistakes and errors all over the time by myself, and it's nice to understand how this works (and how I work). But because author gives very good explanation on how human memory and brains work.

I made some notes during reading this book, and decided to share some of the them that are related to how human memory works. I also was thinking how this apply to the AI. And made some interesting openings for myself too.

So here are my notes...