#if (!UNITY_WEBGL || UNITY_EDITOR) && !BESTHTTP_DISABLE_ALTERNATE_SSL #define ENABLE_LOGGING using System; using System.Collections.Concurrent; using System.Collections.Generic; using System.Threading; using Best.HTTP.Shared.Extensions; using Best.HTTP.Shared.PlatformSupport.Memory; using Best.HTTP.Shared.PlatformSupport.Threading; using Best.HTTP.Shared; using Best.HTTP.Shared.Logger; using Best.HTTP.Shared.PlatformSupport.Network.Tcp; using Best.HTTP.Shared.Streams; namespace Best.HTTP.Hosts.Connections.HTTP2 { public delegate HTTP2Stream CustomHTTP2StreamFactory(HTTPRequest request, uint streamId, HTTP2ContentConsumer parentHandler, HTTP2SettingsManager registry, HPACKEncoder hpackEncoder); public sealed class HTTP2ContentConsumer : IHTTPRequestHandler, IContentConsumer, IThreadSignaler { public KeepAliveHeader KeepAlive { get { return null; } } public bool CanProcessMultiple { get { return !this.SentGoAwayFrame && this.isRunning && this._maxAssignedRequests > 1; } } public int AssignedRequests => this._assignedRequest; private int _assignedRequest; public int MaxAssignedRequests => this._maxAssignedRequests; private int _maxAssignedRequests = 1; public const UInt32 MaxValueFor31Bits = 0xFFFFFFFF >> 1; public double Latency { get; private set; } public HTTP2SettingsManager settings; public HPACKEncoder HPACKEncoder; public LoggingContext Context { get; private set; } public PeekableContentProviderStream ContentProvider { get; private set; } public HTTP2ConnectionSettings ConnectionSettings { get; private set; } private DateTime lastPingSent = DateTime.MinValue; private int waitingForPingAck = 0; private DateTime lastDataFrameReceived = DateTime.MinValue; public static int RTTBufferCapacity = 5; private CircularBuffer rtts = new CircularBuffer(RTTBufferCapacity); private volatile bool isRunning; private AutoResetEvent newFrameSignal = new AutoResetEvent(false); private ConcurrentQueue requestQueue = new ConcurrentQueue(); private List clientInitiatedStreams = new List(); private ConcurrentQueue newFrames = new ConcurrentQueue(); private List outgoingFrames = new List(); private UInt32 remoteWindow; private DateTime lastInteraction; private DateTime goAwaySentAt = DateTime.MaxValue; private bool SentGoAwayFrame { get => this.goAwaySentAt != DateTime.MaxValue; } private HTTPOverTCPConnection conn; private TimeSpan MaxGoAwayWaitTime { get { return !this.SentGoAwayFrame ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(Math.Max(this.Latency * 2.5, 1500)); } } // https://httpwg.org/specs/rfc7540.html#StreamIdentifiers // Streams initiated by a client MUST use odd-numbered stream identifiers // With an initial value of -1, the first client initiated stream's id going to be 1. private long LastStreamId = -1; public HTTP2ContentConsumer(HTTPOverTCPConnection conn) { this.Context = new LoggingContext(this); this.Context.Add("Parent", conn.Context); this.conn = conn; this.isRunning = true; this.ConnectionSettings = HTTPManager.PerHostSettings.Get(conn.HostKey).HTTP2ConnectionSettings; this.settings = new HTTP2SettingsManager(this.Context, this.ConnectionSettings); Process(this.conn.CurrentRequest); } public void Process(HTTPRequest request) { #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Process request called", this.Context); #endif request.TimeoutSettings.SetProcessing(this.lastInteraction = DateTime.UtcNow); Interlocked.Increment(ref this._assignedRequest); this.requestQueue.Enqueue(request); SignalThread(SignalHandlerTypes.Signal); } public void SignalThread(SignalHandlerTypes signalType) => this.newFrameSignal?.Set(); public void RunHandler() { #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Processing thread up and running!", this.Context); #endif ThreadedRunner.SetThreadName("Best.HTTP2 Process"); string abortWithMessage = string.Empty; try { bool atLeastOneStreamHasAFrameToSend = true; this.HPACKEncoder = new HPACKEncoder(this.Context, this.settings); // https://httpwg.org/specs/rfc7540.html#InitialWindowSize // The connection flow-control window is also 65,535 octets. this.remoteWindow = this.settings.RemoteSettings[HTTP2Settings.INITIAL_WINDOW_SIZE]; // we want to pack as many data as we can in one tcp segment, but setting the buffer's size too high // we might keep data too long and send them in bursts instead of in a steady stream. // Keeping it too low might result in a full tcp segment and one with very low payload // Is it possible that one full tcp segment sized buffer would be the best, or multiple of it. // It would keep the network busy without any fragments. The ethernet layer has a maximum of 1500 bytes, // but there's two layers of 20 byte headers each, so as a theoretical maximum it's 1500-20-20 bytes. // On the other hand, if the buffer is small (1-2), that means that for larger data, we have to do a lot // of system calls, in that case a larger buffer might be better. Still, if we are not cpu bound, // a well saturated network might serve us better. using (WriteOnlyBufferedStream bufferedStream = new WriteOnlyBufferedStream(this.conn.TopStream, 1024 * 1024 /*1500 - 20 - 20*/, this.Context)) { // The client connection preface starts with a sequence of 24 octets // Connection preface starts with the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n). ReadOnlySpan MAGIC = stackalloc byte[24] { 0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f, 0x32, 0x2e, 0x30, 0x0d, 0x0a, 0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, 0x0d, 0x0a }; bufferedStream.Write(MAGIC); // This sequence MUST be followed by a SETTINGS frame (Section 6.5), which MAY be empty. // The client sends the client connection preface immediately upon receipt of a // 101 (Switching Protocols) response (indicating a successful upgrade) // or as the first application data octets of a TLS connection this.settings.InitiatedMySettings[HTTP2Settings.INITIAL_WINDOW_SIZE] = this.ConnectionSettings.InitialStreamWindowSize; this.settings.InitiatedMySettings[HTTP2Settings.MAX_CONCURRENT_STREAMS] = this.ConnectionSettings.MaxConcurrentStreams; this.settings.InitiatedMySettings[HTTP2Settings.ENABLE_CONNECT_PROTOCOL] = (uint)(this.ConnectionSettings.EnableConnectProtocol ? 1 : 0); this.settings.InitiatedMySettings[HTTP2Settings.ENABLE_PUSH] = 0; this.settings.SendChanges(this.outgoingFrames); this.settings.RemoteSettings.OnSettingChangedEvent += OnRemoteSettingChanged; // The default window size for the whole connection is 65535 bytes, // but we want to set it to the maximum possible value. Int64 initialConnectionWindowSize = this.ConnectionSettings.InitialConnectionWindowSize; // yandex.ru returns with an FLOW_CONTROL_ERROR (3) error when the plugin tries to set the connection window to 2^31 - 1 // and works only with a maximum value of 2^31 - 10Mib (10 * 1024 * 1024). if (initialConnectionWindowSize == HTTP2ContentConsumer.MaxValueFor31Bits) initialConnectionWindowSize -= 10 * 1024 * 1024; if (initialConnectionWindowSize > 65535) { Int64 initialConnectionWindowSizeDiff = initialConnectionWindowSize - 65535; if (initialConnectionWindowSizeDiff > 0) this.outgoingFrames.Add(HTTP2FrameHelper.CreateWindowUpdateFrame(0, (UInt32)initialConnectionWindowSizeDiff, this.Context)); } initialConnectionWindowSize -= 65535; // local, per-connection window long localConnectionWindow = initialConnectionWindowSize; UInt32 updateConnectionWindowAt = (UInt32)(localConnectionWindow / 2); while (this.isRunning) { DateTime now = DateTime.UtcNow; if (!atLeastOneStreamHasAFrameToSend) { // buffered stream will call flush automatically if its internal buffer is full. // But we have to make it sure that we flush remaining data before we go to sleep. bufferedStream.Flush(); // Wait until we have to send the next ping, OR a new frame is received on the read thread. // lastPingSent Now lastPingSent+frequency lastPingSent+Ping timeout //----|---------------------|---------------|----------------------|----------------------|------------| // lastInteraction lastInteraction + MaxIdleTime var sendPingAt = this.lastPingSent + this.ConnectionSettings.PingFrequency; var timeoutAt = this.waitingForPingAck != 0 ? this.lastPingSent + this.ConnectionSettings.Timeout : DateTime.MaxValue; // sendPingAt can be in the past if Timeout is larger than PingFrequency var nextPingInteraction = sendPingAt < timeoutAt && sendPingAt >= now ? sendPingAt : timeoutAt; var disconnectByIdleAt = this.lastInteraction + this.ConnectionSettings.MaxIdleTime; var nextDueClientInteractionAt = nextPingInteraction < disconnectByIdleAt ? nextPingInteraction : disconnectByIdleAt; int wait = (int)(nextDueClientInteractionAt - now).TotalMilliseconds; wait = (int)Math.Min(wait, this.MaxGoAwayWaitTime.TotalMilliseconds); TimeSpan nextStreamInteraction = TimeSpan.MaxValue; for (int i = 0; i < this.clientInitiatedStreams.Count; i++) { var streamInteraction = this.clientInitiatedStreams[i].NextInteraction; if (streamInteraction < nextStreamInteraction) nextStreamInteraction = streamInteraction; } wait = (int)Math.Min(wait, nextStreamInteraction.TotalMilliseconds); wait = (int)Math.Min(wait, 1000); if (wait >= 1) { //if (HTTPManager.Logger.Level <= Logger.Loglevels.All) // HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), string.Format("Sleeping for {0:N0}ms", wait), this.Context); this.newFrameSignal.WaitOne(wait); now = DateTime.UtcNow; } } // Don't send a new ping until a pong isn't received for the last one if (now - this.lastPingSent >= this.ConnectionSettings.PingFrequency && Interlocked.CompareExchange(ref this.waitingForPingAck, 1, 0) == 0) { this.lastPingSent = now; var frame = HTTP2FrameHelper.CreatePingFrame(HTTP2PingFlags.None, this.Context); BufferHelper.SetLong(frame.Payload.Data, 0, now.Ticks); #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), $"PING frame created with payload: {frame.Payload.Slice(0, 8)}", this.Context); #endif this.outgoingFrames.Add(frame); } // Process received frames HTTP2FrameHeaderAndPayload header; while (this.newFrames.TryDequeue(out header)) { if (header.StreamId > 0) { switch (header.Type) { case HTTP2FrameTypes.DATA: localConnectionWindow -= header.Payload.Count; lastDataFrameReceived = now; break; } HTTP2Stream http2Stream = FindStreamById(header.StreamId); // Add frame to the stream, so it can process it when its Process function is called if (http2Stream != null) { http2Stream.AddFrame(header, this.outgoingFrames); } else { // Error? It's possible that we closed and removed the stream while the server was in the middle of sending frames #if ENABLE_LOGGING if (HTTPManager.Logger.Level == Loglevels.All) HTTPManager.Logger.Warning(nameof(HTTP2ContentConsumer), $"Can't deliver frame: {header}, because no stream could be found for its Id!", this.Context); #endif BufferPool.Release(header.Payload); } } else { switch (header.Type) { case HTTP2FrameTypes.SETTINGS: this.settings.Process(header, this.outgoingFrames); Interlocked.Exchange(ref this._maxAssignedRequests, (int)Math.Min(this.ConnectionSettings.MaxConcurrentStreams, this.settings.RemoteSettings[HTTP2Settings.MAX_CONCURRENT_STREAMS])); /* PluginEventHelper.EnqueuePluginEvent( new PluginEventInfo(PluginEvents.HTTP2ConnectProtocol, new HTTP2ConnectProtocolInfo(this.conn.HostKey, this.settings.MySettings[HTTP2Settings.ENABLE_CONNECT_PROTOCOL] == 1 && this.settings.RemoteSettings[HTTP2Settings.ENABLE_CONNECT_PROTOCOL] == 1))); */ break; case HTTP2FrameTypes.PING: var pingFrame = HTTP2FrameHelper.ReadPingFrame(header); if ((pingFrame.Flags & HTTP2PingFlags.ACK) != 0) { if (Interlocked.CompareExchange(ref this.waitingForPingAck, 0, 1) == 0) break; // waitingForPingAck was 0 == aren't expecting a ping ack! // it was an ack, payload must contain what we sent var ticks = BufferHelper.ReadLong(pingFrame.OpaqueData, 0); // the difference between the current time and the time when the ping message is sent TimeSpan diff = TimeSpan.FromTicks(now.Ticks - ticks); #if ENABLE_LOGGING if (diff.TotalSeconds > 10 || diff.TotalSeconds < 0) HTTPManager.Logger.Warning(nameof(HTTP2ContentConsumer), $"Pong received with weird diff: {diff}! Payload: {pingFrame.OpaqueData}", this.Context); #endif // add it to the buffer this.rtts.Add(diff.TotalMilliseconds); // and calculate the new latency this.Latency = CalculateLatency(); #if ENABLE_LOGGING HTTPManager.Logger.Verbose(nameof(HTTP2ContentConsumer), string.Format("Latency: {0:F2}ms, RTT buffer: {1}", this.Latency, this.rtts.ToString()), this.Context); #endif } else if ((pingFrame.Flags & HTTP2PingFlags.ACK) == 0) { // https://httpwg.org/specs/rfc7540.html#PING // if it wasn't an ack for our ping, we have to send one var frame = HTTP2FrameHelper.CreatePingFrame(HTTP2PingFlags.ACK, this.Context); Array.Copy(pingFrame.OpaqueData.Data, 0, frame.Payload.Data, 0, pingFrame.OpaqueData.Count); this.outgoingFrames.Insert(0, frame); } BufferPool.Release(pingFrame.OpaqueData); break; case HTTP2FrameTypes.WINDOW_UPDATE: var windowUpdateFrame = HTTP2FrameHelper.ReadWindowUpdateFrame(header); this.remoteWindow += windowUpdateFrame.WindowSizeIncrement; break; case HTTP2FrameTypes.GOAWAY: // parse the frame, so we can print out detailed information HTTP2GoAwayFrame goAwayFrame = HTTP2FrameHelper.ReadGoAwayFrame(header); #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Received GOAWAY frame: " + goAwayFrame.ToString(), this.Context); #endif abortWithMessage = string.Format("Server closing the connection! Error code: {0} ({1}) Additonal Debug Data: {2}", goAwayFrame.Error, goAwayFrame.ErrorCode, goAwayFrame.AdditionalDebugData); for (int i = 0; i < this.clientInitiatedStreams.Count; ++i) this.clientInitiatedStreams[i].Abort(abortWithMessage); this.clientInitiatedStreams.Clear(); // set the running flag to false, so the thread can exit this.isRunning = false; BufferPool.Release(goAwayFrame.AdditionalDebugData); //this.conn.State = HTTPConnectionStates.Closed; break; case HTTP2FrameTypes.ALT_SVC: //HTTP2AltSVCFrame altSvcFrame = HTTP2FrameHelper.ReadAltSvcFrame(header); // Implement //HTTPManager.EnqueuePluginEvent(new PluginEventInfo(PluginEvents.AltSvcHeader, new AltSvcEventInfo(altSvcFrame.Origin, )) break; } if (header.Payload != null) BufferPool.Release(header.Payload); } } // If no pong received in a (configurable) reasonable time, treat the connection broken if (this.waitingForPingAck != 0) { // Even if we weren't received a ping ack, check for data frames. // If data is flowing in, we can say that the connection is still healthy. var lastPingDiff = now - this.lastPingSent; var lastDataDiff = now - this.lastDataFrameReceived; if(lastPingDiff >= this.ConnectionSettings.Timeout && lastDataDiff >= this.ConnectionSettings.Timeout) throw new TimeoutException("Ping ACK isn't received in time!"); } // pre-test stream count to lock only when truly needed. if (this.clientInitiatedStreams.Count < _maxAssignedRequests && this.isRunning) { // grab requests from queue HTTPRequest request; while (this.clientInitiatedStreams.Count < _maxAssignedRequests && this.requestQueue.TryDequeue(out request)) { HTTP2Stream newStream = null; if (request.Tag is CustomHTTP2StreamFactory factory) { newStream = factory(request, (UInt32)Interlocked.Add(ref LastStreamId, 2), this, this.settings, this.HPACKEncoder); } else { newStream = new HTTP2Stream((UInt32)Interlocked.Add(ref LastStreamId, 2), this, this.settings, this.HPACKEncoder); } newStream.Assign(request); this.clientInitiatedStreams.Add(newStream); } } // send any settings changes this.settings.SendChanges(this.outgoingFrames); atLeastOneStreamHasAFrameToSend = false; // process other streams for (int i = 0; i < this.clientInitiatedStreams.Count; ++i) { var stream = this.clientInitiatedStreams[i]; stream.Process(this.outgoingFrames); // remove closed, empty streams (not enough to check the closed flag, a closed stream still can contain frames to send) if (stream.State == HTTP2StreamStates.Closed && !stream.HasFrameToSend) { this.clientInitiatedStreams.RemoveAt(i--); stream.Removed(); Interlocked.Decrement(ref this._assignedRequest); } atLeastOneStreamHasAFrameToSend |= stream.HasFrameToSend; this.lastInteraction = now; } // If we encounter a data frame that too large for the current remote window, we have to stop // sending all data frames as we could send smaller data frames before the large ones. // Room for improvement: An improvement would be here to stop data frame sending per-stream. bool haltDataSending = false; if (this.ShutdownType == ShutdownTypes.Running && !this.SentGoAwayFrame && now - this.lastInteraction >= this.ConnectionSettings.MaxIdleTime) { this.lastInteraction = now; #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Reached idle time, sending GoAway frame!", this.Context); #endif this.outgoingFrames.Add(HTTP2FrameHelper.CreateGoAwayFrame(0, HTTP2ErrorCodes.NO_ERROR, this.Context)); this.goAwaySentAt = now; } // https://httpwg.org/specs/rfc7540.html#GOAWAY // Endpoints SHOULD always send a GOAWAY frame before closing a connection so that the remote peer can know whether a stream has been partially processed or not. if (this.ShutdownType == ShutdownTypes.Gentle && !this.SentGoAwayFrame) { #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Connection abort requested, sending GoAway frame!", this.Context); #endif this.outgoingFrames.Clear(); this.outgoingFrames.Add(HTTP2FrameHelper.CreateGoAwayFrame(0, HTTP2ErrorCodes.NO_ERROR, this.Context)); this.goAwaySentAt = now; } if (this.isRunning && this.SentGoAwayFrame && now - goAwaySentAt >= this.MaxGoAwayWaitTime) { #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "No GoAway frame received back. Really quitting now!", this.Context); #endif this.isRunning = false; continue; } if (localConnectionWindow < updateConnectionWindowAt) { UInt32 diff = (UInt32)(initialConnectionWindowSize - localConnectionWindow); #if ENABLE_LOGGING if (HTTPManager.Logger.IsDiagnostic) HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), $"Updating local connection window by {diff:N0} ({initialConnectionWindowSize:N0} - {localConnectionWindow:N0})", this.Context); #endif this.outgoingFrames.Add(HTTP2FrameHelper.CreateWindowUpdateFrame(0, diff, this.Context)); localConnectionWindow = initialConnectionWindowSize; } // Go through all the collected frames and send them. for (int i = 0; i < this.outgoingFrames.Count; ++i) { var frame = this.outgoingFrames[i]; #if ENABLE_LOGGING if (HTTPManager.Logger.IsDiagnostic && frame.Type != HTTP2FrameTypes.DATA /*&& frame.Type != HTTP2FrameTypes.PING*/) HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Sending frame: " + frame.ToString(), this.Context); #endif // post process frames switch (frame.Type) { case HTTP2FrameTypes.DATA: if (haltDataSending) continue; // if the tracked remoteWindow is smaller than the frame's payload, we stop sending // data frames until we receive window-update frames if (frame.Payload.Count > this.remoteWindow) { haltDataSending = true; #if ENABLE_LOGGING HTTPManager.Logger.Warning(nameof(HTTP2ContentConsumer), string.Format("Data sending halted for this round. Remote Window: {0:N0}, frame: {1}", this.remoteWindow, frame.ToString()), this.Context); #endif continue; } break; } this.outgoingFrames.RemoveAt(i--); static void SendHeader(WriteOnlyBufferedStream stream, HTTP2FrameHeaderAndPayload headerAndPayload) { Span frameData = stackalloc byte[9]; HTTP2FrameHelper.HeaderAsBinary(headerAndPayload, frameData); stream.Write(frameData); } SendHeader(bufferedStream, frame); if (frame.Payload.Count > 0) { bufferedStream.Write(frame.Payload.Data, frame.Payload.Offset, frame.Payload.Count); if (!frame.DontUseMemPool) BufferPool.Release(frame.Payload); } if (frame.Type == HTTP2FrameTypes.DATA) this.remoteWindow -= (uint)frame.Payload.Count; } bufferedStream.Flush(); } // while (this.isRunning) bufferedStream.Flush(); } } catch (Exception ex) { abortWithMessage = ex.ToString(); // Log out the exception if it's a non-expected one. if (this.ShutdownType == ShutdownTypes.Running && this.isRunning && !this.SentGoAwayFrame && !HTTPManager.IsQuitting) HTTPManager.Logger.Exception(nameof(HTTP2ContentConsumer), "Sender thread", ex, this.Context); } finally { this.isRunning = false; #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), $"Sender thread closing - cleaning up remaining requests({this.clientInitiatedStreams.Count})...", this.Context); #endif if (string.IsNullOrEmpty(abortWithMessage)) abortWithMessage = "Connection closed unexpectedly"; for (int i = 0; i < this.clientInitiatedStreams.Count; ++i) this.clientInitiatedStreams[i].Abort(abortWithMessage); this.clientInitiatedStreams.Clear(); #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Sender thread closing", this.Context); #endif ConnectionEventHelper.EnqueueConnectionEvent(new ConnectionEventInfo(this.conn, HTTPConnectionStates.Closed)); } } private void OnRemoteSettingChanged(HTTP2SettingsRegistry registry, HTTP2Settings setting, uint oldValue, uint newValue) { switch (setting) { case HTTP2Settings.INITIAL_WINDOW_SIZE: this.remoteWindow = newValue - (oldValue - this.remoteWindow); break; } } public void SetBinding(PeekableContentProviderStream contentProvider) => this.ContentProvider = contentProvider; public void UnsetBinding() => this.ContentProvider = null; public void OnContent() { try { while (this.isRunning && HTTP2FrameHelper.CanReadFullFrame(this.ContentProvider)) { HTTP2FrameHeaderAndPayload header = HTTP2FrameHelper.ReadHeader(this.ContentProvider, this.Context); #if ENABLE_LOGGING if (HTTPManager.Logger.IsDiagnostic /*&& header.Type != HTTP2FrameTypes.DATA /*&& header.Type != HTTP2FrameTypes.PING*/) HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "New frame received: " + header.ToString(), this.Context); #endif // Add the new frame to the queue. Processing it on the write thread gives us the advantage that // we don't have to deal with too much locking. this.newFrames.Enqueue(header); } } catch (Exception ex) { HTTPManager.Logger.Exception(nameof(HTTP2ContentConsumer), "", ex, this.Context); } finally { // ping write thread to process the new frame this.newFrameSignal?.Set(); } } public void OnConnectionClosed() { #if ENABLE_LOGGING HTTPManager.Logger.Verbose(nameof(HTTP2ContentConsumer), $"{nameof(OnConnectionClosed)}({this.isRunning})", this.Context); #endif this.isRunning = false; this.newFrameSignal?.Set(); } public void OnError(Exception ex) { #if ENABLE_LOGGING HTTPManager.Logger.Exception(nameof(HTTP2ContentConsumer), $"{nameof(OnError)}({this.isRunning}, {ex})", ex, this.Context); #endif this.isRunning = false; this.newFrameSignal?.Set(); } private double CalculateLatency() { if (this.rtts.Count == 0) return 0; double sumLatency = 0; for (int i = 0; i < this.rtts.Count; ++i) sumLatency += this.rtts[i]; return sumLatency / this.rtts.Count; } HTTP2Stream FindStreamById(UInt32 streamId) { for (int i = 0; i < this.clientInitiatedStreams.Count; ++i) { var stream = this.clientInitiatedStreams[i]; if (stream.Id == streamId) return stream; } return null; } public ShutdownTypes ShutdownType { get; private set; } public void Shutdown(ShutdownTypes type) { this.ShutdownType = type; switch (this.ShutdownType) { case ShutdownTypes.Gentle: this.newFrameSignal.Set(); break; case ShutdownTypes.Immediate: this.conn?.TopStream?.Dispose(); break; } } public void Dispose() { #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Dispose", this.Context); #endif while (this.newFrames.TryDequeue(out var frame)) BufferPool.Release(frame.Payload); foreach (var frame in this.outgoingFrames) BufferPool.Release(frame.Payload); this.outgoingFrames.Clear(); HTTPRequest request = null; while (this.requestQueue.TryDequeue(out request)) { #if ENABLE_LOGGING HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), string.Format("Dispose - Request '{0}' IsCancellationRequested: {1}", request.CurrentUri.ToString(), request.IsCancellationRequested.ToString()), this.Context); #endif RequestEventHelper.EnqueueRequestEvent(request.IsCancellationRequested ? new RequestEventInfo(request, HTTPRequestStates.Aborted, null) : new RequestEventInfo(request, RequestEvents.Resend)); } this.newFrameSignal?.Close(); this.newFrameSignal = null; } } } #endif