Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Anyone able to load tfrecords into TFRS generated with the spark-to-tf-records connector? #197

Open
dgoldenberg-audiomack opened this issue Dec 28, 2020 · 15 comments

Comments

@dgoldenberg-audiomack
Copy link

Anyone see this kind of error when trying to load TF records generated from Spark by the spark to tf records connector or linkedin's spark tf record library?

Error: Error when deserializing tfrecord's in TF 2.x: Only integers, slices (:), ellipsis (...), tf.newaxis (None) and scalar tf.int32/tf.int64 tensors are valid indices

Filed tickets there with details

Really just doing a simple thing, using the small movielens dataset:

    # Code for the connector
    movies_df.write.format("tfrecords").mode("overwrite").save(tf_movies_dir)
    ratings_df.write.format("tfrecords").mode("overwrite").save(tf_ratings_dir)

    # Alternatively, code for the spark to tfrecord
    movies_df.write.format("tfrecord").mode("overwrite").option("recordType", "Example").save(tf_movies_dir)
    ratings_df.write.format("tfrecord").mode("overwrite").option("recordType", "Example").save(tf_ratings_dir)

    s3 = boto3.resource("s3", verify=False)
    bucket = s3.Bucket("mybucket")

    filenames = []
    for object_summary in bucket.objects.filter(
            Prefix=f"emr/spark_apps/myapp/movielens-100k-conversion/movies-0001/part"
    ):
        filenames.append(os.path.join("s3://audiomack-master-airflow/", object_summary.key))
    movies_dataset = tf.data.TFRecordDataset(filenames)

    filenames = []
    for object_summary in bucket.objects.filter(
            Prefix=f"emr/spark_apps/myapp/movielens-100k-conversion/ratings-0001/part"
    ):
        filenames.append(os.path.join("s3://audiomack-master-airflow/", object_summary.key))
    ratings_dataset = tf.data.TFRecordDataset(filenames)
@dgoldenberg-audiomack
Copy link
Author

That said, there's the CSV consuming api's... https://www.tensorflow.org/guide/data#consuming_csv_data, wonder if that's just a better, more direct approach anyway. I have Parquet data to consume; in theory should be able to load it as described here tensorflow/io#1121

@maciejkula
Copy link
Collaborator

It might help folks answer this if you post the entire stack trace of your error. It's hard to say what's going on based on your summary of the error message.

@dgoldenberg-audiomack
Copy link
Author

Hi @maciejkula, there are two slightly different stack traces in the linked tickets I've included.

However here's one of them:

Traceback (most recent call last):
File "/mnt/tmp/spark-9eb04118-c94f-4ed6-bb6b-75927f4d389f/recsys_tfrs_proto.py", line 304, in
main(sys.argv)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 302, in wrapper
return func(*args, **kwargs)
File "/mnt/tmp/spark-9eb04118-c94f-4ed6-bb6b-75927f4d389f/recsys_tfrs_proto.py", line 50, in main
movies, test, train, unique_movie_titles, unique_user_ids = prepare_data(movies, ratings)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 302, in wrapper
return func(*args, **kwargs)
File "/mnt/tmp/spark-9eb04118-c94f-4ed6-bb6b-75927f4d389f/recsys_tfrs_proto.py", line 155, in prepare_data
ratings = ratings.map(lambda x: {"movie_title": x["movie_title"], "user_id": x["user_id"]})
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 1695, in map
return MapDataset(self, map_func, preserve_cardinality=True)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 4045, in init
use_legacy_function=use_legacy_function)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 3371, in init
self._function = wrapper_fn.get_concrete_function()
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/eager/function.py", line 2939, in get_concrete_function
*args, **kwargs)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/eager/function.py", line 2906, in _get_concrete_function_garbage_collected
graph_function, args, kwargs = self._maybe_define_function(args, kwargs)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/eager/function.py", line 3213, in _maybe_define_function
graph_function = self._create_graph_function(args, kwargs)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/eager/function.py", line 3075, in _create_graph_function
capture_by_value=self._capture_by_value),
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/framework/func_graph.py", line 986, in func_graph_from_py_func
func_outputs = python_func(*func_args, **func_kwargs)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 3364, in wrapper_fn
ret = _wrapper_helper(*args)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/data/ops/dataset_ops.py", line 3299, in _wrapper_helper
ret = autograph.tf_convert(func, ag_ctx)(*nested_args)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/autograph/impl/api.py", line 302, in wrapper
return func(*args, **kwargs)
File "/mnt/tmp/spark-9eb04118-c94f-4ed6-bb6b-75927f4d389f/recsys_tfrs_proto.py", line 155, in
ratings = ratings.map(lambda x: {"movie_title": x["movie_title"], "user_id": x["user_id"]})
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/util/dispatch.py", line 201, in wrapper
return target(*args, **kwargs)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/ops/array_ops.py", line 986, in _slice_helper
_check_index(s)
File "/usr/local/lib64/python3.7/site-packages/tensorflow/python/ops/array_ops.py", line 865, in _check_index
raise TypeError(_SLICE_TYPE_ERROR + ", got {!r}".format(idx))
TypeError: Only integers, slices (:), ellipsis (...), tf.newaxis (None) and scalar tf.int32/tf.int64 tensors are valid indices, got 'movie_title'

@maciejkula
Copy link
Collaborator

maciejkula commented Dec 29, 2020

It looks like the error is in the map function:

ratings.map(lambda x: {"movie_title": x["movie_title"], "user_id": x["user_id"]})

Whatever the elements of ratings are, they are not dictionaries. What are they?

@dgoldenberg-audiomack
Copy link
Author

dgoldenberg-audiomack commented Dec 29, 2020

That's true, unlike with tfds.load, when using the connector and then feeding the files into a tf.data.TFRecordDataset, the elements are not dictionaries, rather they're of type class bytes.

The question is, how does one decode these bytes?

b'\ns\n\x11\n\x08movie_id\x12\x05\x1a\x03\n\x01\x01\n#\n\x0bmovie_title'\x12\x14\n\x12\n\x10Toy Story (1995)\n9\n\x06genres\x12/\n-\n+Adventure|Animation|Children|Comedy|Fantasy'

Clearly this has movie_id, movie_title, and genres. .decode("utf-8") doesn't work and this doesn't cleanly un-hexify, either.

Similarly, for ratings
b'\n|\n\x10\n\x07user_id\x12\x05\x1a\x03\n\x01\x01\n\x11\n\x08movie_id\x12\x05\x1a\x03\n\x01\x1f\n)\n\x0bmovie_title\x12\x1a\n\x18\n\x16Dangerous Minds (1995)\n\x12\n\x06rating\x12\x08\x12\x06\n\x04\x00\x00 @\n\x16\n\ttimestamp\x12\t\x1a\x07\n\x05\xe8\xd0\x96\xd9\x04'

Wonder if there's a utility method in tf somewhere..

@maciejkula
Copy link
Collaborator

I'm pretty sure the problem is on the writing side. What things do you expect Spark to be writing to these files? Is it writing what you think it's writing?

@dgoldenberg-audiomack
Copy link
Author

dgoldenberg-audiomack commented Dec 29, 2020

It's writing the right keys (column names) and the right values, but I don't know enough about the internal tf record format. So this binary blob arrangement I can't pass a judgement on as far as correctness and the encoding it's using. It seems that there's some kind of a disconnect between what's being written and how it's being read into the dataset.

Maybe it's a TF 1.x vs. TF 2.x issue. Or maybe when reading into a TF dataset, tfrecords need to pass through another conversion layer.

All in all, not a critical issue for me because now I want to load Parquet into TF datasets directly; experimenting with that now. I had thought I'd have to write Spark dataframes into intermediary tfrecord files first then load them into tf datasets. But there seems to be a more direct way to just go Parquet -> DS.

That said, I think the load of tfrecord files into a TF DS "should work".

@maciejkula
Copy link
Collaborator

Have you looked at the docs for reading TFRecord files containing tf.train.Examples?

It looks like you're skipping the deserialization step (converting the serialized tf.train.Example protos to dictionaries of tensors).

@dgoldenberg-audiomack
Copy link
Author

Oic, you mean this? -

parsed_dataset = raw_dataset.map(_parse_function)

It seems odd that one has to load the tfrecords into a dataset, then convert it while instructing it what's in the files :)

I would think that the dataset should be able to do the parsing itself, encapsulating the knowledge of how to parse these files...

I'll try this out a little later

@dgoldenberg-audiomack
Copy link
Author

We can probably close the ticket; it may be worth mentioning in the docs that the 'parse' type of transformation is required; it's not obvious especially to a noob :)

@Data-Jack
Copy link

@dgoldenberg-audiomack
Did you ever find a solution for this? I think I am experiencing a simlar issue when writing array of array column types.

I gtet the error "InvalidArgumentError: Feature: cold (data type: float) is required but could not be found. [Op:ParseSingleExample]" even though "cold" is clearly in the message byte string.

from pyspark.sql.types import *

data = [("A", 1, [1.1,2.0,3.0], [[0.0,1.0],[1.0,0.0]]),
         ("B", 2, [11.0,12.3,13.0], [[0.0,1.0],[1.0,0.0]]),
         ("C", 3, [21.0,22.0,23.5], [[1.0,0.0],[1.0,0.0]]),
  ]

schema = StructType([ \
    StructField("colA",StringType(),True), \
    StructField("colb",IntegerType(),True), \
    StructField("colc",ArrayType(FloatType()),True), \
    StructField("cold",ArrayType(ArrayType(FloatType())),True), \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
# df = 
#Write 
write_path = "/home/pathtowrite"

df.write.format("tfrecords").option("recordType", "SequenceExample").mode("overwrite").save(write_path)

### Readin with tensorflow

import tensorflow as tf
import os 

files = [f"{write_path}/{x}" for x in os.listdir(write_path) if x.startswith("part")]
dataset = tf.data.TFRecordDataset(files)
# Create a dictionary describing the features.
feature_description = {
    'colA': tf.io.FixedLenFeature([], tf.string),
    'colb': tf.io.FixedLenFeature([], tf.int64),
    'colc': tf.io.FixedLenFeature([3], tf.float32),
    'cold': tf.io.FixedLenFeature([2,2], tf.float32),
}

for i in dataset.take(1):
    print(repr(i))
    example = tf.io.parse_single_example(i, feature_description)
    print(example)

@dgoldenberg-audiomack dgoldenberg-audiomack changed the title Anyone able to load tfrecords into TFRS generated with the spark to tf records connector? Anyone able to load tfrecords into TFRS generated with the spark-to-tf-records connector? Apr 26, 2021
@dgoldenberg-audiomack
Copy link
Author

@Data-Jack In my particular case, the solution was to make sure I use the map:

parsed_dataset = raw_dataset.map(_parse_function)

However generally I've moved away from using tf-records. At least, my current thinking is why bother :) TF has ways of loading CSV and Parquet data into its datasets. What I do is wrangle all the input data to CSV/Parquet using Spark, then load it into TF datasets.

I have filed this issue in TF for better interoperability with Spark.

However, there's also the Big DL project which presumably allows one to distribute TF training using Spark.

@Data-Jack
Copy link

Data-Jack commented Apr 26, 2021

@dgoldenberg-audiomack Unfortunately my data contains arrays of array fields and map still doesn't work. Chucking the below parsing function in my example still produces the same error for me.

def _parse_function(example_proto):
  # Parse the input tf.train.Example proto using the dictionary above.
  return tf.io.parse_single_example(example_proto, feature_description)

parsed_dataset = dataset.map(_parse_function)
for i in parsed_dataset.take(1):
    print(i)

Thanks for the tip, I will look into to a pipeline that reads from parquet. I was really hoping to just be able to write all my nested array fields to tf-record and read them in. I am going to settle for flattening all the arrays into 1 array for now whilst I research other options.

@dgoldenberg-audiomack
Copy link
Author

@Data-Jack Have you considered filing an issue with tensorflow-io, or core TF? they might suggest something.

@Data-Jack
Copy link

@dgoldenberg-audiomack Yeah, I will do. My first guess was it was how it was being written.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants