FokosDB: A strongly consistent bottomless storage database ontop of Cloudflare Durable Objects

In this post I do a deep dive on a database I built ontop of Cloudflare Durable Objects (DOs) and the rest of Cloudflare Developer Platform.

Let’s clarify a few things first.

  • Is FokosDB production ready? Not yet, but it will be soon.
  • Is FokosDB going to be fast and suitable for all use-case? No, read the Tenets and Goals sections below.
  • Is FokosDB going to be enough for many applications on Cloudflare Workers? Absolutely, most of them actually.
  • Is FokosDB really bottomless storage? Yes, as long as Durable Objects can be created in your account.
  • Is FokosDB an official Cloudflare product? No, hence not officially supported by Cloudflare.

Is this yet another vibe-slop? No. I did a lot of iterations on the design, and researched a lot of similar databases before I settled on the following tenets and goals. A lot of the code is AI generated (see AI productivity boost section) but I reviewed every single line, and most likely I modified every single line in some way or another.

FokosDB is the database I want to use in production for my projects on Cloudflare’s Developer Platform. My plan is to be correct, reliable, and with sufficient performance.

Table of contents

Tenets

Applications, especially data products, deployed on serverless platforms have to make trade-offs due to the different constraints of the platform, and Cloudflare’s Developer Platform is no different.

These tenets drive the trade-offs in the architecture and characteristics of the database.

  • Bottomless storage.
    • Within practical limits I don’t want to be constrained on the total overall storage used by the database. I should be able to store TBs of data without issues.
  • Strong consistency.
    • Eventual consistency makes everything harder to reason about. If I update item A and immediately read item A, I want to see its latest version.
    • Eventual consistency could be optional if it gives significant performance benefits, but should be opt-in.
  • Throughput should scale across different items.
    • Even if the per-item throughput will be constrained, throughput across thousands or millions of items should scale up linearly to the number of items.
  • Good enough performance.
    • Network round-trips are inevitable when running at the edge and wanting to reach durable storage, but we should reduce them as much as possible.
  • Managed scaling.
    • Scaling should be automatic without manual intervention. I just want to write and read data.

Goals and API

These goals drive the implementation and API provided.

  • Each item is identified by a hash key and an optional sort key.
    • When items have the same hash key they should have different sort keys, and items having the same hash key are sorted alphabetically based on their sort key allowing range queries within the hash key.
  • Each item is less than 1MB (default limit is 500KB). For larger blobs I will implement integration with R2 (object storage).
  • Scalability is achieved by sharding across many different items, ideally with different hash keys to eliminate hot partitions as much as possible.
    • The system architecture optimizes for hash key sharding, even if you can have unlimited number of range keys within a hash key.
  • Core operations: putItem, getItem, deleteItem, queryItems (soon).
    • All write operations support conditional checks.
  • Distributed transactions across items: transactWriteItems, transactReadItems.
    • Restrictions on number of items and total request size apply.
  • I decide about trade-offs based on these use-cases: a) 100-500 GB total storage, and b) 100 TB+ total storage, both assuming well spread out hash keys.

My favourite database is Amazon DynamoDB in case it wasn’t clear so far.

Why not xyz

Why did I build a new database instead of using something that already exists?

Cloudflare offers several storage products: D1, Durable Objects, Hyperdrive+Postgres/MySQL, R2, Workers KV

Each one of these comes with its own set of constraints and limitations, but none of them satisfies the tenets and goals above.

I also wanted to push Durable Objects and the Developer Platform as much as I can in order to discover what primitives we should go and build next natively into the platform.

ps. My day job is at Cloudflare’s Developer Platform.

Finally, it’s fun.

Platform constraints

Before we dive into the architecture, let’s recap our constraints.

  • Each Worker and Durable Object has a memory limit of 128MB, and that’s potentially reused across requests handled on the same server.
    • I want less than 5MB memory used, ideally less than 1MB, inside each Worker doing a database request.
  • Durable Objects are single-threaded, I like to call them tiny servers, and they have resource limits. Even though this documentation is about D1, the exact same performance characteristics apply to Durable Objects.
    • For easy back of the napkin calculations assume each Durable Object can handle 1000 requests per second doing reads and 300-500 requests per second doing writes, assuming a handful of rows involved.
  • Each SQLite Durable Object can store up to 10GB and each row up to 2MB.
    • I cap the item size to 500KB, and the Durable Object total size to be less than 1GB since many operations are more efficient with less storage used.
    • Migrations and splits are significantly faster with smaller Durable Objects too. Until we get a native fork/clone API.
  • Durable Objects do not (yet) run in all of Cloudflare’s edge locations, so even if a Worker runs within 50ms of your location, the closest Durable Object location could be hundreds of milliseconds away. See https://where.durableobjects.live for a cool website showing Durable Object locations by region and city.
    • I cannot do anything about this for now, but this is the main latency overhead we will observe and try to optimize for.

I love constraints because they force you to compromise something to benefit on something else. We have to think up front about these constraints to design a system that attempts to satisfy these constraints as best as we can.

Let’s have some fun. 😉

System architecture overview

The following diagram contains everything that makes FokosDB work, except the distributed transaction coordinators that we will add later.

It’s simpler than it looks.

FokosDB high-level architecture

flowchart TD
    A([User]) -->|operation+data| B(Worker)

    C{{Workers KV Cache}} -.->|topology encoded| B

    B ==>|BY_HashKey\ndirectly| DD[(PartitionDO_roots_1)]
    B ==>|BY_HashKey\ndirectly| D[(PartitionDO_roots_1_N)]
    D -->|BY_HashKey| E[(PartitionDO\nhash_level_1_1)]
    D -->|BY_HashKey| F[(PartitionDO\nhash_level_1_N)]

    B ==>|BY_HashKey\ndirectly| FF
    E -->|BY_HashKey| FF[(PartitionDO\nhash_level_2_1)]
    E -->|BY_HashKey| FFN[(PartitionDO\nhash_level_2_N)]

    E -->|HashKey_ABC| G[(PartitionDO\nrange_root_HK_ABC)]
    G -->|BY_SortKey| H[(PartitionDO\nrange_HK_ABC.SK_MIN.SK_ghf)]
    G -->|BY_SortKey| I[(PartitionDO\nrange_HK_ABC.SK_ghf.SK_MAX)]

    TK(TopologyKeeperDO) -->|topology encoded| C
    G -->|partition split| TK
    D -->|partition split| TK

Mermaid LIVE EDITOR

Life of a FokosDB request

Each FokosDB data partition is a single Durable Object, and each partition can act as a “hash” partition or a “range” partition. The only difference between the two is how they execute their splits to child partitions and and how they forward requests to those child partitions. Hash partitions route based on the hash key and range partitions based on the sort key.

Let’s first examine the life of a request assuming that there is no information cached about the data partitions layout (topology).

  1. A user of the application in London (LHR) makes a request that ultimately needs to write an item {hashKey: "lambros", sortKey: "post_001", data}.
  2. The user request is processed in a Worker that will run close to the user (LHR) unless there is explicit placement configured.
  3. The Worker will extract the hashKey: "lambros" from the request and hash it across N root hash partitions to pick the one handling this hashKey.
  4. The request will be sent to the selected root hash partition, in this case PartitionDO_roots_1_N from the diagram above.
  5. The hash partition will then check if it has already been split or not, in this case it did split, so it will repeat the hashing of hashKey: "lambros" across its child partitions and forward the request to the picked partition. The hashing function includes the “tree level” to randomize picked partitions across levels.
  6. The request is forwarded to hash partition PartitionDO hash_level_2_1 that will repeat the same split check, and since it hasn’t been split, it will process the request locally, and the response will travel all the way back to the Worker.
  7. Worker prepares the application response to the user.

When there is topology information in the cache, the Worker will use the topology and pick the partition that owns the item and send the request directly to it, skipping the partition tree traversal. See the arrow directly to partition PartitionDO hash_level_2_1 in the diagram above.

Keep in mind that a topology in the cache could be stale. However, due to our tree-based partitioning we will route the request to whichever partition owned the item we want as of the cached topology’s information, and then that partition can continue forwarding the requests down the tree as necessary.

The topology cache is simply a performance optimization, not a correctness mechanism.

Data partitions

Each data partition is a single SQLite Durable Object and holds <1GB of data, my default threshold is 500MB. We won’t examine the SQL schemas at this point, but there is a table holding the items, and then some auxiliary tables for tracking distributed transactions and split information.

Each partition acts as a “hash” partition or a “range” partition. Everything in the schema and behavior is the same, except that hash partitions route requests to their children using the hash key, and the range partitions using the sort key.

A hash partition can hold items with different hash keys and sort keys, whereas a range partition can only hold items of the same hash key but many sort keys within that hash key.

When the total storage of a single hash partition exceeds the configured threshold we schedule a “split”, and the partition will create a fixed number of child partitions. From that point on the hash partition only acts as request forwarder and never modifies its local storage for item operations. Asynchronously the child partitions will fetch the items that hash into them and start accepting item operations.

When a single hash key is responsible for more than a configured percentage of storage (like 30-50%) in a hash partition due to many items with different sort keys, that hash key is promoted into its own range root partition and a similar data migration process is followed as a partition split.

In the diagram above the partition PartitionDO range_root_HK_ABC represents the range root partition for items with hash key HK_ABC.

Once the max storage threshold is breached in a range root partition, we schedule a split into child range partitions. The process for the range split partitions is the same as with hash partitions, with the difference that child partitions own a continuous segment of the items based on the sort keys instead of hashing items by hash key. See child range partitions PartitionDO range_HK_ABC.SK_MIN.SK_ghf and PartitionDO range_HK_ABC.SK_ghf.SK_MAX in the diagram above.

To recap, we have one Durable Object per data partition holding a number of items. Hash partitions and range partitions are the same except when it comes to splits and forwarding of requests.

Topology invariants

A nice property I like with this architecture is that even without any knowledge of the topology like how many partitions have been split or which keys have been promoted to their own range partitions, we can still process requests.

The topology we implement has the following invariants / rules:

  1. A partition is either processing operations locally in the items it holds, or always forwards requests to its child partitions, but never both. The exception of the rule are the promoted hash keys for which we forward requests to their root range partition.
  2. Data partitions are split, but never merged back, therefore the topology tree always grows.
  3. Each item is owned by a single partition at any point in time and its ownership can only be passed to a single child partition, or a single root range partition.
  4. There is a fixed number of root hash partitions and that number is constant for the lifetime of the database. Once an item is written to the database, the number of root partitions can never change.
  5. Each hash partition is split into a fixed N number of child partitions. The number N is constant across all hash partitions and once the first split occurs it can never be changed.
  6. Each range partition can split into an arbitrary number of child range partitions.
  7. Any version of the topology can route requests to the correct partition. A fully updated topology should always be a single hop to the correct data partition.

The fixed number for root partitions and the fixed number of hash split child partitions allow efficient topology encoding as we will see in the Topology encoding section.

Splits and hash key promotions

As mentioned in the tenets section, the throughput of the database should grow as we add more items with sufficiently spread out hash keys, and we should provide bottomless storage.

We achieve this by using many Durable Objects.

At one extreme someone would put each item into its own Durable Object and get its full capabilities per item. This would be a big waste of resources though, and it would lead to many cold starts for items not accessed often.

We still use many Durable Object partitions to scale out the throughput and storage of the database, but we pack many items in each partition to exploit warm live DOs allowing latencies of single digit milliseconds (same colo).

Once enough items are written breaching the storage threshold for a split, we schedule a split, initialize child partitions, migrate data to them, and only forward requests to them from now on.

NOTE: Before diving into the actual implementation flows, I want to point out that the data migration process is the biggest complexity of the entire thing.

It’s a few hundreds of lines of code and a state machine to avoid races while handling item requests and transactions at the same time as data is being migrated. I am hoping that a platform native API for forking/cloning DOs will land soon and allow me to remove all this complexity.

The following sequence diagram showcases the full hash partition split (click for full size).

FokosDB hash split sequence diagram

sequenceDiagram
      participant C as Client
      participant P as PartitionDO
      participant BG as Alarm / setTimeout (PartitionDO)
      participant Ch as Child PartitionDO(s)

      C->>P: putItem(hk, sk, data)
      activate P
      Note over P: Write item to items table
      P->>P: checkSplits(hk, sk)
      P->>P: maybeQueueSplit(), status → split_queued
      P->>P: setAlarm(+5s) + scheduleBackgroundWork(10ms)
      P-->>C: PutItemResult
      deactivate P

      Note over P,BG: ── PHASE 1: Split Init (background, ~10ms) ──

      activate BG
      BG->>BG: detect split_queued → startSplit()
      Note over BG: Initialize child DOs (idempotent)
      BG->>Ch: initFromSplit(..., type='hash')
      activate Ch
      Note over Ch: Persist parent/split ctx<br/>migration_initialized → KV
      Ch-->>BG: ok
      deactivate Ch
      Note over BG: All Child DOs initialized, status → split_started
      Note over P,Ch: Parent DO only forwards requests going forward<br/>Writes rejected (503) while migrating<br/>Reads re-routed back to parent via getItemDirect()
      BG->>Ch: triggerMigration()
      activate Ch
      Note over Ch: migration_initialized → migration_migrating<br/>scheduleBackgroundWork(0)
      Ch-->>BG: ok
      deactivate Ch
      deactivate BG

      Note over P,Ch: ── PHASE 2: Data Migration (child background) ──

      activate Ch
      loop Item batches — cursor-based, resumable across crashes
          Ch->>P: getItemsBatch(childCtx, cursor)
          P-->>Ch: {items[], nextCursor}
          Note over Ch: INSERT OR IGNORE into local items<br/>Checkpoint cursor in KV after each batch
      end

      loop Tx metadata batches
          Ch->>P: getPartitionTransactionMetadata(childCtx, cursor)
          P-->>Ch: {maxDeletedTs, pendingTxRows[], nextCursor}
          Note over Ch: Copy pending locks (per-child slice)<br/>Sync deletion high-water mark
      end

      loop Promoted-key pointers (hash split only)
          Ch->>P: getPromotedKeysBatch(childCtx, cursor)
          P-->>Ch: {rows[], nextCursor}
          Note over Ch: Inherit promoted_keys forward pointers<br/>(data stays in range structure)
      end

      Note over Ch: ── PHASE 3: Cutover ──<br/>Processes reads and writes locally
      Note over Ch: Recompute key_size_estimates<br/>migration_completed, delete cursor KV

      Ch->>P: acknowledgeChildMigrationComplete(childDoName)
      deactivate Ch

      activate P
      P->>P: acknowledgeChildMigration(childDoName)
      alt All children acknowledged
          Note over P: status → split_completed<br/>DELETE pending_transactions
      end
      P-->>Ch: ok
      deactivate P

Mermaid LIVE EDITOR

A very similar flow happens when a range partition is split into child partitions.

Hash key promotions

The hash key promotion is a special kind of split where we move all the items of a specific hash key to its own Durable Object since it’s consuming a big chunk of storage in a hash partition and if that hash key continues to grow even a hash split will not help since all those items will go along with their hash key. Therefore we make a root range partition so that it can execute range partition splits over its sort keys.

The process for the hash key promotion shares some aspects with the normal split flow shown above, but with some key differences. Firstly, it only involves a single data partition. In addition, it requires some local state in the hash partition tracking which hash keys have been promoted into their own root range partitions used for forwarding requests for that hash key.

The full sequence diagram for the hash key promotion is shown below (click for full size):

FokosDB hash key promotion sequence diagram

sequenceDiagram
      participant C as Client
      participant H as PartitionDO (Hash)
      participant BG as Alarm / setTimeout PartitionDO (Hash)
      participant RR as Root PartitionDO (Range)

      C->>H: putItem(hk, sk, data)
      activate H
      Note over H: Write item locally<br/>UPDATE key_size_estimates for hk
      H->>H: maybeQueuePromotion(hk, newKeyEst)
      Note over H: est_bytes > maxSizeMb × RANGE_PROMOTION_FRACTION<br/>AND hk not already in promoted_keys
      H->>H: INSERT promoted_keys (hk, status='queued')<br/>+ scheduleBackgroundWork(10ms)
      H-->>C: PutItemResult
      deactivate H

      Note over H,BG: ── PHASE 1: Promotion Init (background, ~10ms) ──

      activate BG
      BG->>BG: detect status='queued' → startPromotion(hk)
      Note over BG: Skip if a hash split is already split_queued/split_started

      Note over BG: A. Resolve range root identity<br/>resolveRangePartitionContext(hk, start=null, end=null)
      BG->>RR: initFromSplit(..., splitType='range')
      activate RR
      Note over RR: Persist parent hash ctx + splitType=range<br/>migration_initialized → KV
      RR-->>BG: ok
      deactivate RR

      BG->>BG: transactionSync: check lock count
      Note over BG: B. Atomic cutover queued → promoting<br/>ONLY if pending_transactions WHERE hk=? COUNT = 0
      alt Pending locks exist → defer
          Note over BG: retry on next background cycle
      else No pending locks
          BG->>BG: UPDATE promoted_keys SET status='promoting'<br/>#_promotedKeys.set(hk, 'promoting')
          Note over H,RR: Parent DO only forwards requests going forward<br/>Writes rejected (503) while migrating<br/>Reads re-routed back to parent via getItemDirect()
          Note over BG: C. Fire-and-forget
          BG->>RR: triggerMigration()
          activate RR
          Note over RR: migration_initialized → migration_migrating<br/>scheduleBackgroundWork(0)
          RR-->>BG: ok
          deactivate RR
      end
      deactivate BG

      Note over H,RR: ── PHASE 2: Data Migration (range root background) ──

      activate RR
      loop Item batches — all items for hk (start=null, end=null), cursor-based
          RR->>H: getItemsBatch(childCtx=rangeCtx, cursor)
          Note over H: promoted_keys[hk] must be 'promoting'<br/>getItemsBatchForRange(hk, null, null, cursor)
          H-->>RR: {items[], nextCursor}
          Note over RR: INSERT OR IGNORE into local items<br/>Checkpoint cursor in KV after each batch
      end

      loop Deletion watermark sync (no pending locks to copy — lock-free cutover)
          RR->>H: getPartitionTransactionMetadata(childCtx=rangeCtx, cursor)
          Note over H: Hash parent returns pendingTransactions=[]<br/>Only maxDeletedTs is synced
          H-->>RR: {maxDeletedTs, pendingTransactions=[], nextCursor=null}
          Note over RR: Sync deletion high-water mark only
      end

      Note over RR: ── PHASE 3: Cutover ──<br/>Processes reads and writes locally
      Note over RR: Recompute key_size_estimates<br/>migration_completed, delete cursor KV

      RR->>H: acknowledgePromotionComplete(hk)
      deactivate RR

      activate H
      Note over H: UPDATE promoted_keys SET status='promoted'<br/>#_promotedKeys.set(hk, 'promoted')<br/>scheduleBackgroundWork(1000ms)
      H-->>RR: ok
      deactivate H

      Note over H,RR: ── PHASE 4: GC (hash DO background) ──

      activate BG
      loop Batches of 1000 rows until drained
          BG->>BG: DELETE FROM items WHERE hk=? LIMIT 1000<br/>DELETE FROM pending_transactions WHERE hk=?
      end
      BG->>BG: DELETE FROM key_size_estimates WHERE hk=?<br/>(once items fully drained)
      deactivate BG

Mermaid LIVE EDITOR

The sequence diagrams provide a good overview but ultimately the source code is the source of truth and with AI/LLM agents these days you can just navigate and explore the code directly to fully internalize how this works.

Distributed transactions

Distributed transactions are operations involving multiple items potentially across several data partitions. See Amazon DynamoDB’s Transactions API (specifically transactWriteItems and transactGetItems) which is the main influence for FokosDB.

The distributed transactions I implemented are following the Amazon DynamoDB distributed transactions which are based on timestamp ordering (TSO) and a two-phase protocol.

I will not go into detail in this blog post for explaining how transactions work, not because it’s not important, it absolutely is, but because it’s already nicely explained in the resources below.

Read this material for DynamoDB transactions:

To be clear and set expectations right, there are some deviations from DynamoDB’s transactions architecture, mostly just because I did not implement everything yet, but will be done over time.

  • Most notably the optimizations described in Section “4. Adapting timestamp ordering for key-value operations” of the ATC 2023 paper are still not implemented.
  • There isn’t any tie breaking on transaction timestamps, so at the moment an item can only accept one transaction/write per millisecond, and clock skew in Cloudflare’s network worsens this to be per few milliseconds.
  • The implementation doesn’t have stateless transaction coordinators by using external durable transaction state ledger. DynamoDB uses a DynamoDB table to store the state for each transaction, and we could do it too by using a FokosDB instance. However, for now I randomly pick a Durable Object from a dedicated cluster of DOs to coordinate and drive each transaction.

Still plenty of things to implement.

Topology encoding and caching

As mentioned in the goals section I wanted FokosDB to handle both small applications and also scale out to terabytes of storage or more for larger applications.

The two reference use-cases: 1_000 partitions (~500GB), 1_000_000 partitions (~500TB)

These numbers influence the topology encoding and overall architecture since we need to be able to route requests to any of these partitions directly with as few network hops as possible and without consuming a lot of memory in our client Worker.

I will probably write a full follow-up article on the thinking behind my decisions about the topology and the partitions layout, since I consider it one of the most interesting aspects of the project that influences everything else.

First a very short recap of how other databases in the industry do partitioning:

  • Google BigTable
    • Three-level hierarchy: ROOT METADATA tablet (128MB) -> METADATA tablets (each 128MB) -> USER data tablets (100-200MB) -> storage
    • Each row within a metadata tablet is about 1KB and acts as a pointer to a tablet in the next level.
    • The ROOT tablet is read by every client and is stored in highly-available storage based on Paxos (Chubby).
    • The actual user data is an ordered list of keys to values.
  • Amazon DynamoDB
    • Each data partition is 10GB and holds items with a hash key, a sort key, and the rest is arbitrary data.
    • A partition can split in child partitions once it grows and all splits are registered in a Metadata service (MemDS).
    • Request routers heavily cache the partition topology from MemDS and send requests directly to partitions, and they return redirection responses if the cached topology was out of date (a split occurred).
    • The topology is based on hash-range boundaries (start, end) encoded in a Perkle tree (hybrid of Patricia/Trie and Merkle trees). The hash key is hashed into a fixed-length text and gets sorted in the overall hash-range ordering, and the global order is split into boundaries and partitions.
  • CockroachDB (CRDB)
    • CRDB uses range-partitioning on the internal key-value item keys to divide the data into contiguous ordered chunks of size 64MB (Ranges), and these are the chunks replicated across storage nodes.
    • There is a two-level index ontops of the Ranges allowing for quick key lookups to find which Range to pick and is heavily cached on clients.
  • more of the same patterns…

Most battle-tested databases use similar patterns, either simulating a tree of partitions and then heavily caching and optimizing access to an index of the tree, or they organize their items in order based on the keys and then split the global range into chunks each assigned to a storage partition and again heavily caching and encoding the range boundaries for fast lookups.

My first instinct was to do something like BigTable, each tablet would be an item in Workers KV and then each request starts reading from the root item which should in theory be heavily cached in the request’s colo location, and then reading more items from Workers KV as necessary to resolve the final target Durable Object partition. For a database with 1000+ partitions this would require several MBs to be fetched by KV on every request (or every couple if we use in-memory cache too), and it would be quite slow eventually for larger databases.

Any other range based splitting also has a memory issue in our case since even though encoding text into Perkle/Patricia trees would save a lot of storage, with thousands of these keys it would be several MBs.

FokosDB encoded topology

NOTE: This is still work-in-progress and not merged in the code. I might also change how I encode the range partition trees. I do like the simplicity of the hash partition encoding, though.

FokosDB uses a hybrid topology as described in the architecture section, with both a tree based partitioning based on the hash keys, and then a range based partitioning based on the sort key.

This is intentional, I want the constraints and API of the database to influence usage as close to ideal as possible, that is sharding your data across millions of hash keys, while still providing the ability to scale across unlimited sort keys for the cases you need that.

As mentioned in the topology invariants section there are two properties of the hash partitioning that allow optimization.

Every FokosDB database has N number of root hash partitions out of the box, no initialization needed, hardcoded in the user code and cannot change after use. This means that every incoming request can be immediately be hashed across these N partitions and send the request to the corresponding Durable Object, even if we don’t know anything about child partitions. This already covers lots of small-scale applications (N=10 => 5GB, N=100 => 50GB, N=1000 => 500GB) both in terms of storage and throughput.

The second property, even more important, is that each hash partition splits into a fixed number of partitions, always the same number, also hardcoded in the user code and cannot change after use.

The fixed fanout degree means that we don’t need to track any other information about each hash partition other than a single boolean “did it split?”.

At this stage, ignoring for a minute the range partitions, we have a tree of hash partitions with a fixed fanout degree, and a single boolean per partition. This is everything we need to fully figure out which partition in the tree owns a given hash key. Encoding this data structure saves lots of data since we don’t need any information about the keys themselves.

As it turns out, there is a group of data structures called “Succinct data structures” and specifically the “Level-Ordered Unary Degree Sequence (LOUDS)” that can encode the above tree in just (2N+1) bits where N is number of total partitions. The original paper is Space-efficient Static Trees and Graphs - Guy Jacobson - 1989 but I also found Succinct Data Structures - Exploring succinct trees in theory and practice - Sam Heilbron to be an easy read.

We also maintain a Bloom filter where we insert all the hash keys promoted to their own range partitions. With rough calculations, for a 1% false positive rate and 7 hash functions, we need about 1.14MB for 1_000_000 promoted hash keys. Then, we can grow it linearly as we deem necessary depending on the promoted hash keys, or pay the penalty of an extra hop or an extra Workers KV item read.

Sidenote: While exploring succinct data structures, I found out about “Succinct Range Filters (SuRF)” that unlike Bloom filters support both single-key lookups and common range queries. It’s not immediately helpful in the way Bloom filters are used here, but it might enable a different approach that will be more optimized overall.

Let’s now explore how the above information is made available to a random location the Worker processing a request.

There is one Workers KV item with the following content:

{
    // 2N+1 bits for N number of partitions.
    louds: UInt8Array,
    // X bits to allow 1% error rate for Z number of promoted hash keys.
    promotedHashKeys: BloomFilter,
}

The above item is fetched in each Worker request and is the main index we need, given that the entire database design optimizes for sharded hash keys. This item will be cached in-memory, and also in Tiered Cache which Workers KV uses.

It resembles the ROOT tablet in BigTable’s architecture in some way.

For the range partitions we do keep a Patricia/Trie tree per hash key stored separately. Therefore, a second lookup in Workers KV is needed if a hash key is promoted into a root range partition in order to fetch its range tree topology which encodes the range boundaries of each range partition.

There are more optimizations and smart things we could do for this second lookup, but for now I kept it simple until I get more feedback on how large these range boundary trees can get in real-world scenarios.

Flow for an incoming request processing:

  1. Extract hash key HK and sort key SK from incoming request.
  2. Fetch the Workers KV item above if not already in-memory (cacheTtl).
  3. Check if HK is in the promotedHashKeys Bloom filter.
  4. If not:
    1. Repeatedly hash the HK key and traverse the louds structure until we reach the first non-split partition.
    2. Send the request to the hash partition DO.
  5. If yes:
    1. Fetch from Workers KV the encoded range partition tree for HK.
    2. If not found, it was a false positive, so follow step 3.
    3. If found, search for the range partition owning SK in the encoded range tree.
    4. Send the request to the range partition DO.

The false positive rate might lead to extra fetches from Workers KV, but that’s acceptable to me, and we can do other optimizations later.

Each partition DO periodically fetches the topology from Workers KV as well, the root item, and also the range partition trees for its own promoted hash keys.

With each partition DO having the topology, we can even skip the second KV item fetch and just send the request to the hash partition owning that hash key and it will forward the request as necessary at the cost of lower throughput.

The vast majority of requests should really be flowing through step 3, since the use-case I am optimizing for is well spread out hash keys.

Future work

My plan is for FokosDB to be production ready in terms of correctness and reliability so testing will be a big focus over the next couple weeks.

Other features I plan to implement:

  • Analytics out of the box published into Workers Analytics Engine, with a basic UI to track your database operations.
  • More of the DynamoDB API I find useful like batch non-transaction operations.
  • Smarter heuristics for splitting data partitions, like requests per second per item.
  • Condition checks on arbitrary item properties.
  • Global secondary indexes. In most applications I use DynamoDB I usually design them with a Global Secondary Index (GSI) with the keys inversed with the sort key being the hash key and the hash keys being the sort keys, so I want to support that too out of the box.

Performance optimizations

  • Transaction optimizations from section 4 of the Amazon DynamoDB ATC2023 paper.
  • Implement the ability for any transaction coordinator to be able to recover a transaction, or have the data partitions themselves recover without external coordinators.
  • There is excessive storage access right now for everything. It was simpler to implement that way initially, and there are many optimizations to do by keeping some state in memory to avoid accessing SQLite all the time which also leads to billed rows read.
  • Data migration is done in chunks of 20MB. I could do it in bigger chunks with streaming and compression, but I postponed it hoping for a platform native fork API.

Wishlist for the platform

  • I want a fork/clone/snapshot API that given a PITR bookmark I can create separate standalone Durable Objects starting with the storage as of that bookmark. This will hugely simplify the partition splits and also lead to better performance since we will avoid the user land data migration.
  • Whenever there is data migration or coordination across multiple DOs, round trips get very expensive. I want a way to specify “these DOs should be close to each other” or “these DOs should be within 20ms from each other”, or “these DOs should be have this placement constraint (same colo, same region, etc)”.
  • Long-term it would be amazing if Durable Objects could “move” as traffic shifts across the world. At the moment, the location of the DO is determined on its first request. In our case where partitions split the traffic to each partition could be very different after the split. For example if our hash key is user emails and we store their emails, once a hash key gets its own range partition due to that user having a large inbox it would be very nice if we those specific range partitions are placed close to that specific user.

Show me the numbers

I wanted to publish the article and didn’t have time for elaborate benchmarks, so they have to be in a follow-up article.

For now the following are some indicative latencies.

TODO

AI productivity boost

I used AI a lot for this project.

I initially wrote a lot of the code by hand, to give a basic structure with the components I wanted, establish some patterns I like when using Durable Objects, and implemented the core operations as a bootstrapping step.

Then, it was time for the distributed transactions. Honestly, I probably spent 80% of the total time attributed on that feature just chatting and discussing with LLMs about different tradeoffs and designs we could be doing. I did a thorough research how other databases implement transactions and sharding like Google’s Megastore, BigTable, Spanner, CockroachDB, Vitess, and others. We then went into more iterations on how we would port the DynamoDB transactions paper into something that would work well with Workers and Durable Objects.

From that point on I started using AI for coding as well since there was more code to mechanically write. It was heavily reviewed line by line in small chunks, with defined milestones, and often manually modified or refactored as follow-up. Same for the tests.

Similar story with the partition splitting flows, the topology encoding, and everything else. 80% of the time is spent on deep research and iterations in coming up with a design I like, and then moving to coding small parts each time, reviewing them and testing them as we go.

Overall, I definitely would need a lot more time to build the same project without AI. No matter what you think, there is some mechanical part of software that can be done by AI and get that productivity boost. Was this a 10x improvement like the CxOs with AI psychosis are trying to sell you? No. Was it at least a 2x? Absolutely.

Deep research and instant inline tiny UI apps are probably my favourite AI features right now.

Fun fact, it was surprisingly annoying at times trying to steer the AI/LLM away from its knowledge data in existing industry patterns that would obviously not work on Workers due to the constraints I established. “I was absolutely right” many times in these discussions.😅

Conclusion

My goal with this article is to give another perspective on how to think about tradeoffs and constraints when building a product. This product being a database makes it a whole lot more fun.

Do NOT use this in production, yet. I do not promise backwards compatibility, yet. I have to test it thoroughly and implement a few more things. You have been warned.

I am hoping to get to a stable version in terms of backwards compatibility in the next few weeks.

Please try it out, go through the code, and get in touch if you have any feedback.

FokosDB source code: <….>