In April 2022, Feathr was released under the Apache 2.0 license and we announced, in close conjunction with our Microsoft Azure partners, native integration and support for Feathr on Azure. Since being open sourced, Feathr has achieved substantial popularity among the machine learning operations (MLOps) community. It has been adopted by companies of various sizes across multiple industries and the community continues to grow rapidly. Most excitingly, more and more open-source enthusiasts are contributing code to Feathr.
It’s clear that many others experience the same pain points that Feathr aims to address. That’s why we are excited to share it with a broader audience and for Feathr to be adopted by a broader open-source community with help from LF AI & Data.
Donating to the LF AI & Data will help ensure that Feathr continues to grow and evolve across various dimensions, including visibility, user base, and contributor base. Also, the Feathr development team will have more opportunities to collaborate with other member companies and projects, such as achieving richer online store support via integration with Milvus and JanusGraph, and adopting open data lineage standard from OpenLineage. As a result, we hope Feathr helps AI engineers build and scale feature pipelines and feature applications in ways that push MLOps tech stacks and the industry forward for years to come.
The Feathr feature store provides an abstraction layer between raw data and ML models. This abstraction layer standardizes and simplifies feature definition, transformation, serving, storage, and access from within ML workflows or applications. Feathr empowers AI engineers to focus on feature engineering while it takes care of data serialization format, connecting to various databases, performance optimization, and credential management. More specifically, Feathr helps:
- Define features once and use them in different scenarios, like model training and model serving
- Create training dataset with point-in-time correct semantics
- Connect to various offline data sources (data lakes, and data warehouses), and then transform source data into features
- Deliver feature data from offline system to online store for faster online serving
- Discover features or share features among colleagues or teams with ease
To learn more please visit Feathr’s GitHub page here and our April 2022 blog, Open sourcing Feathr – LinkedIn’s feature store for productive machine learning.
Acceptance into the LF AI & Data indicates an important recognition from the Linux Foundation. We believe a large, diverse, healthy, and self-sustained Feathr open-source community is important. We’re excited for the new chapter of Feathr and to welcome more people into the Feathr community.
Open Sourcing Venice – LinkedIn’s Derived Data Platform
We are proud to announce the open sourcing of Venice, LinkedIn’s derived data platform that powers more than 1800 of our datasets and is leveraged by over 300 distinct applications. Venice is a high-throughput, low-latency, highly-available, horizontally-scalable, eventually-consistent storage system with first-class support for ingesting the output of batch and stream processing jobs.
Venice entered production at the end of 2016 and has been scaling continuously ever since, gradually replacing many other systems, including Voldemort and several proprietary ones. It is used by the majority of our AI use cases, including People You May Know, Jobs You May Be Interested In, and many more.
The very first commit in the Venice codebase happened a little more than eight years ago, but development picked up in earnest with a fully staffed team in the beginning of 2016, and continues to this day. In total, 42 people have contributed more than 3700 commits to Venice, which currently equates to 67 years of development work.
Throughout all this, the project traversed many phases, increasing in both scope and scale. The team learned many lessons along the way, which led to a plethora of minor fixes and optimizations, along with several rounds of major architectural enhancements. The last such round brought active-active replication to Venice and served as a catalyst for clearing many layers of tech debt. Against this backdrop, we think Venice is in the best position it’s ever been in to propel the industry forward, beyond the boundaries of LinkedIn.
This blog post provides an overview of Venice and some of its use cases, divided across three sections:
- Writing to Venice
- Reading from Venice
- Operating Venice at scale
Writing data seems like a natural place to start, since anyone trying out a new database needs to write data first, before reading it.
How we write into Venice also happens to be one of the significant architectural characteristics that makes it different from most traditional databases. The main thing to keep in mind with Venice is that, since it is a derived data store that is fed from offline and nearline sources, all writes are asynchronous. In other words, there is no support for performing strongly consistent online write requests, as one would get in MySQL or Apache HBase. Venice is not the only system designed this way—another example of a derived data store with the same constraints is Apache Pinot. This architecture enables Venice to achieve very high write throughputs.
In this section, we will go over the various mechanisms provided by Venice to swap an entire dataset, as well as to insert and update rows within a dataset. Along the way, we will also mention some use cases that leverage Venice, and reference previously published information about them.
The most popular way to write data into Venice is using the Push Job, which takes data from an Apache Hadoop grid and writes it into Venice. Currently, LinkedIn has more than 1400 push jobs running daily that are completely automated.
By default, the Venice Push Job will swap the data, meaning that the new dataset version gets loaded in the background without affecting online reads, and when a given data center is done loading, then read requests within that data center get swapped over to the new version of the dataset. The previous dataset version is kept as a backup, and we can quickly roll back to it if necessary. Older backups get purged automatically according to configurable policies. Since this job performs a full swap, it doesn’t make sense to run multiple Full Pushes against the same dataset at the same time, and Venice guards against this, treating it as a user error. Full Pushes are ideal in cases where all or most of the data needs to change, whereas other write methods described in the next sections are better suited if changing only small portions of the data.
By 2018, the roughly 500 Voldemort Read-Only use cases running in production had been fully migrated to Venice, leveraging its Full Push capability as a drop-in replacement.
A large portion of Full Push use cases are AI workflows that retrain daily or even several times a day. A prominent example of this is People You May Know. More generally, LinkedIn’s AI infrastructure has adopted a virtual feature store architecture where the underlying storage is pluggable, and Venice is the most popular choice of online storage to use with our feature store, Feathr, which was recently open sourced. Tight integration between Venice and Feathr is widely used within LinkedIn, and this integration will be available in a future release of Feathr.
Another way to leverage the Venice Push Job is in incremental mode, in which case the new data gets added to the existing dataset, rather than swapping it wholesale. Data from incremental pushes get written both to the current version and backup version (if any) of the dataset. Contrary to the default (Full Push) mode, Venice supports having multiple Incremental Push jobs run in parallel, which is useful in cases where many different offline sources need to be aggregated into a single dataset.
Besides writing from Hadoop, Venice also supports writing from a stream processor. At LinkedIn, we use Apache Samza for stream processing, and that is what Venice is currently integrated with. We are looking forward to hearing from the community about which other stream processors they would like to integrate with Venice. Because Venice has a sufficiently flexible architecture, integrating additional stream processors is possible.
Similar to incremental pushes, data written from stream processors gets applied both to the current and backup versions of a dataset, and multiple stream processing jobs can all write concurrently into the same dataset.
Since 2017, LinkedIn’s A/B testing experimentation platform has leveraged Venice with both full pushes and streaming writes for its member attribute stores. We will come back to this use case in the next section.
There is an alternative way to leverage a stream processor that is similar to a Full Push. If the stream processor is leveraging an input source such as Brooklin that supports bootstrapping the whole upstream dataset, then the owner of the stream processing job can choose to reprocess a snapshot of its whole input. The stream processor’s output is written to Venice as a new dataset version, loaded in the background, and, finally, the read traffic gets swapped over to the new version. This is conceptually similar to the Kappa Architecture.
We’ve described before on the blog our data standardization work, which helps us build taxonomies to power our Economic Graph. These standardization use cases started off using a different database, but since then have fully migrated their more than 100 datasets to Venice. Having built-in support for reprocessing allows standardization engineers to run a reprocessing job in hours instead of days, and to do more of them at the same time, both of which greatly increases their productivity.
For both incremental pushes and streaming writes, it is not only possible to write entire rows, but also to update specific columns of the affected rows. This is especially useful in cases where multiple sources need to be merged together into the same dataset, with each source contributing different columns.
Another supported flavor of partial updates is collection merging, which enables the user to declaratively add or remove certain elements from a set or a map.
The goal of partial updates is for the user to mutate some of the data within a row without needing to know the rest of that row. This is important because all writes to Venice are asynchronous, and it is therefore not possible to perform “read-modify-write” workloads. Compared to systems which provide direct support for read-modify-write workloads (e.g., via optimistic locking), Venice’s approach of declaratively pushing down the update into the servers is more efficient and better at dealing with large numbers of concurrent writers. It is also the only practical way to make hybrid workloads work (see below).
Hybrid write workloads
Venice supports hybrid write workloads that include both swapping the entire dataset (via Full Push jobs or reprocessing jobs) and nearline writes (via incremental pushes or streaming). All of these data sources get merged together seamlessly by Venice. The way this works is that after having loaded a new dataset version in the background, but before swapping the reads over to it, there is a replay phase where recent nearline writes get written on top of the new dataset version. When the replay is caught up, then reads get swapped over. How far back the replay should start from is configurable.
This previous blog post on Venice Hybrid goes into more detail. Of note, the bookkeeping implementation details described in that post have since been redesigned.
As of today, including both incremental pushes and streaming writes, there are more than 200 hybrid datasets in production.
Putting it all together
In summary, we can say that Venice supports two data sources: Hadoop and Samza (or eventually any stream processor, if the open source community wants it), which are the columns in the following table. In addition, Venice supports three writing patterns, which are the rows of the following table. This 2×3 matrix results in 6 possible combinations, all of which are supported.
|Full dataset swap||Full Push job||Reprocessing job|
|Insertion of some rows into an existing dataset||Incremental push job||Streaming writes|
|Updates to some columns of some rows||Incremental push job
doing partial updates
doing partial updates
In addition to supporting these individual workloads, Venice also supports hybrid workloads, where any of the “full dataset swap” methods can be merged with any of the insertion and update methods, thanks to the built-in replay mechanism.
In terms of scale, if we add up all of the write methods together, Venice at LinkedIn continuously ingests an average of 14 GB per second, or 39M rows per second. But the average doesn’t tell the full story, because Full Pushes are by nature spiky. At peak throughput, Venice ingests 50 GB per second, or 113M rows per second. These figures add up to 1.2 PB daily and a bit more than three trillion rows daily.
On a per-server basis, we throttle the asynchronous ingestion of data by throughput of both bytes and rows. Currently, our servers with the most aggressive tunings are throttling at 200 MB per second, and 400K rows per second, all the while continuing to serve low latency online read requests without hiccups. Ingestion efficiency and performance is an area of the system that we continuously invest in, and we are committed to keep pushing the envelope on these limits.
Now that we know how to populate Venice from various sources, let’s explore how data can be read from it. We will go over the three main read APIs: Single Gets, Batch Gets, and Read Compute. We will then look at a special client mode called Da Vinci.
The simplest way to interact with Venice is as if it were a key-value store: for a given key, retrieve the associated value.
LinkedIn’s A/B testing platform, mentioned earlier, is one of the heavy users of this functionality. Including all use cases, Venice serves 6.25M Single Get queries per second at peak.
It is also possible to retrieve all values associated with a set of keys. This is a popular usage pattern across search and recommendation use cases, where we need to retrieve features associated with a large number of entities. We then score each entity by feeding its features into some ML model, and finally return the top K most relevant entities.
A prominent example of this is the LinkedIn Feed, which makes heavy use of batch gets. Including all Batch Get use cases, Venice serves 45M key lookups per second at peak.
Another way to get data out of Venice is via its declarative read computation DSL, which supports operations such as projecting a subset of columns, as well as executing vector math functions (e.g., dot product, cosine similarity, etc.) on the embeddings (i.e., arrays of floats) stored in the datasets.
Since 2019, these functions have been used by People You May Know (PYMK) to achieve horizontally scalable online deep learning. While it is common to see large scale deep learning get executed offline (e.g., via TonY), online deep learning tends to be constrained to a smaller scale (i.e., single instance). It is thus fairly novel, in our view, to achieve horizontal scalability in the context of online inference. The data stored in the PYMK dataset includes many large embeddings, and retrieving all of this data into the application is prohibitively expensive, given its latency budget. To solve this performance bottleneck, PYMK declares in its query what operations to perform on the data, and Venice performs it on PYMK’s behalf, returning only the very small scalar results. A previous talk given at QCon AI provides more details on this use case.
At peak, read compute use cases perform 46M key lookups per second, and every single one of these lookups also performs an average of 10 distinct arithmetic operations on various embeddings stored inside the looked up value.
So far, we have been exploring use cases that perform various kinds of remote queries against the Venice backend. These are the original and oldest use cases supported by Venice, and collectively we refer to them as Classical Venice. In 2020, we built an alternative client library called Da Vinci, which supports all of the same APIs we already looked at, but provides a very different cost-performance tradeoff, by serving all calls from local state rather than remote queries.
Da Vinci is a portion of the Venice server code, packaged as a client library. Besides the APIs of the Classical Venice client, it also offers APIs for subscribing to all or some partitions of the dataset. Subscribed partitions are fully loaded in the host application’s local state, and also eagerly updated by listening to Venice’s internal changelog of updates. In this way, the state inside Da Vinci mirrors that of the Venice server, and the two are eventually consistent with one other. All of the write methods described earlier are fully compatible with both clients.
The same dataset can be used in both Classical and Da Vinci fashion at the same time. The application can choose which client to use, and switch between them easily, if cost-performance requirements change over time.
Da Vinci is what we call an “eager cache,” in opposition to the more traditional read-through cache, which needs to be configured with a TTL. The difference between these kinds of caches is summarized in the following table:
As we can see, the main advantage of a read-through cache is that the size of the local state is configurable, but the drawback is needing to deal with the awkward tradeoff of tuning TTL, which pits freshness against hit rate. An eager cache provides the opposite tradeoff, not giving the ability to configure the size of the local state (besides the ability to control which partitions to subscribe to), but providing 100% hit rate and optimal freshness.
The easiest way to leverage Da Vinci is to eagerly load all partitions of the dataset of interest. This works well if the total dataset size is small—let’s say a few GB. We have some online applications in the ads space that do this.
Alternatively, Da Vinci also supports custom partitioning. This enables an application to control which partition gets ingested in each instance, and thus reduces the size of the local state in each instance. This is a flexible mechanism, which is usable in a variety of ways, but also more advanced. It is appropriate for use within distributed systems that already have a built-in notion of partitioning. In this way, Da Vinci becomes a building block for other data systems.
Galene, our proprietary search stack, uses Da Vinci to load AI feature data inside its searchers, where it performs ranking on many documents at high throughput. It leverages Da Vinci with its all-in-RAM configuration*, since the state is not that large (e.g., 10 GB per partition) and the latency requirement is very tight. These use cases usually have few partitions (e.g., 16) but can have a very high replication factor (e.g., 60), adding up to a thousand instances all subscribing to the same Venice dataset.
Samza stream processing jobs also leverage Da Vinci to load AI feature data. In this case, the state can be very large, in the tens of TB in total, so many more partitions are used (e.g., one to two thousand) to spread it into smaller chunks. For stream processing, only one replica is needed, or two if a warm standby is desired, so it is the inverse of the Galene use cases. These use cases leverage the SSD-friendly configuration* to be more cost-effective.
* N.B.: Both the all-in-RAM and SSD-friendly configurations are described in more detail in the storage section.
Venice was designed from the ground up for massive scale and operability, and a large portion of development efforts is continuously invested in these important areas. As such, the system has built-in support for, among other things:
- Operator-driven configuration
- Elastic & linear scalability
In the subsequent sections, we’ll discuss each of these in turn.
It is a requirement to have our data systems deployed in all data centers. Not all data systems have the same level of integration between data centers, however. In the case of Venice, the system is globally integrated in such a way that keeping data in-sync across regions is easy, while also leveraging the isolation of each data center to better manage risks.
The Push Job writes to all data centers, and we have Hadoop grids in more than one region, all of which can serve as the source of a push. Nearline writes are also replicated to all regions in an active-active manner, with deterministic conflict resolution in case of concurrent writes.
These are important characteristics for our users, who want data to keep getting refreshed even if the data sources are having availability issues in some of the regions.
Administrative operations, such as dataset creation or schema evolution, are performed asynchronously by each region, so that if a region has availability issues, it can catch up to what it missed when it becomes healthy again. This may seem like a trivial detail, but at LinkedIn we have datasets getting created and dropped every day, from users leveraging our self-service console and also from various systems doing these operations dynamically. As such, it is infeasible for Venice operators to supervise these procedures.
Operators also have the ability to repair the data in an unhealthy region by copying it from a healthy region. In rare cases of correlated hardware failures, this last resort mechanism can save the day.
Each region contains many Venice clusters, some of which are dedicated to very sensitive use cases, while others are heavily multi-tenant. Currently, there are 17 production Venice clusters per region, spanning a total of 4000 physical hosts.
Dataset to cluster assignment is fully operator-driven and invisible to users. Users writing to and reading from Venice need not know what cluster their dataset is hosted on. Moreover, operators can migrate datasets between clusters, also invisibly for users. Migration is used to optimize resource utilization, or to leverage special tunings that some clusters have.
Each Venice cluster may contain many datasets; our most densely populated cluster currently holds more than 500 datasets. In such an environment, it is critical to have mechanisms for preventing noisy neighbors from affecting each other. This is implemented in the form of quotas, of which there are three kinds, all configurable on a per-dataset basis.
There is a storage quota, which is the total amount of persistent storage a dataset is allowed to take. For Full Push jobs, this is verified prior to the push taking place, so that an oversized push doesn’t even begin to enter the system. For other write methods, there is an ongoing verification that can cause ingestion to stall if a dataset is going over its limit, in which case dataset owners are alerted and can request more quota.
There are two kinds of read quotas: one for the maximum number of keys queryable in a Batch Get query, and the other for the maximum rate of keys looked up per second. In other words, N Single Get queries cost the same amount of quota as one Batch Get query of N keys.
One of the core principles of LinkedIn’s data infrastructure teams is Invisible Infra, which means that the users should be empowered to achieve their business goals without needing to know how the infrastructure helps them fulfill those goals. An example of this is given in the previous multi-cluster section, in the form operator-driven cluster migration. There are many more such examples.
Venice supports end-to-end compression, which helps minimize storage and transmission costs. Various compression schemes are supported, and operators can choose to enable them on behalf of users. This configuration change takes effect on the next Full Push, which can be triggered by the user’s own schedule, or triggered by the operator. The latest compression scheme we integrated is Zstandard, which we use to build a shared dictionary as part of the Push Job. This dictionary is stored just once per dataset version, thus greatly reducing total storage space in cases where some bits of data are repeated across records. In the most extreme case, we’ve seen a dataset deflate by 40x!
Venice also supports repartitioning datasets, which is another type of configuration change taking effect on the next Full Push. Partitions are the unit of scale in the system, and as datasets grow, it is sometimes useful to be able to change this parameter. For a given dataset size, having a greater number of smaller partitions enables faster push and rebalancing (more on this later).
Venice has a pluggable storage engine architecture. Previously, Venice leveraged BDB-JE as its storage engine, but since then, we have been able to fully swap this for RocksDB, without needing any user intervention. Currently, we use two different RocksDB formats: block-based (optimized for larger-than-RAM datasets, backed by SSD) and plain table (optimized for all-in-RAM datasets). As usual, changing between these formats is operator-driven. A previous blog post chronicles some of our recent RocksDB-related challenges and how we solved them.
We are looking forward to seeing what the open source community will do with this abstraction, and whether new kinds of use cases may be achievable within the scope of Venice by plugging in new storage engines.
Venice leverages Apache Helix for partition-replica placements in each cluster, which enables the system to react readily to hardware failures. If servers crash, Helix automatically rebalances the partitions they hosted onto healthy servers. In addition, we leverage Helix’s support for rack-awareness, which ensures that at most one replica of a given partition is in the same rack. This is useful so that network switch maintenance or failures don’t cause us to lose many replicas of a partition at once. Rack-awareness is also useful in that it allows us to simultaneously bounce all Venice servers in a rack at once, thus greatly reducing the time required to perform a rolling restart of a full cluster. In these scenarios, Helix is smart enough to avoid triggering a surge of self-healing rebalance operations. A previous post provides more details on Venice’s usage of Helix, although the state model has been redesigned since the post was published.
Elastic & linear scalability
Again thanks to Helix, it is easy to inject extra hardware into a cluster and to rebalance the data into it. The exact same code path that is leveraged more than 1400 times a day to rewrite entire datasets is also leveraged during rebalance operations, thus making it very well-tested and robust. This important design choice also means that whenever we further optimize the write path (which we are more or less continuously doing), it improves both push and rebalance performance, thus giving more elasticity to the system.
Beyond this, another important aspect of scalability is to ensure that the capacity to serve read traffic increases linearly, in proportion to the added hardware. For the more intense use cases described in this post—Feed and PYMK—the traffic is high enough that we benefit from over-replicating these clusters beyond the number of replicas needed for reliability purposes. Furthermore, we group replicas into various sets, so that a given Batch Get or Read Compute query can be guaranteed to hit all partitions within a bounded number of servers. This is important, because otherwise the query fanout size could increase to arbitrarily high amounts and introduce tail latency issues, which would mean the system’s capacity scales sublinearly as the hardware footprint increases. This and many other optimizations are described in more detail in a previous blog post dedicated to supporting large fanout use cases.
It would be difficult to exhaustively cover all aspects of a system as large as Venice in a single blog post. Hopefully, this overview of functionalities and use cases gives a glimpse of the range of possibilities it enables. Below is a quick recap to put things in a broader context.
What is Venice for?
We have found Venice to be a useful component in almost every user-facing AI use case at LinkedIn, from the most basic to cutting edge ones like PYMK. The user-facing aspect is significant, since AI can also be leveraged in offline-only ways, in which case there may be no need for a scalable storage system optimized for serving ML features for online inference workloads.
Besides AI, there are other use cases where we have found it useful to have a derived data store with first-class support for ingesting lots of data in various ways. One example is our use of Venice as the stateful backbone of our A/B testing platform. There are other examples as well, such as caching a massaged view of source-of-truth storage systems, when cost and performance matter, but strong consistency does not.
At the end of the day, Venice is a specialized solution, and there is no intent for it to be anyone’s one and only data system, nor even their first one. Most likely, all organizations need to have some source of truth database, like a RDBMS, a key-value store, or a document store. It is also likely that any organization would already have some form of batch or stream processing infrastructure (or both) before they feel the need to store the output of these processing jobs in a derived data store. Once these other components are in place, Venice may be a worthwhile addition to complement an organization’s data ecosystem.
The road ahead
In this post, we have focused on the larger scale and most mature capabilities of Venice, but we also have a packed roadmap of exciting projects that we will share more details about soon.
We hope this upcoming chapter of the project’s life in the open source community will bring value to our peers and propel the project to the next level.
Real-time analytics on network flow data with Apache Pinot
The LinkedIn infrastructure has thousands of services serving millions of queries per second. At this scale, having tools that provide observability into the LinkedIn infrastructure is imperative to ensure that issues in our infrastructure are quickly detected, diagnosed, and remediated. This level of visibility helps prevent the occurrence of outages so we can deliver the best experience for our members. To provide observability, there are various data points that need to be collected, such as metrics, events, logs, and flows. Once collected, the data points can then be processed and made available, in real-time, for engineers to use for alerting, troubleshooting, capacity planning, and other operations.
At LinkedIn, we developed InFlow to provide observability into network flows. A network flow describes the movement of a packet through a network and is the metadata of a packet sampled at a network device that describes the packet in terms of the 5-tuple: source IP, source port, destination IP, destination port, and protocol. It may also contain source and destination autonomous system numbers (ASNs), the IP address of the network device that has captured this flow, input and output interface indices of the network device where the traffic was sampled, and the number of bytes transferred.
How LinkedIn leverages flow data
InFlow provides a rich set of time-series network data having over 50 dimensions such as source and destination sites, security zones, ASNs, IP address type, and protocol. With this data, various types of analytical queries can be run to get meaningful insights about network health and characteristics.
Figure 1. A screenshot from InFlow UI’s Top Services tab which shows the 5 services consuming the most network bandwidth and the variation of this traffic over the last 2 hours
Most commonly, InFlow is used for operational troubleshooting to get complete visibility into the traffic. For example, if there is an outage due to a network link capacity exhaustion, InFlow can be used to find out the top talkers for that link based on hosts/services that are consuming the most bandwidth (Figure 1) and based on the nature of the service, further steps can be taken to remediate the issue.
Flow data also provides source and destination ASN information, which can be used for optimizing cost, based on bandwidth consumption of different kinds of peering with external networks. It can also be used for analyzing data based on several dimensions for network operations. For example, finding the distribution of traffic by IPv4 or IPv6 flows or the distribution of traffic based on Type of Service (ToS) bits.
InFlow architecture overview
Figure 2. InFlow architecture
Figure 2 shows the overall InFlow architecture. The platform is divided into 3 main components: flow collector, flow enricher, and InFlow API with Pinot as a storage system. Each component has been modeled as an independent microservice to provide the following benefits:
- It enforces the single responsibility principle and prevents the system from becoming a monolith.
- Each of the components have different requirements in terms of scaling. Separate microservices ensure that each can be scaled independently.
- This architecture creates loosely coupled pluggable services which can be reused for other scenarios.
InFlow receives 50k flows per second from over 100 different network devices on the LinkedIn backbone and edge devices. InFlow supports sFlow and IPFIX as protocols for collecting flows from network devices. This is based on the device’s vendor support for the protocols and minimal impact of flow export on the device’s performance. The InFlow collector receives and parses these incoming flows, aggregates the data into unique flows for a minute, and pushes them to a Kafka topic for raw flows.
The data processing pipeline for InFlow leverages Apache Kafka and Apache Samza for stream processing of incoming flow events. Our streaming pipeline processes 50k messages per second, enriching the data with 40 additional fields (like service, source and destination sites, security zones, ASNs, and IP address type), which are fetched from various internal services at LinkedIn. For example, our data center infrastructure management system, InOps, provides information on the site, security zone, security domain of the source, and destination IPs for a flow. The incoming raw flow messages are consumed by a stream processing job on Samza and after adding the additional enriched fields, the result is pushed to an enriched Kafka topic.
InFlow requires storage of tens of TBs of data with a retention of 30 days. To support its real-time troubleshooting use case, the data must be queryable in real-time with sub-second latency so that engineers can query the data without any hassles during outages. For the storage layer, InFlow leverages Apache Pinot.
Figure 3. A screenshot from InFlow UI’s Explore tab which provides a self-service interface for users to visualize flow data by grouping and filtering on different dimensions
The InFlow UI is a dashboard with some of the commonly used visualizations on flow data pre-populated that provides a rich interface where the data can be filtered or grouped by any of the 40 different dimension fields. The UI also has an Explore section, which allows for creation of ad-hoc queries. The UI is based on top of InFlow API, which is a middleware responsible for translating user input into Pinot queries and issuing them to the Pinot cluster.
Pinot as a storage layer
In the first version of InFlow, data was ingested from the enriched Kafka topic to HDFS. We leveraged Trino for facilitating user queries on the data present in HDFS. However, the ETL and aggregation pipeline added a 15-20 minute delay to the pipeline, reducing the freshness of the data. Additionally, query latencies to HDFS using Presto were in the order of 15-30 seconds. This latency and delay was acceptable for doing historical data analytics, however, for real-time troubleshooting, the data needs to be available in real-time with a maximum delay of 1 minute.
Based on the query latency and data freshness requirements, we explored several storage solutions available at LinkedIn (like Espresso, Kusto, and Pinot) and decided on onboarding our data to Apache Pinot. When looking for solutions, we needed a reliable system providing real-time ingestion and sub-second query latencies. Pinot’s support for Lambda and Lamda-less architecture, real-time ingestion, and low latency at high throughput could help us achieve optimal results. Additionally, the Pinot team at LinkedIn is experimenting with supporting a new use case called Real-time Operational Metrics Analysis (ROMA), which enables engineers to slice and dice metrics along different combinations of dimensions to help monitor infrastructure near real-time, analyze the last few weeks/months/years of data to discover trends and patterns to forecast and plan capacity, and helps to find the root cause of outages quickly and reduce the time to recovery. These objectives aligned well with our problem statement of processing large numbers of metrics in real-time.
The Pinot ingestion pipeline consumes directly from the enriched Kafka topic and creates the segments on the Pinot servers, which improves the freshness of the data in the system to less than a minute. User requests from InFlow UI are converted to Pinot SQL queries and sent to the Pinot broker for processing. Since Pinot servers keep data and indices in cache-friendly data structures, the query latencies are a huge improvement from the previous version where data was queried from disk (HDFS).
Several optimizations were done to reach this query latency and ingestion parameters. Because the data volume for the input Kafka topic is high, several considerations were made to decide the optimal number of partitions in the topic to allow for parallel consumption into segments in Pinot after several experiments with the ingestion parameters. Most of our queries involved a regexp_like condition on the devicehostname column, which is the name of the network device that has exported the flow. This is used to narrow down on a specific plane of the network. regexp_like is inefficient as it cannot leverage any index so to resolve this, we set up an ingestion transformation using Pinot. These are various transformation functions that can be applied to your data before it is ingested into Pinot. The transformation created a derived column flowType, which classifies a flow based on the name of the network device that has exported this flow into a specific plane of the network. For example, if the exporting device is at the edge of our network, then this flow can be classified as an Internet-facing flow. The flowType column is now an indexed column used for equality comparisons instead of regexp_like and this helped improve query latency by 50%.
Queries from InFlow always request for data from a specific range in time. To improve query performance, timestamp based pruning was enabled on Pinot. This improved query latencies since only relevant segments are filtered in for processing based on the filter conditions on the timestamp column in queries. Based on the Pinot team’s input, indexes on the different dimension columns were set up to aid query performance.
Figure 4. Latency metric for InFlow API query for top flows in the last 12 hours before and after onboarding to Pinot
Following the successful onboarding of flow data to a real-time table on Pinot, freshness of data improved from 15 mins to 1 minute and query latencies were reduced by as much as 95%. For some of the more expensive queries, which took as much as 6 minutes using Presto queries, the query latency reduced to 4 seconds using Pinot.This has been helpful in making it easier for the network engineers at LinkedIn to easily get the data they need for troubleshooting or running real-time analytics on network flow data.
The current network flow data only provides us with sampled flows from the LinkedIn backbone and edge network. Skyfall is an eBPF-based agent, developed at LinkedIn, that collects flow data and network metrics from the host’s kernel with minimal overhead. The agent captures all flows for the host without sampling and will be deployed across all servers in the LinkedIn fleet. This would provide us with a 100% coverage of flows across our data centers and enable us to support more use cases on flow data that require unsampled information such as security audit and validation based use cases. Because the agent collects more data and from more devices, the scale of data collected by Skyfall is expected to be 100 times that of InFlow. We are looking forward to leveraging the InFlow architecture to support this scale and provide real-time analytics on top of the rich set of metrics exported by the Skyfall agent. Another upcoming feature that we are excited about is leveraging InFlow data for anomaly detection and more traffic analytics.
Onboarding our data to Pinot was a collaborative effort and we would like to express our gratitude to Subbu Subramaniam, Sajjad Moradi, Florence Zhang, and the Pinot team at LinkedIn for their patience and efforts in understanding our requirements and working on the optimizations required for getting us to the optimal performance.
Thanks to Prashanth Kumar for the continuous dialogue in helping us understand the network engineering perspective on flow data. Thanks to Varoun P and Vishwa Mohan for their leadership and continued support.
Career stories: Rejoining LinkedIn to scale our media infrastructure
Originally from Argentina, systems & infrastructure engineering leader Federico was a founding member of the Media Infrastructure team in 2015. Now based in Bellevue, Wash., Federico shares how his supportive mentor, LinkedIn’s “sweet spot” scale, and the distinctive engineering challenges here ultimately brought him back to LinkedIn in 2019.
My love for engineering started in my home country of Argentina. After working as an engineer in a corporate setting for a few years, I decided to start my own company focused on custom software development. I loved the interesting problems I could solve every day for my clients, but I was searching for greater economic opportunities in the U.S., where most of my clients were based. After working as a contractor for YouTube, I found my passion for media and engineering of video systems.
Joining and rejoining LinkedIn
When LinkedIn reached out to me with an opportunity to build their video platform in 2015, I jumped at the chance. It was thrilling to join LinkedIn at a time when we launching in-feed video. What originally started as a team of two grew to nine people, and that’s when LinkedIn began training me to step into my first management role for the Media Infrastructure team.
After growing in my management position for a few years, I left LinkedIn for an opportunity working on larger scale systems. But I quickly became burned out and missed my original role as an individual contributor at LinkedIn. My previous manager at LinkedIn was so supportive. I was offered a role as a technical architect (i.e., Senior Staff) for media infrastructure, which allowed me to return to LinkedIn with new technical knowledge, and the same passion for my work.
Making the move to a new LinkedIn home base
Once our team had grown to almost 40 people, we reached the point at which it made sense to look for additional engineering talent outside the San Francisco Bay and New York City areas. It is challenging to find engineers in the media domain since very few companies are doing what LinkedIn does at scale. That’s when we started considering the next office location as an opportunity to bring in more talent.
Ultimately, we decided on Bellevue, Washington. After eight years in the Bay Area, I was ready for a move, and Bellevue was the right fit for my wife and me for many reasons. For example, many of the media companies we partnered with had a strong engineering presence in Seattle. Our driving motivation was to spearhead the company culture and to build an identity for a new LinkedIn office. The Bellevue office just turned one year old and we have been able to build a thriving engineering community here that’s growing quickly.
Taking ownership and giving back
In my current role as a Principal Staff Software Engineer, I love that I can mix the technical side of engineering with driving the strategic and product roadmap for my organization.
As an infrastructure engineer, there’s a sweet spot here between the scale of your work and the size of your engineering team at LinkedIn. We have relatively small teams tackling very large problems in complex technical domains. This creates great opportunities for individual ownership over a significant engineering problem on a large scale. We have space to get involved and truly make a difference instead of simply being a cog in a wheel.
Throughout my time in Silicon Valley, so many mentors were instrumental in shaping my career. As I’ve grown, I’ve tried to prioritize paying it forward by mentoring my team and other engineers at LinkedIn. Relationships matter, especially at LinkedIn. Building your network is a really core value here, because we thrive on connections.
More About Federico
Based in Bellevue, Washington, Federico is a Principal Staff Systems & Infrastructure Engineer on LinkedIn Media Infrastructure team. Prior to his time at LinkedIn, Federico’s engineering career led him from launching his own software development company, ESTUDIO42, to software engineering roles at YouTube and Instagram. Federico holds a degree in Computer Engineering from the Universidad Nacional de Tucuman in Argentina. Outside of work, Federico enjoys traveling with his wife, cooking, visiting shuttle expeditions, and mixing music.
Instagram Story Time Limit Increased to 60 Seconds: Report
Elon Musk, Twitter CEO Parag Agrawal Said to Postpone Depositions Ahead of Upcoming Trial
Social Media Activism in 2022: How to Go Beyond the Hashtag
Open Sourcing Venice – LinkedIn’s Derived Data Platform
WhatsApp Call Links Support Rolling Out, 32-Member Group Video Call Testing Also Begins
Twitter Says 50-60 Percent of Tweets in Government Takedown Orders Are ‘Innocuous’: Details
Twitter Expanding Birdwatch Community Fact-Checking Programme With New Onboarding Process, More
Operating system upgrades at LinkedIn’s scale
Career stories: Rejoining LinkedIn to scale our media infrastructure
Introducing Facebook Graph API v15.0 and Marketing API v15.0
Kiwi Farms’ Services Terminated by DDoS-Guard Over Hate Forum’s Violation of Acceptable Use Policy
How to Write Your Best Social Media Bio [+28 Free Templates]
FACEBOOK2 weeks ago
Introducing Facebook Graph API v15.0 and Marketing API v15.0
LINKEDIN2 weeks ago
Real-time analytics on network flow data with Apache Pinot
Uncategorized2 weeks ago
The 12 Best Chatbot Examples for Businesses
FACEBOOK2 weeks ago
Meet the Developers: Linux Kernel Team (Jonathan Zhang)
FACEBOOK7 days ago
Summer of open source: building more efficient AI with PyTorch
Uncategorized2 weeks ago
I Tried Instagram Automation (So You Don’t Have To): An Experiment
FACEBOOK1 week ago
Introducing Facebook Reels API: an enterprise solution for desktop and web publishers
Uncategorized2 weeks ago
Experiment: What Reels Caption Length Gets the Best Engagement?