What might a true distributed graph look like?


#1

Or, what if we wrote “the dream”? :thinking:

Obviously this is a huge topic. But while I’m writing down sketches, I’ve always wanted to try this one.

This would be a separate but sister repo/project to Cayley, but one made far easier by the fact Cayley exists: it’s easy to write a backend for Cayley, and so all the questions about query languages and shapes and what’s a quad/quadstore and so on are right there. We only need a gRPC call from one to the other.

The way all the distributed backends today do it, if we’re honest, is by sharding the quads. It doesn’t matter if we’re on Elastic/Mongo/Cassandra/etc… they all have the basic document be a quad, and we distribute the quads everywhere in the cluster.

This lets you distribute very easily. It also means that a traversal is really hard – when we iterate over "all quads which have subject <foo>" – you have to query every machine, and take the union of the matching triples. It makes scaling hard because you’re returning these large sets over the wire. As you traverse, you have to hit every machine – even if you batch things up – about once for every LinksTo. If your cluster is large, this means a lot of traffic on your network.

                                                   
        +---------------------+                    
        |  Query Engine       |                    
        |                     |                    
        +---------------------+                    
         /   ^   |  ^     \  ^                     
        /   /    \  |      -\ \     //Cycles here               
       v   /      v |        > \                   
  +--------+   +--------+    +--------+            
  |Node    |   | Node   |    |Node    |           
  |1       |   | 2      |    |3       |            
  |        |   |        |    |        |            
  +--------+   +--------+    +--------+ 

(Often times, it’s Cayley talking to the Query Engine, which may be the Mongo port, for instance)

That’s how it usually works today. However…

What if, instead, we sharded the nodes?

We’d need to be careful, of course. We may have to split a node between multiple machines if it gets large (think of the degree of the node for </people/person>) but, since it’s often scale free, those nodes are farther and fewer between. The vast majority of nodes would fit in a single “node page”.

In some sense, you can think of the indices we currently have in KV as being a “node page” – for buckets “sub” “pred” and “obj” where the key is a node hash or ID – if we turned that inside out and wrote our indices as “node hash” bucket and inside was a proto with fields for sub pred and obj, that proto is a “node page”.

This “node page” then can be assigned to a machine – for how to assign blocks of things to machines, we can look at the standard tricks I used in Torus – consensus metadata server in etcd, hash rings, the usual. For nodes that are too big that we do need to split them up, that list can be kept in etcd as well.

The TL;DR is, then, we can assign nodes to machines. An individual node knows the quads it’s mentioned in, but only that. However, we win on data locality.

That means the query engine, for a given traversal, might only need to talk to one node:

                                                   
        +---------------------+                    
        |  Query Engine       |                    
        |                     |                    
        +---------------------+                    
         /   ^                      
        /   /                     //Still cycles               
       v   /                   
  +--------+   +--------+    +--------+            
  |Node    |   | Node   |    |Node    |           
  |1       |   | 2      |    |3       |            
  |        |   |        |    |        |            
  +--------+   +--------+    +--------+ 

But I think we can do better.

Let’s take some inspiration from Google’s Pregel Paper – the mantra there is “think like a vertex”.

If we’re sitting on a vertex, there’s a very few things we can do. We can check our connections ("yes, I have that quad, I’m connected to <foo> by <p>"), we can send messages to other nodes (“tell <foo> to process this message”) and we can vote to halt (a meta-action).

This means evaluating a query can be done as message-passing. The query is done when all machines are finished processing it. Suppose we come up with a simple assembly language for a query – the exact semantics of which I am making up on the fly:

SEND </en/the_matrix>
CHECK predicate: </type/object/type> object: </film/film> // FAIL if false
SENDTO object WHERE predicate: </film/film/directed_by>
TAG "director_id"
SENDTO object WHERE predicate: <name>
RETURN

These steps can be built, compiled and optimized from query shapes. But once built, they should try to execute faithfully. Now we’re passing messages between nodes to evaluate the query:

                                       
        +---------------------+        
        |  Query Engine       |        
        |                     |        
        +---------------------+        
         /          ^       <-         
        /           |         \--      
       v            |            \-    
  +--------+   +--------+    +--------+
  |Node    |   | Node   |    |Node    |
  |1       |   | 2      |    |3       |
  |        |   |        |    |        |
  +--------+   +--------+    +--------+
    |   -         ^              ^     
    |   +---------|              |     
    |                            |     
    +----------------------------|     

The messages are very short. Using the above example:

{ 
query_id: 3  // this tells us what query code block we're executing -- we don't have to send it every time
program_counter: 4 // what line we start from, in this case after directed_by
target_nodes: [34, 50] // the nodes of both the wachowskis
tags: {}
}

Tags can be part of the message or reported back to the Query Engine; I’m not sure which is better, but it’s easier to reason about tags in the message; then then entire program state is in the message.

This looks a lot like a message queue; you could even use one to implement the same idea.

No idea if this works, but those are the interesting pieces of the sketch. Small messages, running around the network as they run around the graph. Individual steps as simple as possible. Messages are traversals, so then the question becomes how many messages/sec can the cluster handle. It seems likely that a bigger cluster could handle more, which is a good scaling feature. As an estimate, we can ask how many messages per second can, eg, Kafka handle?

You’d need replication for nodes, and a bunch of other things for reliability. Writes aren’t too bad – you a new write to every computer that owns a node that’s mentioned in the quad. There’s probably some out-of-band things too – eg get statistics from the cluster so the query engine replica set can write the query program the best way.

There’s a lot I didn’t cover, but I wanted to write out my general sketch. It’s sort of been done before? But maybe not applied to graphs.