From 064c13613cbe031b0245bc675df9fb3052767d10 Mon Sep 17 00:00:00 2001 From: Simon Lemay Date: Fri, 23 Jul 2021 15:58:57 -0400 Subject: [PATCH 1/3] Correctly handle empty messages in Send The UTPTransport.Send method would throw a NullReferenceException exception when passed default(ArraySegment) as the message data. Now it simply sends an empty message to its remote host (not a terribly useful feature, but better than an exception). --- .../Runtime/UTPTransport.cs | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs b/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs index c14fbaf1af..0913df84ab 100644 --- a/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs +++ b/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs @@ -462,11 +462,14 @@ public override void Send(ulong clientId, ArraySegment data, NetworkChanne writer.WriteByte((byte)networkChannel); writer.WriteInt(data.Count); - unsafe + if (data.Array != null) { - fixed(byte* dataPtr = &data.Array[data.Offset]) + unsafe { - writer.WriteBytes(dataPtr, data.Count); + fixed(byte* dataPtr = &data.Array[data.Offset]) + { + writer.WriteBytes(dataPtr, data.Count); + } } } From d49506a53c1b6f6882837df840850f45f99cac9b Mon Sep 17 00:00:00 2001 From: Simon Lemay Date: Mon, 26 Jul 2021 17:13:58 -0400 Subject: [PATCH 2/3] Make it possible to receive messages >1024 bytes The fragmentation pipeline was set up to handle messages up to 6KB in length, but the receive buffer was only 1KB long. Messages longer than that were just ignored (although an error message was logged). --- com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs b/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs index 0913df84ab..f8312ebadc 100644 --- a/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs +++ b/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs @@ -37,12 +37,11 @@ private enum State } [SerializeField] private ProtocolType m_ProtocolType; - [SerializeField] private int m_MessageBufferSize = 1024; + [SerializeField] private int m_MessageBufferSize = 6 * 1024; [SerializeField] private string m_ServerAddress = "127.0.0.1"; [SerializeField] private ushort m_ServerPort = 7777; [SerializeField] private int m_RelayMaxPlayers = 10; [SerializeField] private string m_RelayServer = "https://relay-allocations.cloud.unity3d.com"; - [SerializeField] private int m_MaxFragmentationCapacity = 6 * 1024; private State m_State = State.Disconnected; private NetworkDriver m_Driver; @@ -428,7 +427,7 @@ public override void Init() m_NetworkParameters = new List(); - m_NetworkParameters.Add(new FragmentationUtility.Parameters(){PayloadCapacity = m_MaxFragmentationCapacity}); + m_NetworkParameters.Add(new FragmentationUtility.Parameters(){PayloadCapacity = m_MessageBufferSize}); m_MessageBuffer = new byte[m_MessageBufferSize]; #if ENABLE_RELAY_SERVICE From c69183b38a99fb1f92cba533758c0a9887326b93 Mon Sep 17 00:00:00 2001 From: Simon Lemay Date: Tue, 27 Jul 2021 08:58:26 -0400 Subject: [PATCH 3/3] Add proper support for channels in UTPTransport The transport now selects an appropriate pipeline according to the delivery level of the channel, rather than send everything on the default unreliable pipeline. --- .../Runtime/UTPTransport.cs | 59 ++++- .../Tests/Runtime/ChannelsTests.cs | 222 ++++++++++++++++++ .../Tests/Runtime/ChannelsTests.cs.meta | 11 + .../Tests/Runtime/ConnectionTests.cs | 2 +- .../Tests/Runtime/Helpers/DriverClient.cs | 92 ++++++++ .../Runtime/Helpers/DriverClient.cs.meta | 11 + 6 files changed, 385 insertions(+), 12 deletions(-) create mode 100644 com.unity.multiplayer.transport.utp/Tests/Runtime/ChannelsTests.cs create mode 100644 com.unity.multiplayer.transport.utp/Tests/Runtime/ChannelsTests.cs.meta create mode 100644 com.unity.multiplayer.transport.utp/Tests/Runtime/Helpers/DriverClient.cs create mode 100644 com.unity.multiplayer.transport.utp/Tests/Runtime/Helpers/DriverClient.cs.meta diff --git a/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs b/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs index f8312ebadc..37358f16a1 100644 --- a/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs +++ b/com.unity.multiplayer.transport.utp/Runtime/UTPTransport.cs @@ -36,8 +36,10 @@ private enum State Connected, } + public const int MaximumMessageLength = 6 * 1024; + [SerializeField] private ProtocolType m_ProtocolType; - [SerializeField] private int m_MessageBufferSize = 6 * 1024; + [SerializeField] private int m_MessageBufferSize = MaximumMessageLength; [SerializeField] private string m_ServerAddress = "127.0.0.1"; [SerializeField] private ushort m_ServerPort = 7777; [SerializeField] private int m_RelayMaxPlayers = 10; @@ -50,14 +52,16 @@ private enum State private string m_RelayJoinCode; private ulong m_ServerClientId; + private NetworkPipeline m_UnreliableSequencedPipeline; + private NetworkPipeline m_ReliableSequencedPipeline; + private NetworkPipeline m_ReliableSequencedFragmentedPipeline; + public override ulong ServerClientId => m_ServerClientId; public string RelayJoinCode => m_RelayJoinCode; public ProtocolType Protocol => m_ProtocolType; - NetworkPipeline unreliableFragmentedPipeline; - private void InitDriver() { if (m_NetworkParameters.Count > 0) @@ -65,8 +69,11 @@ private void InitDriver() else m_Driver = NetworkDriver.Create(); - unreliableFragmentedPipeline = m_Driver.CreatePipeline( - typeof(FragmentationPipelineStage)); + m_UnreliableSequencedPipeline = m_Driver.CreatePipeline(typeof(UnreliableSequencedPipelineStage)); + m_ReliableSequencedPipeline = m_Driver.CreatePipeline(typeof(ReliableSequencedPipelineStage)); + m_ReliableSequencedFragmentedPipeline = m_Driver.CreatePipeline( + typeof(FragmentationPipelineStage), typeof(ReliableSequencedPipelineStage) + ); } private void DisposeDriver() @@ -108,6 +115,35 @@ private static RelayConnectionData ConvertConnectionData(byte[] connectionData) } } + private NetworkPipeline SelectSendPipeline(NetworkChannel channel, int size) + { + TransportChannel transportChannel = Array.Find(MLAPI_CHANNELS, tc => tc.Channel == channel); + + switch (transportChannel.Delivery) + { + case NetworkDelivery.Unreliable: + return NetworkPipeline.Null; + + case NetworkDelivery.UnreliableSequenced: + return m_UnreliableSequencedPipeline; + + case NetworkDelivery.Reliable: + case NetworkDelivery.ReliableSequenced: + return m_ReliableSequencedPipeline; + + case NetworkDelivery.ReliableFragmentedSequenced: + // No need to send on the fragmented pipeline if data is smaller than MTU. + if (size < NetworkParameterConstants.MTU) + return m_ReliableSequencedPipeline; + + return m_ReliableSequencedFragmentedPipeline; + + default: + Debug.LogError($"Unknown NetworkDelivery value: {transportChannel.Delivery}"); + return NetworkPipeline.Null; + } + } + private IEnumerator ClientBindAndConnect(SocketTask task) { var serverEndpoint = default(NetworkEndPoint); @@ -427,7 +463,11 @@ public override void Init() m_NetworkParameters = new List(); - m_NetworkParameters.Add(new FragmentationUtility.Parameters(){PayloadCapacity = m_MessageBufferSize}); + // If we want to be able to actually handle messages MaximumMessageLength bytes in + // size, we need to allow a bit more than that in FragmentationUtility since this needs + // to account for headers and such. 128 bytes is plenty enough for such overhead. + var maxFragmentationCapacity = MaximumMessageLength + 128; + m_NetworkParameters.Add(new FragmentationUtility.Parameters(){PayloadCapacity = maxFragmentationCapacity}); m_MessageBuffer = new byte[m_MessageBufferSize]; #if ENABLE_RELAY_SERVICE @@ -451,12 +491,9 @@ public override void Send(ulong clientId, ArraySegment data, NetworkChanne { var size = data.Count + 5; - // Debug.Log($"Sending: {String.Join(", ", data.Skip(data.Offset).Take(data.Count).Select(x => string.Format("{0:x}", x)))}"); - var defaultPipeline = NetworkPipeline.Null; - if (data.Count >= NetworkParameterConstants.MTU) - defaultPipeline = unreliableFragmentedPipeline; + var pipeline = SelectSendPipeline(networkChannel, size); - if (m_Driver.BeginSend(defaultPipeline, ParseClientId(clientId), out var writer, size) == 0) + if (m_Driver.BeginSend(pipeline, ParseClientId(clientId), out var writer, size) == 0) { writer.WriteByte((byte)networkChannel); writer.WriteInt(data.Count); diff --git a/com.unity.multiplayer.transport.utp/Tests/Runtime/ChannelsTests.cs b/com.unity.multiplayer.transport.utp/Tests/Runtime/ChannelsTests.cs new file mode 100644 index 0000000000..8d29ca3c9f --- /dev/null +++ b/com.unity.multiplayer.transport.utp/Tests/Runtime/ChannelsTests.cs @@ -0,0 +1,222 @@ +using NUnit.Framework; +using System; +using System.Collections; +using System.Collections.Generic; +using System.Linq; + +using MLAPI.Transports; +using Unity.Networking.Transport; +using UnityEngine; +using UnityEngine.TestTools; + +using NetworkEvent = MLAPI.Transports.NetworkEvent; +using UTPNetworkEvent = Unity.Networking.Transport.NetworkEvent; + +namespace MLAPI.UTP.RuntimeTests +{ + using static RuntimeTestsHelpers; + + public class ChannelsTests + { + // Check that we receive the correct channel (one message after the other). + [UnityTest] + public IEnumerator ReceiveCorrectChannelSequenced() + { + UTPTransport server, client; + List serverEvents, clientEvents; + + InitializeTransport(out server, out serverEvents); + InitializeTransport(out client, out clientEvents); + + server.StartServer(); + client.StartClient(); + + yield return WaitForNetworkEvent(NetworkEvent.Connect, serverEvents); + + int eventIndex = 1; + foreach (var transportChannel in server.MLAPI_CHANNELS) + { + server.Send(serverEvents[0].ClientID, default(ArraySegment), transportChannel.Channel); + + yield return WaitForNetworkEvent(NetworkEvent.Data, clientEvents); + + Assert.AreEqual(transportChannel.Channel, clientEvents[eventIndex].Channel); + + eventIndex++; + } + + server.Shutdown(); + client.Shutdown(); + + yield return null; + } + + // Check that we receive the correct channel (all messages received at once). + [UnityTest] + public IEnumerator ReceiveCorrectChannelSameFrame() + { + UTPTransport server, client; + List serverEvents, clientEvents; + + InitializeTransport(out server, out serverEvents); + InitializeTransport(out client, out clientEvents); + + server.StartServer(); + client.StartClient(); + + yield return WaitForNetworkEvent(NetworkEvent.Connect, serverEvents); + + foreach (var transportChannel in server.MLAPI_CHANNELS) + client.Send(client.ServerClientId, default(ArraySegment), transportChannel.Channel); + + yield return WaitForNetworkEvent(NetworkEvent.Data, serverEvents); + + Assert.AreEqual(server.MLAPI_CHANNELS.Length + 1, serverEvents.Count); + + int eventIndex = 1; + foreach (var transportChannel in server.MLAPI_CHANNELS) + { + Assert.AreEqual(transportChannel.Channel, serverEvents[eventIndex].Channel); + eventIndex++; + } + + server.Shutdown(); + client.Shutdown(); + + yield return null; + } + + // Check pipeline mapping for every default channel (except fragmented). + [UnityTest] + public IEnumerator ChannelPipelineMapping() + { + UTPTransport server; + List serverEvents; + DriverClient client = new GameObject().AddComponent(); + + InitializeTransport(out server, out serverEvents); + + server.StartServer(); + client.Connect(); + + yield return client.WaitForNetworkEvent(UTPNetworkEvent.Type.Connect); + + foreach (var transportChannel in server.MLAPI_CHANNELS) + { + // Skip over fragmented channels (covered by different test). + if (transportChannel.Delivery == NetworkDelivery.ReliableFragmentedSequenced) + continue; + + server.Send(serverEvents[0].ClientID, default(ArraySegment), transportChannel.Channel); + + yield return client.WaitForNetworkEvent(UTPNetworkEvent.Type.Data); + + // Check that data's pipeline is what's expected for the delivery. + switch (transportChannel.Delivery) + { + case NetworkDelivery.Unreliable: + Assert.AreEqual(NetworkPipeline.Null, client.LastEventPipeline); + break; + + case NetworkDelivery.UnreliableSequenced: + Assert.AreEqual(client.UnreliableSequencedPipeline, client.LastEventPipeline); + break; + + case NetworkDelivery.Reliable: + case NetworkDelivery.ReliableSequenced: + Assert.AreEqual(client.ReliableSequencedPipeline, client.LastEventPipeline); + break; + } + } + + server.Shutdown(); + + yield return null; + } + + // Check pipeline mapping for every default channel that has fragmentation. + [UnityTest] + public IEnumerator ChannelPipelineMappingFragmented() + { + UTPTransport server; + List serverEvents; + DriverClient client = new GameObject().AddComponent(); + + InitializeTransport(out server, out serverEvents); + + server.StartServer(); + client.Connect(); + + yield return client.WaitForNetworkEvent(UTPNetworkEvent.Type.Connect); + + foreach (var transportChannel in server.MLAPI_CHANNELS) + { + // Skip over non-fragmented channels (covered by different test). + if (transportChannel.Delivery != NetworkDelivery.ReliableFragmentedSequenced) + continue; + + // Check that data smaller than MTU doesn't trigger fragmented pipeline. + + server.Send(serverEvents[0].ClientID, default(ArraySegment), transportChannel.Channel); + + yield return client.WaitForNetworkEvent(UTPNetworkEvent.Type.Data); + + Assert.AreEqual(client.ReliableSequencedPipeline, client.LastEventPipeline); + + // Check that data larger than MTU does trigger fragmented pipeline. + + var data = new ArraySegment(new byte[UTPTransport.MaximumMessageLength]); + server.Send(serverEvents[0].ClientID, data, transportChannel.Channel); + + yield return client.WaitForNetworkEvent(UTPNetworkEvent.Type.Data); + + Assert.AreEqual(client.ReliableSequencedFragmentedPipeline, client.LastEventPipeline); + } + + server.Shutdown(); + + yield return null; + } + + // Check fragmentation on channels where it is expected. + [UnityTest] + public IEnumerator FragmentedDelivery() + { + UTPTransport server, client; + List serverEvents, clientEvents; + + InitializeTransport(out server, out serverEvents); + InitializeTransport(out client, out clientEvents); + + server.StartServer(); + client.StartClient(); + + yield return WaitForNetworkEvent(NetworkEvent.Connect, serverEvents); + + int eventIndex = 1; + foreach (var transportChannel in server.MLAPI_CHANNELS) + { + // Only want to test fragmentation-enabled channels. + if (transportChannel.Delivery != NetworkDelivery.ReliableFragmentedSequenced) + continue; + + var data = new byte[UTPTransport.MaximumMessageLength]; + for (int i = 0; i < data.Length; i++) + data[i] = (byte) i; + + client.Send(client.ServerClientId, new ArraySegment(data), transportChannel.Channel); + + yield return WaitForNetworkEvent(NetworkEvent.Data, serverEvents); + + Assert.True(serverEvents[eventIndex].Data.SequenceEqual(data)); + + eventIndex++; + } + + server.Shutdown(); + client.Shutdown(); + + yield return null; + } + } +} diff --git a/com.unity.multiplayer.transport.utp/Tests/Runtime/ChannelsTests.cs.meta b/com.unity.multiplayer.transport.utp/Tests/Runtime/ChannelsTests.cs.meta new file mode 100644 index 0000000000..fbba9ef7f2 --- /dev/null +++ b/com.unity.multiplayer.transport.utp/Tests/Runtime/ChannelsTests.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: fa2a36c2e74a6504fb086b2b7d25161c +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: diff --git a/com.unity.multiplayer.transport.utp/Tests/Runtime/ConnectionTests.cs b/com.unity.multiplayer.transport.utp/Tests/Runtime/ConnectionTests.cs index b88f5d92ce..3e96e76de7 100644 --- a/com.unity.multiplayer.transport.utp/Tests/Runtime/ConnectionTests.cs +++ b/com.unity.multiplayer.transport.utp/Tests/Runtime/ConnectionTests.cs @@ -133,7 +133,7 @@ public IEnumerator ServerDisconnectMultipleClients() for (int i = 1; i < NumClients; i++) server.DisconnectRemoteClient(serverEvents[i].ClientID); - // Need to manually wait since we don't know which client will got the Disconnect. + // Need to manually wait since we don't know which client got the Disconnect. yield return new WaitForSeconds(MaxNetworkEventWaitTime); // Check that all clients got a Disconnect event. diff --git a/com.unity.multiplayer.transport.utp/Tests/Runtime/Helpers/DriverClient.cs b/com.unity.multiplayer.transport.utp/Tests/Runtime/Helpers/DriverClient.cs new file mode 100644 index 0000000000..8b2d3cc113 --- /dev/null +++ b/com.unity.multiplayer.transport.utp/Tests/Runtime/Helpers/DriverClient.cs @@ -0,0 +1,92 @@ +using NUnit.Framework; +using System.Collections; + +using MLAPI.Transports; +using Unity.Networking.Transport; +using Unity.Networking.Transport.Utilities; +using UnityEngine; + +using UTPNetworkEvent = Unity.Networking.Transport.NetworkEvent; + +namespace MLAPI.UTP.RuntimeTests +{ + using static RuntimeTestsHelpers; + + // Thin wrapper around a UTP NetworkDriver that can act as a client to a UTPTransport server. + // In particular that means the pipelines are set up the same way as in UTPTransport. + // + // The only reason it's defined as a MonoBehaviour is that OnDestroy is the only reliable way + // to get the driver's Dispose method called from a UnityTest. Making it disposable would be + // the preferred solution, but that doesn't always mesh well with coroutines. + public class DriverClient : MonoBehaviour + { + private NetworkDriver m_Driver; + public NetworkDriver Driver => m_Driver; + + private NetworkConnection m_Connection; + + private NetworkPipeline m_UnreliableSequencedPipeline; + private NetworkPipeline m_ReliableSequencedPipeline; + private NetworkPipeline m_ReliableSequencedFragmentedPipeline; + + public NetworkPipeline UnreliableSequencedPipeline => m_UnreliableSequencedPipeline; + public NetworkPipeline ReliableSequencedPipeline => m_ReliableSequencedPipeline; + public NetworkPipeline ReliableSequencedFragmentedPipeline => m_ReliableSequencedFragmentedPipeline; + + private NetworkPipeline m_LastEventPipeline; + public NetworkPipeline LastEventPipeline => m_LastEventPipeline; + + private void Awake() + { + var maxCap = UTPTransport.MaximumMessageLength + 128; + var fragParams = new FragmentationUtility.Parameters(){ PayloadCapacity = maxCap }; + + m_Driver = NetworkDriver.Create(fragParams); + + m_UnreliableSequencedPipeline = m_Driver.CreatePipeline(typeof(UnreliableSequencedPipelineStage)); + m_ReliableSequencedPipeline = m_Driver.CreatePipeline(typeof(ReliableSequencedPipelineStage)); + m_ReliableSequencedFragmentedPipeline = m_Driver.CreatePipeline( + typeof(FragmentationPipelineStage), typeof(ReliableSequencedPipelineStage) + ); + } + + private void Update() + { + m_Driver.ScheduleUpdate().Complete(); + } + + private void OnDestroy() + { + if (m_Driver.IsCreated) + m_Driver.Dispose(); + } + + public void Connect() + { + var endpoint = NetworkEndPoint.LoopbackIpv4; + endpoint.Port = 7777; + + m_Connection = m_Driver.Connect(endpoint); + } + + // Wait for the given event to be generated by the client's driver. + public IEnumerator WaitForNetworkEvent(UTPNetworkEvent.Type type) + { + float startTime = Time.realtimeSinceStartup; + + while (Time.realtimeSinceStartup - startTime < MaxNetworkEventWaitTime) + { + UTPNetworkEvent.Type eventType = m_Driver.PopEvent(out _, out _, out m_LastEventPipeline); + if (eventType != UTPNetworkEvent.Type.Empty) + { + Assert.AreEqual(type, eventType); + yield break; + } + + yield return null; + } + + Assert.Fail("Timed out while waiting for network event."); + } + } +} diff --git a/com.unity.multiplayer.transport.utp/Tests/Runtime/Helpers/DriverClient.cs.meta b/com.unity.multiplayer.transport.utp/Tests/Runtime/Helpers/DriverClient.cs.meta new file mode 100644 index 0000000000..806020f6cf --- /dev/null +++ b/com.unity.multiplayer.transport.utp/Tests/Runtime/Helpers/DriverClient.cs.meta @@ -0,0 +1,11 @@ +fileFormatVersion: 2 +guid: ab8e989afa5cf444bac47c920b5d4748 +MonoImporter: + externalObjects: {} + serializedVersion: 2 + defaultReferences: [] + executionOrder: 0 + icon: {instanceID: 0} + userData: + assetBundleName: + assetBundleVariant: