Skip to content

Commit d8d3ad9

Browse files
committed
Implement multi stream append in system client
1 parent 85f9595 commit d8d3ad9

5 files changed

Lines changed: 334 additions & 23 deletions

File tree

src/Connectors/KurrentDB.Connectors/Infrastructure/Connect/Components/Connectors/SystemConnectorsFactory.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ IProcessor ConfigureSinkProcessor(ConnectorId connectorId, LinkedList<Intercepto
180180
}
181181

182182
IProcessor ConfigureSourceProcessor(ConnectorId connectorId, LinkedList<InterceptorModule> interceptors, SourceOptions sourceOptions, SourceProxy sourceProxy) {
183-
var client = Services.GetRequiredService<ISystemClient>();
184-
var loggerFactory = Services.GetRequiredService<ILoggerFactory>();
183+
var client = Services.GetRequiredService<ISystemClient>();
184+
var loggerFactory = Services.GetRequiredService<ILoggerFactory>();
185185
var schemaRegistry = Services.GetRequiredService<SchemaRegistry>();
186186

187187
var loggingOptions = new Kurrent.Surge.Configuration.LoggingOptions {

src/KurrentDB.Core/Bus/Extensions/PublisherWriteExtensions.cs

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// Kurrent, Inc licenses this file to you under the Kurrent License v1 (see LICENSE.md).
33

44
// ReSharper disable CheckNamespace
5+
// ReSharper disable ArrangeTypeModifiers
6+
// ReSharper disable ArrangeTypeMemberModifiers
57

68
#nullable enable
79

@@ -19,7 +21,9 @@
1921

2022
namespace KurrentDB.Core.ClientPublisher;
2123

24+
// TODO: We can remove usages of WriteEvents later
2225
using WriteEventsResult = (Position Position, StreamRevision StreamRevision);
26+
using MultiStreamWriteResult = (Position Position, StreamRevision[] StreamRevisions);
2327

2428
[PublicAPI]
2529
public static class PublisherWriteExtensions {
@@ -53,6 +57,40 @@ public static async Task<WriteEventsResult> WriteEvents(
5357

5458
return await operation.WaitForReply;
5559
}
60+
61+
public static async Task<MultiStreamWriteResult> WriteEventsToMultipleStreams(
62+
this IPublisher publisher,
63+
string[] eventStreamIds,
64+
long[] expectedVersions,
65+
Event[] events,
66+
int[] eventStreamIndexes,
67+
CancellationToken cancellationToken = default
68+
) {
69+
var cid = Guid.NewGuid();
70+
71+
var operation = new MultiStreamWriteEventsOperation(eventStreamIds, expectedVersions);
72+
73+
try {
74+
var command = new ClientMessage.WriteEvents(
75+
internalCorrId: cid,
76+
correlationId: cid,
77+
envelope: operation,
78+
requireLeader: false,
79+
eventStreamIds: eventStreamIds,
80+
expectedVersions: expectedVersions,
81+
events: events,
82+
eventStreamIndexes: eventStreamIndexes,
83+
user: SystemAccounts.System,
84+
cancellationToken: cancellationToken
85+
);
86+
87+
publisher.Publish(command);
88+
} catch (Exception ex) {
89+
throw new($"{nameof(WriteEventsToMultipleStreams)}: Unable to execute request!", ex);
90+
}
91+
92+
return await operation.WaitForReply;
93+
}
5694
}
5795

5896
class WriteEventsOperation(string stream, long expectedRevision) : IEnvelope {
@@ -94,3 +132,57 @@ static ReadResponseException MapToError(Message message, string stream, long exp
94132

95133
public Task<WriteEventsResult> WaitForReply => Operation.Task;
96134
}
135+
136+
class MultiStreamWriteEventsOperation(string[] streams, long[] expectedVersions) : IEnvelope {
137+
TaskCompletionSource<MultiStreamWriteResult> Operation { get; } = new(TaskCreationOptions.RunContinuationsAsynchronously);
138+
139+
public void ReplyWith<T>(T message) where T : Message {
140+
if (message is ClientMessage.WriteEventsCompleted { Result: OperationResult.Success } success)
141+
Operation.TrySetResult(MapToResult(success));
142+
else
143+
Operation.TrySetException(MapToError(message, streams, expectedVersions));
144+
145+
return;
146+
147+
static MultiStreamWriteResult MapToResult(ClientMessage.WriteEventsCompleted completed) {
148+
Debug.Assert(completed.CommitPosition >= 0);
149+
Debug.Assert(completed.PreparePosition >= 0);
150+
var position = Position.FromInt64(completed.CommitPosition, completed.PreparePosition);
151+
var streamRevisions = new StreamRevision[completed.LastEventNumbers.Length];
152+
for (int i = 0; i < completed.LastEventNumbers.Length; i++) {
153+
streamRevisions[i] = StreamRevision.FromInt64(completed.LastEventNumbers.Span[i]);
154+
}
155+
return new MultiStreamWriteResult(position, streamRevisions);
156+
}
157+
158+
static ReadResponseException MapToError(Message message, string[] streams, long[] expectedVersions) {
159+
return message switch {
160+
ClientMessage.WriteEventsCompleted completed => completed.Result switch {
161+
OperationResult.PrepareTimeout => new ReadResponseException.Timeout($"{completed.Result}"),
162+
OperationResult.CommitTimeout => new ReadResponseException.Timeout($"{completed.Result}"),
163+
OperationResult.ForwardTimeout => new ReadResponseException.Timeout($"{completed.Result}"),
164+
OperationResult.AccessDenied => new ReadResponseException.AccessDenied(),
165+
OperationResult.StreamDeleted => MapStreamDeletedError(completed, streams),
166+
OperationResult.WrongExpectedVersion => MapWrongExpectedVersionError(completed, streams, expectedVersions),
167+
_ => ReadResponseException.UnknownError.Create(completed.Result)
168+
},
169+
ClientMessage.NotHandled notHandled => notHandled.MapToException(),
170+
not null => new ReadResponseException.UnknownMessage(message.GetType(), typeof(ClientMessage.WriteEventsCompleted)),
171+
_ => throw new ArgumentOutOfRangeException(nameof(message), message, null)
172+
};
173+
174+
static ReadResponseException MapStreamDeletedError(ClientMessage.WriteEventsCompleted completed, string[] streams) {
175+
var streamIndex = completed.FailureStreamIndexes.Span[0];
176+
return new ReadResponseException.StreamDeleted(streams[streamIndex]);
177+
}
178+
179+
static ReadResponseException MapWrongExpectedVersionError(ClientMessage.WriteEventsCompleted completed, string[] streams, long[] expectedVersions) {
180+
var streamIndex = completed.FailureStreamIndexes.Span[0];
181+
var currentVersion = completed.FailureCurrentVersions.Span[0];
182+
return new ReadResponseException.WrongExpectedRevision(streams[streamIndex], expectedVersions[streamIndex], currentVersion);
183+
}
184+
}
185+
}
186+
187+
public Task<MultiStreamWriteResult> WaitForReply => Operation.Task;
188+
}

src/KurrentDB.Core/Bus/Extensions/SystemClient.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ namespace KurrentDB.Core;
2525
using StreamInfo = (string Stream, StreamRevision Revision);
2626
using StreamMetadataResult = (StreamMetadata Metadata, long Revision);
2727
using WriteEventsResult = (Position Position, StreamRevision StreamRevision);
28+
using MultiStreamWriteResult = (Position Position, StreamRevision[] StreamRevisions);
2829

2930
public interface ISystemClient {
3031
IManagementOperations Management { get; }
@@ -68,6 +69,7 @@ public interface IReadOperations {
6869

6970
public interface IWriteOperations {
7071
Task<WriteEventsResult> WriteEvents(string stream, Event[] events, long expectedRevision = -2, CancellationToken cancellationToken = default);
72+
Task<MultiStreamWriteResult> WriteEventsToMultipleStreams(string[] eventStreamIds, long[] expectedVersions, Event[] events, int[] eventStreamIndexes, CancellationToken cancellationToken = default);
7173
}
7274

7375
public interface ISubscriptionsOperations {
@@ -140,6 +142,9 @@ public Task<StreamRevision> GetStreamRevision(Position position, CancellationTok
140142
public record WriteOperations(IPublisher Publisher, ILogger Logger) : IWriteOperations {
141143
public Task<WriteEventsResult> WriteEvents(string stream, Event[] events, long expectedRevision = -2, CancellationToken cancellationToken = default) =>
142144
Publisher.WriteEvents(stream, events, expectedRevision, cancellationToken);
145+
146+
public Task<MultiStreamWriteResult> WriteEventsToMultipleStreams(string[] eventStreamIds, long[] expectedVersions, Event[] events, int[] eventStreamIndexes, CancellationToken cancellationToken = default) =>
147+
Publisher.WriteEventsToMultipleStreams(eventStreamIds, expectedVersions, events, eventStreamIndexes, cancellationToken);
143148
}
144149

145150
#endregion . Write .

src/KurrentDB.Surge.Tests/Components/Producers/SystemProducerTests.cs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,65 @@ public async Task sends_messages_in_parallel(int numberOfStreams, int numberOfRe
8080
var streams = Enumerable.Range(1, numberOfStreams).Select(_ => Fixture.NewStreamId()).ToList();
8181
await Parallel.ForEachAsync(streams, async (streamId, _) => await sends_messages(numberOfRequests, batchSize, streamId));
8282
}
83+
84+
[Theory]
85+
[InlineData(2, 1)]
86+
[InlineData(3, 1)]
87+
[InlineData(2, 5)]
88+
[InlineData(5, 3)]
89+
public async Task produces_to_multiple_streams(int numberOfStreams, int batchSize) {
90+
// Arrange
91+
var streamIds = Enumerable.Range(1, numberOfStreams).Select(_ => Fixture.NewStreamId()).ToList();
92+
93+
var requests = streamIds
94+
.Select(streamId => Fixture.GenerateTestProduceRequest(streamId, batchSize))
95+
.ToArray();
96+
97+
await using var producer = Fixture.NewProducer()
98+
.ProducerId($"pdr-multiple-{numberOfStreams}-{batchSize}")
99+
.Create();
100+
101+
ProduceResult? result = null;
102+
103+
// Act
104+
await producer.ProduceMultiple(requests, r => {
105+
result = r;
106+
return Task.CompletedTask;
107+
});
108+
109+
// Assert
110+
result.Should().NotBeNull();
111+
112+
for (var streamIndex = 0; streamIndex < streamIds.Count; streamIndex++) {
113+
var streamId = streamIds[streamIndex];
114+
var request = requests[streamIndex];
115+
116+
var actualEvents = await Fixture.Client.Reading.ReadFullStream(streamId).ToListAsync();
117+
118+
actualEvents.Should().HaveCount(batchSize);
119+
120+
for (var i = 0; i < actualEvents.Count; i++) {
121+
var actualRecord = await actualEvents[i].ToRecord((data, headers) => Fixture.SchemaSerializer.Deserialize(data, new(headers)), i + 1);
122+
var actualMessage = MapRecordToMessage(actualRecord);
123+
var sentMessage = request.Messages[i];
124+
125+
actualMessage.Should().BeEquivalentTo(sentMessage,
126+
options => options.WithTracing()
127+
.Excluding(x => x.Schema.SchemaName)
128+
.Excluding(x => x.Schema.SchemaNameMissing)
129+
.ComparingByValue(typeof(PartitionKey)));
130+
}
131+
}
132+
133+
return;
134+
135+
static Message MapRecordToMessage(SurgeRecord record) =>
136+
new() {
137+
Value = record.Value,
138+
Key = record.Key,
139+
Headers = record.Headers,
140+
RecordId = record.Id,
141+
Schema = record.SchemaInfo
142+
};
143+
}
83144
}

0 commit comments

Comments
 (0)