Bryce Nyeggen's Blog

A "web presence" as they say

A/B Tests and the Kelly Criterion

When you’re testing a sequence of changes in an A/B test, the question naturally arises, how large of a group should I test on? Normal practice is to wing it, usually based on some function of the number of fingers you have. This is suboptimal; a 1% test group for a Facebook-sized organization, or in the context of millions of ad impressions, is likely to be incredibly overpowered.

You can deal more rigorously with the issue by recognizing that an A/B test is a wager: we take, say, 1% of our income stream for the time period, and we spin the wheel, seeing if the payoff is negative or not.

There is a “multi-armed bandit” problem that makes this analogy explicit. However, it’s dealing with a slightly different formulation: given a number of possible “plays”, which sequence do we test & how many spins do we give each one? The vanilla posing of the problem doesn’t give a direct answer to the question of how much money we should be dumping into the slot each time (although math composes nicely and one can get a coherent solution for all these problems combined with some effort).

The question of appropriate bet size has a claimed definitive answer in the Kelly criterion. It is remarkably simple:

Proportion to bet = (prob. of success) / (cost of failure) 
                  - (prob. of failure) / (payoff of success)

To pick some contrived numbers, if I have an experiment that I guestimate to have a 70% chance of a 2% gain, and a 30% chance of a 4.5% loss, I should put ~55% of my bankroll into that wager. Essentially what this decision gets you is maximum expected long-term total payoff if you fold your winnings back into more wagers. This means that it’s not actually the appropriate criterion if you can’t continually reinvest your winnings: for instance, if you’re measuring the effect of switching from one JVM garbage collector to another, you get a choice of three and you’re done, and your “winnings” are something like having to deploy a constant proportion fewer machines or a constant reduction in latency (eg, a one-time static payoff). On the other hand, if you’re trying to turn ads into customers into ads into customers, the analogy is pretty apt.

A few questions rear their ugly heads immediately:

  • How can I possibly know the expected payoffs if that’s what I’m trying to measure in the first place?
  • How does statistical significance play into this?

The first is more of an art than a science, but you can get an estimate by looking at the results of previous experiments. If all your previous futzing with your site’s fonts only shifted conversion by 0.5% in one direction or the other, your custom Lobster substitute is unlikely to change it by an order of magnitude. But still, it’s difficult to have reasoned estimates, especially at low probabilities of highly variable magnitudes from a fickle and ever-changing customer base. It might help if you thought of the bet as not “shift N% of our traffic to an A/B test of button color”, but as “shift N% of our traffic to an A/B testing team with this track record”.

The second is trickier. As I mentioned above, it is possible to reconcile these ideas seamlessly, but it does take some math and some assumptions about what your “real” utility function and beliefs are. The Kelly criterion is fundamentally forward-looking, and statistical confidence is fundamentally backwards-looking, so they need not be in conflict, and one of the nice things about the Kelly criterion is that there is no explicit term for “variance”. In particular, because Kelly only depends on expected outcomes, you can use Bayesian updating of your expected payouts as results come in, and adjust proportions in real time.

If this sounds like it might get a bit complicated, it’s because it does. Unfortunately there is no way around it: the more rigorously you try to model a complex system of probabilistic payoffs the more elaborate your model has to be, which is why the solutions tend to be to stick with ye olde A/B tests (which are for the most part predictably suboptimal), or hire some consultants or in-house statisticians.

Designing a Persistent Bloom Filter

Bloom filters are handy data structures, particularly for applications where data sets regularly exceed RAM if stored literally. For instance, they’re useful as a way to implement an inner join or filter. You can stream the restrictor dataset into a Bloom filter, and then stream your restrictee through the Bloom filter, propogating only those elements that match, and taking only 1.44 * log2(1 / errorRate) bits per entry in your restrictor dataset. This is why databases like Cassandra use them extensively.

Usually they’re formulated as mutable data structures over a bit-array, which is in turn implemented (at least in Java) on top of an array of longs. But there’s no reason why we have to use a mutable array; persistent data structures are desirable for many reasons, not least because they can be used with the same idioms as builtin Clojure data structures if we’re operating in that environment. How do we implement them persistently? On top of a persistent vector / array, of course.

Standard Clojure persistent vectors have object overhead, and the point of a Bloom filter is to reduce memory usage, so they’re right out. You could implement it on top of a (vector-of :long) with less overhead, but there is a speed disadvantage; currently gvec / vector-of doesn’t support transients, so with K hash functions in your bloom filter, you’re doing K sequential modifications to your vector, resulting in many more allocations and copies than we’d like.

Basically, designing a persistent Bloom filter comes down to the problem of representing a persistent fixed-sized array in such a way that we can do K “modifications” per step in the minimum number of operations. Essentially all persistent data structures are built on top of a tree-like data structure, so we need to figure out the optimal tree layout. How do we do this?

When we’re constructing a Bloom filter, we provide the expected number of elements and the desired false-positive rate, which in turn gives us parameters for the optimal number of hash functions and number of total bits, so we know how many modifications per insert (K) and how many total elements we’re going to need to represent (number of bits / 64). In order to make this tree persistent, we need the guarantee that we always copy a whole node if we need to modify any of its members. So, if we have a fan-out of F, we basically accumulate a cost of F for each node along each of the K paths, without double-counting shared ancestors.

But how do we estimate the overlap? This is trickier than it seems; one of the assumptions of a Bloom filter is that the bit positions are randomly distributed. But “randomly distributed” doesn’t mean “evenly distributed”; if I drop 100 balls into 100 random buckets, I’ll end up with more than a third of buckets empty. Simulate it yourself and see:

(loop [v (vec (repeat 100 0)) ct 100] 
  (if (zero? ct) 
      (do (println "Buckets empty:" (count (filter zero? v)))
          (println "Final bucket counts:" v)) 
    (recur (update-in v [(rand-int 100)] inc) 
           (dec ct))))

The number of empty buckets (and conversely the number of buckets with at least one entry) is dictated by the negative binomial distribution. Now, we could use that to figure out the number of nodes likely to be hit on each layer, or we could simulate it, which is more fun.

//Does a Monte Carlo simulation of the cost of an insert.
public static double cost(long m, int k, int f){
    final Random rng = new Random();
    final int trials = 250;
    //Each long holds 64 bits
    final int n = (int)Math.ceil(m / 64.0);
    double cost=0;
    //Simulate from top down, averaging 100 iterations
    for(int trial=0; trial<trials; trial++){
        //Smallest power of f greater than n
        for(int width=f; width < n*f; width *= f){
            final int[] hit = new int[width];
            for(int i=0; i<k; i++){
                hit[rng.nextInt(hit.length)] = 1;
            }
            int localCost = 0;
            for(int i=0; i<hit.length; i++){
                localCost += hit[i];
            }
            //We may want to add a fudge factor to account for array header,
            //pointer access & function call overhead.
            cost += localCost * f;
        }
    }       
    cost /= trials;
    return cost;
}

It would be more than a bit overkill to run a Monte Carlo simulation in the constructor (although I’ve seen worse), and still a little overkill to estimate fresh parameters each time using statistics. The cost isn’t convex due to integer issues (sometimes a higher branching factor lets you get rid of another level, and sometimes it just makes each copy costlier), so we’d have to calculate the results of many different fanouts, and we’ll have some error in our estimate anyway since we’re running on a real machine in the physical world.

It turns out that a fanout factor of 8 has the advantage of being a power of 2, corresponds roughly to a cache line (of course Java object headers mess this up), and gives reasonable simulated costs for large Bloom filters. I’m building a persistent Bloom filter implementation into SOAC and benchmarking, so we can see if the real world agrees with the back-of-the-envelope calculations. A fixed-size primitive array with inherent support for multi-node edits would also be a good base for a different hash table implementation that doesn’t depend on gvec.

Full Disk Encryption With Btrfs and Multiple Drives in Ubuntu

At this point, encryption is an issue of social responsibility. It is important to establish a norm that data should not live or travel in plaintext without an affirmative reason, especially if you have nothing to hide, because it provides cover to the people who do. Besides the normative aspect, if you intend on doing any international travel, have any interesting friends, or do any interesting or profitable work on your machine, you owe it to yourself to secure your environment.

Ubuntu makes the single-disk encryption scenario relatively easy, but it doesn’t allow a lot of customization at install time, and has no GUI for extending encryption to multiple disks. Fortunately it’s only a bit more CLI work to set it up to work transparently with multiple disks, so you only need to enter the one passphrase. I’ve tested this in a VM with the latest Ubuntu 14.04 beta, but it should work for other versions of Ubuntu, or any distro with support for cryptsetup.

The defaulted Ubuntu “encrypt my whole hard drive” installer layers like so:

  1. The physical disk
  2. An extended physical partition
  3. A LUKS wrapper
  4. A LVM physical volume
  5. Two LVM logical volumes: one for swap, and one EXT4 filesystem for your root directory.

Whew! This is probably fine for your system drive, if a little complex; it’s nice being able to use LVM to resize your swap partition if your needs change dramatically, and if your system drive is a different size / speed than those in your storage array (eg, a 32GB SSD vs. an array of 4TB spinny disks) it wouldn’t make sense to have it as part of the same filesystem anyway. We’ll accept that default for our root partition and swap, and focus on our secondary data drives.

We’ll assume your main system has been installed successfully on /dev/sda , and we have 2 other disks /dev/sdb and /dev/sdc that we want to set up as an encrypted, Btrfs-managed mirror.

First, let’s blow away the existing disks, and create some fresh partitions. You can do this graphically or through any partition editor. The key thing is to end up with one unformatted partition on each disk; /dev/sdb1 and /dev/sdc1 respectively.

# You’ll need to be superuser for all of this
sudo -i
# For these commands, select "o" to zero the partition table,
# "n" to create a new partition (follow the defaults for a single primary
# partition that fills all space), then "w" to write to disk.
fdisk /dev/sdb
fdisk /dev/sdc

We’re going to use a keyfile so we only have to enter the passphrase that unlocks our root partition. Let’s generate one.

# 512 bit / 64 byte keyfile
dd if=/dev/random of=/etc/keyfile bs=1 count=64

Create a couple of LUKS wrappers inside those partitions, using the keyfile we just generated

cryptsetup --key-file /etc/keyfile -v luksFormat /dev/sdb1
cryptsetup --key-file /etc/keyfile -v luksFormat /dev/sdc1

Now we load the encrypted mapping, to /dev/mapper/enc1 and /dev/mapper/enc2 respectively, again using the keyfile. We write plaintext into the mapper, and it comes out encrypted on the raw device.

cryptsetup --key-file /etc/keyfile luksOpen /dev/sdb1 enc1
cryptsetup --key-file /etc/keyfile luksOpen /dev/sdc1 enc2

Now we make a choice. Btrfs has its own LVM-esque capabilities, so rather than layer in more complexity by using logical volumes, we use Btrfs directly inside the LUKS wrapper.

# Btrfs isn’t installed by default
apt-get install btrfs-tools
# A label makes management slightly easier.
mkfs.btrfs -L vol1 -m raid1 -d raid1 /dev/mapper/enc1 /dev/mapper/enc2
# The final mount point
mkdir /mnt/enc
# We can mount any of the component devices manually, and have access to the full array
mount /dev/mapper/enc1 /mnt/enc

OK, let’s modify our fstab and our crypttab so our system knows to decrypt and mount these drives. This should be added to your crypttab (optionally replacing the devices with their UUIDs, which you can get via “sudo blkid”):

# Optionally add "discard" to options to support TRIM on SSDs
enc1 /dev/sdb1   /etc/keyfile  luks
enc2 /dev/sdc1   /etc/keyfile  luks

And this should be added to your fstab (again optionally using UUID, or the label of the array):

/dev/mapper/enc1 /mnt/enc  defaults  btrfs 0 2
# Optionally, using label:
# LABEL=vol1     /mnt/enc defaults   btrfs 0 2

Now, when you boot, you will be asked for your root passphrase first. The keyfile will be decrypted, and used to decrypt your Btrfs component drives. They will then be mounted, and your data will be secure.

Upgradable Locks in Go

Read-write locks are one of the basic means of coordinating access to a shared resource. As many readers can coexist as you’d like, but a writer requires exclusive access. For performance reasons, you’d like to have your write locks over as short a section as possible so you don’t unnecessarily block readers.

Sometimes, though, you have a long stretch of reads, and then a relatively short section of writes dependent on those reads. While you’re reading, you don’t care about other readers – but you do want to ensure that after your reads, you (and not some other caller) immediately acquire the write lock. This is known as “lock upgrading”.

An example of this is restructuring a file (eg, compacting a database file). You read it in, process it, write it to a temp file, and rename it to overwrite the original. While you’re reading the original and writing to a temp file, you don’t care about other readers. But after you’re done reading, you do want to adjust all the metadata so that subsequent readers are pointed at the right file handle / mmap’d buffer / whatever, and also to ensure that no writers bash the file between your reads and your writes.

Unfortunately, POSIX locks don’t support this. Java, which probably has the best shared-memory concurrency runtime available, explicitly forbids it. If you happen to be using Go, which has only the most basic POSIX-y concurrency constructs, how do you do it without holding a write lock the entire time?

The key insight is that the caller doesn’t actually need to ensure that it “upgrades the lock”, per se. It just needs to ensure that its code is run immediately the next time the write lock is acquired. It turns out this is not too difficult in a language that supports passing around functions:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import (
  "fmt"
  "runtime"
  "sync"
)

type UpgradableLock struct {
  uglMutex sync.RWMutex
  upgrades chan func()
}

func NewUpgradableLock() *UpgradableLock {
  return &UpgradableLock{sync.RWMutex{}, make(chan func(), 1)}
}

func (ugl *UpgradableLock) Lock() {
  ugl.uglMutex.Lock()
  select {
  case v, _ := <-ugl.upgrades:
      v()
  default:
  }
}

func (ugl *UpgradableLock) MaybeUpgrade(f func()) bool {
  select {
  case ugl.upgrades <- f:
      go func() {
          ugl.Lock()
          ugl.Unlock()
      }()
      return true
  default:
      return false
  }
}

// RLock, RUnlock, and Unlock just call the corresponding method on the underlying RWMutex.

The full example lives here. When we try to upgrade, we attempt to place the function with our write-lock-requiring code in a channel, that is always selected from when the lock is acquired. In case there are no writers trying to acquire that lock (and hence run our code), we fire off one with no additional side effects. In order to ensure we’re handing off to the particular write operation we’re placing on the channel, we set the capacity of the channel to 1. If we can’t place that function (ie, there is already another writer scheduled), the upgrade fails.

There are two important limitations to this approach:

  • It doesn’t support arbitrary upgrade-downgrade cycling. This is less than ideal if for instance we wanted to split our write operation into two parts, separated by a read. Currently it is a one-way street.
  • The upgrade is not guaranteed to succeed. This makes sense – if you have multiple readers wanting to upgrade, only one of them can be next. What is guaranteed is that if we have multiple upgraders, at least one succeeds. The rest can continue spinning in the interim, attempting to stay low-impact with time.Sleep() or runtime.Gosched() calls on each cycle until they all complete.

What’s nice in addition is that this approach works on any platform with some sort of passable function / callable construct, read/write locks, and a threadsafe queue (in fact, the queue here isn’t much of a queue – you could replace it with an atomic reference or a plain mutex and a pointer).

Immutable SQL

I have strong opinions on data models. They’re not exactly novel opinions, which is good, because if they were truly novel it’d probabably be difficult to reify them on top of existing data stores. As it is, if you want your very own immutable persistence layer, you can call Rich Hickey and he will hook you up.

(I’m going to attempt to use the word “immutable” to signify data, whether it lives on disk or in RAM, which is “updated” via returning new copies and which can be validly observed at any point in time, to avoid confusion between persistence as a synonym for durability, and persistence as non-mutability. In reality it’s a bit of a misnomer.)

But Datomic is kind of expensive; SQLite on the other hand is free, so let’s use that (or really, any other free RDBMS). A data model that is both relational and immutable has certain conceptual advantages anyway, because the immutability applies to both the entities themselves, and also the relations between the entities. A blog post is in some sense the “same” blog post after editing, just with two values for its content over time. Its relation to its comments hasn’t changed. If we remove a comment, the post’s relation with its comments up to that point also doesn’t change – it’s just that we record a new relation from that point on that ommits the comment in question.

This is basically the Clojure idea of identity vs. state vs. value, expressed in a relational world.

How does this work? Basically like this:

create table posts(post_id integer not null, version integer not null, current integer not null, body text not null, primary key(post_id, version));
create table comments(comment_id integer not null, version integer not null, current integer not null, post_id integer, body text not null, deleted integer, primary key(comment_id, version));

--Unique on post_id and version prevents double-insert
insert into posts(post_id, version, current, body) values (1, 1, 1, "Valuable content!");

Notice that post_id does NOT form a primary key; only between post_id and version. We’ll potentially have many “versions” of a post, but they all have the same identity.

Here’s where things get interesting: how do I edit a post?

begin transaction;
insert into posts select 1, 2, 1, "On further reflection..." from a where post_id=1 and version=1 and current=1;
update a set current=0 where post_id=1 and version=1 and current=1; --Invalidate old data
commit;

The only part of a row that changes is the “current” flag, which is just an optimization to prevent us from having to attach a “having version=max(version)” clause to every query for the current state. Now, we live in a world where the present value for the same entity is different, but the history is preserved.

Now, here’s the really cool part: what happens if my blog co-author tries to edit the post at the same time, and submits after I do?

Absolutely nothing.

Those “where” clauses in the insert and update ensure my writes only happen if I’m working with the latest version of the data. If I am mistaken about that, no transaction happens. I can verify whether my write happened via the changes() function, or via a select after the fact if other clients are sharing my connection to the db.

Now: what if, instead of effectively overwriting the “current” state, I want to apply a functional transformation to it? For instance, I decide it’s important that I convert my double-spaces to single-spaces. I grab the current value, compute my changes, and try the quasi-update dance above. If it doesn’t work, I simply try again. Eventually my changes will go through, and it will be a valid transformation of the state – a transformation that does not need to be expressible in SQL, avoids the ABA problem, and doesn’t depend on verifying value equality in SQL (which might not be efficient or even possible).

That looks exactly like Clojure’s atom functionality, where you grab the current value, compute its transformation, and compare-and-swap the reference (which operates by checking pointer addresses, not the contents of the object pointed to), repeating if necessary until it commits.

And finally, let’s explore the relational aspect. How do I add and remove a comment?

--Adding
insert into comments values(1,1,1,1,"You write so good.");

--Removing
begin transaction;
insert into comments select 1,2,1,post_id,body,1 from comments where comment_id=1 and version=1 and current=1;
update comments set current=0 where comment_id=1 and version=1 and current=1;
commit;

Adding a comment is done in the same way as adding a post. Removing a comment is done by adding a value that has been “marked” in some way as deleted. In this case we have an actual column that records that information, but this is the same sort of optimization as having a “current” flag – we could just as easily signify deletion by having a NULL body, or a NULL post_id, and looking for the prior version if we wanted the deleted content. It depends on how we want to think about our data – when someone “deletes a comment”, are they more disassociating it from a post entirely, or “erasing” the content while maintaining the relation? Many sites actually display deleted comments as a regular comment with a “[deleted]” body, to preserve conversation threading – that’s something that’s easy to represent in this schema.

In fact, in a world where a comment was associated with multiple posts and vice versa, you’d typically represent it as a table for posts, a table for comments, and a join table for the relations. In that case, removing a comment from a post really does entail recording an updated relation and nothing else – nothing magical needs to happen when a comment reaches zero references, except perhaps when you “garbage-collect” your database by archiving old / unreferenced data.

UPDATE:

Let’s talk briefly about querying the data and making it efficient. This isn’t anything particularly magical; if you want to restrict yourself to current and non-deleted posts / comments, you just need to embed that restriction in a “where” clause, like so:

--All posts IDs & bodies x all of their comment bodies
select posts.post_id, posts.body, comments.body from 
(select * from posts where current = 1) as posts 
left outer join 
(select * from comments where current = 1 and deleted != 1) as comments
on posts.post_id = comments.post_id;

You’ll be doing a lot of this, so you’ll probably want to encode that as a view for each table, and have a compound index on the “ID” and “current” columns to make those queries efficient.

The point of this is to facilitate a more historical view of the world – if your production systems mostly care about the present state, you can ship the archived state to a completely different system (maybe Hive for storage and bulk queries, and Shark for iterative analysis):

begin transaction;
insert into archived_posts select * from posts where current = 0;
delete from posts where current = 0;
commit;

This only works to archive “dead” data – but the present state is probably something you want in your analysis stack as well. To handle this, just ship your present state as a transient table that’s overwritten, and your archived state as a strictly accumulating store. Query the union.

select * from posts
union all
select * from archived_posts;

And having snapshots isn’t helpful unless we can actually examine the state on a particular date. To do this, you’d need to have a “created_on” column that embeds the date:

-- What version was live on 2010-01-01?
select post_id, version from posts 
where created_on < "2010-01-01" 
group by post_id having version = max(version);
-- Join this with whatever data you're actually interested in.
-- Or examine the subsequent state, if any; because version ID is
-- monotonically increasing by 1, it's trivial to examine deltas.

That requires a group-by over a potentially large amount of data per ID, depending on how many revisions you have. Depending on your particular RDBMS and indexing scheme, it may or may not be efficient to do that in the same system with the same cache that’s managing queries to the present state, which is why being able to ship historical data to a separate system is so handy.

12 Ways of Looking at Logistic Regression

Logistic regression is the most popular off-the-shelf classification technique. In fact, it’s so popular that every subfield has their own interpretation of the same technique, just so they have it in their toolkit while being able to compare it with the stuff they’re actually researching in the same theoretical framework. I’ve attempted to reproduce some of the many interpretations I’ve heard of below, bearing in mind I’m not an expert in all of these, so some of the nuance might be lost.

  • The classical statistical interpretation. Your labels come from a binomial distribution, conditional on their features. You want to estimate the distribution.

  • The Bayesian statistical interpretation. In addition to the above, your parameter estimates are themselves probabilistic beliefs with their own distributions. If you could encode a totally non-informative, zero-strength prior, this ends up being more or less the same as the frequentist interpretation.

  • The latent-variable interpretation, popular with social scientists and psychologists. There is some latent continuous variable that determines the outcome, depending on which side of the threshold it falls on, but we can only see the final outcome. Your goal is to estimate the parameters that determine this latent variable as closely as possible.

  • The Kentucky Derby interpretation. Your parameters represent multiplicative effects on the odds (as in, a 4:1 bet). Your goal is to calculate the effect of each feature to end up with the same outcome.

  • The less-Naive-Bayes interpretation. Like Naive Bayes, but estimating the pairwise correlations/covariances instead of assuming uncorrelated variables.

  • The information-theory interpretation. Find parameters so that conditional on the features, the output label distribution has maximum entropy.

  • The warp-space interpretation. Perform a kind of quasi-linear-regression in a space where we transform the label dimension via an inverse sigmoid.

  • The loss minimization interpretation. You have a loss function that gives you a penalty for each misclassified example (a higher penalty the more extreme your prediction was), and you classify an example by dotting its features with your parameters and applying a sigmoid. Find parameters that minimize the loss.

  • The “minimum bias” interpretation, popular with actuaries. Plot your data as a tensor, with each feature being a dimension, and the outcomes for each feature combo being summed in the appropriate cell (this only works for categorical features). Try to find parameters for each dimension, so that when you sum them together, apply a sigmoid, and multiply by the cell population, you get minimal binomial loss.

  • The neural network interpretation. Your features constitute a stimulus, dot-product’d with your parameters and fed through a sigmoid activation function to get a predicted label. You’re maximizing the fidelity with which your neuron can “remember” the label for the data it has seen.

  • The support vector interpretation. Take your data, and try to segment it with a hyperplane. For each point, apply a “supporting vector force” to the plane, proportional to the logit of the distance. When the forces balance, your hyperplane gives you your parameters.

  • The feedback interpretation. We initialize our parameters to some garbage values. For each observation, we dot the features and our parameters. If the result is negative and the outcome in our data is positive, or vice versa, move the parameter vector “backwards”, in the reverse direction of the feature vector. If they’re both negative or positive, move the parameter vector “forwards”, in the direction of the feature vector. This corresponds to the stochastic gradient descent fitting procedure.

There are probably more I missed. Despite the fact that everyone has a similar basic toolkit, there seems to be a pretty low amount of cross-domain polination on extensions, even between similar fields like machine learning and statistics, or statistics and actuarial science. Maybe that’s because everyone is speaking their own dialect, and content to restrict their conversations to native speakers.

Clownshoes: An ENTERPRISE GRADE Etc. Etc. DOCUMENT STORE

Everyone should write a databse. It’s a core area of practical software engineering, it’s fun, and turns out it’s not that hard.

(The real reason, of course, is to wind up rolling in dough. It’s basically the same thought process behind learning to play the guitar.)

So I threw together Clownshoes, the ADVANCED ENTERPRISE GRADE NOSQL BIG DATA HIGH PERFORMANCE SCALABLE DOCUMENT STORE FOR BIG DATA (capitalization is emphatically part of the description). It’s not quite as awesome as some other NoSQL databases, like IMS, but it’s fun in its own way. The core storage engine bears a strong resemblance to a certain other prominent document database, to wit: mmaped linked-lists of “documents”, where “documents” are in this case arbitrary bytestreams, which you can easily use to represent anything from JSON to BSON to entire web pages.

It supports atomic inserts, deletes, and updates (“supports” might not be a strong enough word; all updates are atomic since there’s a per-database write lock). It scales to collections over 180TB in size, and working sets over 1TB. It’s wicked fast on the storage layer. How does it do all this?

Clownshoes, being HIGH PERFORMANCE, knows that in order to scale to BIG DATA you need to run on BEAR METAL. No network overhead for us! Instead it is embedded in your running Go application process. You can easily scale horizontally by buying a couple of LSI cards and plugging in a disk enclosure to the left or right of your application server, or inserting more RAM to either side of your CPU socket. If you find yourself bound on CPU throughput, you can scale horizontally as long as you built with some creative geometry which gives you room to grow (see the diagram below):

CPU HERE…….EMPTY SOCKET

EMPTY SOCKET…….CPU HERE

The other main perfomance trick is journaling. Journaling essentially converts random writes into linear writes by recording a running log, eg, “write N bytes at X location”, and ensures that those linear writes actually hit disk. That way if there’s a problem actually committing to your main database file, you can replay your journal against the last consistent snapshot and recover. Not a bad idea, but we need all the IOPs we can get – our journaling strategy is to not journal, and let the user to decide when they want to snapshot their database (Clownshoes believes in moving fast, and also in breaking things). In general, complete configurability about how much to care about data is the secret to our webscale sauce.

Now, you might say to yourself,

This is almost exactly a linked list of strings, plus some hashmaps for indexing, plus gob-based serialization, plus the OS’s paging algorithm to support data bigger than physical memory capacity. The only difference is that you’re managing the memory a bit more manually via mmap, but because Go’s GC is currently non-compacting, if you’re using the built-in structures and memory pressure is high enough the OS will swap out a lot of untouched data to disk anyway. The wheel has been reinvented here.

This is true. However:

  • There is an advantage to managing the mmap-ing, compaction, etc. yourself, rather than letting the OS manage the swapping. Go’s GC does have to traverse pointers on the heap when it GC’s, which can be avoided if you manage the pointers by embedding them all in a single array as Clownshoes does. Depending on how & where the “native” pointers are actually stored in memory, they might actually generate some swapping you don’t need as they bring in entire 4k pages, and traversal will of course add more time to your collections.
  • I said everyone should write a database, not use a database they wrote themselves. Your probably shouldn’t use Clownshoes for important things; I am personally using it to manage some metadata about which movies I have watched. I have probably spent more time writing Clownshoes (a few hours cumulatively) than I have watching movies over the past month or so, so this is not mission-critical data. In any case, if you want an embedded data store, what’s wrong with SQLite? Just use SQLite already.

I guess that last one isn’t really a “however” so much as a “yup”.

Persistence + GC Saves Memory

One of the disadvantages of garbage-collected runtimes is potentially increased memory consumption. This usually comes from a couple of places: object headers used for GC metadata, and “extra” heap space kept around to manage the garbage collection process (for example, the space taken up by Java’s Eden pool, which is gradually filled up and then GC’d, rather than incrementally freeing objects as in a reference-counted environment like Python or a manually managed malloc/free environment like C).

But garbage collection can actually lead to vastly reduced memory consumption, because it allows you to use persistent data structures that share data which would otherwise have to be tracked separately. You can share the same backing data for things that will be used & modified in different ways, because it’s effectively a copy-on-write environment: if you conj data to a shared vector in Clojure, the original is unchanged and the caller gets a “new” vector, with most of its backing data shared with its parent.

Those kinds of persistent data structures really only work (at least performantly) in the context of garbage collection, where you’re automatically freeing objects that have ended up with no references to them. Otherwise, managing the object graph for the kind of complex relationships that persistent data structures generate is nigh impossible.

Here’s an example. You have a large number of short phrases you’re analyzing. If they’re in a Latin-alphabet language, you can easily segment them into lists of interned strings representing words (a poor man’s trie). A lot of phrases are duplicates, so you can also de-dupe the entire phrase-list – and it’s free as long as you’re using persistent data structures that guarantee you’re placing no restrictions on how future callers can use your data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
;Straightforward impl
(with-open [r (clojure.java.io/reader "short_cuts")]
  (->> (line-seq r)
       (map #(vec (clojure.string/split % #"\s")))
       (into [])))

;Intern the strings
(with-open [r (clojure.java.io/reader "short_cuts")]
  (->> (line-seq r)
       (map #(vec (for [token (clojure.string/split % #"\s")]
                    (.intern ^String token)))
       (into [])))

;Dedupe the vectors
(with-open [r (clojure.java.io/reader "short_cuts")]
  (loop [remaining (line-seq r)
         as-vec []
         as-set #{}]
    (let [this-line (vec (for [token (clojure.string/split % #"\s")] 
                           (.intern ^String token)))]
      (cond (empty? remaining) as-vec
            (contains? as-set this-line) 
              (recur (rest remaining) (conj as-vec this-line) as-set)
            :else 
              (recur (rest remaining) (conj as-vec this-line) (conj as-set this-line))))))

The result: Vectors of non-interned words take 21.5 gb. As vectors of interned words, they take 12.1 gb. And as de-duped vectors of interned words, they take 4.4 gb. And each version supports the exact same operations on the result – you can take one of those de-duped phrases and “modify” it with no side effects on its siblings, because you’ll get back a new reference.

But here’s the twist: most of that storage is overhead from the clojure.lang.PersistentVectors we’re storing the strings in. Each of those has a 32-long Object array for the leaf, plus another 32-long Object array for the root node, plus some miscellaneous primitive fields. Seems a bit excessive when the vast majority of our vectors are <=10 elements long. What happens if we replace them with something more efficient, like a PersistentArrayVector from SOAC?

1
2
3
4
5
6
7
8
9
10
11
12
(require 'soac.arrvec)
(with-open [r (clojure.java.io/reader "short_cuts")]
  (loop [remaining (line-seq r)
         as-vec []
         as-set #{}]
    (let [this-line (soac.arrvec/array-vec (for [token (clojure.string/split % #"\s")] 
                                             (.intern ^String token)))]
      (cond (empty? remaining) as-vec
            (contains? as-set this-line) 
              (recur (rest remaining) (conj as-vec (get as-set this-line)) as-set)
            :else 
              (recur (rest remaining) (conj as-vec this-line) (conj as-set this-line))))))

Memory usage goes down to 1.5 gb, or ~7% of what we started with.

The Genius and Folly of MongoDB

MongoDB is easy to make fun of. The global write lock (now just a database-level write lock, woo). The non-durable un-verifiable writes. The posts about how to scale to Big Data, where Big Data is 100gb.

It makes more sense when you look at how the underlying storage layer is implemented. Basically, MongoDB consists of a collection of mmap’d linked lists of BSON documents, with dead simple B-tree indexing, and basic journaling as the storage durability mechanism (issues with what the driver considers a “durable write”, before data necessarily hits the storage layer, is something others have dealt with in depth). Eventually writes get fsync’d to disk by the OS, and reads result in the page with the data being loaded into memory by the OS.

All the speed that was initially touted as the killer advantage, is a result of using the page cache. When you realize “it’s just mmap”, all the BS about optimizing your working set to fit within RAM, the cratering of performance once you hit disk, the fragmentation if you routinely delete or grow records, etc., make perfect sense. The OS doesn’t know you’re running a database; it just knows you want to mmap some stuff and gives it its best shot. Fortunately, the algorithm was written by some very smart people, so it tends to work pretty well up front (as long as you’re always hitting the page cache) – but when the OS schedules writes it doesn’t know about your storage layout, or even the difference between your indexes and your data. It certainly can’t infer what data to keep in cache, or preload, because it doesn’t know what or where your data is – it’s just some bytes somewhere you wanted for some reason.

But actually, that’s the Tao-like genius of MongoDB – having absolutely nothing new. Most databases are built with some killer idea: the consistency protocol for Cassandra, the crazy data structures of Redis, or the data-processing abilities of Hadoop. MongoDB has mmap, and by “has”, I mean “uses” (but hey, possession is nine tenths of the law). Not having to design your own caching algorithms or write strategies, and using the simplest possible implementations of everything else, lets you get to market quickly and focus on marketing your benchmarks, consulting to bring your customers up to Web Scale, responding to haters, or learning about concurrency. You can get a fair amount of traction before your customers realize there’s very little “there” there, and by that time you might have either cashed out or written an actual database (they’re starting down that road with their page-fault prediction algos, and good for them). In any event, your customers are more or less locked in, as they’ve jumped through so many hoops to accomodate your design decisions (“oh, long field names take up more space in every record? I guess we’ll just use the first N unicode characters and map on the application layer”). It’s no coincidence that you’re being compared to Oracle and IBM.

Like I said, MongoDB is easy to make fun of.

There’s exactly one situation where you should look at MongoDB. Focusing on the storage engine and ignoring all the issues with their broader durability strategy, the killer application is something like user data for an online game: a situation where you have a fairly consistent working set for a given chunk of time, it’s small relative to the whole database, reads and writes are in the same working set, you have lots of reads relative to writes, clients do a lot of the computation for you, you’d like schema flexiblity… You could jam that into a relational model and use something like hstore or JSON columns to fill in the gaps, or store documents as blobs / text in something like HBase or Cassandra, but you probably won’t have that bad of a time with MongoDB (until you get a surge of traffic; it doesn’t exactly degrade gracefully).

But in that case, it also wouldn’t be crazy to pull a Viaweb and store it on the file system (ZFS has a heckuva caching strategy, and you know when it accepts a write). It’s obvious and uninformative to say “it depends” regardless of the question; of course some things are generally better than others.

Being able to walk with them doesn’t change the fact that as of right now, MongoDB is clown shoes.

I’ve Got 99 Features, Give Me a Random One

Say you have a large array of Stuff, and you need to get a random element. In fact, you need to get many random non-duplicated elements – i.e., sampling without replacement. And since we’re living in the future with quad-core supercomputers in our pants, it should be threadsafe and scalable.

The simple thing to do is to shuffle the elements, and distribute them to the callers. You can do this by shuffling with a lock on the array, and then having each caller use AtomicInteger.getAndIncrement() to get a unique index to take. You could even do this online, since shuffling is an incremental process, and distribute the elements with a concurrent queue.

But if you need some small-but-unknown number of elements, this isn’t the best fit. You’re forced to shuffle too many elements, or if you didn’t shuffle enough initially, to maintain a potentially high-contention lock (either on the array itself or a queue) to isolate the array as you select more random elements.

Instead, you can have each caller select a random element, and then check if it has been selected before – either by maintaining a common threadsafe set of selected elements (e.g. via a ConcurrentMap) or, even better, by marking selected elements themselves. As you reach saturation, you’re forced to re-check many times and it becomes more efficient to shuffle-and-iterate. But, for small subsets, it’s very efficient and avoids having to think too much about synchronizing between shuffling and selection.

This kind of usage pattern is common in certain machine learning techniques that use random feature selection – for instance random forests. If you make your sampling parallelizable, you can see some pretty nice speedups.