Skip to content

MultiStreamProjection: AppendEvent during RaiseSideEffects fails #3624

Open
@Threepud

Description

Hi Marten Team

I have an example where AppendEvent during RaiseSideEffects method works for the SingleStreamProjection<>, but not the MultiStreamProjection<>.

I see that the RaiseSideEffects for SingleStreamProjection is already covered in the test cases, so I will only provide the MultiStreamProjection example which doesn't work for me.

Excuse the dummy event names. I have been hitting my head against a wall trying to get this running.
When the exact same projection is converted to single stream, then EventA is added to the event stream as expected.

MultiStreamProjection Example

Setup

                o.Connection(connectionString);
                o.DatabaseSchemaName = "BEX";
                o.UseSystemTextJsonForSerialization();

                o.AutoCreateSchemaObjects = string.Equals("Production", environment,
                    StringComparison.InvariantCultureIgnoreCase)
                    ? AutoCreate.CreateOrUpdate
                    : AutoCreate.All;

                o.Events.TenancyStyle = TenancyStyle.Conjoined;
                o.Events.UseMandatoryStreamTypeDeclaration = true;
                o.Events.StreamIdentity = StreamIdentity.AsString;
                o.Events.UseArchivedStreamPartitioning = true;
                o.Events.UseIdentityMapForAggregates = true;

               // Other Projections...

                o.Projections.Add<BlaEventProjection>(ProjectionLifecycle.Async);
                o.Schema.For<BlaEvent>().MultiTenanted();

BlaEventProjection

public sealed class BlaEventProjection : MultiStreamProjection<BlaEvent, string>
{
    public BlaEventProjection()
    {
        Identity<CustomerCreatedEvent>(x => x.CustomerPublicId);
        Identity<LeadCustomerCreatedEvent>(x => x.CustomerPublicId);
    }
    
    public BlaEvent Create(CustomerCreatedEvent e)
    {
        return new BlaEvent
        {
            Id = e.CustomerPublicId,
            Count = 0
        };
    }
    
    public void Apply(LeadCustomerCreatedEvent e, BlaEvent bla)
    {
        bla.Count++;
    }
    
    public override ValueTask RaiseSideEffects(IDocumentOperations operations, IEventSlice<BlaEvent> slice)
    {
        if (slice.Aggregate is null)
            return ValueTask.CompletedTask;

        slice.AppendEvent(new EventA());
        
        return ValueTask.CompletedTask;
    }
}

public record EventA();

Errors Encountered

[17:01:33 ERR] Marten encountered an exception executing 
insert into bex.mt_event_progression (name, last_seq_id) values ($1, $2);
  : BlaEvent:All
  : 6
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
   at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
   at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
   at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
   at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
   at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
   at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 ERR] Marten encountered an exception executing 
select bex.mt_quick_append_events($1,$2,$3,$4,$5,$6,$7);
  : zfdpwzvqmoeo
  : 
  : EYOzBhdIaW5n
  : System.Guid[]
  : System.String[]
  : System.String[]
  : System.String[]
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
   at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
   at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
   at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
   at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
   at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
   at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 ERR] Marten encountered an exception executing 
select bex.mt_upsert_blaevent($1,$2,$3,$4,$5);
  : {"Id":"zfdpwzvqmoeo","Count":1}
  : Business.Experience.Domain.Projections.CRM_API.Customers.BlaEvent
  : zfdpwzvqmoeo
  : 0
  : EYOzBhdIaW5n
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
   at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
   at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
   at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
   at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
   at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
   at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 ERR] Marten encountered an exception executing 
select bex.mt_quick_append_events($1,$2,$3,$4,$5,$6,$7);
  : necxsavxlfpi
  : 
  : EYOzBhdIaW5n
  : System.Guid[]
  : System.String[]
  : System.String[]
  : System.String[]
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
   at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
   at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
   at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
   at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
   at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
   at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 ERR] Marten encountered an exception executing 
select bex.mt_upsert_blaevent($1,$2,$3,$4,$5);
  : {"Id":"necxsavxlfpi","Count":0}
  : Business.Experience.Domain.Projections.CRM_API.Customers.BlaEvent
  : necxsavxlfpi
  : 0
  : EYOzBhdIaW5n
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
   at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
   at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
   at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
   at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
   at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
   at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 DBG] Rolling back transaction
[17:01:33 DBG] Rolled back transaction
[17:01:33 ERR] Marten encountered an exception executing 


System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
   at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
   at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
   at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
   at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
   at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
   at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
[17:01:33 DBG] Closed connection to 127.0.0.1:59577/postgres
[17:01:33 ERR] Failure in shard 'BlaEvent:All' trying to execute an update batch for Event range of 'Identity: BlaEvent:All', 0 to 6
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
   at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
   at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
   at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
   at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
   at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
   at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at JasperFx.Core.Exceptions.ExceptionTransformExtensions.TransformAndThrow(IEnumerable`1 transforms, Exception ex)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Polly.ResiliencePipeline.<>c__2`1.<<ExecuteAsync>b__2_0>d.MoveNext()
--- End of stack trace from previous location ---
   at Polly.Outcome`1.GetResultOrRethrow()
   at Polly.ResiliencePipeline.ExecuteAsync[TState](Func`3 callback, TState state, CancellationToken cancellationToken)
   at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
   at JasperFx.Core.Exceptions.ExceptionTransformExtensions.TransformAndThrow(IEnumerable`1 transforms, Exception ex)
   at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
   at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
   at Marten.Events.Daemon.Internals.GroupedProjectionExecution.applyBatchOperationsToDatabaseAsync(EventRangeGroup group, DocumentSessionBase session, ProjectionUpdateBatch batch)
[17:01:33 ERR] Error trying to build and apply changes to event subscription BlaEvent:All from 0 to 6
System.InvalidCastException: Reading as 'System.Int64[]' is not supported for fields having DataTypeName 'integer'
   at Npgsql.Internal.AdoSerializerHelpers.<GetTypeInfoForReading>g__ThrowReadingNotSupported|0_0(Type type, PgSerializerOptions options, PgTypeId pgTypeId, Exception inner)
   at Npgsql.Internal.AdoSerializerHelpers.GetTypeInfoForReading(Type type, PgTypeId pgTypeId, PgSerializerOptions options)
   at Npgsql.BackendMessages.FieldDescription.<GetInfo>g__GetInfoSlow|50_0(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.BackendMessages.FieldDescription.GetInfo(Type type, ColumnInfo& lastColumnInfo)
   at Npgsql.NpgsqlDataReader.<GetInfo>g__Slow|133_0(ColumnInfo& info, PgConverter& converter, Size& bufferRequirement, Boolean& asObject, <>c__DisplayClass133_0&)
   at Npgsql.NpgsqlDataReader.GetFieldValueCore[T](Int32 ordinal)
   at Npgsql.NpgsqlDataReader.GetFieldValueAsync[T](Int32 ordinal, CancellationToken cancellationToken)
   at Marten.Events.Operations.QuickAppendEventsOperationBase.PostprocessAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.OperationPage.ApplyCallbacksAsync(DbDataReader reader, IList`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at JasperFx.Core.Exceptions.ExceptionTransformExtensions.TransformAndThrow(IEnumerable`1 transforms, Exception ex)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Marten.Internal.Sessions.AutoClosingLifetime.ExecuteBatchPagesAsync(IReadOnlyList`1 pages, List`1 exceptions, CancellationToken token)
   at Polly.ResiliencePipeline.<>c__2`1.<<ExecuteAsync>b__2_0>d.MoveNext()
--- End of stack trace from previous location ---
   at Polly.Outcome`1.GetResultOrRethrow()
   at Polly.ResiliencePipeline.ExecuteAsync[TState](Func`3 callback, TState state, CancellationToken cancellationToken)
   at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
   at JasperFx.Core.Exceptions.ExceptionTransformExtensions.TransformAndThrow(IEnumerable`1 transforms, Exception ex)
   at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
   at Marten.Internal.Sessions.DocumentSessionBase.ExecuteBatchAsync(IUpdateBatch batch, CancellationToken token)
   at Marten.Events.Daemon.Internals.GroupedProjectionExecution.applyBatchOperationsToDatabaseAsync(EventRangeGroup group, DocumentSessionBase session, ProjectionUpdateBatch batch)
   at Marten.Events.Daemon.Internals.GroupedProjectionExecution.applyBatchOperationsToDatabaseAsync(EventRangeGroup group, DocumentSessionBase session, ProjectionUpdateBatch batch)
   at Marten.Events.Daemon.Internals.GroupedProjectionExecution.processRange(EventRangeGroup group)
   at Marten.Events.Daemon.Internals.GroupedProjectionExecution.processRange(EventRangeGroup group)

The interesting thing about this error is that the subscription I was testing with still thought the stream had progressed on, even though the persistence of the events had failed.

Please let me know if you need any other info. :)

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions