我打算如何为此代码使用/创建 apache 光束运行器?

下面的代码是 Google 语言存储库中未经编辑的“wiki_preproc_pipeline.py”文件,位于此处的 COMPONO 子文件夹中:https://github.com/google-research/language/tree/master/language/conpono

代码末尾有一个空函数def main(_):,后面跟着这条指令:

“如果使用 Apache BEAM,请在此处执行 runner。”

我已经完成了快速入门示例(此处显示为 https://beam.apache.org/get-started/quickstart-py/),并且我尝试用 def main(_) 代替注释来填充 wiki_pipeline(),但我没有获得任何输出即使代码运行。据我所知,它旨在从 .raw 文件转换 TFRecord 文件。这些文件可在以下链接中的“下载 WikiText-103 原始字符级数据”超链接中找到:https://www.salesforce.com/products/einstein/ai-research/the-wikitext-dependency-language-modeling-dataset/

绝对文件路径输入如下:

--input_file 我已设置为一个 文件夹 的路径,该文件夹包含以下代码中指定的三个 .raw 文件。 (我也尝试将路径直接提供给 e.g. ".../wiki.test.raw" 而不是包含它的文件夹。

--output_file 只是我的桌面(适用于梁快速入门示例)。

--vocab_file 是用于训练 berT 的 vocab.txt 文件的直接路径。

如何获取这些 TFRecord 文件?在这一点上,我完全迷失了。如果它们可用,与其将它们下载到其他地方,我宁愿了解此过程,以便我可以将其复制到维基百科的一个子集。谢谢,如果有人参与其中。

# Copyright 2018 The Google AI Language Team Authors.
#
# Licensed under the Apache License,Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,software
# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Beam pipeline to convert WikiText103 to shareded TFRecords."""

from __future__ import absolute_import
from __future__ import division

from __future__ import print_function

import hashlib
import random
from absl import app
from absl import flags
import apache_beam as beam
from bert import tokenization
from language.conpono.create_pretrain_data.preprocessing_utils import convert_instance_to_tf_example
from language.conpono.create_pretrain_data.preprocessing_utils import create_instances_from_document
from language.conpono.create_pretrain_data.preprocessing_utils import create_paragraph_order_from_document
import tensorflow.compat.v1 as tf

FORMAT_BINARY = "binary"
FORMAT_PARAGRAPH = "paragraph"

flags.DEFINE_string(
    "input_file",None,"Path to raw input files."
    "Assumes the filenames wiki.{train|valid|test}.raw")
flags.DEFINE_string("output_file","Output TF example file.")
flags.DEFINE_string("vocab_file","The vocabulary file that the berT model was trained on.")
flags.DEFINE_integer("max_seq_length",512,"Maximum sequence length.")
flags.DEFINE_integer("random_seed",12345,"A random seed")
flags.DEFINE_bool(
    "do_lower_case",True,"Whether to lower case the input text. Should be True for uncased "
    "models and False for cased models.")
flags.DEFINE_enum(
    "format",FORMAT_PARAGRAPH,[FORMAT_BINARY,FORMAT_PARAGRAPH],"Build a dataset of either binary order or paragraph reconstrucition")

flaGS = flags.flaGS


def read_file(filename):
  """Read the contents of filename (str) and split into documents by chapter."""

  all_documents = []
  # Input file format:
  # See wiki.*.tokens for an example
  # Documents are contiguously stored. Headers to sections are denoted by
  # "=" on each side. The number of "=" indicates the depth. So the title for
  # United States is "= United States =". "= = = Population = = =" indicates
  # a section 3 levels deep (United States -> Demographics -> Population).
  # There are blank lines between sections.
  # Each line is a paragraph. Periods that are sentence delimiters have a space
  # on each side. Periods in abbreviations do not have spaces.

  # For parallel processing,we first split the single file into documents.
  # Each document is a list of lines.
  with tf.gfile.GFile(filename,"r") as reader:
    for line in reader:
      line = line.strip()
      if not line:
        continue

      # Headers like (" = John Doe = ") are document delimiters
      if line.startswith("= ") and line[2] != "=":
        all_documents.append([])
      # Skip lines that are headers
      if line[0] == "=":
        continue
      all_documents[-1].append(line)

  # Remove empty documents
  all_documents = [x for x in all_documents if x]
  return all_documents


def split_line_by_sentences(line):
  # Put trailing period back but not on the last element
  # because that usually leads to double periods.
  sentences = [l + "." for l in line.split(" . ")]
  sentences[-1] = sentences[-1][:-1]
  return sentences


def preproc_doc(document):
  """Convert document to list of TF Examples for binary order classification.

  Args:
      document: a wikipedia article as a list of lines

  Returns:
      A list of tfexamples of binary orderings of pairs of sentences in the
      document. The tfexamples are serialized to string to be written directly
      to TFRecord.
  """

  # Each document is a list of lines
  tokenizer = tokenization.FullTokenizer(
      vocab_file=flaGS.vocab_file,do_lower_case=flaGS.do_lower_case)

  # set a random seed for reproducability
  hash_object = hashlib.md5(document[0])
  rng = random.Random(int(hash_object.hexdigest(),16) % (10**8))

  # Each document is composed of a list of text lines. Each text line is a
  # paragraph. We split the line into sentences but keep the paragraph grouping.
  # The utility functions below expect the document to be split by paragraphs.
  list_of_paragraphs = []
  for line in document:
    line = tokenization.convert_to_unicode(line)
    line = line.replace(u"\u2018","'").replace(u"\u2019","'")
    sents = split_line_by_sentences(line)
    sent_tokens = [tokenizer.tokenize(sent) for sent in sents if sent]
    list_of_paragraphs.append(sent_tokens)

  # In case of any empty paragraphs,remove them.
  list_of_paragraphs = [x for x in list_of_paragraphs if x]

  # Convert the list of paragraphs into TrainingInstance object
  # See preprocessing_utils.py for definition
  if flaGS.format == FORMAT_BINARY:
    instances = create_instances_from_document(list_of_paragraphs,flaGS.max_seq_length,rng)
  elif flaGS.format == FORMAT_PARAGRAPH:
    instances = create_paragraph_order_from_document(list_of_paragraphs,rng)

  # Convert token lists into ids and add any needed tokens and padding for berT
  tf_examples = [
      convert_instance_to_tf_example(tokenizer,instance,flaGS.max_seq_length)[0]
      for instance in instances
  ]

  # Serialize TFExample for writing to file.
  tf_examples = [example.SerializeToString() for example in tf_examples]

  return tf_examples


def wiki_pipeline():
  """Read WikiText103 filenames and create Beam pipeline."""

  train_files = flaGS.input_file + "/wiki.train.raw"
  dev_files = flaGS.input_file + "/wiki.valid.raw"
  test_files = flaGS.input_file + "/wiki.test.raw"

  def pipeline(root):
    """Beam pipeline for converting WikiText103 files to TF Examples."""
    _ = (
        root | "Create test files" >> beam.Create([test_files])
        | "Read test files" >> beam.flatMap(read_file)
        | "test Shuffle" >> beam.Reshuffle()
        | "Preproc test docs" >> beam.flatMap(preproc_doc)
        | "record test Shuffle" >> beam.Reshuffle()
        | "Write to test tfrecord" >> beam.io.WriteToTFRecord(
            flaGS.output_file + "." + flaGS.format + ".test.tfrecord",num_shards=10))
    _ = (
        root | "Create dev files" >> beam.Create([dev_files])
        | "Read dev files" >> beam.flatMap(read_file)
        | "dev Shuffle" >> beam.Reshuffle()
        | "Preproc dev docs" >> beam.flatMap(preproc_doc)
        | "record dev Shuffle" >> beam.Reshuffle()
        | "Write to dev tfrecord" >> beam.io.WriteToTFRecord(
            flaGS.output_file + "." + flaGS.format + ".dev.tfrecord",num_shards=10))
    _ = (
        root | "Create train files" >> beam.Create([train_files])
        | "Read train files" >> beam.flatMap(read_file)
        | "train Shuffle" >> beam.Reshuffle()
        | "Preproc train docs" >> beam.flatMap(preproc_doc)
        | "record train Shuffle" >> beam.Reshuffle()
        | "Write to train tfrecord" >> beam.io.WriteToTFRecord(
            flaGS.output_file + "." + flaGS.format + ".train.tfrecord",num_shards=100))
    return

  return pipeline


def main(_):
  # If using Apache BEAM,execute runner here.

if __name__ == "__main__":
  app.run(main)```
youaremyjay 回答:我打算如何为此代码使用/创建 apache 光束运行器?

暂时没有好的解决方案,如果你有好的解决方案,请发邮件至:iooj@foxmail.com
本文链接:https://www.f2er.com/74869.html

大家都在问