Skip to content

Comments

KAFKA-19943: Implement state store clean up on start#21566

Open
UladzislauBlok wants to merge 4 commits intoapache:trunkfrom
UladzislauBlok:bloku/kafka-19943
Open

KAFKA-19943: Implement state store clean up on start#21566
UladzislauBlok wants to merge 4 commits intoapache:trunkfrom
UladzislauBlok:bloku/kafka-19943

Conversation

@UladzislauBlok
Copy link
Contributor

@UladzislauBlok UladzislauBlok commented Feb 24, 2026

Implementation for KIP-1259

  • Added new property to set max directory age, and remove directories if current time - max age > last modify time
  • Unit tests

@github-actions github-actions bot added triage PRs from the community streams labels Feb 24, 2026
@github-actions github-actions bot added the small Small PRs label Feb 24, 2026
@github-actions github-actions bot removed the small Small PRs label Feb 24, 2026
@UladzislauBlok UladzislauBlok changed the title KAFKA-19943: Implement state store clean up on start [WIP] KAFKA-19943: Implement state store clean up on start Feb 24, 2026
@UladzislauBlok UladzislauBlok changed the title [WIP] KAFKA-19943: Implement state store clean up on start KAFKA-19943: Implement state store clean up on start Feb 24, 2026

private void cleanStateAndTaskDirectoriesOnStartup(final long dirMaxAgeMs) throws Exception {
if (!lockedTasksToOwner.isEmpty()) {
log.warn("Found some still-locked task directories when cleaning outdated directories");
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That shouldn't be the case, so I wasn't sure log warn / error or throw an exception

public synchronized void start() throws IllegalStateException, StreamsException {
if (setState(State.REBALANCING)) {
final Long dirMaxAgeMs = applicationConfigs.getLong(StreamsConfig.STATE_CLEANUP_DIR_MAX_AGE_MS_CONFIG);
if (dirMaxAgeMs != StreamsConfig.STATE_CLEANUP_DIR_MAX_AGE_MS_DISABLED) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't add test for this one, because it's super simple. I can add one if anyone think that will be usefull

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant