Implement replay_event_log_async#1266
Merged
Merged
Conversation
As we now have AsyncSessionPersister to enable async persistence, we need to be able to replay the event log asynchronously too.
Collaborator
Pull Request Test Coverage Report for Build 21076949429Details
💛 - Coveralls |
arminsabouri
approved these changes
Jan 16, 2026
Contributor
arminsabouri
left a comment
There was a problem hiding this comment.
cACK be7d040
Just had one question. Otherwise code looks good.
|
|
||
| for event in logs { | ||
| session_events.push(event.clone()); | ||
| receiver = receiver.process_event(event)?; |
Contributor
There was a problem hiding this comment.
In the previous code if process_event failed we close the session. Is this a regression?
Contributor
There was a problem hiding this comment.
It looks the close was move to the replay_events call side. Is that right?
Collaborator
Author
There was a problem hiding this comment.
replay_events does no IO so that it can be reused in the async version of replay_event_log. So it just returns an error here and that's handled with a session close in replay_event_log/_async:
let (receiver, session_events) = match replay_events(logs.map(|e| e.into())) {
Ok(r) => r,
Err(e) => {
persister.close().await.map_err(|ce| {
InternalReplayError::PersistenceFailure(ImplementationError::new(ce))
})?;
return Err(e);
}
};
Collaborator
Author
There was a problem hiding this comment.
I added a second commit that adds test coverage for this scenario to confirm no regression
When `process_event` fails we close the session. This adds test coverage for that scenario.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
As we now have
AsyncSessionPersisterto enable a sync persistence (#1235), we need to be able to replay the event log asynchronously too.There's still considerable duplication between the
replay_eventsfunctions on the send and receive sides, but I'm not convinced whether we should refactor further by introducing aSessionEventtrait that both the receive and senderSessionEventtypes could both implement (and the same on Receive/SendSession).Pull Request Checklist
Please confirm the following before requesting review:
AI
in the body of this PR.