Multitask learning, a subfield of machine learning, aims to accomplish multiple learning tasks at the same time by exploiting commonalities and differences across tasks. Traditionally, engineers have built separate AI models to accomplish each task at hand. While this allows a simple boundary of ownership, the isolated models lack the ability to share any learnings gathered from training the models. For example, new machine learning (ML) tasks often suffer from the cold start problem due to a lack of training data.
At LinkedIn, we try to improve the performance of AI models by training them in groups of related tasks, which has led to better model performance in our experiments. As ML models power important functions such as job recommendation, job search, and many other products on LinkedIn, the improvement from these models lead to a more effective and relevant jobs marketplace for our members.
To address the fundamental challenges in applying multitask learning to general modeling problems, we have designed a multitask training framework. Unlike confined modeling domains in which tasks share the same training data, model architecture, or loss function, building a multitask learning framework for LinkedIn’s heterogeneous use cases had several challenges.
The first challenge that we faced was that different tasks may have a vastly different distribution of labels, which happens a lot among upstream and downstream tasks. For example, we may have tasks to predict for job view and job application. While both tasks are from job search, for every job view, maybe only 10% of it leads to a final job application. This means job applications could easily be overwhelmed by job views. To solve this, we relax the constraints in training and allow different tasks to have separate datasets as input. Using the previous example for a job search example, job application could choose to use a down-sampled training data from the job view training data, to prevent its sparsely populated signals from being overwhelmed.
With the relaxation of the input datasets, another challenge we faced was that different sets of training data may not need to share the same set of features as different tasks may have different input features. This is especially the case if tasks to be trained together are from different domains or products. At LinkedIn, we have a job search feature for members to find jobs and we also have recruiter search for recruiters to find candidates for a job. These two tasks are related but vary in the features that are required. Through unifying the input data scheme across tasks, we were able to unify schemas of different tasks for multitask training.
Fig. 1 Unify schemas of different tasks for multi-task training
As shown in Figure 1, task A has two unique features (in red), task B has two unique features (in blue), and both tasks have one shared feature. Our framework can identify shared features and task-specific features. As a result, it will unify the input data schema of both tasks and fill the missing fields to a predefined default value (0 in the above figure).
The last challenge we faced was different tasks using different model architectures and having different input layers or loss functions. The solution we implemented was to allow model developers to specify shared model architecture and task-specific model architecture in the same way as in single-task learning models.
Fig. 2 From separate single-task training to multi-task training
As shown in Figure 2, model developers can define the model architecture of each task, similar to single-task learning. We automatically identify the model component that is shareable across tasks and as a result, there is a shared model for overlapping parts between tasks and task-specific models for non-overlapping parts.
How training works
As explained earlier we may have different model architectures for different tasks, with different loss functions and different hyper-parameters such as learning rate. We support two different mechanisms for training: 1)iterative training where each task and its associated model will be trained separately in an alternating fashion, and 2) joint training where training data from different tasks are combined and the task losses are combined and back propagated to the shared model.
We demonstrate the difference between the two through a classical shared-bottom model structure in Figure 3.
Fig. 3 Left: Joint Training; Right: Iterative Training. Red arrows indicate task 1 data, blue arrows indicate task 2 data, black arrows indicate combined data
In summary, here are the key differences between the two approaches:
Joint training, because of the combined input batches, requires one single pre-defined shared sub-model among tasks. On the other hand, iterative training may be flexible with different kinds of hard or soft sharings.
Relative weightings of different tasks are a difficult hyper-parameter for multi-task training. In joint training, they can be learned by regularizing or balancing the loss/gradients from the combined loss/shared layers. In iterative training, because there is no combined loss, only tasks’ gradients from shared layers could be leveraged.
Joint training is more suitable for cases when there are interactions of task losses. One example is from knowledge distillation use cases, while we can have two task losses: a teacher loss and a student loss, a third hinged differentiation loss depending on the two can also be included. Calculation of the hinge loss requires the teacher and the student to be trained on the same training batches.
Either one of the two approaches may be selected depending on the use case.
Cross-domain skill understanding
For job skill extraction, we heavily rely on a contextual skill model. With this model, we tag out skill terms using a skill entity tagger and then use a contextual skill model to consider the tagged skill terms in the context of the job posting, to determine if it is a valid skill entity. For example, Spark is a data analytics tool, but in the context of the phrase “electric wires give off spark,” it is not a skill.
The contextual skill model takes in
Fig. 4 Model architecture of the cross-domain skill extraction model
The training data from the job domain is in seven different languages. In production, we will need to be able to handle the same seven international languages well in both job and resume domains. As a result, for sentence context, we chose to use an in-house trained multilingual word embedding.
For skill entities, we chose to use an in-house trained skill entity embedding instead of skill raw term text. Skills are conceptual entities and are language-agnostic in nature. If we treat skill terms as text, the same skill concept will have different embedding representations in different languages, which introduces more noise to the model. We decided to leverage the entity embedding as it is a more stable representation across languages.
The new shared model had outperformed the single-task models in both job and resume tasks and resulted in more relevant job recommendation matches between jobs and members. In production, it has led to statistically significant improvements in user engagement, such as more weekly active users (WAU), sessions, and higher click through rate (CTR).
Cross domain member company affinity learning
For the LinkedIn recruiter search service, we want to surface relevant members in the search result to garner more InMail acceptances and gains in confirmed hires. These metric gains are driven by a two-way selection between job candidates and recruiters and would benefit from a better characterization of recruiter-candidate affinity, informed by user activities across different but related sites on the LinkedIn ecosystem. Using only user history on the recruiter search service has limitations. One example is that a potential candidate has not interacted with any recruiters in the past few days, yet they have been actively applying to certain jobs. Incorporating user activities on LinkedIn’s job search and recommendation sites would help recruiters reach out to a broader pool of interested candidates, further driving the key InMail and hiring metrics. Compared to using pure activity count features, user representations trained on multitask deep neural networks provide a greater flexibility of encoding diverse activity signals optimized for given targets. Additionally, training on multitask deep neural networks provides an improved abilitly to generalize relevant items that have not shown up in the search history.
For the LinkedIn recruiter search product, embeddings that represent the similarity between recruiters and general LinkedIn members, who are potential job candidates, would help improve search relevance. Besides data from recruiter search itself, members’ job preferences also affect key metrics such as InMail acceptance. We co-trained member and company embeddings using a multitask deep neural network with a shared bottom for transfer learning between recruiter InMail accept, job apply, and job view tasks.
Fig. 5 Model architecture of cross domain member company model
The output member and company embeddings represent members’ affinity for recruiters’ companies. The label data is obtained from user activity tracking and are split at a particular historical time for training/validation. We use cross-entropy loss weighted over different tasks. Two variants of models are experimented, one with only member profile features, and the other with a host of additional member activity features that represent members’ job search intentions. With the added member activity features, we observe more positive transfer and the output embeddings bring more lift in the recruiter search model. The member and company embeddings have been ramped to Recruiter Search production, bringing significant gains to key metrics such as InMail accepts and predicted confirmed hires.
In this post, we have introduced our approach to a multitask learning framework and how we applied it to heterogeneous tasks in various product domains. The application of multitask learning has shown to improve model performance with significant product impact. We find that the success of applying multitask learning relies on choosing the relevant tasks, which currently depends on domain knowledge and intuition. In the future, we’d like to explore how to scale multitask learning to a large number of modeling tasks, creating a way to automatically identify tasks that can be effectively learned together.
Sen Zhou, Anastasiya Karpovich from the AI Foundation Team contributed to develop Translearn for foundational support of different training paradigms and of iterative training. Ji Yan, Peide Zhong and Xu Dan from the Enterprise Standardization team contributed to add support for joint training. Dansong Zhang, Tong Zhou from the AI Foundation Team and Sang Wook Park from Hirer AI developed the AI models and conducted A/B experiments for cross domain member company affinity learning. Raochuan Fan and Ji Yan developed the AI model and conducted A/B experiments for cross domain skill understanding. Zhewei Shi and the ProML team provided support for Translearn integration with LinkedIn infrastructure. Thanks for the management support from Tie Wang, Lei Zhang, Jimmy Guo and technical discussion with Jaewon Yang.
Super Tables: The road to building reliable and discoverable data products
Figure 1: Evolving the current data ecosystem to leveraging Super Tables
The Super Tables initiative aims to mitigate these challenges as illustrated in Figure 1. Our goal is to build only a small number of enterprise-grade Super Tables that will be leveraged by many downstream users. Each domain may have only very few Super Tables that are highly leveraged by users. By consolidating hundreds of SOTs into these few Super Tables, a large number of SOTs will eventually be deprecated and retired.
- Having one or two Super Tables for a business entity or event (i.e., in a domain) makes it easier to explore and consume data for data analytics. It minimizes data duplication and reduces time to find the right data.
Strengthens Reliability & Usability
- Super Tables are materialized with precomputed joins of related business entities or events to consolidate data that is commonly needed for downstream analytics. This obviates the need to perform such joins in many downstream use cases leading to simpler and more performant downstream flows and better resource utilization.
- Super Tables provide and publish SLA commitments on availability and supportability with proactive data quality checks (structural, semantics and variance) and a programmatic way of querying the current and historical data quality check results.
Improves Change Management
- Super Tables have well-defined governance policies for change management and communication and committed change velocity (e.g., monthly deployment of changes). Upstream data producers and downstream consumers (cross teams/domains) are both involved in the governance of changes to the STs.
Super Tables design principles
In this section, we highlight several important design principles for Super Tables. We want to emphasize that most of these principles are applicable to any dataset, not just Super Tables.
Building a data product requires a good understanding of the domain and the available data sources. It is important to establish the source of truth and document such information. Choosing the right sources will promote data consistency. For example, if a source is continuously refreshed via streaming, the metric calculation may lead to inconsistent results at a different time. Consumers may not realize the situation and can be surprised by this outcome. Once sources are identified, we need to look at the upstream availability and business requirements so that the ST’s SLA can be established. One may argue that we should consolidate as many datasets into a single ST as possible. However, adding a data source to the ST will increase the resource needed to materialize the ST, and potentially jeopardize its SLA commitment. A good understanding of how the extra data source will be leveraged downstream (e.g., the extra data source is needed to compute a critical metric) is warranted.
Field naming conventions and field groupings are established so that users can easily understand the meaning of the fields and frozen fields (immutable values) are identified. For example, a job poster may switch companies but the hiring company for the job posting shouldn’t change – it is important to include the hiring company information (immutable) instead of the job poster’s company. Future flow executions should not change the field values. By default, any schema changes (both compatible and incompatible) in data sources would not affect the ST. For example, if a new column is added to a source, the column will not appear in the ST by default. Similarly if an existing source column is deleted, its value is nullified and the ST team will be notified. Schema changes in a ST are decoupled from its sources so that any change will not accidently break the ST flow. Planned changes are documented and communicated to consumers through a distribution list.
The retention policy of the ST is well established and published so that downstream consumers are fully aware of the policy and any future changes. Likewise, the retention policies of its data sources are tracked and monitored.
Establishing Upstream SLA Commitment
To meet its own SLA commitment, the SLA commitment of all data sources must be established and agreed on. Any changes will be notified for proper actions.
Data quality checks on all data sources as well as the ST itself must be put in place. For example, the ST’s primary key column should not be NULL or have duplicates. Data profiles on fields are performed to identify any potential outliers. If a data source has seriously bad data quality, the ST flow may be terminated until the quality issue is resolved.
It is very important to have both dataset and column-level documentation available to users to determine if the ST can be leveraged. Very often users want to understand the sources and how the ST and its fields are derived.
High Availability / Disaster Recovery
ST aims to reach 99+% availability. For a daily ST flow, it translates to approximately one SLA miss per quarter. To improve availability, STs can be materialized in multiple clusters. With an active-active configuration, the ST flows will be executed independently and consistency is guaranteed on two clusters (JOBS flows run on 2 production clusters). In case of SLA miss and disaster recovery, data can be copied from the mirror dataset across clusters.
ST flows must be monitored closely for any deviation from the norm.
- Flow performance: the flow’s runtime trending may lead the prevention of SLA misses
- Data quality: the sources must adhere to established quality standards and data quality metrics are shown on dashboards
- Cross cluster: the datasets in multiple clusters must be compared for deviation detection
A governance body (comprising teams from upstream and downstream) is established to ensure that the ST design principles are followed, the dataflow operation meets SLA and data quality is preserved, and changes are communicated to consumers. For example, if a downstream user wants to include a new field from a source, the user will have to submit the request to the governance body for evaluation and recommendation. In contrast to the past, a field very large in size would have been automatically included and blown up the final dataset size as well as jeopardized the delivery SLA. The governance body will have to consider all the tradeoffs in the final recommendation. A minimum of monthly release cadence is established to accept change requests so that the ST has the agility to serve the business needs.
A brief introduction to two Super Tables
The first Super Table is JOBS. Before the JOBS ST was built, there were a dozen tables and views that were built over the years; each joined with the job posting dataset (more than one) with different dimensional tables for various analytics, such as the job poster information, and standardized/normalized job attributes (location, titles, industries, and skills etc.). These tables/views may be created using different formats (such as Apache Avro or Apache ORC) or on different clusters. They may have different arrival frequencies and times (SLAs). Adding more complexity to the situation is that there are many different kinds of jobs, some of which may even be deleted or archived, and various job listing and posting types. Choosing the right dataset for a particular use case requires a complicated task of understanding and analyzing various data sources, joining the right dimensional datasets, and in some cases repeating the join redundantly. As such, the learning curve is steep.
The JOBS ST combines data elements from 57+ critical data sources from the multiple LinkedIn teams across different organizations. Totaling 158 columns, this JOBS ST precomputes and combines the most needed information for job-related analyses and insights. It was our intent that JOBS ST can be leveraged extensively for the most critical use cases. The JOBS ST has a daily SLA in the morning and the daily flow is run on two different production clusters to provide high availability. Data quality of all data sources and JOBS itself are enforced and monitored continuously.
Leveraging the JOBS ST is easier and more efficient than its original source datasets. In fact, in many downstream flows, the flow logic is simplified by just scanning the JOBS ST or joining it with another dimensional table, making the logic more efficient. With the availability of JOBS, the existing dozen job-related tables/views will be deprecated and migrated to JOBS.
The second Super Table is Ad Events. Before the Ad Events ST was built, there were seven different advertisement (ads) related tables including ad impressions, clicks, and video views. They have many fields in common such as campaign and advertiser information. Downstream frequently needs to join multiple ads tables, the campaign dimension, and the advertiser dimension tables to get insights on the ads revenue, performance, etc. The duplicate fields and frequent downstream joins added some unnecessary storage and computation.
Upon analyzing the commonly joined tables and downstream metrics, the Ad Events ST is created with 150+ columns to provide the precomputed information ready for Ads insights analysis and reporting.
From designing and implementing the Super Tables, we have learned many valuable lessons. First, we were able to learn that understanding the use cases and usage patterns is crucial in determining the main benefits of using a Super Table. Before building a new Super Table, it’s important to look around and see what similar tables are already in place. Sometimes it’s better to strengthen an existing table than to build one from scratch. When weighing these two options, factors to consider include quality, coverage, support, and usage of the existing tables.
Next, we learned that while building Super Tables, it’s important to identify those semantic logic and their owners to ensure that it is correct and understand how it will evolve over time. Data transformation includes structural logic (e.g., table joins, deduplication, and data type conversion) and semantic logic (e.g., a method of predicting the likelihood of a future purchase based on browsing history). Semantic logic is usually owned by a specific team with deep domain knowledge. Without proper communication and collaboration, semantic logic would be poorly maintained and outdated. A better solution is to separate semantic logics into a different layer (such as Dali Views built on top of ST) and be managed by the team with the domain knowledge.
With Super Table SLAs, the stricter the SLA, the less tolerant you can be of issues. This means implementing mechanisms like defensive data loading, safe data transformation, ironclad alerting and escalation, and runtime data quality checks. For softer SLAs, you can tolerate a failure, triage and resolve the issue. With strict SLAs, sometimes you cannot tolerate a single failure.
Another learning was that, in an age with an emphasis on privacy, where data inaccuracy can have disastrous cascading effects, and lowering costs is at the forefront, reducing redundant datasets is the closest thing to a panacea there is. Often, it is better to align a Super Table to fulfill a distinct use case and ensure all requirements are met, instead of tolerating duplication for the sake of speed. Likewise, consolidating multiple similar datasets (which target different use cases) into a single Super Table is extremely critical.
Lastly, after the release of ST, one of the critical tasks is to migrate users of the legacy SOTs to the new ST. The sooner the migration is complete, the more resources can be freed up for other tasks. We have learned that it is imperative to provide awareness and support to the downstream users who need to perform the migrations. To that end, we have created a migration guide that outlines all the impacted SOTs. It includes detailed column-to-column mappings and suggested validations to perform to ensure data correctness and quality. The release of JOBS ST has significantly improved the life of both the owners of jobs data sources and the downstream consumers. Before ST, the knowledge of jobs business logic was scattered across several data sources owned by different teams and none of them had the full picture, making it extremely difficult for downstream consumers to figure out what is the right data source to use per their use cases. Usually time-consuming communications were required among different teams and it could easily lead to a misuse of data but since the release of ST, we formed a governance body involving various stakeholders. The governance body manages the evolution and maintenance of the ST. For instance, the ST consumers requested a new field. The governance body discussed the best design and implementation approach, which involved gathering raw data from the sources and implementing the transformation logic in the ST. The monthly release cadence allows development agility and JOBS data are integrated with easier access and much less confusion.
Conclusions and future work
Democratization of data authoring brings agility to analytics throughout the LinkedIn community. However, it introduces lots of challenges such as discoverability, redundancy, and inconsistency. We have launched two Super Tables at LinkedIn that address these challenges, and are in the process of identifying and building more Super Tables. Both STs have simplified data discovery by providing the “go-to” tables. They have also simplified downstream logic and hence saved computation resources. The created value is also amplified due to the high-leverage nature of these tables.
The following table summarizes the benefits of building and leveraging Super Tables.
Open Sourcing Venice – LinkedIn’s Derived Data Platform
We are proud to announce the open sourcing of Venice, LinkedIn’s derived data platform that powers more than 1800 of our datasets and is leveraged by over 300 distinct applications. Venice is a high-throughput, low-latency, highly-available, horizontally-scalable, eventually-consistent storage system with first-class support for ingesting the output of batch and stream processing jobs.
Venice entered production at the end of 2016 and has been scaling continuously ever since, gradually replacing many other systems, including Voldemort and several proprietary ones. It is used by the majority of our AI use cases, including People You May Know, Jobs You May Be Interested In, and many more.
The very first commit in the Venice codebase happened a little more than eight years ago, but development picked up in earnest with a fully staffed team in the beginning of 2016, and continues to this day. In total, 42 people have contributed more than 3700 commits to Venice, which currently equates to 67 years of development work.
Throughout all this, the project traversed many phases, increasing in both scope and scale. The team learned many lessons along the way, which led to a plethora of minor fixes and optimizations, along with several rounds of major architectural enhancements. The last such round brought active-active replication to Venice and served as a catalyst for clearing many layers of tech debt. Against this backdrop, we think Venice is in the best position it’s ever been in to propel the industry forward, beyond the boundaries of LinkedIn.
This blog post provides an overview of Venice and some of its use cases, divided across three sections:
- Writing to Venice
- Reading from Venice
- Operating Venice at scale
Writing data seems like a natural place to start, since anyone trying out a new database needs to write data first, before reading it.
How we write into Venice also happens to be one of the significant architectural characteristics that makes it different from most traditional databases. The main thing to keep in mind with Venice is that, since it is a derived data store that is fed from offline and nearline sources, all writes are asynchronous. In other words, there is no support for performing strongly consistent online write requests, as one would get in MySQL or Apache HBase. Venice is not the only system designed this way—another example of a derived data store with the same constraints is Apache Pinot. This architecture enables Venice to achieve very high write throughputs.
In this section, we will go over the various mechanisms provided by Venice to swap an entire dataset, as well as to insert and update rows within a dataset. Along the way, we will also mention some use cases that leverage Venice, and reference previously published information about them.
The most popular way to write data into Venice is using the Push Job, which takes data from an Apache Hadoop grid and writes it into Venice. Currently, LinkedIn has more than 1400 push jobs running daily that are completely automated.
By default, the Venice Push Job will swap the data, meaning that the new dataset version gets loaded in the background without affecting online reads, and when a given data center is done loading, then read requests within that data center get swapped over to the new version of the dataset. The previous dataset version is kept as a backup, and we can quickly roll back to it if necessary. Older backups get purged automatically according to configurable policies. Since this job performs a full swap, it doesn’t make sense to run multiple Full Pushes against the same dataset at the same time, and Venice guards against this, treating it as a user error. Full Pushes are ideal in cases where all or most of the data needs to change, whereas other write methods described in the next sections are better suited if changing only small portions of the data.
By 2018, the roughly 500 Voldemort Read-Only use cases running in production had been fully migrated to Venice, leveraging its Full Push capability as a drop-in replacement.
A large portion of Full Push use cases are AI workflows that retrain daily or even several times a day. A prominent example of this is People You May Know. More generally, LinkedIn’s AI infrastructure has adopted a virtual feature store architecture where the underlying storage is pluggable, and Venice is the most popular choice of online storage to use with our feature store, Feathr, which was recently open sourced. Tight integration between Venice and Feathr is widely used within LinkedIn, and this integration will be available in a future release of Feathr.
Another way to leverage the Venice Push Job is in incremental mode, in which case the new data gets added to the existing dataset, rather than swapping it wholesale. Data from incremental pushes get written both to the current version and backup version (if any) of the dataset. Contrary to the default (Full Push) mode, Venice supports having multiple Incremental Push jobs run in parallel, which is useful in cases where many different offline sources need to be aggregated into a single dataset.
Besides writing from Hadoop, Venice also supports writing from a stream processor. At LinkedIn, we use Apache Samza for stream processing, and that is what Venice is currently integrated with. We are looking forward to hearing from the community about which other stream processors they would like to integrate with Venice. Because Venice has a sufficiently flexible architecture, integrating additional stream processors is possible.
Similar to incremental pushes, data written from stream processors gets applied both to the current and backup versions of a dataset, and multiple stream processing jobs can all write concurrently into the same dataset.
Since 2017, LinkedIn’s A/B testing experimentation platform has leveraged Venice with both full pushes and streaming writes for its member attribute stores. We will come back to this use case in the next section.
There is an alternative way to leverage a stream processor that is similar to a Full Push. If the stream processor is leveraging an input source such as Brooklin that supports bootstrapping the whole upstream dataset, then the owner of the stream processing job can choose to reprocess a snapshot of its whole input. The stream processor’s output is written to Venice as a new dataset version, loaded in the background, and, finally, the read traffic gets swapped over to the new version. This is conceptually similar to the Kappa Architecture.
We’ve described before on the blog our data standardization work, which helps us build taxonomies to power our Economic Graph. These standardization use cases started off using a different database, but since then have fully migrated their more than 100 datasets to Venice. Having built-in support for reprocessing allows standardization engineers to run a reprocessing job in hours instead of days, and to do more of them at the same time, both of which greatly increases their productivity.
For both incremental pushes and streaming writes, it is not only possible to write entire rows, but also to update specific columns of the affected rows. This is especially useful in cases where multiple sources need to be merged together into the same dataset, with each source contributing different columns.
Another supported flavor of partial updates is collection merging, which enables the user to declaratively add or remove certain elements from a set or a map.
The goal of partial updates is for the user to mutate some of the data within a row without needing to know the rest of that row. This is important because all writes to Venice are asynchronous, and it is therefore not possible to perform “read-modify-write” workloads. Compared to systems which provide direct support for read-modify-write workloads (e.g., via optimistic locking), Venice’s approach of declaratively pushing down the update into the servers is more efficient and better at dealing with large numbers of concurrent writers. It is also the only practical way to make hybrid workloads work (see below).
Hybrid write workloads
Venice supports hybrid write workloads that include both swapping the entire dataset (via Full Push jobs or reprocessing jobs) and nearline writes (via incremental pushes or streaming). All of these data sources get merged together seamlessly by Venice. The way this works is that after having loaded a new dataset version in the background, but before swapping the reads over to it, there is a replay phase where recent nearline writes get written on top of the new dataset version. When the replay is caught up, then reads get swapped over. How far back the replay should start from is configurable.
This previous blog post on Venice Hybrid goes into more detail. Of note, the bookkeeping implementation details described in that post have since been redesigned.
As of today, including both incremental pushes and streaming writes, there are more than 200 hybrid datasets in production.
Putting it all together
In summary, we can say that Venice supports two data sources: Hadoop and Samza (or eventually any stream processor, if the open source community wants it), which are the columns in the following table. In addition, Venice supports three writing patterns, which are the rows of the following table. This 2×3 matrix results in 6 possible combinations, all of which are supported.
|Full dataset swap||Full Push job||Reprocessing job|
|Insertion of some rows into an existing dataset||Incremental push job||Streaming writes|
|Updates to some columns of some rows||Incremental push job
doing partial updates
doing partial updates
In addition to supporting these individual workloads, Venice also supports hybrid workloads, where any of the “full dataset swap” methods can be merged with any of the insertion and update methods, thanks to the built-in replay mechanism.
In terms of scale, if we add up all of the write methods together, Venice at LinkedIn continuously ingests an average of 14 GB per second, or 39M rows per second. But the average doesn’t tell the full story, because Full Pushes are by nature spiky. At peak throughput, Venice ingests 50 GB per second, or 113M rows per second. These figures add up to 1.2 PB daily and a bit more than three trillion rows daily.
On a per-server basis, we throttle the asynchronous ingestion of data by throughput of both bytes and rows. Currently, our servers with the most aggressive tunings are throttling at 200 MB per second, and 400K rows per second, all the while continuing to serve low latency online read requests without hiccups. Ingestion efficiency and performance is an area of the system that we continuously invest in, and we are committed to keep pushing the envelope on these limits.
Now that we know how to populate Venice from various sources, let’s explore how data can be read from it. We will go over the three main read APIs: Single Gets, Batch Gets, and Read Compute. We will then look at a special client mode called Da Vinci.
The simplest way to interact with Venice is as if it were a key-value store: for a given key, retrieve the associated value.
LinkedIn’s A/B testing platform, mentioned earlier, is one of the heavy users of this functionality. Including all use cases, Venice serves 6.25M Single Get queries per second at peak.
It is also possible to retrieve all values associated with a set of keys. This is a popular usage pattern across search and recommendation use cases, where we need to retrieve features associated with a large number of entities. We then score each entity by feeding its features into some ML model, and finally return the top K most relevant entities.
A prominent example of this is the LinkedIn Feed, which makes heavy use of batch gets. Including all Batch Get use cases, Venice serves 45M key lookups per second at peak.
Another way to get data out of Venice is via its declarative read computation DSL, which supports operations such as projecting a subset of columns, as well as executing vector math functions (e.g., dot product, cosine similarity, etc.) on the embeddings (i.e., arrays of floats) stored in the datasets.
Since 2019, these functions have been used by People You May Know (PYMK) to achieve horizontally scalable online deep learning. While it is common to see large scale deep learning get executed offline (e.g., via TonY), online deep learning tends to be constrained to a smaller scale (i.e., single instance). It is thus fairly novel, in our view, to achieve horizontal scalability in the context of online inference. The data stored in the PYMK dataset includes many large embeddings, and retrieving all of this data into the application is prohibitively expensive, given its latency budget. To solve this performance bottleneck, PYMK declares in its query what operations to perform on the data, and Venice performs it on PYMK’s behalf, returning only the very small scalar results. A previous talk given at QCon AI provides more details on this use case.
At peak, read compute use cases perform 46M key lookups per second, and every single one of these lookups also performs an average of 10 distinct arithmetic operations on various embeddings stored inside the looked up value.
So far, we have been exploring use cases that perform various kinds of remote queries against the Venice backend. These are the original and oldest use cases supported by Venice, and collectively we refer to them as Classical Venice. In 2020, we built an alternative client library called Da Vinci, which supports all of the same APIs we already looked at, but provides a very different cost-performance tradeoff, by serving all calls from local state rather than remote queries.
Da Vinci is a portion of the Venice server code, packaged as a client library. Besides the APIs of the Classical Venice client, it also offers APIs for subscribing to all or some partitions of the dataset. Subscribed partitions are fully loaded in the host application’s local state, and also eagerly updated by listening to Venice’s internal changelog of updates. In this way, the state inside Da Vinci mirrors that of the Venice server, and the two are eventually consistent with one other. All of the write methods described earlier are fully compatible with both clients.
The same dataset can be used in both Classical and Da Vinci fashion at the same time. The application can choose which client to use, and switch between them easily, if cost-performance requirements change over time.
Da Vinci is what we call an “eager cache,” in opposition to the more traditional read-through cache, which needs to be configured with a TTL. The difference between these kinds of caches is summarized in the following table:
As we can see, the main advantage of a read-through cache is that the size of the local state is configurable, but the drawback is needing to deal with the awkward tradeoff of tuning TTL, which pits freshness against hit rate. An eager cache provides the opposite tradeoff, not giving the ability to configure the size of the local state (besides the ability to control which partitions to subscribe to), but providing 100% hit rate and optimal freshness.
The easiest way to leverage Da Vinci is to eagerly load all partitions of the dataset of interest. This works well if the total dataset size is small—let’s say a few GB. We have some online applications in the ads space that do this.
Alternatively, Da Vinci also supports custom partitioning. This enables an application to control which partition gets ingested in each instance, and thus reduces the size of the local state in each instance. This is a flexible mechanism, which is usable in a variety of ways, but also more advanced. It is appropriate for use within distributed systems that already have a built-in notion of partitioning. In this way, Da Vinci becomes a building block for other data systems.
Galene, our proprietary search stack, uses Da Vinci to load AI feature data inside its searchers, where it performs ranking on many documents at high throughput. It leverages Da Vinci with its all-in-RAM configuration*, since the state is not that large (e.g., 10 GB per partition) and the latency requirement is very tight. These use cases usually have few partitions (e.g., 16) but can have a very high replication factor (e.g., 60), adding up to a thousand instances all subscribing to the same Venice dataset.
Samza stream processing jobs also leverage Da Vinci to load AI feature data. In this case, the state can be very large, in the tens of TB in total, so many more partitions are used (e.g., one to two thousand) to spread it into smaller chunks. For stream processing, only one replica is needed, or two if a warm standby is desired, so it is the inverse of the Galene use cases. These use cases leverage the SSD-friendly configuration* to be more cost-effective.
* N.B.: Both the all-in-RAM and SSD-friendly configurations are described in more detail in the storage section.
Venice was designed from the ground up for massive scale and operability, and a large portion of development efforts is continuously invested in these important areas. As such, the system has built-in support for, among other things:
- Operator-driven configuration
- Elastic & linear scalability
In the subsequent sections, we’ll discuss each of these in turn.
It is a requirement to have our data systems deployed in all data centers. Not all data systems have the same level of integration between data centers, however. In the case of Venice, the system is globally integrated in such a way that keeping data in-sync across regions is easy, while also leveraging the isolation of each data center to better manage risks.
The Push Job writes to all data centers, and we have Hadoop grids in more than one region, all of which can serve as the source of a push. Nearline writes are also replicated to all regions in an active-active manner, with deterministic conflict resolution in case of concurrent writes.
These are important characteristics for our users, who want data to keep getting refreshed even if the data sources are having availability issues in some of the regions.
Administrative operations, such as dataset creation or schema evolution, are performed asynchronously by each region, so that if a region has availability issues, it can catch up to what it missed when it becomes healthy again. This may seem like a trivial detail, but at LinkedIn we have datasets getting created and dropped every day, from users leveraging our self-service console and also from various systems doing these operations dynamically. As such, it is infeasible for Venice operators to supervise these procedures.
Operators also have the ability to repair the data in an unhealthy region by copying it from a healthy region. In rare cases of correlated hardware failures, this last resort mechanism can save the day.
Each region contains many Venice clusters, some of which are dedicated to very sensitive use cases, while others are heavily multi-tenant. Currently, there are 17 production Venice clusters per region, spanning a total of 4000 physical hosts.
Dataset to cluster assignment is fully operator-driven and invisible to users. Users writing to and reading from Venice need not know what cluster their dataset is hosted on. Moreover, operators can migrate datasets between clusters, also invisibly for users. Migration is used to optimize resource utilization, or to leverage special tunings that some clusters have.
Each Venice cluster may contain many datasets; our most densely populated cluster currently holds more than 500 datasets. In such an environment, it is critical to have mechanisms for preventing noisy neighbors from affecting each other. This is implemented in the form of quotas, of which there are three kinds, all configurable on a per-dataset basis.
There is a storage quota, which is the total amount of persistent storage a dataset is allowed to take. For Full Push jobs, this is verified prior to the push taking place, so that an oversized push doesn’t even begin to enter the system. For other write methods, there is an ongoing verification that can cause ingestion to stall if a dataset is going over its limit, in which case dataset owners are alerted and can request more quota.
There are two kinds of read quotas: one for the maximum number of keys queryable in a Batch Get query, and the other for the maximum rate of keys looked up per second. In other words, N Single Get queries cost the same amount of quota as one Batch Get query of N keys.
One of the core principles of LinkedIn’s data infrastructure teams is Invisible Infra, which means that the users should be empowered to achieve their business goals without needing to know how the infrastructure helps them fulfill those goals. An example of this is given in the previous multi-cluster section, in the form operator-driven cluster migration. There are many more such examples.
Venice supports end-to-end compression, which helps minimize storage and transmission costs. Various compression schemes are supported, and operators can choose to enable them on behalf of users. This configuration change takes effect on the next Full Push, which can be triggered by the user’s own schedule, or triggered by the operator. The latest compression scheme we integrated is Zstandard, which we use to build a shared dictionary as part of the Push Job. This dictionary is stored just once per dataset version, thus greatly reducing total storage space in cases where some bits of data are repeated across records. In the most extreme case, we’ve seen a dataset deflate by 40x!
Venice also supports repartitioning datasets, which is another type of configuration change taking effect on the next Full Push. Partitions are the unit of scale in the system, and as datasets grow, it is sometimes useful to be able to change this parameter. For a given dataset size, having a greater number of smaller partitions enables faster push and rebalancing (more on this later).
Venice has a pluggable storage engine architecture. Previously, Venice leveraged BDB-JE as its storage engine, but since then, we have been able to fully swap this for RocksDB, without needing any user intervention. Currently, we use two different RocksDB formats: block-based (optimized for larger-than-RAM datasets, backed by SSD) and plain table (optimized for all-in-RAM datasets). As usual, changing between these formats is operator-driven. A previous blog post chronicles some of our recent RocksDB-related challenges and how we solved them.
We are looking forward to seeing what the open source community will do with this abstraction, and whether new kinds of use cases may be achievable within the scope of Venice by plugging in new storage engines.
Venice leverages Apache Helix for partition-replica placements in each cluster, which enables the system to react readily to hardware failures. If servers crash, Helix automatically rebalances the partitions they hosted onto healthy servers. In addition, we leverage Helix’s support for rack-awareness, which ensures that at most one replica of a given partition is in the same rack. This is useful so that network switch maintenance or failures don’t cause us to lose many replicas of a partition at once. Rack-awareness is also useful in that it allows us to simultaneously bounce all Venice servers in a rack at once, thus greatly reducing the time required to perform a rolling restart of a full cluster. In these scenarios, Helix is smart enough to avoid triggering a surge of self-healing rebalance operations. A previous post provides more details on Venice’s usage of Helix, although the state model has been redesigned since the post was published.
Elastic & linear scalability
Again thanks to Helix, it is easy to inject extra hardware into a cluster and to rebalance the data into it. The exact same code path that is leveraged more than 1400 times a day to rewrite entire datasets is also leveraged during rebalance operations, thus making it very well-tested and robust. This important design choice also means that whenever we further optimize the write path (which we are more or less continuously doing), it improves both push and rebalance performance, thus giving more elasticity to the system.
Beyond this, another important aspect of scalability is to ensure that the capacity to serve read traffic increases linearly, in proportion to the added hardware. For the more intense use cases described in this post—Feed and PYMK—the traffic is high enough that we benefit from over-replicating these clusters beyond the number of replicas needed for reliability purposes. Furthermore, we group replicas into various sets, so that a given Batch Get or Read Compute query can be guaranteed to hit all partitions within a bounded number of servers. This is important, because otherwise the query fanout size could increase to arbitrarily high amounts and introduce tail latency issues, which would mean the system’s capacity scales sublinearly as the hardware footprint increases. This and many other optimizations are described in more detail in a previous blog post dedicated to supporting large fanout use cases.
It would be difficult to exhaustively cover all aspects of a system as large as Venice in a single blog post. Hopefully, this overview of functionalities and use cases gives a glimpse of the range of possibilities it enables. Below is a quick recap to put things in a broader context.
What is Venice for?
We have found Venice to be a useful component in almost every user-facing AI use case at LinkedIn, from the most basic to cutting edge ones like PYMK. The user-facing aspect is significant, since AI can also be leveraged in offline-only ways, in which case there may be no need for a scalable storage system optimized for serving ML features for online inference workloads.
Besides AI, there are other use cases where we have found it useful to have a derived data store with first-class support for ingesting lots of data in various ways. One example is our use of Venice as the stateful backbone of our A/B testing platform. There are other examples as well, such as caching a massaged view of source-of-truth storage systems, when cost and performance matter, but strong consistency does not.
At the end of the day, Venice is a specialized solution, and there is no intent for it to be anyone’s one and only data system, nor even their first one. Most likely, all organizations need to have some source of truth database, like a RDBMS, a key-value store, or a document store. It is also likely that any organization would already have some form of batch or stream processing infrastructure (or both) before they feel the need to store the output of these processing jobs in a derived data store. Once these other components are in place, Venice may be a worthwhile addition to complement an organization’s data ecosystem.
The road ahead
In this post, we have focused on the larger scale and most mature capabilities of Venice, but we also have a packed roadmap of exciting projects that we will share more details about soon.
We hope this upcoming chapter of the project’s life in the open source community will bring value to our peers and propel the project to the next level.
Real-time analytics on network flow data with Apache Pinot
The LinkedIn infrastructure has thousands of services serving millions of queries per second. At this scale, having tools that provide observability into the LinkedIn infrastructure is imperative to ensure that issues in our infrastructure are quickly detected, diagnosed, and remediated. This level of visibility helps prevent the occurrence of outages so we can deliver the best experience for our members. To provide observability, there are various data points that need to be collected, such as metrics, events, logs, and flows. Once collected, the data points can then be processed and made available, in real-time, for engineers to use for alerting, troubleshooting, capacity planning, and other operations.
At LinkedIn, we developed InFlow to provide observability into network flows. A network flow describes the movement of a packet through a network and is the metadata of a packet sampled at a network device that describes the packet in terms of the 5-tuple: source IP, source port, destination IP, destination port, and protocol. It may also contain source and destination autonomous system numbers (ASNs), the IP address of the network device that has captured this flow, input and output interface indices of the network device where the traffic was sampled, and the number of bytes transferred.
How LinkedIn leverages flow data
InFlow provides a rich set of time-series network data having over 50 dimensions such as source and destination sites, security zones, ASNs, IP address type, and protocol. With this data, various types of analytical queries can be run to get meaningful insights about network health and characteristics.
Figure 1. A screenshot from InFlow UI’s Top Services tab which shows the 5 services consuming the most network bandwidth and the variation of this traffic over the last 2 hours
Most commonly, InFlow is used for operational troubleshooting to get complete visibility into the traffic. For example, if there is an outage due to a network link capacity exhaustion, InFlow can be used to find out the top talkers for that link based on hosts/services that are consuming the most bandwidth (Figure 1) and based on the nature of the service, further steps can be taken to remediate the issue.
Flow data also provides source and destination ASN information, which can be used for optimizing cost, based on bandwidth consumption of different kinds of peering with external networks. It can also be used for analyzing data based on several dimensions for network operations. For example, finding the distribution of traffic by IPv4 or IPv6 flows or the distribution of traffic based on Type of Service (ToS) bits.
InFlow architecture overview
Figure 2. InFlow architecture
Figure 2 shows the overall InFlow architecture. The platform is divided into 3 main components: flow collector, flow enricher, and InFlow API with Pinot as a storage system. Each component has been modeled as an independent microservice to provide the following benefits:
- It enforces the single responsibility principle and prevents the system from becoming a monolith.
- Each of the components have different requirements in terms of scaling. Separate microservices ensure that each can be scaled independently.
- This architecture creates loosely coupled pluggable services which can be reused for other scenarios.
InFlow receives 50k flows per second from over 100 different network devices on the LinkedIn backbone and edge devices. InFlow supports sFlow and IPFIX as protocols for collecting flows from network devices. This is based on the device’s vendor support for the protocols and minimal impact of flow export on the device’s performance. The InFlow collector receives and parses these incoming flows, aggregates the data into unique flows for a minute, and pushes them to a Kafka topic for raw flows.
The data processing pipeline for InFlow leverages Apache Kafka and Apache Samza for stream processing of incoming flow events. Our streaming pipeline processes 50k messages per second, enriching the data with 40 additional fields (like service, source and destination sites, security zones, ASNs, and IP address type), which are fetched from various internal services at LinkedIn. For example, our data center infrastructure management system, InOps, provides information on the site, security zone, security domain of the source, and destination IPs for a flow. The incoming raw flow messages are consumed by a stream processing job on Samza and after adding the additional enriched fields, the result is pushed to an enriched Kafka topic.
InFlow requires storage of tens of TBs of data with a retention of 30 days. To support its real-time troubleshooting use case, the data must be queryable in real-time with sub-second latency so that engineers can query the data without any hassles during outages. For the storage layer, InFlow leverages Apache Pinot.
Figure 3. A screenshot from InFlow UI’s Explore tab which provides a self-service interface for users to visualize flow data by grouping and filtering on different dimensions
The InFlow UI is a dashboard with some of the commonly used visualizations on flow data pre-populated that provides a rich interface where the data can be filtered or grouped by any of the 40 different dimension fields. The UI also has an Explore section, which allows for creation of ad-hoc queries. The UI is based on top of InFlow API, which is a middleware responsible for translating user input into Pinot queries and issuing them to the Pinot cluster.
Pinot as a storage layer
In the first version of InFlow, data was ingested from the enriched Kafka topic to HDFS. We leveraged Trino for facilitating user queries on the data present in HDFS. However, the ETL and aggregation pipeline added a 15-20 minute delay to the pipeline, reducing the freshness of the data. Additionally, query latencies to HDFS using Presto were in the order of 15-30 seconds. This latency and delay was acceptable for doing historical data analytics, however, for real-time troubleshooting, the data needs to be available in real-time with a maximum delay of 1 minute.
Based on the query latency and data freshness requirements, we explored several storage solutions available at LinkedIn (like Espresso, Kusto, and Pinot) and decided on onboarding our data to Apache Pinot. When looking for solutions, we needed a reliable system providing real-time ingestion and sub-second query latencies. Pinot’s support for Lambda and Lamda-less architecture, real-time ingestion, and low latency at high throughput could help us achieve optimal results. Additionally, the Pinot team at LinkedIn is experimenting with supporting a new use case called Real-time Operational Metrics Analysis (ROMA), which enables engineers to slice and dice metrics along different combinations of dimensions to help monitor infrastructure near real-time, analyze the last few weeks/months/years of data to discover trends and patterns to forecast and plan capacity, and helps to find the root cause of outages quickly and reduce the time to recovery. These objectives aligned well with our problem statement of processing large numbers of metrics in real-time.
The Pinot ingestion pipeline consumes directly from the enriched Kafka topic and creates the segments on the Pinot servers, which improves the freshness of the data in the system to less than a minute. User requests from InFlow UI are converted to Pinot SQL queries and sent to the Pinot broker for processing. Since Pinot servers keep data and indices in cache-friendly data structures, the query latencies are a huge improvement from the previous version where data was queried from disk (HDFS).
Several optimizations were done to reach this query latency and ingestion parameters. Because the data volume for the input Kafka topic is high, several considerations were made to decide the optimal number of partitions in the topic to allow for parallel consumption into segments in Pinot after several experiments with the ingestion parameters. Most of our queries involved a regexp_like condition on the devicehostname column, which is the name of the network device that has exported the flow. This is used to narrow down on a specific plane of the network. regexp_like is inefficient as it cannot leverage any index so to resolve this, we set up an ingestion transformation using Pinot. These are various transformation functions that can be applied to your data before it is ingested into Pinot. The transformation created a derived column flowType, which classifies a flow based on the name of the network device that has exported this flow into a specific plane of the network. For example, if the exporting device is at the edge of our network, then this flow can be classified as an Internet-facing flow. The flowType column is now an indexed column used for equality comparisons instead of regexp_like and this helped improve query latency by 50%.
Queries from InFlow always request for data from a specific range in time. To improve query performance, timestamp based pruning was enabled on Pinot. This improved query latencies since only relevant segments are filtered in for processing based on the filter conditions on the timestamp column in queries. Based on the Pinot team’s input, indexes on the different dimension columns were set up to aid query performance.
Figure 4. Latency metric for InFlow API query for top flows in the last 12 hours before and after onboarding to Pinot
Following the successful onboarding of flow data to a real-time table on Pinot, freshness of data improved from 15 mins to 1 minute and query latencies were reduced by as much as 95%. For some of the more expensive queries, which took as much as 6 minutes using Presto queries, the query latency reduced to 4 seconds using Pinot.This has been helpful in making it easier for the network engineers at LinkedIn to easily get the data they need for troubleshooting or running real-time analytics on network flow data.
The current network flow data only provides us with sampled flows from the LinkedIn backbone and edge network. Skyfall is an eBPF-based agent, developed at LinkedIn, that collects flow data and network metrics from the host’s kernel with minimal overhead. The agent captures all flows for the host without sampling and will be deployed across all servers in the LinkedIn fleet. This would provide us with a 100% coverage of flows across our data centers and enable us to support more use cases on flow data that require unsampled information such as security audit and validation based use cases. Because the agent collects more data and from more devices, the scale of data collected by Skyfall is expected to be 100 times that of InFlow. We are looking forward to leveraging the InFlow architecture to support this scale and provide real-time analytics on top of the rich set of metrics exported by the Skyfall agent. Another upcoming feature that we are excited about is leveraging InFlow data for anomaly detection and more traffic analytics.
Onboarding our data to Pinot was a collaborative effort and we would like to express our gratitude to Subbu Subramaniam, Sajjad Moradi, Florence Zhang, and the Pinot team at LinkedIn for their patience and efforts in understanding our requirements and working on the optimizations required for getting us to the optimal performance.
Thanks to Prashanth Kumar for the continuous dialogue in helping us understand the network engineering perspective on flow data. Thanks to Varoun P and Vishwa Mohan for their leadership and continued support.
Musk Bid for More Data on Twitter Bot Accounts Denied by Judge
Ten Things Elon Musk’s Texts Reveal About the Twitter Deal
Meta Unveils ‘Make-A-Video’ AI Text-To-Video Generator: All Details
Twitter Tests Immersive Media Viewer With Vertical Scrolling for Video: All Details
Meta to Freeze Hiring to Cut Costs Following First Quarterly Revenue Drop, Plunging Profit: Report
Elon Musk’s Messages to Twitter CEO Parag Agrawal, Jack Dorsey Revealed Ahead of October Takeover Trial
Twitter Expanding Birdwatch Community Fact-Checking Programme With New Onboarding Process, More
Kiwi Farms’ Services Terminated by DDoS-Guard Over Hate Forum’s Violation of Acceptable Use Policy
Introducing Facebook Graph API v15.0 and Marketing API v15.0
Real-time analytics on network flow data with Apache Pinot
Career stories: Rejoining LinkedIn to scale our media infrastructure
How to Create a Black Friday eCommerce Strategy
FACEBOOK2 weeks ago
Summer of open source: building more efficient AI with PyTorch
FACEBOOK2 weeks ago
Meet the Developers – Linux Kernel Team (David Vernet)
FACEBOOK2 weeks ago
Get started with WhatsApp Business Platform in Minutes with Postman
FACEBOOK2 weeks ago
Introducing Facebook Reels API: an enterprise solution for desktop and web publishers
OTHER1 week ago
WhatsApp Working to Keep Iranians Connected Amid Widespread Internet Shutdown Over Nationwide Protests
OTHER2 weeks ago
Twitter Discloses, Fixes Bug That Prevented Account Logouts on All Devices After a Password Reset: Details
OTHER1 week ago
US Senate Panel Approves Bill Empowering News Organisations to Negotiate With Facebook, Google for Revenue
Uncategorized2 weeks ago
How to Download Instagram Videos: We Rank the Best Apps