Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.CONSUMER_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.SHARE_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics.STREAMS_GROUP_REBALANCES_SENSOR_NAME;
import static org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember.hasAssignedPartitionsChanged;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.convertToStreamsGroupTopologyRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord;
import static org.apache.kafka.coordinator.group.streams.StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord;
Expand Down Expand Up @@ -2456,7 +2455,7 @@ private CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord>
// to detect a full request as those must be set in a full request.
// 2. The member's assignment has been updated.
boolean isFullRequest = rebalanceTimeoutMs != -1 && (subscribedTopicNames != null || subscribedTopicRegex != null) && ownedTopicPartitions != null;
if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) {
if (memberEpoch == 0 || isFullRequest || ConsumerGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
response.setAssignment(ConsumerGroupHeartbeatResponse.createAssignment(updatedMember.assignedPartitions()));
}

Expand Down Expand Up @@ -2828,7 +2827,7 @@ private CoordinatorResult<Map.Entry<ShareGroupHeartbeatResponseData, Optional<In
// (subscribedTopicNames) to detect a full request as those must be set in a full request.
// 2. The member's assignment has been updated.
boolean isFullRequest = subscribedTopicNames != null;
if (memberEpoch == 0 || isFullRequest || hasAssignedPartitionsChanged(member, updatedMember)) {
if (memberEpoch == 0 || isFullRequest || ShareGroupMember.hasAssignedPartitionsChanged(member, updatedMember)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For symmetry, could we also qualify the hasAssignedPartitionsChanged call in consumerGroupHeartbeat?

response.setAssignment(ShareGroupHeartbeatResponse.createAssignment(updatedMember.assignedPartitions()));
}
return new CoordinatorResult<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@
*/
package org.apache.kafka.coordinator.group.modern;

import org.apache.kafka.common.Uuid;

import java.util.Map;
import java.util.Set;

/**
Expand Down Expand Up @@ -71,11 +68,6 @@ public abstract class ModernGroupMember {
*/
protected Set<String> subscribedTopicNames;

/**
* The partitions assigned to this member.
*/
protected Map<Uuid, Set<Integer>> assignedPartitions;

protected ModernGroupMember(
String memberId,
int memberEpoch,
Expand All @@ -85,8 +77,7 @@ protected ModernGroupMember(
String clientId,
String clientHost,
Set<String> subscribedTopicNames,
MemberState state,
Map<Uuid, Set<Integer>> assignedPartitions
MemberState state
) {
this.memberId = memberId;
this.memberEpoch = memberEpoch;
Expand All @@ -97,7 +88,6 @@ protected ModernGroupMember(
this.clientId = clientId;
this.clientHost = clientHost;
this.subscribedTopicNames = subscribedTopicNames;
this.assignedPartitions = assignedPartitions;
}

/**
Expand Down Expand Up @@ -169,21 +159,4 @@ public MemberState state() {
public boolean isReconciledTo(int targetAssignmentEpoch) {
return state == MemberState.STABLE && memberEpoch == targetAssignmentEpoch;
}

/**
* @return The set of assigned partitions.
*/
public Map<Uuid, Set<Integer>> assignedPartitions() {
return assignedPartitions;
}

/**
* @return True of the two provided members have different assigned partitions.
*/
public static boolean hasAssignedPartitionsChanged(
ModernGroupMember member1,
ModernGroupMember member2
) {
return !member1.assignedPartitions().equals(member2.assignedPartitions());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ public ConsumerGroupMember build() {
*/
private final String serverAssignorName;

/**
* The partitions assigned to this member.
*/
private final Map<Uuid, Set<Integer>> assignedPartitions;

/**
* The partitions being revoked by this member.
*/
Expand Down Expand Up @@ -299,12 +304,12 @@ private ConsumerGroupMember(
clientId,
clientHost,
subscribedTopicNames,
state,
assignedPartitions
state
);
this.rebalanceTimeoutMs = rebalanceTimeoutMs;
this.subscribedTopicRegex = subscribedTopicRegex;
this.serverAssignorName = serverAssignorName;
this.assignedPartitions = assignedPartitions;
this.partitionsPendingRevocation = partitionsPendingRevocation;
this.classicMemberMetadata = classicMemberMetadata;
}
Expand All @@ -330,13 +335,30 @@ public Optional<String> serverAssignorName() {
return Optional.ofNullable(serverAssignorName);
}

/**
* @return The set of assigned partitions.
*/
public Map<Uuid, Set<Integer>> assignedPartitions() {
return assignedPartitions;
}

/**
* @return The set of partitions awaiting revocation from the member.
*/
public Map<Uuid, Set<Integer>> partitionsPendingRevocation() {
return partitionsPendingRevocation;
}

/**
* @return True if the two provided members have different assigned partitions.
*/
public static boolean hasAssignedPartitionsChanged(
ConsumerGroupMember member1,
ConsumerGroupMember member2
) {
return !member1.assignedPartitions().equals(member2.assignedPartitions());
}

/**
* @return The supported classic protocols converted to JoinGroupRequestProtocolCollection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,11 @@ public ShareGroupMember build() {
}
}

/**
* The partitions assigned to this member.
*/
private final Map<Uuid, Set<Integer>> assignedPartitions;

private ShareGroupMember(
String memberId,
int memberEpoch,
Expand All @@ -190,9 +195,26 @@ private ShareGroupMember(
clientId,
clientHost,
subscribedTopicNames,
state,
assignedPartitions
state
);
this.assignedPartitions = assignedPartitions;
}

/**
* @return The partitions assigned to this member.
*/
public Map<Uuid, Set<Integer>> assignedPartitions() {
return assignedPartitions;
}

/**
* @return True if the two provided members have different assigned partitions.
*/
public static boolean hasAssignedPartitionsChanged(
ShareGroupMember member1,
ShareGroupMember member2
) {
return !member1.assignedPartitions().equals(member2.assignedPartitions());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.kafka.coordinator.group.modern.Assignment;
import org.apache.kafka.coordinator.group.modern.GroupSpecImpl;
import org.apache.kafka.coordinator.group.modern.MemberSubscriptionAndAssignmentImpl;
import org.apache.kafka.coordinator.group.modern.ModernGroupMember;
import org.apache.kafka.coordinator.group.modern.TopicIds;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupMember;
Expand Down Expand Up @@ -115,24 +114,23 @@ public static CoordinatorMetadataImage createMetadataImage(
}

/**
* Creates a GroupSpec from the given ModernGroupMembers.
* Creates a GroupSpec from the given ConsumerGroupMembers.
*
* @param members The ModernGroupMembers.
* @param members The ConsumerGroupMembers.
* @param subscriptionType The group's subscription type.
* @param topicResolver The TopicResolver to use.
* @return The new GroupSpec.
*/
public static GroupSpec createGroupSpec(
Map<String, ? extends ModernGroupMember> members,
public static GroupSpec createConsumerGroupSpec(
Map<String, ConsumerGroupMember> members,
SubscriptionType subscriptionType,
TopicIds.TopicResolver topicResolver
) {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();

// Prepare the member spec for all members.
for (Map.Entry<String, ? extends ModernGroupMember> memberEntry : members.entrySet()) {
for (Map.Entry<String, ConsumerGroupMember> memberEntry : members.entrySet()) {
String memberId = memberEntry.getKey();
ModernGroupMember member = memberEntry.getValue();
ConsumerGroupMember member = memberEntry.getValue();

memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Expand All @@ -149,6 +147,40 @@ public static GroupSpec createGroupSpec(
);
}

/**
* Creates a GroupSpec from the given ShareGroupMembers.
*
* @param members The ShareGroupMembers.
* @param subscriptionType The group's subscription type.
* @param topicResolver The TopicResolver to use.
* @return The new GroupSpec.
*/
public static GroupSpec createShareGroupSpec(
Map<String, ShareGroupMember> members,
SubscriptionType subscriptionType,
TopicIds.TopicResolver topicResolver
) {
Map<String, MemberSubscriptionAndAssignmentImpl> memberSpecs = new HashMap<>();

for (Map.Entry<String, ShareGroupMember> memberEntry : members.entrySet()) {
String memberId = memberEntry.getKey();
ShareGroupMember member = memberEntry.getValue();

memberSpecs.put(memberId, new MemberSubscriptionAndAssignmentImpl(
Optional.ofNullable(member.rackId()),
Optional.empty(),
new TopicIds(member.subscribedTopicNames(), topicResolver),
new Assignment(member.assignedPartitions())
));
}

return new GroupSpecImpl(
memberSpecs,
subscriptionType,
Map.of()
);
}

/**
* Creates a ConsumerGroupMembers map where all members have the same topic subscriptions.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public void setup() {
setupTopics();

Map<String, ConsumerGroupMember> members = createMembers();
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicResolver);
this.groupSpec = AssignorBenchmarkUtils.createConsumerGroupSpec(members, subscriptionType, topicResolver);

if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void setup() {
setupTopics();

Map<String, ShareGroupMember> members = createMembers();
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(members, subscriptionType, topicResolver);
this.groupSpec = AssignorBenchmarkUtils.createShareGroupSpec(members, subscriptionType, topicResolver);

if (assignmentType == AssignmentType.INCREMENTAL) {
simulateIncrementalRebalance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ private void setupTopics() {
private Map<String, Assignment> generateMockInitialTargetAssignmentAndUpdateInvertedTargetAssignment(
Map<String, ConsumerGroupMember> members
) {
this.groupSpec = AssignorBenchmarkUtils.createGroupSpec(
this.groupSpec = AssignorBenchmarkUtils.createConsumerGroupSpec(
members,
subscriptionType,
topicResolver
Expand Down
Loading