Kafka流中的changelog主题-设置或更改分区

我们有一个流处理器应用程序,该应用程序使用具有n个分区(n> 1)的主题中的数据。

从头开始(没有changelog主题),开发人员环境始终创建具有n个分区的changelog主题。

在相同的情况下,在生产环境中分区数始终等于1,然后我们已手动更改为n以匹配该主题的分区数。

我检查了所有文档,尝试设置变更日志的分区数,但是找不到任何方法。我的最后一个选择是检查changelog主题是否不存在,然后用n个分区创建它。

由于框架是自动创建该主题的,因此有什么方法可以设置变更日志的分区数,而无需手动或在代码中创建该主题?

PS:我们正在使用Kafka客户端2.3.1版本。

谢谢

奥斯丁

iCMS 回答:Kafka流中的changelog主题-设置或更改分区

我刚刚查看了源代码以了解此功能的详细信息,并且在撰写本文时,事实证明设置change-logs主题 的分区是

说明

change-logs主题被归类为内部主题,并且在以下两个类别(InternalTopicConfigInternalTopicManager)中对此有证据:

  1. InternalTopicConfig的源代码包含以下方法,该方法还说明强制执行此类内部主题上的分区数量:

    public void setNumberOfPartitions(final int numberOfPartitions) {
    if (hasEnforcedNumberOfPartitions()) {
        throw new UnsupportedOperationException("number of partitions are enforced on topic " +
                                                "" + name() + " and can't be altered.");
    ...
    
  2. InternalTopicManager的源代码中的嵌入式文档清楚地说明了makeReady()方法的这一点。

    /**
    * Prepares a set of given internal topics.
    *
    * If a topic does not exist creates a new topic.
    * If a topic with the correct number of partitions exists ignores it.
    * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again.
    * @return the set of topics which had to be newly created
    */
    public Set<String> makeReady(final Map<String,InternalTopicConfig> topics) 
    ...
    

正如您在评论中看到的那样,如果存在这样的主题,且其分区数正确,则它将被忽略;如果分区数不正确,则会看到错误,建议您使用application reset tool

希望这会有所帮助!

,

当前,我们正在连接启用SSL的MSK主题,因此,我们没有写访问权限,无法通过该应用创建内部主题。因此,作为一项变通办法,我们要求MSK管理员手动创建具有所需名称的changelog主题,以便应用程序可以读取它。

此外,当前,我们所有的用户主题均分为3个分区,创建的变更日志主题也具有3个分区,并具有以下更新的设置。这些设置将很方便,以防万一您尝试手动创建更改日志主题(启用压缩以节省空间):

Configs for Changelog

此外,changelog主题名称如下所示: (您的应用程序ID)-(materializedAs下的userDefined属性)-更改日志

本文链接:https://www.f2er.com/1958649.html

大家都在问