At LinkedIn Search, we strive to serve results that are most relevant to a member’s query, from a wide variety of verticals such as jobs they may be interested in, people they may want to connect with, or posts that are trending within their industry. Post search saw strong organic growth in 2020, with a 35% year-over-year increase in user engagement. As we watched content continue to grow and diversify on the platform, the Flagship Search team saw an opportunity to improve the Post search tech stack’s agility, with a strategic priority to enable members to create, find, share, and have productive conversations around high-quality content on LinkedIn.
The Post search tech stack was originally built on top of the well-established Feed systems and services and leveraged the Feed platform. Removing the unnecessary dependency on Feed systems and simplifying our stack allowed us to successfully leverage several machine learning (ML) techniques and improve the relevance of Post search results while providing a superior member experience. This blog post outlines our journey as we re-architected the Post search stack and describes how we addressed many of the unique challenges faced along the way.
The life of a search query at LinkedIn begins in a search bar, which interacts with a frontend presentation layer that fetches the search results from a midtier service and decorates them for the best member experience.
The search midtier at LinkedIn follows the federated search pattern: it fans out calls to various search backends and blends the results together. The Post search stack, however, is different, as it was designed to be compatible with the Feed ecosystem at LinkedIn. For Post search, the fanout and blending logic depended on Feed services, including feed-mixer and interest-discovery.
Figure 1. View of the Search stack before simplification
Apart from the different service architecture, Post search also uses an intermediate language called Interest Query Language (IQL) to translate a user query into index-specific queries used to serve search results. At the time of its inception, Post search was served by two search indexes, one for posts that you see on the Feed and one for articles. However, as more content types were supported at LinkedIn over time, it was easier to augment the existing Post index instead of adding a new index each time.
Due to the complex architecture, increasing our development and experimentation velocity proved to be difficult, as any search relevance or feature improvements required changes to multiple points throughout the stack. It was challenging to address many of the unique needs of Post search, such as balancing multiple aspects of relevance, ensuring diversity of results, and supporting other product requirements.
Simplifying system architecture
We set out to simplify the system architecture to improve productivity and facilitate faster relevance iterations. To achieve this, we decided to decouple the Feed and Post search services in two phases. The first phase removed feed-mixer from the call stack and moved fanout and blending into the search federator. The second phase removed interest-discovery. This enabled us to get rid of all the cruft built up over the years and simplified the stack by removing additional layers of data manipulation.
Figure 2. Phased approach to simplify the stack
Improving Post search relevance
As we thought about ways to improve the relevance of results from Post search, we realized that the user’s perceived relevance of results is a delicate balance of several orthogonal aspects, such as:
- Query-document relevance
- Query-agnostic document quality (i.e., static rank)
- Personalization and user intent understanding
- Post engagement
- Post freshness/recency (especially for trending topics)
In addition to those aspects, we wanted to easily implement other requirements from our product partners to satisfy searcher expectations (e.g., ensuring diversity of results, promoting serendipitous discovery, etc.). To meet these goals for post relevance, we implemented a new ML-powered system; the high-level architecture is shown in Figure 3, and we’ll cover the details in the remainder of this section.
Figure 3. Design of our ML system
As a single, unified model did not scale well for our needs, we invested in modeling the First Pass Ranker (FPR) as a multi-aspect model, wherein each aspect is optimized through an independent ML model. As a FPR typically scans a large number of documents with the goal of optimizing for recall, latency constraints require the aspect models to be lightweight (e.g., gradient boosted decision trees). We combine the scores from all these aspect models in a separate layer to determine the final score for ranking. This approach enables us to:
- Have separation of concerns for each aspect
- Decouple modeling iterations for each aspect
- Add more explainability to our ranking
- Control the importance of each aspect based on product requirements
With the re-architecture of the stack, we were able to easily launch two additional layers of re-ranking on top of the FPR: Second Pass Ranker (SPR) and Diversity re-ranker. SPR, which resides in the federation layer and can support complex ML model architectures (e.g., neural nets), is geared toward improving precision and ranks the top k documents returned from the FPR. It enables personalization by leveraging deeper and real-time signals for members’ intent, interests, and affinities. The Diversity re-ranker forms our last layer and helps us inject diverse content in the top k positions. This includes increasing discovery of potentially viral content for trending queries, reducing duplication of similar content, etc.
To iterate quickly on a multi-layered, complex ML stack, testing and validation was a foundational piece we had to get right. We built a suite of internal tools to assess the quality of new candidate models and quantify how they differed from the current production model. This enabled us to have a principled approach to testing relevance changes and ensured we did not regress on the core functionality/user experience.
- Pre-ramp evaluation: This tool helps compute and plot descriptive statistics for the results of a collection of user queries to test and validate intended effects. For example, we are able to validate how we changed the distribution of recent posts in the results for our experiments. This helps us to understand our models better before ramping any change to members and to catch any unintended side effects early on.
- Validating table stakes: The Build Verification Tool (BVT) helps generate model reliability tests to assert if a specific expected document is correctly surfaced for a certain queries and members. For example, using this tool, we can validate if certain table stakes—like searching for an exact match using quoted phrase queries— are not regressing with any of our improvements.
- Human evaluation: To have a better understanding of basic query-document relevance, we invested heavily in crowdsourcing human ratings for the search results. We leverage this crowdsourced data to evaluate performance of offline ranking models with respect to search quality, to ensure it meets the basic quality bar. Crowdsourced human annotation data also provides valuable training data to improve the ranking of results.
These changes to the system architecture have helped us unlock several wins, such as:
- Developer productivity: We reduced the developer effort to add new features from 6 weeks to 2 weeks. Additional productivity wins include faster developer onboarding time and lower maintenance costs.
- Latency: Already, we’ve seen P90 latency reduced by ~62ms for Android, ~34ms for iOS, and ~30ms for web. We expect additional latency reductions from future work as well.
- Leverage: Because search federation was already integrated with newer machine learning technologies, this migration has also empowered relevance engineers to iterate faster and run more experiments to improve the Post search results.
- End-to-end optimization: With the extra layers removed, search federator now has access to all the Post-related metadata from the index. This data is being used to improve the ranking of posts when blended with other types of results, help reduce duplication, and increase the diversity of content.
- Engaging and personalized results: Pertinent results, which are highly relevant to the user’s search query, have led to an aggregate click-through rate improvement of over 10%. Increased distribution of posts from within the searcher’s social network, their geographic location, and in their preferred language have led to a 20% increase in messaging within the searcher’s network.
- Superior navigational capabilities: Better navigation allows members to search for posts from a specific author, for posts that match quoted queries, for recently viewed posts, and more, leading to an increase in user satisfaction, reflected by a 20% increase in positive feedback rate.
Table 1. Impact of the first ML model using performant min-span featuresw
Table 2. Impact of combined model including multiple aspects such as Engagement, Personalization (In-Network, Geo), and Quality
Limitations and future work
Although our current solution has been a huge improvement, we acknowledge its limitations and plan to address them in our future work. Some of the areas we’ll focus on include:
- Further simplify: Removing IQL from the stack will help remove two layers of query translation from the flow, making it much easier to add new, useful filters for Post search. To do this, we plan to merge all of the Post search backend and translate directly from the member query to the Galene query.
- Semantic understanding: We will be investing in Natural Language Processing (NLP) capabilities to understand the deeper semantic meaning of queries. We plan to implement Embedding Based Retrieval (EBR) and use DeText embeddings in our neural network-based SPR model.
- Detect trending content: To quickly detect trending, newsy, or viral content on our platform and surface fresh results for queries on trending topics, we plan to use a real-time platform for computing engagement features to reduce the feedback loop from hours to minutes.
- Promoting creators: Results are ranked today mainly by using viewer-side utility functions such as likelihood of a click, user action originating from search, etc. To support our creators, we will evolve this ranking, along with our experimentation and testing stack to also optimize for creator-side utilities, such as content creation or distribution for emerging creators.
- Multimedia understanding: Expanding our document understanding capabilities beyond text data to include handling multimedia content such as images, short-form videos, and audio, is another opportunity for future investment. As web content becomes increasingly diverse and newer content formats become popular, it will be important to ensure these content types are easily discoverable on Post search.
We would like to conclude with a reminder to our readers that in large complex systems, the existing state can be suboptimal due to incremental solutions to problems over time. By stepping back and taking a high level view, it is possible to identify several areas of improvement, as we have shared in this blog. It takes an open mind and support from engineering, product, and leadership to entertain these improvements, but in the end, your users and fellow engineers will thank you. Please share any similar experiences from your work, and reach out to us if you have any questions or just want to chat about how we broke the status quo.
We would like to thank the following teams and individuals involved for their support: Search Apps Infra, Flagship Search AI, Search Federation, Feed Infra, Discover AI, Verticals and Platform, Search SRE, Bef Ayenew, Rashmi Jain, Abhimanyu Lad, Catherine Xu, Alice Xiong, Steven Chow, Lexi He, Margaret Sobota, Xin Yang, Justin Zhang, David Golland, Sean Henderson, Pete Merkouris, and many more.
Open Sourcing Venice – LinkedIn’s Derived Data Platform
We are proud to announce the open sourcing of Venice, LinkedIn’s derived data platform that powers more than 1800 of our datasets and is leveraged by over 300 distinct applications. Venice is a high-throughput, low-latency, highly-available, horizontally-scalable, eventually-consistent storage system with first-class support for ingesting the output of batch and stream processing jobs.
Venice entered production at the end of 2016 and has been scaling continuously ever since, gradually replacing many other systems, including Voldemort and several proprietary ones. It is used by the majority of our AI use cases, including People You May Know, Jobs You May Be Interested In, and many more.
The very first commit in the Venice codebase happened a little more than eight years ago, but development picked up in earnest with a fully staffed team in the beginning of 2016, and continues to this day. In total, 42 people have contributed more than 3700 commits to Venice, which currently equates to 67 years of development work.
Throughout all this, the project traversed many phases, increasing in both scope and scale. The team learned many lessons along the way, which led to a plethora of minor fixes and optimizations, along with several rounds of major architectural enhancements. The last such round brought active-active replication to Venice and served as a catalyst for clearing many layers of tech debt. Against this backdrop, we think Venice is in the best position it’s ever been in to propel the industry forward, beyond the boundaries of LinkedIn.
This blog post provides an overview of Venice and some of its use cases, divided across three sections:
- Writing to Venice
- Reading from Venice
- Operating Venice at scale
Writing data seems like a natural place to start, since anyone trying out a new database needs to write data first, before reading it.
How we write into Venice also happens to be one of the significant architectural characteristics that makes it different from most traditional databases. The main thing to keep in mind with Venice is that, since it is a derived data store that is fed from offline and nearline sources, all writes are asynchronous. In other words, there is no support for performing strongly consistent online write requests, as one would get in MySQL or Apache HBase. Venice is not the only system designed this way—another example of a derived data store with the same constraints is Apache Pinot. This architecture enables Venice to achieve very high write throughputs.
In this section, we will go over the various mechanisms provided by Venice to swap an entire dataset, as well as to insert and update rows within a dataset. Along the way, we will also mention some use cases that leverage Venice, and reference previously published information about them.
The most popular way to write data into Venice is using the Push Job, which takes data from an Apache Hadoop grid and writes it into Venice. Currently, LinkedIn has more than 1400 push jobs running daily that are completely automated.
By default, the Venice Push Job will swap the data, meaning that the new dataset version gets loaded in the background without affecting online reads, and when a given data center is done loading, then read requests within that data center get swapped over to the new version of the dataset. The previous dataset version is kept as a backup, and we can quickly roll back to it if necessary. Older backups get purged automatically according to configurable policies. Since this job performs a full swap, it doesn’t make sense to run multiple Full Pushes against the same dataset at the same time, and Venice guards against this, treating it as a user error. Full Pushes are ideal in cases where all or most of the data needs to change, whereas other write methods described in the next sections are better suited if changing only small portions of the data.
By 2018, the roughly 500 Voldemort Read-Only use cases running in production had been fully migrated to Venice, leveraging its Full Push capability as a drop-in replacement.
A large portion of Full Push use cases are AI workflows that retrain daily or even several times a day. A prominent example of this is People You May Know. More generally, LinkedIn’s AI infrastructure has adopted a virtual feature store architecture where the underlying storage is pluggable, and Venice is the most popular choice of online storage to use with our feature store, Feathr, which was recently open sourced. Tight integration between Venice and Feathr is widely used within LinkedIn, and this integration will be available in a future release of Feathr.
Another way to leverage the Venice Push Job is in incremental mode, in which case the new data gets added to the existing dataset, rather than swapping it wholesale. Data from incremental pushes get written both to the current version and backup version (if any) of the dataset. Contrary to the default (Full Push) mode, Venice supports having multiple Incremental Push jobs run in parallel, which is useful in cases where many different offline sources need to be aggregated into a single dataset.
Besides writing from Hadoop, Venice also supports writing from a stream processor. At LinkedIn, we use Apache Samza for stream processing, and that is what Venice is currently integrated with. We are looking forward to hearing from the community about which other stream processors they would like to integrate with Venice. Because Venice has a sufficiently flexible architecture, integrating additional stream processors is possible.
Similar to incremental pushes, data written from stream processors gets applied both to the current and backup versions of a dataset, and multiple stream processing jobs can all write concurrently into the same dataset.
Since 2017, LinkedIn’s A/B testing experimentation platform has leveraged Venice with both full pushes and streaming writes for its member attribute stores. We will come back to this use case in the next section.
There is an alternative way to leverage a stream processor that is similar to a Full Push. If the stream processor is leveraging an input source such as Brooklin that supports bootstrapping the whole upstream dataset, then the owner of the stream processing job can choose to reprocess a snapshot of its whole input. The stream processor’s output is written to Venice as a new dataset version, loaded in the background, and, finally, the read traffic gets swapped over to the new version. This is conceptually similar to the Kappa Architecture.
We’ve described before on the blog our data standardization work, which helps us build taxonomies to power our Economic Graph. These standardization use cases started off using a different database, but since then have fully migrated their more than 100 datasets to Venice. Having built-in support for reprocessing allows standardization engineers to run a reprocessing job in hours instead of days, and to do more of them at the same time, both of which greatly increases their productivity.
For both incremental pushes and streaming writes, it is not only possible to write entire rows, but also to update specific columns of the affected rows. This is especially useful in cases where multiple sources need to be merged together into the same dataset, with each source contributing different columns.
Another supported flavor of partial updates is collection merging, which enables the user to declaratively add or remove certain elements from a set or a map.
The goal of partial updates is for the user to mutate some of the data within a row without needing to know the rest of that row. This is important because all writes to Venice are asynchronous, and it is therefore not possible to perform “read-modify-write” workloads. Compared to systems which provide direct support for read-modify-write workloads (e.g., via optimistic locking), Venice’s approach of declaratively pushing down the update into the servers is more efficient and better at dealing with large numbers of concurrent writers. It is also the only practical way to make hybrid workloads work (see below).
Hybrid write workloads
Venice supports hybrid write workloads that include both swapping the entire dataset (via Full Push jobs or reprocessing jobs) and nearline writes (via incremental pushes or streaming). All of these data sources get merged together seamlessly by Venice. The way this works is that after having loaded a new dataset version in the background, but before swapping the reads over to it, there is a replay phase where recent nearline writes get written on top of the new dataset version. When the replay is caught up, then reads get swapped over. How far back the replay should start from is configurable.
This previous blog post on Venice Hybrid goes into more detail. Of note, the bookkeeping implementation details described in that post have since been redesigned.
As of today, including both incremental pushes and streaming writes, there are more than 200 hybrid datasets in production.
Putting it all together
In summary, we can say that Venice supports two data sources: Hadoop and Samza (or eventually any stream processor, if the open source community wants it), which are the columns in the following table. In addition, Venice supports three writing patterns, which are the rows of the following table. This 2×3 matrix results in 6 possible combinations, all of which are supported.
|Full dataset swap||Full Push job||Reprocessing job|
|Insertion of some rows into an existing dataset||Incremental push job||Streaming writes|
|Updates to some columns of some rows||Incremental push job
doing partial updates
doing partial updates
In addition to supporting these individual workloads, Venice also supports hybrid workloads, where any of the “full dataset swap” methods can be merged with any of the insertion and update methods, thanks to the built-in replay mechanism.
In terms of scale, if we add up all of the write methods together, Venice at LinkedIn continuously ingests an average of 14 GB per second, or 39M rows per second. But the average doesn’t tell the full story, because Full Pushes are by nature spiky. At peak throughput, Venice ingests 50 GB per second, or 113M rows per second. These figures add up to 1.2 PB daily and a bit more than three trillion rows daily.
On a per-server basis, we throttle the asynchronous ingestion of data by throughput of both bytes and rows. Currently, our servers with the most aggressive tunings are throttling at 200 MB per second, and 400K rows per second, all the while continuing to serve low latency online read requests without hiccups. Ingestion efficiency and performance is an area of the system that we continuously invest in, and we are committed to keep pushing the envelope on these limits.
Now that we know how to populate Venice from various sources, let’s explore how data can be read from it. We will go over the three main read APIs: Single Gets, Batch Gets, and Read Compute. We will then look at a special client mode called Da Vinci.
The simplest way to interact with Venice is as if it were a key-value store: for a given key, retrieve the associated value.
LinkedIn’s A/B testing platform, mentioned earlier, is one of the heavy users of this functionality. Including all use cases, Venice serves 6.25M Single Get queries per second at peak.
It is also possible to retrieve all values associated with a set of keys. This is a popular usage pattern across search and recommendation use cases, where we need to retrieve features associated with a large number of entities. We then score each entity by feeding its features into some ML model, and finally return the top K most relevant entities.
A prominent example of this is the LinkedIn Feed, which makes heavy use of batch gets. Including all Batch Get use cases, Venice serves 45M key lookups per second at peak.
Another way to get data out of Venice is via its declarative read computation DSL, which supports operations such as projecting a subset of columns, as well as executing vector math functions (e.g., dot product, cosine similarity, etc.) on the embeddings (i.e., arrays of floats) stored in the datasets.
Since 2019, these functions have been used by People You May Know (PYMK) to achieve horizontally scalable online deep learning. While it is common to see large scale deep learning get executed offline (e.g., via TonY), online deep learning tends to be constrained to a smaller scale (i.e., single instance). It is thus fairly novel, in our view, to achieve horizontal scalability in the context of online inference. The data stored in the PYMK dataset includes many large embeddings, and retrieving all of this data into the application is prohibitively expensive, given its latency budget. To solve this performance bottleneck, PYMK declares in its query what operations to perform on the data, and Venice performs it on PYMK’s behalf, returning only the very small scalar results. A previous talk given at QCon AI provides more details on this use case.
At peak, read compute use cases perform 46M key lookups per second, and every single one of these lookups also performs an average of 10 distinct arithmetic operations on various embeddings stored inside the looked up value.
So far, we have been exploring use cases that perform various kinds of remote queries against the Venice backend. These are the original and oldest use cases supported by Venice, and collectively we refer to them as Classical Venice. In 2020, we built an alternative client library called Da Vinci, which supports all of the same APIs we already looked at, but provides a very different cost-performance tradeoff, by serving all calls from local state rather than remote queries.
Da Vinci is a portion of the Venice server code, packaged as a client library. Besides the APIs of the Classical Venice client, it also offers APIs for subscribing to all or some partitions of the dataset. Subscribed partitions are fully loaded in the host application’s local state, and also eagerly updated by listening to Venice’s internal changelog of updates. In this way, the state inside Da Vinci mirrors that of the Venice server, and the two are eventually consistent with one other. All of the write methods described earlier are fully compatible with both clients.
The same dataset can be used in both Classical and Da Vinci fashion at the same time. The application can choose which client to use, and switch between them easily, if cost-performance requirements change over time.
Da Vinci is what we call an “eager cache,” in opposition to the more traditional read-through cache, which needs to be configured with a TTL. The difference between these kinds of caches is summarized in the following table:
As we can see, the main advantage of a read-through cache is that the size of the local state is configurable, but the drawback is needing to deal with the awkward tradeoff of tuning TTL, which pits freshness against hit rate. An eager cache provides the opposite tradeoff, not giving the ability to configure the size of the local state (besides the ability to control which partitions to subscribe to), but providing 100% hit rate and optimal freshness.
The easiest way to leverage Da Vinci is to eagerly load all partitions of the dataset of interest. This works well if the total dataset size is small—let’s say a few GB. We have some online applications in the ads space that do this.
Alternatively, Da Vinci also supports custom partitioning. This enables an application to control which partition gets ingested in each instance, and thus reduces the size of the local state in each instance. This is a flexible mechanism, which is usable in a variety of ways, but also more advanced. It is appropriate for use within distributed systems that already have a built-in notion of partitioning. In this way, Da Vinci becomes a building block for other data systems.
Galene, our proprietary search stack, uses Da Vinci to load AI feature data inside its searchers, where it performs ranking on many documents at high throughput. It leverages Da Vinci with its all-in-RAM configuration*, since the state is not that large (e.g., 10 GB per partition) and the latency requirement is very tight. These use cases usually have few partitions (e.g., 16) but can have a very high replication factor (e.g., 60), adding up to a thousand instances all subscribing to the same Venice dataset.
Samza stream processing jobs also leverage Da Vinci to load AI feature data. In this case, the state can be very large, in the tens of TB in total, so many more partitions are used (e.g., one to two thousand) to spread it into smaller chunks. For stream processing, only one replica is needed, or two if a warm standby is desired, so it is the inverse of the Galene use cases. These use cases leverage the SSD-friendly configuration* to be more cost-effective.
* N.B.: Both the all-in-RAM and SSD-friendly configurations are described in more detail in the storage section.
Venice was designed from the ground up for massive scale and operability, and a large portion of development efforts is continuously invested in these important areas. As such, the system has built-in support for, among other things:
- Operator-driven configuration
- Elastic & linear scalability
In the subsequent sections, we’ll discuss each of these in turn.
It is a requirement to have our data systems deployed in all data centers. Not all data systems have the same level of integration between data centers, however. In the case of Venice, the system is globally integrated in such a way that keeping data in-sync across regions is easy, while also leveraging the isolation of each data center to better manage risks.
The Push Job writes to all data centers, and we have Hadoop grids in more than one region, all of which can serve as the source of a push. Nearline writes are also replicated to all regions in an active-active manner, with deterministic conflict resolution in case of concurrent writes.
These are important characteristics for our users, who want data to keep getting refreshed even if the data sources are having availability issues in some of the regions.
Administrative operations, such as dataset creation or schema evolution, are performed asynchronously by each region, so that if a region has availability issues, it can catch up to what it missed when it becomes healthy again. This may seem like a trivial detail, but at LinkedIn we have datasets getting created and dropped every day, from users leveraging our self-service console and also from various systems doing these operations dynamically. As such, it is infeasible for Venice operators to supervise these procedures.
Operators also have the ability to repair the data in an unhealthy region by copying it from a healthy region. In rare cases of correlated hardware failures, this last resort mechanism can save the day.
Each region contains many Venice clusters, some of which are dedicated to very sensitive use cases, while others are heavily multi-tenant. Currently, there are 17 production Venice clusters per region, spanning a total of 4000 physical hosts.
Dataset to cluster assignment is fully operator-driven and invisible to users. Users writing to and reading from Venice need not know what cluster their dataset is hosted on. Moreover, operators can migrate datasets between clusters, also invisibly for users. Migration is used to optimize resource utilization, or to leverage special tunings that some clusters have.
Each Venice cluster may contain many datasets; our most densely populated cluster currently holds more than 500 datasets. In such an environment, it is critical to have mechanisms for preventing noisy neighbors from affecting each other. This is implemented in the form of quotas, of which there are three kinds, all configurable on a per-dataset basis.
There is a storage quota, which is the total amount of persistent storage a dataset is allowed to take. For Full Push jobs, this is verified prior to the push taking place, so that an oversized push doesn’t even begin to enter the system. For other write methods, there is an ongoing verification that can cause ingestion to stall if a dataset is going over its limit, in which case dataset owners are alerted and can request more quota.
There are two kinds of read quotas: one for the maximum number of keys queryable in a Batch Get query, and the other for the maximum rate of keys looked up per second. In other words, N Single Get queries cost the same amount of quota as one Batch Get query of N keys.
One of the core principles of LinkedIn’s data infrastructure teams is Invisible Infra, which means that the users should be empowered to achieve their business goals without needing to know how the infrastructure helps them fulfill those goals. An example of this is given in the previous multi-cluster section, in the form operator-driven cluster migration. There are many more such examples.
Venice supports end-to-end compression, which helps minimize storage and transmission costs. Various compression schemes are supported, and operators can choose to enable them on behalf of users. This configuration change takes effect on the next Full Push, which can be triggered by the user’s own schedule, or triggered by the operator. The latest compression scheme we integrated is Zstandard, which we use to build a shared dictionary as part of the Push Job. This dictionary is stored just once per dataset version, thus greatly reducing total storage space in cases where some bits of data are repeated across records. In the most extreme case, we’ve seen a dataset deflate by 40x!
Venice also supports repartitioning datasets, which is another type of configuration change taking effect on the next Full Push. Partitions are the unit of scale in the system, and as datasets grow, it is sometimes useful to be able to change this parameter. For a given dataset size, having a greater number of smaller partitions enables faster push and rebalancing (more on this later).
Venice has a pluggable storage engine architecture. Previously, Venice leveraged BDB-JE as its storage engine, but since then, we have been able to fully swap this for RocksDB, without needing any user intervention. Currently, we use two different RocksDB formats: block-based (optimized for larger-than-RAM datasets, backed by SSD) and plain table (optimized for all-in-RAM datasets). As usual, changing between these formats is operator-driven. A previous blog post chronicles some of our recent RocksDB-related challenges and how we solved them.
We are looking forward to seeing what the open source community will do with this abstraction, and whether new kinds of use cases may be achievable within the scope of Venice by plugging in new storage engines.
Venice leverages Apache Helix for partition-replica placements in each cluster, which enables the system to react readily to hardware failures. If servers crash, Helix automatically rebalances the partitions they hosted onto healthy servers. In addition, we leverage Helix’s support for rack-awareness, which ensures that at most one replica of a given partition is in the same rack. This is useful so that network switch maintenance or failures don’t cause us to lose many replicas of a partition at once. Rack-awareness is also useful in that it allows us to simultaneously bounce all Venice servers in a rack at once, thus greatly reducing the time required to perform a rolling restart of a full cluster. In these scenarios, Helix is smart enough to avoid triggering a surge of self-healing rebalance operations. A previous post provides more details on Venice’s usage of Helix, although the state model has been redesigned since the post was published.
Elastic & linear scalability
Again thanks to Helix, it is easy to inject extra hardware into a cluster and to rebalance the data into it. The exact same code path that is leveraged more than 1400 times a day to rewrite entire datasets is also leveraged during rebalance operations, thus making it very well-tested and robust. This important design choice also means that whenever we further optimize the write path (which we are more or less continuously doing), it improves both push and rebalance performance, thus giving more elasticity to the system.
Beyond this, another important aspect of scalability is to ensure that the capacity to serve read traffic increases linearly, in proportion to the added hardware. For the more intense use cases described in this post—Feed and PYMK—the traffic is high enough that we benefit from over-replicating these clusters beyond the number of replicas needed for reliability purposes. Furthermore, we group replicas into various sets, so that a given Batch Get or Read Compute query can be guaranteed to hit all partitions within a bounded number of servers. This is important, because otherwise the query fanout size could increase to arbitrarily high amounts and introduce tail latency issues, which would mean the system’s capacity scales sublinearly as the hardware footprint increases. This and many other optimizations are described in more detail in a previous blog post dedicated to supporting large fanout use cases.
It would be difficult to exhaustively cover all aspects of a system as large as Venice in a single blog post. Hopefully, this overview of functionalities and use cases gives a glimpse of the range of possibilities it enables. Below is a quick recap to put things in a broader context.
What is Venice for?
We have found Venice to be a useful component in almost every user-facing AI use case at LinkedIn, from the most basic to cutting edge ones like PYMK. The user-facing aspect is significant, since AI can also be leveraged in offline-only ways, in which case there may be no need for a scalable storage system optimized for serving ML features for online inference workloads.
Besides AI, there are other use cases where we have found it useful to have a derived data store with first-class support for ingesting lots of data in various ways. One example is our use of Venice as the stateful backbone of our A/B testing platform. There are other examples as well, such as caching a massaged view of source-of-truth storage systems, when cost and performance matter, but strong consistency does not.
At the end of the day, Venice is a specialized solution, and there is no intent for it to be anyone’s one and only data system, nor even their first one. Most likely, all organizations need to have some source of truth database, like a RDBMS, a key-value store, or a document store. It is also likely that any organization would already have some form of batch or stream processing infrastructure (or both) before they feel the need to store the output of these processing jobs in a derived data store. Once these other components are in place, Venice may be a worthwhile addition to complement an organization’s data ecosystem.
The road ahead
In this post, we have focused on the larger scale and most mature capabilities of Venice, but we also have a packed roadmap of exciting projects that we will share more details about soon.
We hope this upcoming chapter of the project’s life in the open source community will bring value to our peers and propel the project to the next level.
Real-time analytics on network flow data with Apache Pinot
The LinkedIn infrastructure has thousands of services serving millions of queries per second. At this scale, having tools that provide observability into the LinkedIn infrastructure is imperative to ensure that issues in our infrastructure are quickly detected, diagnosed, and remediated. This level of visibility helps prevent the occurrence of outages so we can deliver the best experience for our members. To provide observability, there are various data points that need to be collected, such as metrics, events, logs, and flows. Once collected, the data points can then be processed and made available, in real-time, for engineers to use for alerting, troubleshooting, capacity planning, and other operations.
At LinkedIn, we developed InFlow to provide observability into network flows. A network flow describes the movement of a packet through a network and is the metadata of a packet sampled at a network device that describes the packet in terms of the 5-tuple: source IP, source port, destination IP, destination port, and protocol. It may also contain source and destination autonomous system numbers (ASNs), the IP address of the network device that has captured this flow, input and output interface indices of the network device where the traffic was sampled, and the number of bytes transferred.
How LinkedIn leverages flow data
InFlow provides a rich set of time-series network data having over 50 dimensions such as source and destination sites, security zones, ASNs, IP address type, and protocol. With this data, various types of analytical queries can be run to get meaningful insights about network health and characteristics.
Figure 1. A screenshot from InFlow UI’s Top Services tab which shows the 5 services consuming the most network bandwidth and the variation of this traffic over the last 2 hours
Most commonly, InFlow is used for operational troubleshooting to get complete visibility into the traffic. For example, if there is an outage due to a network link capacity exhaustion, InFlow can be used to find out the top talkers for that link based on hosts/services that are consuming the most bandwidth (Figure 1) and based on the nature of the service, further steps can be taken to remediate the issue.
Flow data also provides source and destination ASN information, which can be used for optimizing cost, based on bandwidth consumption of different kinds of peering with external networks. It can also be used for analyzing data based on several dimensions for network operations. For example, finding the distribution of traffic by IPv4 or IPv6 flows or the distribution of traffic based on Type of Service (ToS) bits.
InFlow architecture overview
Figure 2. InFlow architecture
Figure 2 shows the overall InFlow architecture. The platform is divided into 3 main components: flow collector, flow enricher, and InFlow API with Pinot as a storage system. Each component has been modeled as an independent microservice to provide the following benefits:
- It enforces the single responsibility principle and prevents the system from becoming a monolith.
- Each of the components have different requirements in terms of scaling. Separate microservices ensure that each can be scaled independently.
- This architecture creates loosely coupled pluggable services which can be reused for other scenarios.
InFlow receives 50k flows per second from over 100 different network devices on the LinkedIn backbone and edge devices. InFlow supports sFlow and IPFIX as protocols for collecting flows from network devices. This is based on the device’s vendor support for the protocols and minimal impact of flow export on the device’s performance. The InFlow collector receives and parses these incoming flows, aggregates the data into unique flows for a minute, and pushes them to a Kafka topic for raw flows.
The data processing pipeline for InFlow leverages Apache Kafka and Apache Samza for stream processing of incoming flow events. Our streaming pipeline processes 50k messages per second, enriching the data with 40 additional fields (like service, source and destination sites, security zones, ASNs, and IP address type), which are fetched from various internal services at LinkedIn. For example, our data center infrastructure management system, InOps, provides information on the site, security zone, security domain of the source, and destination IPs for a flow. The incoming raw flow messages are consumed by a stream processing job on Samza and after adding the additional enriched fields, the result is pushed to an enriched Kafka topic.
InFlow requires storage of tens of TBs of data with a retention of 30 days. To support its real-time troubleshooting use case, the data must be queryable in real-time with sub-second latency so that engineers can query the data without any hassles during outages. For the storage layer, InFlow leverages Apache Pinot.
Figure 3. A screenshot from InFlow UI’s Explore tab which provides a self-service interface for users to visualize flow data by grouping and filtering on different dimensions
The InFlow UI is a dashboard with some of the commonly used visualizations on flow data pre-populated that provides a rich interface where the data can be filtered or grouped by any of the 40 different dimension fields. The UI also has an Explore section, which allows for creation of ad-hoc queries. The UI is based on top of InFlow API, which is a middleware responsible for translating user input into Pinot queries and issuing them to the Pinot cluster.
Pinot as a storage layer
In the first version of InFlow, data was ingested from the enriched Kafka topic to HDFS. We leveraged Trino for facilitating user queries on the data present in HDFS. However, the ETL and aggregation pipeline added a 15-20 minute delay to the pipeline, reducing the freshness of the data. Additionally, query latencies to HDFS using Presto were in the order of 15-30 seconds. This latency and delay was acceptable for doing historical data analytics, however, for real-time troubleshooting, the data needs to be available in real-time with a maximum delay of 1 minute.
Based on the query latency and data freshness requirements, we explored several storage solutions available at LinkedIn (like Espresso, Kusto, and Pinot) and decided on onboarding our data to Apache Pinot. When looking for solutions, we needed a reliable system providing real-time ingestion and sub-second query latencies. Pinot’s support for Lambda and Lamda-less architecture, real-time ingestion, and low latency at high throughput could help us achieve optimal results. Additionally, the Pinot team at LinkedIn is experimenting with supporting a new use case called Real-time Operational Metrics Analysis (ROMA), which enables engineers to slice and dice metrics along different combinations of dimensions to help monitor infrastructure near real-time, analyze the last few weeks/months/years of data to discover trends and patterns to forecast and plan capacity, and helps to find the root cause of outages quickly and reduce the time to recovery. These objectives aligned well with our problem statement of processing large numbers of metrics in real-time.
The Pinot ingestion pipeline consumes directly from the enriched Kafka topic and creates the segments on the Pinot servers, which improves the freshness of the data in the system to less than a minute. User requests from InFlow UI are converted to Pinot SQL queries and sent to the Pinot broker for processing. Since Pinot servers keep data and indices in cache-friendly data structures, the query latencies are a huge improvement from the previous version where data was queried from disk (HDFS).
Several optimizations were done to reach this query latency and ingestion parameters. Because the data volume for the input Kafka topic is high, several considerations were made to decide the optimal number of partitions in the topic to allow for parallel consumption into segments in Pinot after several experiments with the ingestion parameters. Most of our queries involved a regexp_like condition on the devicehostname column, which is the name of the network device that has exported the flow. This is used to narrow down on a specific plane of the network. regexp_like is inefficient as it cannot leverage any index so to resolve this, we set up an ingestion transformation using Pinot. These are various transformation functions that can be applied to your data before it is ingested into Pinot. The transformation created a derived column flowType, which classifies a flow based on the name of the network device that has exported this flow into a specific plane of the network. For example, if the exporting device is at the edge of our network, then this flow can be classified as an Internet-facing flow. The flowType column is now an indexed column used for equality comparisons instead of regexp_like and this helped improve query latency by 50%.
Queries from InFlow always request for data from a specific range in time. To improve query performance, timestamp based pruning was enabled on Pinot. This improved query latencies since only relevant segments are filtered in for processing based on the filter conditions on the timestamp column in queries. Based on the Pinot team’s input, indexes on the different dimension columns were set up to aid query performance.
Figure 4. Latency metric for InFlow API query for top flows in the last 12 hours before and after onboarding to Pinot
Following the successful onboarding of flow data to a real-time table on Pinot, freshness of data improved from 15 mins to 1 minute and query latencies were reduced by as much as 95%. For some of the more expensive queries, which took as much as 6 minutes using Presto queries, the query latency reduced to 4 seconds using Pinot.This has been helpful in making it easier for the network engineers at LinkedIn to easily get the data they need for troubleshooting or running real-time analytics on network flow data.
The current network flow data only provides us with sampled flows from the LinkedIn backbone and edge network. Skyfall is an eBPF-based agent, developed at LinkedIn, that collects flow data and network metrics from the host’s kernel with minimal overhead. The agent captures all flows for the host without sampling and will be deployed across all servers in the LinkedIn fleet. This would provide us with a 100% coverage of flows across our data centers and enable us to support more use cases on flow data that require unsampled information such as security audit and validation based use cases. Because the agent collects more data and from more devices, the scale of data collected by Skyfall is expected to be 100 times that of InFlow. We are looking forward to leveraging the InFlow architecture to support this scale and provide real-time analytics on top of the rich set of metrics exported by the Skyfall agent. Another upcoming feature that we are excited about is leveraging InFlow data for anomaly detection and more traffic analytics.
Onboarding our data to Pinot was a collaborative effort and we would like to express our gratitude to Subbu Subramaniam, Sajjad Moradi, Florence Zhang, and the Pinot team at LinkedIn for their patience and efforts in understanding our requirements and working on the optimizations required for getting us to the optimal performance.
Thanks to Prashanth Kumar for the continuous dialogue in helping us understand the network engineering perspective on flow data. Thanks to Varoun P and Vishwa Mohan for their leadership and continued support.
Feathr joins LF AI & Data Foundation
In April 2022, Feathr was released under the Apache 2.0 license and we announced, in close conjunction with our Microsoft Azure partners, native integration and support for Feathr on Azure. Since being open sourced, Feathr has achieved substantial popularity among the machine learning operations (MLOps) community. It has been adopted by companies of various sizes across multiple industries and the community continues to grow rapidly. Most excitingly, more and more open-source enthusiasts are contributing code to Feathr.
It’s clear that many others experience the same pain points that Feathr aims to address. That’s why we are excited to share it with a broader audience and for Feathr to be adopted by a broader open-source community with help from LF AI & Data.
Donating to the LF AI & Data will help ensure that Feathr continues to grow and evolve across various dimensions, including visibility, user base, and contributor base. Also, the Feathr development team will have more opportunities to collaborate with other member companies and projects, such as achieving richer online store support via integration with Milvus and JanusGraph, and adopting open data lineage standard from OpenLineage. As a result, we hope Feathr helps AI engineers build and scale feature pipelines and feature applications in ways that push MLOps tech stacks and the industry forward for years to come.
The Feathr feature store provides an abstraction layer between raw data and ML models. This abstraction layer standardizes and simplifies feature definition, transformation, serving, storage, and access from within ML workflows or applications. Feathr empowers AI engineers to focus on feature engineering while it takes care of data serialization format, connecting to various databases, performance optimization, and credential management. More specifically, Feathr helps:
- Define features once and use them in different scenarios, like model training and model serving
- Create training dataset with point-in-time correct semantics
- Connect to various offline data sources (data lakes, and data warehouses), and then transform source data into features
- Deliver feature data from offline system to online store for faster online serving
- Discover features or share features among colleagues or teams with ease
To learn more please visit Feathr’s GitHub page here and our April 2022 blog, Open sourcing Feathr – LinkedIn’s feature store for productive machine learning.
Acceptance into the LF AI & Data indicates an important recognition from the Linux Foundation. We believe a large, diverse, healthy, and self-sustained Feathr open-source community is important. We’re excited for the new chapter of Feathr and to welcome more people into the Feathr community.
Instagram Story Time Limit Increased to 60 Seconds: Report
Elon Musk, Twitter CEO Parag Agrawal Said to Postpone Depositions Ahead of Upcoming Trial
Social Media Activism in 2022: How to Go Beyond the Hashtag
Open Sourcing Venice – LinkedIn’s Derived Data Platform
WhatsApp Call Links Support Rolling Out, 32-Member Group Video Call Testing Also Begins
Twitter Says 50-60 Percent of Tweets in Government Takedown Orders Are ‘Innocuous’: Details
Twitter Expanding Birdwatch Community Fact-Checking Programme With New Onboarding Process, More
Operating system upgrades at LinkedIn’s scale
Career stories: Rejoining LinkedIn to scale our media infrastructure
Introducing Facebook Graph API v15.0 and Marketing API v15.0
Kiwi Farms’ Services Terminated by DDoS-Guard Over Hate Forum’s Violation of Acceptable Use Policy
Challenges and practical lessons from building a deep-learning-based ads CTR prediction model
FACEBOOK2 weeks ago
Introducing Facebook Graph API v15.0 and Marketing API v15.0
LINKEDIN2 weeks ago
Real-time analytics on network flow data with Apache Pinot
Uncategorized2 weeks ago
The 12 Best Chatbot Examples for Businesses
FACEBOOK2 weeks ago
Meet the Developers: Linux Kernel Team (Jonathan Zhang)
FACEBOOK7 days ago
Summer of open source: building more efficient AI with PyTorch
Uncategorized2 weeks ago
I Tried Instagram Automation (So You Don’t Have To): An Experiment
FACEBOOK1 week ago
Introducing Facebook Reels API: an enterprise solution for desktop and web publishers
OTHER2 weeks ago
Elon Musk Accuses Twitter of Fraud for Concealing Serious Security Flaws in Amended Lawsuit: Report