Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to scale TFRS? #201

Open
dgoldenberg-audiomack opened this issue Jan 6, 2021 · 11 comments
Open

How to scale TFRS? #201

dgoldenberg-audiomack opened this issue Jan 6, 2021 · 11 comments
Assignees

Comments

@dgoldenberg-audiomack
Copy link

dgoldenberg-audiomack commented Jan 6, 2021

The documentation does not appear to offer much guidance on how to scale up a TFRS based solution, for example, here: https://www.tensorflow.org/recommenders/examples/basic_retrieval.

To start, let's consider a (common, I'm sure) scenario of N million users and M million items (such as movies). We would be envisioning at least millions (billions) of feature records also, but for the moment we could set that aside and contemplate just the 'basic retrieval' use-case/example.

There are at least 3 considerations while designing an actual recommender, in terms of scale:

  1. How to feed input data into TFRS. I'm starting with Parquet from our datalake. Due to the size of this data and various filtering and augmentation of data sets, I'm looking at having to read Parquet in a Spark job, transforming the data, then persisting it into an intermediary Parquet format ready to be consumed by TFRS. Issue: extra I/O. I don't see a way to do all the transformations in TF/TFRS, especially joins.
  2. The building of vocabularies. From the example:
unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

I can uniquefy my data in Spark when prepping Parquet for TFRS. However, it appears that building a model in TFRS is oriented toward using numpy arrays:

user_model = tf.keras.Sequential([
  tf.keras.layers.experimental.preprocessing.StringLookup(
      vocabulary=unique_user_ids, mask_token=None),
  # We add an additional embedding to account for unknown tokens.
  tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
])

How are vocabularies expected to perform with millions of ID's? how will this affect memory and if this is not expected to scale, what alternatives are there to vocabulary/numpy array approach?
3. Getting the recommendations out, at scale. In my use-case, I'm going to ask the model to generate more recommendations than actually needed, per user, so that I can perform some post-filtering based on a set of a few rules. I need a scalable way to convert tensors en masse to Spark datasets which will write Parquet or CSV back into the datalake. This may mean extracting recs as Parquet into intermediary files then having separate code to turn that into the final, post-processed results. Issue: extra I/O and processing.

All in all, what this feels like is potentially a need for a tighter integration with Spark datasets where it is easy and seamless to load TF datasets from Spark and then just as easy to convert the resulting recommendations (e.g. a tfrs.layers.factorized_top_k.BruteForce index) to Spark datasets.

So far, I don't seem to see an easy way to do this (?). Some considerations:

  • The LinkedIn's Spark-TFRecord library and the Spark Tensorflow Connector both require intermediary tfrecord files.
  • I'm "fishing" for a more Spark oriented way to build a TFRS model and then extract results en masse for millions of input users. Perhaps Uber's Petastorm is an answer for the input data load side. But what about TF datasets to Spark conversion?
@maciejkula
Copy link
Collaborator

We don't offer specific guidance because everything that applies to scaling TensorFlow models applies here equally.

  1. Preprocessing data. Here, a Spark job looks sensible. You could also have a look at TensorFlow Transform which uses Apache Beam for parallelizing transformations.
  2. Building of vocabularies. The StringLookup can accept either a numpy array or a path to a vocabulary file; either way millions of ids will be fine.
  3. Getting the recommendations out. For fast prediction I'd suggest using the ScaNN approach. It should be able to very efficiently generate top-K predictions you will then be able to post-process using Spark.

I'm afraid I know very little about Spark, but the basic approach of generating training data using Spark, and post-processing results using Spark again makes sense to me. I imagine you'd also be able to load the trained model into memory in your Spark functions and call it, so (3) could in principle be done entirely in Spark.

@maciejkula maciejkula self-assigned this Jan 6, 2021
@dgoldenberg-audiomack
Copy link
Author

dgoldenberg-audiomack commented Jan 6, 2021

Thanks, @maciejkula. This makes sense to me; I'll update the ticket as I experiment with larger datasets.

This is possibly a "naive" intuitive sense, however, "wouldn't it be nice" if TensorFlow's datasets could "ride" Spark datasets directly. It's probably wishful thinking but if there was a way to integrate Spark datasets directly into TF, as in, some TF dataset class being a subclass of Spark dataset, the framework would avoid having to deal with potentially massive amounts of intermediary files (on the input and on the output).

In other words, a TensorFlow dataset could become enormously powerful if it WERE a Spark dataset, with all the rich functionality it offers, including joins etc.etc.

I've filed tensorflow/tensorflow#46236 for this.

@MLnick
Copy link

MLnick commented Aug 26, 2021

The basics for Spark-TF interop here may be helpful?

@dgoldenberg-audiomack
Copy link
Author

dgoldenberg-audiomack commented Aug 26, 2021

Yes, @MLnick Spark-TF may seem as a place to look however, what it offers is a conversion of TF records into Spark.

Interop is the wrong word to use here; what I'm really after is the ability to do all dataprep, all TF training, and all data post-processing after the training, all in one cluster and all natively in Spark (or Flink, or Storm, or whatever one's clustering 'workhorse' happens to be).

The Spark-TF approach seems to imply that all 3 of these processes (dataprep, train, post-process) would be in separate clusters, or steps, and that is both cumbersome and expensive.

There is also TensorFlowOnSpark which purports to "By combining salient features from the TensorFlow deep learning framework with Apache Spark and Apache Hadoop, TensorFlowOnSpark enables distributed deep learning on a cluster of GPU and CPU servers." -- However, this is also obscure and hard to follow and integrate, IMO.

What I'm looking for is an easy way to do all 3 processes in one cluster and that would mean that TF would need to be natively Spark, i.e. a Tensor would be a Spark dataframe.

Additionally, I want to be able to distribute training easily on an AWS EMR cluster, and currently that's very cumbersome in TF. You have to specify a distribution strategy (and those are obscure). Then you have to jump through hoops in EMR to have it match that strategy. All the while you get long obscure stack traces from TF which doesn't like one thing or another and the error messages are cryptic.

What seems to be a lot more appropriate is BigDL, "a distributed deep learning library for Apache Spark; with BigDL, users can write their deep learning applications as standard Spark programs, which can directly run on top of existing Spark or Hadoop clusters."

However, for example, I wanted to use TF-recommenders and that "rides" TF, not BigDL. So then one has to think of porting TF-recommenders to BigDL.... An arduous experience.

As a side note, TF-recommenders looks very attractive as it appears to support the notion of content-based filtering (CBF) rather than just collaborative filtering (CF). The latter is what most recommender frameworks provide and works a-OK but eventually one is likely to want to integrate in complex features, and CF doesn't allow you to do that as it only works with user ID's, item ID's, and events (interactions).

@MLnick
Copy link

MLnick commented Aug 26, 2021

Indeed it's a problem that still has not been solved very well.

I think TF-on-spark is arguably one of the better options (still leaves a lot to be desired though) and as you mention BigDL a decent option too (but doesn't run on GPUs!).

Are you set on Spark? I think the PyData ecosystem projects especially Dask will have easier integration with TF, PyTorch etc.

@MLnick
Copy link

MLnick commented Aug 26, 2021

@dgoldenberg-audiomack
Copy link
Author

dgoldenberg-audiomack commented Aug 26, 2021

I'll look at Dask however, like with anything, many folks these days are on AWS EMR or some equivalent technology in their services. What most folks would want, I'd venture to say, is something that seamlessly runs on one of those.

Using Dask immediately poses the question of how to plop it onto EMR or the equivalent.

Spark-TF-distributor is another one of those seemingly obvious things to look at but I had a problem making heads or tails of it and never got a clear explanation of what it does and how. E.g. see my tickets:
tensorflow/ecosystem#184
tensorflow/ecosystem#178
tensorflow/ecosystem#177

IMO -- considering that a "regular developer" wants to just focus on solving the business problem at hand, all this stitching together of iffy frameworks is far from being a seamless experience. Once somewhat stitched, you run into a number of hefty stack traces and then you try to get those resolved, which may take weeks or months.

@MLnick
Copy link

MLnick commented Sep 2, 2021

@dgoldenberg-audiomack I agree, the experience is far from seamless currently.

The closest looking thing is actually: https://analytics-zoo.readthedocs.io/en/latest/doc/Orca/Overview/orca.html (from the BigDL team at Intel I think).

I plan to give it a go on YARN to see how it works!

@dgoldenberg-audiomack
Copy link
Author

That does look interesting, @MLnick. What sort of use-case are you playing with?

@MLnick
Copy link

MLnick commented Sep 2, 2021 via email

@dgoldenberg-audiomack
Copy link
Author

Nice. I think it would be great to have a full example of how to scale TFRS so it's easy to take and plop into AWS EMR or the like with few tweaks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants