diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt
index 3167a0d0445..931d9918ccb 100644
--- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt
+++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt
@@ -4753,6 +4753,7 @@ namespace Akka.Streams.Implementation.Stages
}
namespace Akka.Streams.Serialization
{
+ [System.Runtime.CompilerServices.NullableAttribute(0)]
public sealed class StreamRefSerializer : Akka.Serialization.SerializerWithStringManifest
{
public StreamRefSerializer(Akka.Actor.ExtendedActorSystem system) { }
diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt
index ce092ba63d1..3205d29804b 100644
--- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt
+++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt
@@ -4727,6 +4727,7 @@ namespace Akka.Streams.Implementation.Stages
}
namespace Akka.Streams.Serialization
{
+ [System.Runtime.CompilerServices.NullableAttribute(0)]
public sealed class StreamRefSerializer : Akka.Serialization.SerializerWithStringManifest
{
public StreamRefSerializer(Akka.Actor.ExtendedActorSystem system) { }
diff --git a/src/core/Akka.Streams.Tests/Serialization/StreamRefSerializer.cs b/src/core/Akka.Streams.Tests/Serialization/StreamRefSerializer.cs
new file mode 100644
index 00000000000..6beacf93f92
--- /dev/null
+++ b/src/core/Akka.Streams.Tests/Serialization/StreamRefSerializer.cs
@@ -0,0 +1,42 @@
+// -----------------------------------------------------------------------
+//
+// Copyright (C) 2009-2024 Lightbend Inc.
+// Copyright (C) 2013-2024 .NET Foundation
+//
+// -----------------------------------------------------------------------
+
+using System;
+using Akka.Serialization;
+using Akka.Streams.Implementation.StreamRef;
+using FluentAssertions;
+using Xunit;
+using Xunit.Abstractions;
+using static FluentAssertions.FluentActions;
+
+namespace Akka.Streams.Tests.Serialization;
+
+public class StreamRefSerializer: Akka.TestKit.Xunit2.TestKit
+{
+ public StreamRefSerializer(ITestOutputHelper output)
+ : base(ActorMaterializer.DefaultConfig(), nameof(StreamRefSerializer), output)
+ {
+ }
+
+ [Fact(DisplayName = "StreamRefSerializer should not throw NRE when configuration were set before ActorSystem started")]
+ public void StreamsConfigBugTest()
+ {
+ var message = new SequencedOnNext(10, "test");
+ var serializer = (SerializerWithStringManifest)Sys.Serialization.FindSerializerFor(message);
+ var manifest = serializer.Manifest(message);
+
+ byte[] bytes = null;
+ Invoking(() =>
+ {
+ bytes = serializer.ToBinary(message); // This throws an NRE in the bug
+ }).Should().NotThrow();
+
+ var deserialized = (SequencedOnNext) serializer.FromBinary(bytes, manifest);
+ deserialized.SeqNr.Should().Be(message.SeqNr);
+ deserialized.Payload.Should().Be(message.Payload);
+ }
+}
\ No newline at end of file
diff --git a/src/core/Akka.Streams/Serialization/StreamRefSerializer.cs b/src/core/Akka.Streams/Serialization/StreamRefSerializer.cs
index b86c1cc53a5..f9ac94f43a5 100644
--- a/src/core/Akka.Streams/Serialization/StreamRefSerializer.cs
+++ b/src/core/Akka.Streams/Serialization/StreamRefSerializer.cs
@@ -10,7 +10,6 @@
using Akka.Actor;
using Akka.Serialization;
using Akka.Streams.Serialization.Proto.Msg;
-using Akka.Util;
using Google.Protobuf;
using Akka.Streams.Implementation.StreamRef;
using CumulativeDemand = Akka.Streams.Implementation.StreamRef.CumulativeDemand;
@@ -19,12 +18,12 @@
using RemoteStreamFailure = Akka.Streams.Implementation.StreamRef.RemoteStreamFailure;
using SequencedOnNext = Akka.Streams.Implementation.StreamRef.SequencedOnNext;
+#nullable enable
namespace Akka.Streams.Serialization
{
public sealed class StreamRefSerializer : SerializerWithStringManifest
{
private readonly ExtendedActorSystem _system;
- private readonly Akka.Serialization.Serialization _serialization;
private const string SequencedOnNextManifest = "A";
private const string CumulativeDemandManifest = "B";
@@ -37,52 +36,51 @@ public sealed class StreamRefSerializer : SerializerWithStringManifest
public StreamRefSerializer(ExtendedActorSystem system) : base(system)
{
_system = system;
- _serialization = system.Serialization;
}
public override string Manifest(object o)
{
- switch (o)
+ return o switch
{
- case SequencedOnNext _: return SequencedOnNextManifest;
- case CumulativeDemand _: return CumulativeDemandManifest;
- case OnSubscribeHandshake _: return OnSubscribeHandshakeManifest;
- case RemoteStreamFailure _: return RemoteSinkFailureManifest;
- case RemoteStreamCompleted _: return RemoteSinkCompletedManifest;
- case SourceRefImpl _: return SourceRefManifest;
- case SinkRefImpl _: return SinkRefManifest;
- default: throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o));
- }
+ SequencedOnNext => SequencedOnNextManifest,
+ CumulativeDemand => CumulativeDemandManifest,
+ OnSubscribeHandshake => OnSubscribeHandshakeManifest,
+ RemoteStreamFailure => RemoteSinkFailureManifest,
+ RemoteStreamCompleted => RemoteSinkCompletedManifest,
+ SourceRefImpl => SourceRefManifest,
+ SinkRefImpl => SinkRefManifest,
+ _ => throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o))
+ };
}
public override byte[] ToBinary(object o)
{
- switch (o)
+ return o switch
{
- case SequencedOnNext onNext: return SerializeSequencedOnNext(onNext).ToByteArray();
- case CumulativeDemand demand: return SerializeCumulativeDemand(demand).ToByteArray();
- case OnSubscribeHandshake handshake: return SerializeOnSubscribeHandshake(handshake).ToByteArray();
- case RemoteStreamFailure failure: return SerializeRemoteStreamFailure(failure).ToByteArray();
- case RemoteStreamCompleted completed: return SerializeRemoteStreamCompleted(completed).ToByteArray();
- case SourceRefImpl sourceRef: return SerializeSourceRef(sourceRef).ToByteArray();
- case SinkRefImpl sinkRef: return SerializeSinkRef(sinkRef).ToByteArray();
- default: throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o));
- }
+ SequencedOnNext onNext => SerializeSequencedOnNext(onNext).ToByteArray(),
+ CumulativeDemand demand => SerializeCumulativeDemand(demand).ToByteArray(),
+ OnSubscribeHandshake handshake => SerializeOnSubscribeHandshake(handshake).ToByteArray(),
+ RemoteStreamFailure failure => SerializeRemoteStreamFailure(failure).ToByteArray(),
+ RemoteStreamCompleted completed => SerializeRemoteStreamCompleted(completed).ToByteArray(),
+ SourceRefImpl sourceRef => SerializeSourceRef(sourceRef).ToByteArray(),
+ SinkRefImpl sinkRef => SerializeSinkRef(sinkRef).ToByteArray(),
+ _ => throw new ArgumentException($"Unsupported object of type {o.GetType()}", nameof(o))
+ };
}
public override object FromBinary(byte[] bytes, string manifest)
{
- switch (manifest)
+ return manifest switch
{
- case SequencedOnNextManifest: return DeserializeSequenceOnNext(bytes);
- case CumulativeDemandManifest: return DeserializeCumulativeDemand(bytes);
- case OnSubscribeHandshakeManifest: return DeserializeOnSubscribeHandshake(bytes);
- case RemoteSinkFailureManifest: return DeserializeRemoteSinkFailure(bytes);
- case RemoteSinkCompletedManifest: return DeserializeRemoteSinkCompleted(bytes);
- case SourceRefManifest: return DeserializeSourceRef(bytes);
- case SinkRefManifest: return DeserializeSinkRef(bytes);
- default: throw new ArgumentException($"Unsupported manifest '{manifest}'", nameof(manifest));
- }
+ SequencedOnNextManifest => DeserializeSequenceOnNext(bytes),
+ CumulativeDemandManifest => DeserializeCumulativeDemand(bytes),
+ OnSubscribeHandshakeManifest => DeserializeOnSubscribeHandshake(bytes),
+ RemoteSinkFailureManifest => DeserializeRemoteSinkFailure(bytes),
+ RemoteSinkCompletedManifest => DeserializeRemoteSinkCompleted(bytes),
+ SourceRefManifest => DeserializeSourceRef(bytes),
+ SinkRefManifest => DeserializeSinkRef(bytes),
+ _ => throw new ArgumentException($"Unsupported manifest '{manifest}'", nameof(manifest))
+ };
}
private SinkRefImpl DeserializeSinkRef(byte[] bytes)
@@ -129,7 +127,7 @@ private SequencedOnNext DeserializeSequenceOnNext(byte[] bytes)
{
var onNext = Proto.Msg.SequencedOnNext.Parser.ParseFrom(bytes);
var p = onNext.Payload;
- var payload = _serialization.Deserialize(
+ var payload = system.Serialization.Deserialize(
p.EnclosedMessage.ToByteArray(),
p.SerializerId,
p.MessageManifest?.ToStringUtf8());
@@ -169,7 +167,7 @@ private ByteString SerializeCumulativeDemand(CumulativeDemand demand) =>
private ByteString SerializeSequencedOnNext(SequencedOnNext onNext)
{
var payload = onNext.Payload;
- var serializer = _serialization.FindSerializerFor(payload);
+ var serializer = system.Serialization.FindSerializerFor(payload);
var manifest = Akka.Serialization.Serialization.ManifestFor(serializer, payload);
var p = new Payload