fetch.max.wait.ms与poll()方法的参数

在我问我的问题之前,我想指出一个类似的问题已经提出过here,但尚未得到回答,所以我再次询问。请不要将此标记为重复,因为前面提到的问题没有任何答案。

我对fetch.max.wait.msconsumer.poll(<value>)存有疑问。 这是我在研究上述配置时发现的

  

poll()方法采用一个超时参数。这指定有或没有数据的轮询返回需要多长时间

     

如果将fetch.max.wait.ms设置为100 ms,将fetch.min.bytes设置为1 MB,则Kafka将接收来自使用者的提取请求,并在有1 MB数据要返回时以数据响应或100毫秒后,以先到者为准。

所以我的问题是,当fetch.max.wait.ms=500consumer.poll(200)fetch.min.bytes= 500但经纪人没有足够的数据来返回fetch.min.bytes时会发生什么?

openmylwx 回答:fetch.max.wait.ms与poll()方法的参数

获取最小字节数

  

此属性允许使用者指定最小数据量   在获取记录时希望从代理接收的信息。如果一个   经纪人收到消费者的记录要求,但新   记录的字节数少于fetch.min.bytes,代理将   等到有更多消息可用后再发送回记录   给消费者。

fetch.max.wait.ms

  

它将通知经纪人等到有足够的数据要发送之前   回应消费者。

     

示例:如果将fetch.max.wait.ms设置为100毫秒,将fetch.min.bytes设置为1 MB,   Kafka将收到消费者的提取请求,并将做出回应   当它有1 MB数据要返回时或在100 ms之后,   以先发生的为准。

在向消费者响应消息时,在两个参数控制代理上。

投票(超时)

  

控制如果数据不可用时poll()将阻塞多长时间。   使用者缓冲区。

消费者方面要求

轮询,以获取Broker响应的记录。它会调用fetchRecords(),如果代理中已经存在满足上述参数fetch.min.bytes和fetch.max.wait.ms的记录,它将立即响应,否则将等待直到给定的超时返回空值,以防代理中没有可用的记录。>

下面在KafkaConsumer类中解释pollForfetches方法

private Map<TopicPartition,List<ConsumerRecord<K,V>>> pollForFetches(final long timeoutMs) {
        final long startMs = time.milliseconds();
        long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs),timeoutMs);

        // if data is available already,return it immediately
        final Map<TopicPartition,V>>> records = fetcher.fetchedRecords();
        if (!records.isEmpty()) {
            return records;
        }

        // send any new fetches (won't resend pending fetches)
        fetcher.sendFetches();

        // We do not want to be stuck blocking in poll if we are missing some positions
        // since the offset lookup may be backing off after a failure

        // NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
        // updateAssignmentMetadataIfNeeded before this method.
        if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
            pollTimeout = retryBackoffMs;
        }

        client.poll(pollTimeout,startMs,() -> {
            // since a fetch might be completed by the background thread,we need this poll condition
            // to ensure that we do not block unnecessarily in poll()
            return !fetcher.hasCompletedFetches();
        });

        // after the long poll,we should check whether the group needs to rebalance
        // prior to returning data so that the group can stabilize faster
        if (coordinator.rejoinNeededOrPending()) {
            return Collections.emptyMap();
        }

        return fetcher.fetchedRecords();
    }

如果fetch.min.bytes = 500和fetch.max.wait.ms = 500,则意味着代理将在有500字节数据要返回时或在500 ms之后(以先发生的为准)响应使用者。 消费者端轮询将每200ms调用一次fetchedRecords,以接收代理提供的任何消息。

,

documents开始,如果fetch.min.bytes没有足够的数据,则服务器将被阻塞。因此,在这种情况下,如果没有足够的数据,服务器将等待500毫秒

根据poll文档,KafkaConsumer public ConsumerRecords poll(long timeout)即将到来,因为没有足够的数据消费者轮询将在200毫秒内为空,直到代理拥有足够的数据为止

  

timeout-如果缓冲区中没有数据,则等待轮询的时间(以毫秒为单位)。如果为0,则立即返回缓冲区中当前可用的任何记录,否则返回空。不能为负。

fetch.max.wait.ms

  

如果没有足够的数据可以立即满足fetch.min.bytes给出的要求,服务器将在回答提取请求之前阻塞的最长时间。

获取最小字节数

  

服务器应为获取请求返回的最小数据量。如果没有足够的数据,则请求将等待该数据积累,然后再回答请求。默认设置为1字节,这意味着只要有一个字节的数据可用,或者提取请求超时等待数据到达,就将对提取请求进行应答。将此值设置为大于1的值将导致服务器等待大量数据累积,从而可以以一些额外的延迟为代价提高服务器吞吐量。

,

您提出的带有 fetch.max.wait.ms > pollDuration 的配置在我看来并不是一个健康的配置,因为它会导致奇怪的行为。我会试着用一个例子来解释。

首先,让我们假设一个健康的配置,其中 fetch.max.wait.ms 小于 pollDuration
如果您进行民意调查,但根本没有数据通过该主题,您将等到 pollDuration 通过,然后 poll() 将返回。如果主题中有大量数据,您将获取它,poll() 将立即返回。

有趣的用例是,如果您进行轮询并且主题中有少量数据,少于 fetch.min.bytes,或者主题中根本没有数据但它在您进行轮询时到达。
此用例如下:

  • 时间 T0:您开始投票
  • 时间 T1:您在主题中收到了一些字节,但还不够。

我认为T1 < T0 + pollDuration是理所当然的。现在,了解接下来会发生什么的关键是了解 fetch.max.wait.ms 强加的“截止日期”在时间 T0 开始。 因此,我们有两个选择:

  1. T1 < T0 + fetch.max.wait.ms - 您将继续等待一段时间,在时间 T0 + fetch.max.wait.ms 轮询方法将返回少量数据。请注意,根据我之前的假设,这次是在 T0 + pollDuration 之前。
  2. T1 > T0 + fetch.max.wait.ms - 在这种情况下,Kafka 认为不需要再等待,poll() 方法将在时间 T1 以少量数据返回。

现在,回到您的用例。由于 fetch.max.wait.ms 大于 pollDuration,您将看到一些奇怪的行为。这是一种可能的用例:

  • 时间 T0:您开始轮询,主题有少量数据。
  • 时间 T1 = T0 + pollDuration:您完成轮询而不获取数据。
  • 时间 T2:您再次开始投票。
  • 时间 T3 = T0 + fetch.max.wait.ms:poll() 返回,您消耗少量数据。

请注意,如果 fetch.max.wait.mspollDuration 足够大,那么您可能会看到多次轮询迭代,但没有获取任何数据,直到最终获取它为止。在您的示例中,您将轮询两次,总共 400 毫秒,然后空手返回,然后在第三次轮询尝试中,您将在 100 毫秒后获取数据。

本文链接:https://www.f2er.com/3165110.html

大家都在问