Skip to content

Commit

Permalink
Internal change
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 697988535
  • Loading branch information
tomvdw authored and SeqIO committed Nov 19, 2024
1 parent 358d1c3 commit de1fb1a
Showing 1 changed file with 14 additions and 26 deletions.
40 changes: 14 additions & 26 deletions seqio/beam_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,16 +196,12 @@ def __init__(self, output_path: str, num_shards: Optional[int] = None):
self._num_shards = num_shards

def expand(self, pcoll):
return (
pcoll
| beam.Map(seqio.dict_to_tfexample)
| beam.Reshuffle()
| beam.io.tfrecordio.WriteToTFRecord(
self._output_path,
num_shards=self._num_shards,
coder=beam.coders.ProtoCoder(tf.train.Example),
)
sink = beam.io.tfrecordio.WriteToTFRecord(
self._output_path,
num_shards=self._num_shards,
coder=beam.coders.ProtoCoder(tf.train.Example),
)
return pcoll | beam.Map(seqio.dict_to_tfexample) | beam.Reshuffle() | sink



Expand Down Expand Up @@ -302,17 +298,13 @@ def __init__(
self._preserve_random_access = preserve_random_access

def expand(self, pcoll):
return (
pcoll
| beam.Map(seqio.dict_to_tfexample)
| beam.Reshuffle()
| WriteToArrayRecord(
self._output_path,
num_shards=self._num_shards,
coder=beam.coders.ProtoCoder(tf.train.Example),
preserve_random_access=self._preserve_random_access,
)
sink = WriteToArrayRecord(
self._output_path,
num_shards=self._num_shards,
coder=beam.coders.ProtoCoder(tf.train.Example),
preserve_random_access=self._preserve_random_access,
)
return pcoll | beam.Map(seqio.dict_to_tfexample) | beam.Reshuffle() | sink


class WriteJson(beam.PTransform):
Expand All @@ -337,14 +329,10 @@ def _jsonify(self, el):
return json.dumps(el)

def expand(self, pcoll):
return (
pcoll
| beam.Map(self._jsonify)
| "write_info"
>> beam.io.WriteToText(
self._output_path, num_shards=1, shard_name_template=""
)
sink = beam.io.WriteToText(
self._output_path, num_shards=1, shard_name_template=""
)
return pcoll | beam.Map(self._jsonify) | "write_info" >> sink


class GetInfo(beam.PTransform):
Expand Down

0 comments on commit de1fb1a

Please sign in to comment.