A set of flat head screwdrivers.
Engineering & Developers

How Discord Indexes Trillions of Messages

Back in 2017, we shared how we built our message search system to index billions of messages. We designed our search infrastructure to be performant, cost-effective, scalable, and easy to operate. We chose to use Elasticsearch, with Discord messages sharded over indices, the logical namespace for Elasticsearch messages, on two Elasticsearch clusters. Messages were sharded either by Discord server (which we’ll refer to as a guild from here on out) or direct message (DM). This allowed us to store all a guild’s messages together for fast querying and run smaller, more manageable clusters. Since not everyone uses search, messages were lazily indexed into Discord, and we built a message queue that allowed workers to pull chunks of messages for indexing to leverage Elasticsearch’s bulk-indexing capabilities.

But as Discord grew, our search infrastructure began to exhibit a few cracks…

Our Redis message indexing queue would drop messages

We used Redis to back our realtime message indexing queue, which held up great at first. However, when our indexing queue got backed up for any reason, which happened often on Elasticsearch node failure, the Redis cluster became a source of failure that began dropping messages once CPU maxed out with too many messages in the queue.

Our bulk indexing was fault-intolerant on Elasticsearch index or node failure

Part of the reason our indexing queue was so impacted by Elasticsearch node failures is because of how we optimized our bulk message indexing. The realtime index workers pulled batches of messages off the queue to index to Elasticsearch. However, these messages all belonged to different indices and Elasticsearch nodes in our cluster — one bulk index operation with a batch of 50 messages might then fan out to 50 Elasticsearch nodes in the cluster! If one operation in the batch fails, then the entire bulk index operation would be considered failed, re-enqueueing all messages to be retried.

To put it into perspective, let's say our Elasticsearch cluster had 100 nodes, and we were indexing batches of 50 messages. If one node fails, assuming an equal distribution, the odds of a given batch having at least one message going to that failed node are ~40%. This means that a single-node failure leads to ~40% of our bulk index operations failing!

On the left, a diagram of 4-node Elasticsearch cluster with a batch of messages touching all 4 nodes successfully indexing. On the right, a diagram of a 4-node Elasticsearch cluster where Node 4 is down, causing the same batch of messages to fail.

Large Elasticsearch clusters had large overhead, impacting performance and operability

As our message volume grew, we scaled up our Elasticsearch clusters by adding additional nodes. This allowed us to horizontally-scale quite easily by adding more indices and nodes to serve more Discord servers and their messages. However, adding these also meant that each of our bulk index operations were now also spread across a much larger number of indices and nodes. Operations fanned out more, causing slowdown to our indexing performance. Additionally, the more nodes we have in a cluster, the more likely a cluster was to fail at any point.

No good path to software upgrades or rolling restarts

Given the system’s lack of resilience to single node unavailability, we didn't have a good solution for performing rolling restarts and doing software upgrades. Our clusters had also grown really large and unwieldy, with over 200 nodes and terabytes of data total. Any strategy to gracefully drain nodes before restart to ensure service availability would’ve taken a prohibitively long time. We were forced to keep our clusters running legacy OS versions and Elasticsearch versions, missing both critical updates and performance improvements.

Most notably, patching the log4shell vulnerability by disabling JNDI lookups and setting log4j2.formatMsgNoLookups to true required taking our search system fully offline for a maintenance period while we restarted every Elasticsearch node with the new configuration.

Indices with large guilds grew too large

Over time, some of our Elasticsearch indices housed the data for some very large Discord guilds. Naturally, large guilds post a LOT of messages, causing the incides to grow very large. Each of our Elasticsearch indices is a single Lucene index under the hood, and Lucene has a MAX_DOC limit of about 2 billion messages per index. As we learned the hard way, once you hit this limit, all indexing operations will fail.

When this happened, our only path to recover was to work with our Safety team to try and find guilds that were solely intended to spam as many messages as possible and delete them. This worked for the time being, but we knew we would eventually have to support search on legitimate communities that had posted more than 2 billion messages.

The Solutions

While our search infrastructure had served its purpose for many years, we began to experience more and more incidents as our clusters failed to handle increased message throughput and query volume. Given our brittleness to rolling restarts and software upgrades, our outdated software also needed an update.

Since this was going to be a big undertaking, we might as well take the opportunity to learn and evolve from our existing infrastructure.

Deploy Elasticsearch on Kubernetes

By now, Discord was deploying many of our stateless services on Kubernetes. This was a significant improvement to our ease of operability, and allowed us to optimize costs with more granular compute and memory resources. We didn't run any stateful services on Kubernetes, but the Elastic Kubernetes Operator seemed like a promising solution for orchestrating and managing an Elasticsearch cluster deployed on Kubernetes.

With the Elasticsearch Operator, we would easily be able to define our cluster topology and configuration, and deploy the Elasticsearch cluster onto our Kubernetes nodepool. OS upgrades would happen automatically, and the ECK operator exposes ergonomic tools for safely performing rolling restarts and upgrades.

Multi-cluster “cell” architecture to run smaller Elasticsearch clusters

Now that we had committed to running on Kubernetes and using the ECK operator, we envisioned a more expansive architecture where we ran a larger number of smaller Elasticsearch clusters.

In our legacy system, our clusters had grown to more than 200 nodes per cluster. This meant that a given node in the cluster could fail often, and the coordination overhead for the cluster itself was very high.

Cluster states of an Elasticsearch cluster don't scale — as a cluster grows to contain more nodes and more indices, the overhead for the cluster’s master node also increases. We began to see our cluster’s master nodes frequently OOM, which would cause indexing failures, a growing backlog of messages that failed to index into Elasticsearch, degraded query performance, and frequent timeouts.

In our new search infrastructure, we would create more Elasticsearch indices to remedy this, so each index could stay within the recommendation of 200M messages and 50GB of data. These indices would be housed over many more, smaller clusters, helping keep our cluster state and coordination overhead down for better performance.

We introduced the concept of a logical “cell” of multiple Elasticsearch clusters, allowing us to have a higher-level grouping of a set of smaller Elasticsearch clusters. We'll discuss our usage of cells how this unlocked new search features and ability to handle large guilds later in this blog post.

Left: diagram showing a cluster made of 3 ingest nodes, 3 master-eligible nodes, and 5 data nodes. Right: diagram showing a cell composed of 4 clusters.

Within each of these smaller Elasticsearch clusters in a cell, we run dedicated ingest nodes, master-eligible nodes, and data nodes. Having these dedicated node roles ensures that:

  • Master-eligible nodes always have sufficient resources to perform cluster coordination
  • Ingest nodes performing pre-processing and routing can be run as a stateless deployment since they own no data and can scale up and down to handle volume spikes
  • Data nodes can be given sufficient heap resources to handle indexing and query operations

Given the different resource requirements for our different node roles, their respective pods are scheduled onto different machine types and nodepools.

We designed each cluster to be resilient to zonal failure with the following:

  • 3 master-eligible nodes: one per zone
  • At least 3 ingest nodes: one per zone
  • Data nodes: the primary and replica of an index are in different zones with shard allocation awareness, and forced awareness balances indices across all zones

PubSub message queue

We migrated our indexing message queue from Redis to PubSub, which allowed us to have guaranteed message delivery and tolerate large backlogs of messages. This meant that Elasticsearch failures now could lead to a slowdown in indexing, but we would no longer drop messages — phew!

In fact, the convenience of PubSub’s guaranteed message delivery led us to adopt PubSub for other use cases across Discord like task scheduling—but that’s a blog post for another day.

Batch messages by cluster and index before bulk indexing

We still want to index messages in bulk for performance, but our workers should be intelligent and collect and index batches of messages housed on the same cluster. This ensures that each bulk index operation talks to a single Elasticsearch index and node, so if a node does fail, it only impacts index operations for messages on that node. This is more important now that we have a larger number of clusters and indices.

To improve our bulk indexing strategy, we implemented a PubSub message router that streams messages from PubSub and collects batches of messages grouped by their Destination, or in our case, the Elasticsearch cluster and index the message is mapped to. The message router creates a channel and spawns a tokio task for each Destination in the messages being pulled: the router then sends the message to the task collects chunks of messages for indexing, which it receives on its channel.

Our router streams messages from PubSub and extracts a Destination, or key, for each message.In our case, the Destination is the Elasticsearch cluster and index this message will live on. When the router encounters a message with a new Destination, it spawns an unbounded channel and a tokio task that receives messages on that channel.

As the router continues to receive messages, it forwards them on to the appropriate channel, keyed by Destination. Each destination task collects chunks of messages, now all keyed by the same Destination, and bulk indexes to Elasticsearch.

/// MessageRouter routes messages to a set of dynamically spawned destinations.
/// This is a simplified representation of the MessageRouter used to index
/// new Discord messages into Elasticsearch.
struct MessageRouter<DestinationKeyT, MessageT> {
    destinations: RwLock<HashMap<DestinationKeyT, UnboundedSender<MessageT>>,
}

impl<DestinationKeyT, MessageT> MessageRouter {
    /// Attempts to send the message to the given destination, spawning it
    /// if the destination does not exist.
    fn send_message(
        &self,
        destination_key: DestinationKeyT,
        message: MessageT,
    ) -> Result<()> {
        let mut destinations = self.destinations.write();
        match destinations.entry(destination_key) {
            Entry::Occupied(mut ent) => {
	              // Send the message to the given destination
	              ent.get().send(message).ok();
            }
            Entry::Vacant(ent) => {
                // Spawn a new destination and receiver
                let (destination_sender, destination_receiver) = unbounded_channel();
                let task = tokio::task::spawn(async move {
                    // Destination task receives messages with same destination
                    // key on destination_receiver.
                    // For our case, the destination task groups messages into 
                    // chunks and bulk indexes into Elasticsearch.
                });
                ent.insert(destination_sender).send(message).ok();
            }
        }
        Ok(())
    }
}

Supporting new search use cases

Defining “cells” of Elasticsearch clusters gave a useful abstraction that allowed us to index messages in different dimensions. The ability to search across all your DMs had long been a requested feature, but one that we were unable to support since all messages were sharded and indexed by guild or DM. Fanning out a search query over all of a user’s DMs would be far too expensive.

In order to efficiently search across all DMs, we would need to shard the messages by user, rather than by channel, storing all of a user’s DMs together on a given Elasticsearch index. This would also mean that we'd store twice as much data, as each message in a DM would be indexed into recipient_a’s index and recipient_b’s index.

Given that we were already planning to re-index all Discord messages as part of our migration to this new architecture, this provided a rare chance to shard and index some messages differently. Guild messages are still sharded by guild_id, but now user DM messages are now sharded by user_id and indexed in a separate user-dm-messages Elasticsearch cell.

This is what powers the new cross-DM search functionality, now available on mobile. Now, all DM messages and search queries are served by the user-dm-messages Elasticsearch cell, and guild messages and queries served by guild-messages cell. You won't need to remember who sent you that one message six years ago!

Give “Big Freaking Guilds” dedicated Elasticsearch clusters, with multiple shards

As guilds on Discord grow larger with longer histories, more and more of them bump up against Lucene’s MAX_DOC limit of ~2 billion messages. We needed a solution to scale search for these special cases, which we call BFGs, or Big Freaking Guilds.

We wanted to retain the performance gains from storing all messages for a given guild on the same Elasticsearch shard, since that still works for the vast majority of guilds, but we needed a solution to scale search for BFGs as well.

Elasticsearch indices can have multiple primary shards, with each shard living on a node in the cluster. Having multiple primary shards means data for an index will be spread across multiple nodes in the cluster, backed by multiple underlying Lucene indices.

To optimize query performance, we normally create Elasticsearch indices with a single primary shard. This guarantees that all messages in that index are colocated on a single Elasticsearch node, so there's no fanout and coordination overhead at query-time. For almost all Discord guilds, this results in far more performance search queries since each query only talks to a single Elasticsearch node.

In the case of BFGs, however, we are dealing with BF-indices and large message volume: billions of messages on a single guild! Having more shards in an index can speed up search queries, but only when the cost of coordination does not exceed the cost of querying the index. For BFGs, we can realize query performance gains from the parallelism we get by fanning out a search query to multiple shards, and the coordination cost is worth it.

Indexing BFG messages onto indices with multiple primary shards lets us scale our search functionality to guilds with many billions of messages, and allows for more performant queries for these special-case guilds. It also improves query performance for other Discord guilds that previously would’ve had their resources hogged by the expensive queries to BFGs on the same index.

We built a new system that allows us to reindex a BFG’s messages onto a new index with more primary shards, increasing the maximum number of messages. Importantly, we need to continue to index new messages and serve queries while this migration is happening.

The BFG reindexing flow goes as follows:

  1. We identify a BFG: a guild where the number of messages is approaching the Lucene MAX_DOCS. This guild is currently indexed to index-a
  2. We create a new index, new-bfg-index, in our Elasticsearch cell dedicated to BFGs, within 2x the primary shard count of the previous index.
  3. New messages are now dual-indexed to index-a and new-bfg-index
  4. We kick off a job to historically index all the guild’s messages into new-bfg-index. Ongoing search queries are still served from index-a
  5. When the historical indexing is complete, we switch new query traffic from index-a to new-bfg-index
  6. Once we’re confident that new-bfg-index is performant and reliable, we stop indexing new incoming messages to index-a and kick off a cleanup job to remove all the BFG’s messages from too-small-index

By running a dedicated Elasticsearch cell for BFGs, we can keep the cluster and index configurations standard for most guilds and resources while retaining the ability to adjust as needed for these outlier but important BFGs.

So... how are things going?

Now, with our new search infrastructure, we:

  • Index trillions of messages, with double the indexing throughput of our legacy search
  • Improved median query latency from 500ms to <100ms, and p99 from 1s to <500ms
  • Run 40 Elasticsearch clusters with thousands of indices
  • Perform cluster upgrades and rolling restarts automatically and with no service impact

We’re excited to keep powering fast and efficient search for Discord guilds, even as our communities grow to each have billions of messages and beyond!

If these sorts of technical challenges tickle your brain as well, come join us!

Tags
No items found.

related articles