获取最小字节数
此属性允许使用者指定最小数据量
在获取记录时希望从代理接收的信息。如果一个
经纪人收到消费者的记录要求,但新
记录的字节数少于fetch.min.bytes,代理将
等到有更多消息可用后再发送回记录
给消费者。
fetch.max.wait.ms
它将通知经纪人等到有足够的数据要发送之前
回应消费者。
示例:如果将fetch.max.wait.ms设置为100毫秒,将fetch.min.bytes设置为1 MB,
Kafka将收到消费者的提取请求,并将做出回应
当它有1 MB数据要返回时或在100 ms之后,
以先发生的为准。
在向消费者响应消息时,在两个参数控制代理上。
投票(超时)
控制如果数据不可用时poll()将阻塞多长时间。
使用者缓冲区。
消费者方面要求
轮询,以获取Broker响应的记录。它会调用fetchRecords(),如果代理中已经存在满足上述参数fetch.min.bytes和fetch.max.wait.ms的记录,它将立即响应,否则将等待直到给定的超时返回空值,以防代理中没有可用的记录。>
下面在KafkaConsumer类中解释pollForfetches方法
private Map<TopicPartition,List<ConsumerRecord<K,V>>> pollForFetches(final long timeoutMs) {
final long startMs = time.milliseconds();
long pollTimeout = Math.min(coordinator.timeToNextPoll(startMs),timeoutMs);
// if data is available already,return it immediately
final Map<TopicPartition,V>>> records = fetcher.fetchedRecords();
if (!records.isEmpty()) {
return records;
}
// send any new fetches (won't resend pending fetches)
fetcher.sendFetches();
// We do not want to be stuck blocking in poll if we are missing some positions
// since the offset lookup may be backing off after a failure
// NOTE: the use of cachedSubscriptionHashAllFetchPositions means we MUST call
// updateAssignmentMetadataIfNeeded before this method.
if (!cachedSubscriptionHashAllFetchPositions && pollTimeout > retryBackoffMs) {
pollTimeout = retryBackoffMs;
}
client.poll(pollTimeout,startMs,() -> {
// since a fetch might be completed by the background thread,we need this poll condition
// to ensure that we do not block unnecessarily in poll()
return !fetcher.hasCompletedFetches();
});
// after the long poll,we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.rejoinNeededOrPending()) {
return Collections.emptyMap();
}
return fetcher.fetchedRecords();
}
如果fetch.min.bytes = 500和fetch.max.wait.ms = 500,则意味着代理将在有500字节数据要返回时或在500 ms之后(以先发生的为准)响应使用者。
消费者端轮询将每200ms调用一次fetchedRecords,以接收代理提供的任何消息。
,
从documents开始,如果fetch.min.bytes
没有足够的数据,则服务器将被阻塞。因此,在这种情况下,如果没有足够的数据,服务器将等待500
毫秒
根据poll
文档,KafkaConsumer
public ConsumerRecords poll(long timeout)即将到来,因为没有足够的数据消费者轮询将在200
毫秒内为空,直到代理拥有足够的数据为止
timeout-如果缓冲区中没有数据,则等待轮询的时间(以毫秒为单位)。如果为0,则立即返回缓冲区中当前可用的任何记录,否则返回空。不能为负。
fetch.max.wait.ms
如果没有足够的数据可以立即满足fetch.min.bytes给出的要求,服务器将在回答提取请求之前阻塞的最长时间。
获取最小字节数
服务器应为获取请求返回的最小数据量。如果没有足够的数据,则请求将等待该数据积累,然后再回答请求。默认设置为1字节,这意味着只要有一个字节的数据可用,或者提取请求超时等待数据到达,就将对提取请求进行应答。将此值设置为大于1的值将导致服务器等待大量数据累积,从而可以以一些额外的延迟为代价提高服务器吞吐量。
,
您提出的带有 fetch.max.wait.ms > pollDuration
的配置在我看来并不是一个健康的配置,因为它会导致奇怪的行为。我会试着用一个例子来解释。
首先,让我们假设一个健康的配置,其中 fetch.max.wait.ms
小于 pollDuration
。
如果您进行民意调查,但根本没有数据通过该主题,您将等到 pollDuration
通过,然后 poll()
将返回。如果主题中有大量数据,您将获取它,poll()
将立即返回。
有趣的用例是,如果您进行轮询并且主题中有少量数据,少于 fetch.min.bytes
,或者主题中根本没有数据但它在您进行轮询时到达。
此用例如下:
- 时间 T0:您开始投票
- 时间 T1:您在主题中收到了一些字节,但还不够。
我认为T1 < T0 + pollDuration
是理所当然的。现在,了解接下来会发生什么的关键是了解 fetch.max.wait.ms
强加的“截止日期”在时间 T0 开始。
因此,我们有两个选择:
-
T1 < T0 + fetch.max.wait.ms
- 您将继续等待一段时间,在时间 T0 + fetch.max.wait.ms
轮询方法将返回少量数据。请注意,根据我之前的假设,这次是在 T0 + pollDuration
之前。
-
T1 > T0 + fetch.max.wait.ms
- 在这种情况下,Kafka 认为不需要再等待,poll()
方法将在时间 T1 以少量数据返回。
现在,回到您的用例。由于 fetch.max.wait.ms
大于 pollDuration
,您将看到一些奇怪的行为。这是一种可能的用例:
- 时间 T0:您开始轮询,主题有少量数据。
- 时间 T1 = T0 + pollDuration:您完成轮询而不获取数据。
- 时间 T2:您再次开始投票。
- 时间 T3 = T0 + fetch.max.wait.ms:
poll()
返回,您消耗少量数据。
请注意,如果 fetch.max.wait.ms
比 pollDuration
足够大,那么您可能会看到多次轮询迭代,但没有获取任何数据,直到最终获取它为止。在您的示例中,您将轮询两次,总共 400 毫秒,然后空手返回,然后在第三次轮询尝试中,您将在 100 毫秒后获取数据。
本文链接:https://www.f2er.com/3165110.html