Description
Hi Marten Team
I have an example where AppendEvent during RaiseSideEffects method works for the SingleStreamProjection<>, but not the MultiStreamProjection<>.
I see that the RaiseSideEffects for SingleStreamProjection is already covered in the test cases, so I will only provide the MultiStreamProjection example which doesn't work for me.
Excuse the dummy event names. I have been hitting my head against a wall trying to get this running.
When the exact same projection is converted to single stream, then EventA
is added to the event stream as expected.
MultiStreamProjection Example
Setup
o.Connection(connectionString);
o.DatabaseSchemaName = "BEX";
o.UseSystemTextJsonForSerialization();
o.AutoCreateSchemaObjects = string.Equals("Production", environment,
StringComparison.InvariantCultureIgnoreCase)
? AutoCreate.CreateOrUpdate
: AutoCreate.All;
o.Events.TenancyStyle = TenancyStyle.Conjoined;
o.Events.UseMandatoryStreamTypeDeclaration = true;
o.Events.StreamIdentity = StreamIdentity.AsString;
o.Events.UseArchivedStreamPartitioning = true;
o.Events.UseIdentityMapForAggregates = true;
// Other Projections...
o.Projections.Add<BlaEventProjection>(ProjectionLifecycle.Async);
o.Schema.For<BlaEvent>().MultiTenanted();
BlaEventProjection
public sealed class BlaEventProjection : MultiStreamProjection<BlaEvent, string>
{
public BlaEventProjection()
{
Identity<CustomerCreatedEvent>(x => x.CustomerPublicId);
Identity<LeadCustomerCreatedEvent>(x => x.CustomerPublicId);
}
public BlaEvent Create(CustomerCreatedEvent e)
{
return new BlaEvent
{
Id = e.CustomerPublicId,
Count = 0
};
}
public void Apply(LeadCustomerCreatedEvent e, BlaEvent bla)
{
bla.Count++;
}
public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<BlaEvent> slice)
{
if (slice.Aggregate is null)
return ValueTask.CompletedTask;
slice.AppendEvent(new EventA());
return ValueTask.CompletedTask;
}
}
public record EventA();
Errors Encountered
[17:01:33 ERR] Marten encountered an exception executing
insert into bex.mt_event_progression (name, last_seq_id) values ($1, $2);
: BlaEvent:All
: 6
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 ERR] Marten encountered an exception executing
select bex.mt_quick_append_events($1,$2,$3,$4,$5,$6,$7);
: zfdpwzvqmoeo
:
: EYOzBhdIaW5n
: System.Guid[]
: System.String[]
: System.String[]
: System.String[]
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 ERR] Marten encountered an exception executing
select bex.mt_upsert_blaevent($1,$2,$3,$4,$5);
: {"Id":"zfdpwzvqmoeo","Count":1}
: Business.Experience.Domain.Projections.CRM_API.Customers.BlaEvent
: zfdpwzvqmoeo
: 0
: EYOzBhdIaW5n
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 ERR] Marten encountered an exception executing
select bex.mt_quick_append_events($1,$2,$3,$4,$5,$6,$7);
: necxsavxlfpi
:
: EYOzBhdIaW5n
: System.Guid[]
: System.String[]
: System.String[]
: System.String[]
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 ERR] Marten encountered an exception executing
select bex.mt_upsert_blaevent($1,$2,$3,$4,$5);
: {"Id":"necxsavxlfpi","Count":0}
: Business.Experience.Domain.Projections.CRM_API.Customers.BlaEvent
: necxsavxlfpi
: 0
: EYOzBhdIaW5n
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 DBG] Rolling back transaction
[17:01:33 DBG] Rolled back transaction
[17:01:33 ERR] Marten encountered an exception executing
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 DBG] Closed connection to 127.0.0.1:59577/postgres
[17:01:33 ERR] Failure in shard 'BlaEvent:All' trying to execute an update batch for Event range of 'Identity: BlaEvent:All', 0 to 6
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at JasperFx.Core.Exceptions.ExceptionTransformExtensions.TransformAndThrow(IEnumerable`1 transforms, Exception ex)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Polly.ResiliencePipeline.<>c__2`1.<<ExecuteAsync>b__2_0>d.MoveNext()
--- End of stack trace from previous location ---
at Polly.Outcome`1.GetResultOrRethrow()
at Polly.ResiliencePipeline.ExecuteAsync[TState](Func`3 callback, TState state, CancellationToken cancellationToken)
at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
at JasperFx.Core.Exceptions.ExceptionTransformExtensions.TransformAndThrow(IEnumerable`1 transforms, Exception ex)
at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
at Marten.Events.Daemon.Internals.GroupedProjectionExecution.applyBatchOperationsToDatabaseAsync(EventRangeGroup group, DocumentSessionBase session, ProjectionUpdateBatch batch)
[17:01:33 ERR] Error trying to build and apply changes to event subscription BlaEvent:All from 0 to 6
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at JasperFx.Core.Exceptions.ExceptionTransformExtensions.TransformAndThrow(IEnumerable`1 transforms, Exception ex)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
at Polly.ResiliencePipeline.<>c__2`1.<<ExecuteAsync>b__2_0>d.MoveNext()
--- End of stack trace from previous location ---
at Polly.Outcome`1.GetResultOrRethrow()
at Polly.ResiliencePipeline.ExecuteAsync[TState](Func`3 callback, TState state, CancellationToken cancellationToken)
at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
at JasperFx.Core.Exceptions.ExceptionTransformExtensions.TransformAndThrow(IEnumerable`1 transforms, Exception ex)
at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
at Marten.Events.Daemon.Internals.GroupedProjectionExecution.applyBatchOperationsToDatabaseAsync(EventRangeGroup group, DocumentSessionBase session, ProjectionUpdateBatch batch)
at Marten.Events.Daemon.Internals.GroupedProjectionExecution.applyBatchOperationsToDatabaseAsync(EventRangeGroup group, DocumentSessionBase session, ProjectionUpdateBatch batch)
at Marten.Events.Daemon.Internals.GroupedProjectionExecution.processRange(EventRangeGroup group)
at Marten.Events.Daemon.Internals.GroupedProjectionExecution.processRange(EventRangeGroup group)
The interesting thing about this error is that the subscription I was testing with still thought the stream had progressed on, even though the persistence of the events had failed.
Please let me know if you need any other info. :)
Activity