Skip to content

Comments

KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups [1/N]#21557

Open
lucliu1108 wants to merge 7 commits intoapache:trunkfrom
lucliu1108:KAFKA-20066-1
Open

KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups [1/N]#21557
lucliu1108 wants to merge 7 commits intoapache:trunkfrom
lucliu1108:KAFKA-20066-1

Conversation

@lucliu1108
Copy link
Contributor

@lucliu1108 lucliu1108 commented Feb 23, 2026

Summary

This PR moves assignedPartitions out of ModernConsumerMember
interface, add it as independent properties for ShareGroupMember and
ConsumerGroupMember.

Reason for the change

In an upcoming PR, the structure of
ConsumerGroupMember#assignedPartitions and
ConsumerGroupMember#partitionsPendingRevocation will be changed to
include epoch information as

Map<Uuid, Map<Integer, Integer>>

This differs from the ShareGroupMember#assignedPartitions structure,
which remains Map<Uuid, Set<Integer>>. Therefore, it is no longer
appropriate to have this as a shared field in the base class.

Reviewers: Sean Quah squah@confluent.io, Lucas Brutschy
lbrutschy@confluent.io

@lucliu1108 lucliu1108 changed the title KAKFA-1251: Implement KIP-1251: Assignment epochs for consumer groups [1/N] KAKFA-20066: Implement KIP-1251: Assignment epochs for consumer groups [1/N] Feb 23, 2026
@github-actions github-actions bot added triage PRs from the community group-coordinator small Small PRs labels Feb 23, 2026
Copy link
Contributor

@squah-confluent squah-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for filing the PR!

// 2. The member's assignment has been updated.
boolean isFullRequest = subscribedTopicNames != null;
if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) {
if (memberEpoch == 0 || isFullRequest || ShareGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For symmetry, could we also qualify the hasAssignedPartitionsChanged call in consumerGroupHeartbeat?

Comment on lines 197 to 201

/**
* The partitions assigned to this member.
*/
private final Map<Uuid, Set<Integer>> assignedPartitions;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this declaration belongs in between the builder and constructor, like in ConsumerGroupMember.

Copy link
Contributor

@squah-confluent squah-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, lgtm!

@github-actions github-actions bot removed the triage PRs from the community label Feb 24, 2026
@lucasbru lucasbru self-assigned this Feb 24, 2026
@lucasbru lucasbru self-requested a review February 24, 2026 13:06
@lucasbru lucasbru changed the title KAKFA-20066: Implement KIP-1251: Assignment epochs for consumer groups [1/N] KAFKA-20066: Implement KIP-1251: Assignment epochs for consumer groups [1/N] Feb 24, 2026
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks!

@lucasbru
Copy link
Member

nit: @lucliu1108 if you want to prefill the Reviewers header, use the committer-tools/reviewers.py tool to generate it

@lucliu1108
Copy link
Contributor Author

lucliu1108 commented Feb 24, 2026

Thanks for the review!
@squah-confluent @lucasbru I notice there's also change need in AssignorBenchmarkUtils#createGroupSpec method. In this PR since we are still keeping the same data structure for assignedPartitions for both SharedGroupMember and ConsumerGroupMember, instanceof conversion could work. However, later on when the ConsumerGroupMember#assignedPartition is modified to include epochs, the current fix won't work.

Considering that createGroupSpec is only used for empty member now, does it make sense to simply pass in an empty assignment in this case?

memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
                Optional.ofNullable(member.rackId()),
                Optional.ofNullable(member.instanceId()),
                new TopicIds(member.subscribedTopicNames(), topicResolver),
                new Assignment(Map.of())
            ));

@squah-confluent
Copy link
Contributor

@lucliu1108 That's unfortunate. I would probably duplicate createGroupSpec and have one version for consumer group members and another for share group members.

@github-actions github-actions bot removed the small Small PRs label Feb 24, 2026
@lucliu1108
Copy link
Contributor Author

Thanks @squah-confluent
Right now i spitted them into 2 different methods and rename separately, since they have the same set of parameters and will be marked as name clash.
In the next PR, since the consumerGroupMember assignedPartitions is restructured, I'll rename them back.

Copy link
Contributor

@squah-confluent squah-confluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants