Suppliers are an integral part of Wayfair, and providing them with real-time and high-quality data with flexibility is important for our mission. Today I’m going to highlight two specific areas: how Wayfair is leveraging real-time data to provide suppliers with up-to-date sales information and how our team has tackled some of the problems that emerged along the journey.
The entire process begins with a customer action. This could be the purchase of an item, a product return, an exchange, and more. It’s our team that captures the information as an order event. These events include multiple properties such as:
- Time the order placed
- Amount of the sale
- Category of the item sold (e.g., outdoor lighting, small appliances, etc.…)
- Discount type
- Supplier product number
The order event details are added in real-time to our data access layer, making them available to suppliers. We do this using API’s, which allow us to provide aggregated views across different properties and fine-grained views via filter options. This may seem easy, but these are significant challenges that come when processing large quantities of data and addressing flexible requirements around aggregation granularities, all in real-time. These challenges include:
- Scalability
- Enriching data that cannot be calculated in real-time
- Storing events that provide API flexibility
In this article, we will introduce our API, what it is, and how we addressed the challenges outlined above.
API
The current API supports the following:
- Single Level Aggregation: These are done on a single dimension. Examples include date, category ID, and supplier product number.
- Multi-Level Aggregation: These are done on multiple levels, which allows them to provide a more detailed view. Examples here include date and category ID or date
- Filter Options: Here, we use the two levels of aggregations mentioned above which allows suppliers to add filters. These filters let them drill into specific products, categories, etc. Currently, suppliers can filter on nine different parameters, and since the solution is flexible, we can include more with minimum changes.
Architecture
Our architecture provides near real-time data, which includes information from suppliers. The data can scale to our future use cases and offers different granularity queries with any combination of filter options.
One of the first problems we faced when building the application was how to enrich our data with values that cannot be calculated in real-time. The answer was to piggyback on one of the known data architectures, the Lambda architecture.
Before going ahead, let’s take a detour to discuss the two famous data processing architectures, Lambda and Kappa.
Lambda Architecture
Lambda architecture is a mix of real-time data processing and batch layer processing. It consists primarily of three parts:
- Batch Layer: A sequential batch layer that keeps updating/fixing/enriching existing data
- Speed Layer (Stream Layer): Event streaming layer which process data in real-time
- Serving Layer: A serving layer where a user can access both the batch and real-time data.
Kappa Architecture
Kappa architecture is event-based and can handle data at scale in real-time. The kappa architecture consists of only two layers:
- Realtime layer: Event streaming layer which process data in real-time
- Serving layer: Layer where the user can interact with the data.
Kappa architecture has many advantages over Lambda architecture:
- Single architecture for streaming and batch
- One codebase
- One infrastructure
- Single source of truth
Unfortunately, for our use case, the adjustment in revenue comes weeks or even months after the order is placed. It is also not available in Kafka since they are processed on the backend with finance teams. This is why we are using Lambda architecture for this scenario. In the future, our hope is to leverage this data in real-time, which will allow us to move entirely to Kappa architecture.
Realtime Layer
Currently, we use Kafka to capture our real-time data and partitions to scale our application into many different workers. When it comes to real-time data ingesting, we are using Flink streaming. Flink provides a scalable data processing framework and a rich set of high-level APIs.
Here’s how these elements work together. Kafka messages are fed into the Flink job for transformation and enrichment. For the enrichment step, we use an external database. Flink provides a rich set of API that allows us to access this external data. We are using the Async IO-RichAsyncFunction to query different tables to enrich the data.
When we interact with external systems (DB), there are significant communication delays. Our preference is to parallelize the DB calls while still maintaining backpressure. This helps ensure that the database does not get overloaded. Flink’s Async I/O API allows asynchronous request clients with data streams. This API provides:
- Integration with data streams
- Handling order
- Event time
- Fault-tolerant
When conducting asynchronous operations, we use the following parameters:
- Timeout: How long before the request is labeled as failed
- Capacity: How many asynchronous requests can be in progress at the same time
With asynchronous calls, we can overlay the sending of multiple requests, which provides us with a higher streaming throughput. Async IO API also lets us control the order of the results:
- Unordered: Result records are emitted as soon as the asynchronous requests are finished.
- Ordered: The steam order is preserved. Result orders are emitted in the same order as the asynchronous requests are triggered.
Backpressure working with aysnc API
Asynchronous APIs also ensure that we do not place too much pressure on the database. When async I/O requests timeout, we throw an exception and restart our streaming job.
Checkpoints
To make our streaming application more fault-tolerant, we use checkpoints. These allow Flink to recover state and position in the streams. This gives the application the same semantics as a failure-free execution. We also execute a commit to Kafka when the checkpoint is done. This ensures we don’t reprocess any messages.
For storing the checkpoints we use Google cloud storage.
Checkpoint configuration:
Batch
A supplier's final sales value has many adjustments, which cannot be made in real-time. These adjustments can come months after the actual order. That’s when invoices are generated, and advertisements and fixed costs are calculated.
Our batch layer is primarily powered by our data warehouse-BigQuery. BigQuery is a serverless data warehouse solution offered by Google Cloud Services, which is also scalable and highly available.
For the batch processing pipeline, we look into the worst-case scenario of three years of data, find the delta information and pass it on to our data access layer to make the adjustments.
We then leverage BigQuery to produce the final delta, which is pulled by a batch Flink job and pushed to our data layer.
Data access layer
The second problem we faced was how to represent our data so that we could provide millisecond response time with different aggregation values and unknown filter combinations. We had two different approaches to which we could explore:
- Pre-computed data
- Event level storage
On one hand, pre-computed fields provide us with speed improvement. However, on filtration side, we lose flexibility.
Here are some highlights of the problems we saw with pre-computed data:
- Upfront knowledge of what all filtration combinations can be used and build our pre-computed data based on that.
- Adding filtration would result in the re-computation of all the precomputed data.
- Rigid API are limited to certain upfront filtering options.
- It’s harder to distinguish between events such as purchase vs cancellation
- Handling of late-arriving and duplicate event is difficult?
For the above-mentioned reason, we looked into storing our data on the lowest possible granularity level, using technology that could provide aggregation and filtration on events with reasonable speed.
We choose Elastic search as our data access layer because it would provide unparalleled data filtration and aggregation features. We used elastic search internal capabilities such as data sharding, caching and fast aggregation via eager global ordinals on properties. Using Elastic search provided us with a sub-power millisecond response time and flexibility of filtration and aggregation.
Late arriving and duplicate events can also be easily handled. This is done by setting a unique key per event. For our use case, we used a combination of order-product ID, transaction type, and transaction ID.
Above we show the elastic search index time and query latency. As we can see both indexing and response time are in the factor of 10 milliseconds, which gives us satisfactory performance on both query and indexing time.
Below we highlight some lessons learned from our Elastic search setup.
Sharding
Shard is the unit at which Elasticsearch distributes data around the cluster and helps scale.
We started off with five shards, which soon caused a query performance hit. We then modified our shards to ten which resulted in improvements in our query performance.
Eager global ordinance
As we are aggregating on many different parameters, we started seeing improvement on response time after setting eager global ordinance.
Elasticsearch uses a data structure called doc values which support aggregations and other operations that require looking up field values on a per-document basis. Term-based field types such as keyword store their doc values using an ordinal mapping for a more compact representation. This ordinal mapping is segment based. When used during aggregations, ordinals can greatly improve performance.
Each index segment defines its own ordinal mapping, but aggregations collect data across an entire shard.
For our team to have the ability to use ordinals for shard-level operations like aggregations, Elasticsearch creates a unified mapping called global ordinals. The global ordinal mapping is built on top of segment ordinals and works by maintaining a map from the global ordinal to the local ordinal for each segment.
Moving to more data nodes
Data nodes hold data and perform data-related operations such as CRUD, search, and aggregations. Data nodes perform all IO, memory and CPU intensive tasks. By increasing them, we were able to distribute our traffic better. Moving from 3 to 4 data nodes also helped us parallelize our queries and thus reducing the response time.
Summary
By making order information available to suppliers in real time rather than working with 24-hour delay allows suppliers to take many useful actions on our platform. On our journey to building the service, we tackled many restrictions, such as data size, missing information, late-arriving information, flexibility around API, and scaling our data access layer.
Today we are leveraging scalable solutions such as Kafka, Flink, and elastic search to handle our data size. We also use Lambda architecture to backfill the missing data, which cannot be calculated in real-time, and in the future, we will only use Kappa architecture. In addition, we use Flink Async IO to fill in data that is not available via Kafka.
To satisfy our API requirements and data access patterns, we stored event-level data in elastic search and tuned elastic search with appropriate features such as a global ordinance, sharding, and more data nodes.
In our current architecture, we have enabled future use cases such as trends for orders vs. cancellations vs returns for suppliers, future addition of aggregates and filters without code change, and adding additional data sources.
In future, we hope to decouple our architecture from the database and move from Kappa to Lambda architecture.