xtool/contrib/fundamentals/TCP/flcTCPConnection.pas

2958 lines
82 KiB
ObjectPascal

{******************************************************************************}
{ }
{ Library: Fundamentals 5.00 }
{ File name: flcTCPConnection.pas }
{ File version: 5.34 }
{ Description: TCP connection. }
{ }
{ Copyright: Copyright (c) 2007-2020, David J Butler }
{ All rights reserved. }
{ This file is licensed under the BSD License. }
{ See http://www.opensource.org/licenses/bsd-license.php }
{ Redistribution and use in source and binary forms, with }
{ or without modification, are permitted provided that }
{ the following conditions are met: }
{ Redistributions of source code must retain the above }
{ copyright notice, this list of conditions and the }
{ following disclaimer. }
{ THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND }
{ CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED }
{ WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED }
{ WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A }
{ PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL }
{ THE REGENTS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, }
{ INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR }
{ CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, }
{ PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF }
{ USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) }
{ HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER }
{ IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING }
{ NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE }
{ USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE }
{ POSSIBILITY OF SUCH DAMAGE. }
{ }
{ Github: https://github.com/fundamentalslib }
{ E-mail: fundamentals.library at gmail.com }
{ }
{ Revision history: }
{ }
{ 2008/12/23 0.01 Initial development. }
{ 2010/11/07 0.02 Revision. }
{ 2010/11/12 0.03 Refactor for asynchronous operation. }
{ 2010/12/03 0.04 Connection proxies. }
{ 2010/12/17 0.05 Throttling. }
{ 2010/12/19 0.06 Multiple connection proxies. }
{ 2010/12/29 0.07 Report read/write rates. }
{ 2011/01/02 0.08 Bug fix in PollSocket routine. }
{ 2011/06/16 0.09 Fix TriggerRead with proxies. }
{ 2011/06/18 0.10 Fix Read error in PollSocket when closed by proxies. }
{ 2011/06/25 0.11 Improve Write notifications. }
{ 2011/06/26 0.12 Implement defered shutdown if write buffer not empty. }
{ 2011/07/03 0.13 WriteBufferEmpty event. }
{ 2011/07/04 0.14 Trigger read events outside lock. }
{ 2011/07/24 0.15 Discard method. }
{ 2011/07/31 0.16 Defer close from proxy until after read. }
{ 2011/09/03 4.17 Revise for Fundamentals 4. }
{ 2011/09/10 4.18 Improve locking granularity. }
{ 2011/09/15 4.19 Improve polling efficiency. }
{ 2011/10/06 4.20 Fix TCPTick frequency. }
{ 2015/03/14 4.21 RawByteString changes. }
{ 2015/04/09 4.22 TBytes functions. }
{ 2015/04/26 4.23 Blocking interface and worker thread. }
{ 2016/01/09 5.24 Revised for Fundamentals 5. }
{ 2018/08/30 5.25 PollSocket returns Terminated on exception. }
{ 2018/09/08 5.26 PollSocket remove use of Select. }
{ 2018/09/10 5.27 PollSocket change to ProcessSocket. }
{ 2018/12/31 5.28 CreationTime and LastActivityTime properties. }
{ 2019/04/10 5.29 Locking changes. }
{ 2019/04/11 5.30 Shutdown send handling. }
{ 2019/04/13 5.31 Shutdown receive handling. }
{ 2019/12/30 5.32 Initialise buffer sizes on creation. }
{ 2020/03/28 5.33 Update LastReadActivityTime when read is from socket. }
{ 2020/05/02 5.34 Log exceptions raised in event handlers. }
{ }
{******************************************************************************}
{$INCLUDE ../flcInclude.inc}
{$INCLUDE flcTCP.inc}
unit flcTCPConnection;
interface
uses
{ System }
SysUtils,
{$IFDEF WindowsPlatform}
Windows,
{$ENDIF}
SyncObjs,
Classes,
{ Utils }
flcStdTypes,
{ Socket }
flcSocket,
{ TCP }
flcTCPBuffer;
{ }
{ TCP Connection }
{ }
type
ETCPConnection = class(Exception);
TTCPLogType = (
tlDebug,
tlParameter,
tlInfo,
tlWarning,
tlError
);
//tlCritical
TTCPConnectionProxyState = (
prsInit, // proxy is initialising
prsNegotiating, // proxy is negotiating, connection data is not yet being transferred
prsFiltering, // proxy has successfully completed negotiation and is actively filtering connection data
prsFinished, // proxy has successfully completed operation and can be bypassed
prsError, // proxy has failed and connection is invalid
prsClosed // proxy has closed the connection
);
TTCPConnectionProxy = class;
TTCPConnectionProxyEvent = procedure (const AProxy: TTCPConnectionProxy) of object;
TTCPConnectionProxyLogEvent = procedure (
const AProxy: TTCPConnectionProxy;
const LogType: TTCPLogType; const LogMsg: String; const LogLevel: Integer) of object;
TTCPConnectionProxyStateEvent = procedure (
const AProxy: TTCPConnectionProxy;
const AState: TTCPConnectionProxyState) of object;
TTCPConnectionProxyDataEvent = procedure (
const AProxy: TTCPConnectionProxy; const Buf; const BufSize: Integer) of object;
TTCPConnectionProxy = class
private
FOnLog : TTCPConnectionProxyLogEvent;
FOnStateChange : TTCPConnectionProxyStateEvent;
FOnConnectionClose : TTCPConnectionProxyEvent;
FOnConnectionPutReadData : TTCPConnectionProxyDataEvent;
FOnConnectionPutWriteData : TTCPConnectionProxyDataEvent;
FNextProxy : TTCPConnectionProxy;
FState : TTCPConnectionProxyState;
protected
FErrorMessage : String;
procedure Log(const LogType: TTCPLogType; const LogMsg: String; const LogLevel: Integer = 0); overload;
procedure Log(const LogType: TTCPLogType; const LogMsg: String; const LogArgs: array of const; const LogLevel: Integer = 0); overload;
function GetStateStr: String;
procedure SetState(const AState: TTCPConnectionProxyState);
procedure SetStateError(const AErrorMessage: String);
procedure ConnectionClose;
procedure ConnectionPutReadData(const Buf; const BufSize: Integer);
procedure ConnectionPutWriteData(const Buf; const BufSize: Integer);
procedure ProxyStart; virtual; abstract;
public
class function ProxyName: String; virtual;
constructor Create;
procedure Finalise;
property OnLog: TTCPConnectionProxyLogEvent read FOnLog write FOnLog;
property OnStateChange: TTCPConnectionProxyStateEvent read FOnStateChange write FOnStateChange;
property OnConnectionClose: TTCPConnectionProxyEvent read FOnConnectionClose write FOnConnectionClose;
property OnConnectionPutReadData: TTCPConnectionProxyDataEvent read FOnConnectionPutReadData write FOnConnectionPutReadData;
property OnConnectionPutWriteData: TTCPConnectionProxyDataEvent read FOnConnectionPutWriteData write FOnConnectionPutWriteData;
property State: TTCPConnectionProxyState read FState;
property StateStr: String read GetStateStr;
property ErrorMessage: String read FErrorMessage;
procedure Start;
procedure ProcessReadData(const Buf; const BufSize: Integer); virtual; abstract;
procedure ProcessWriteData(const Buf; const BufSize: Integer); virtual; abstract;
end;
TTCPConnectionProxyList = class
private
FList : array of TTCPConnectionProxy;
function GetCount: Integer;
function GetItem(const AIdx: Integer): TTCPConnectionProxy;
function GetLastItem: TTCPConnectionProxy;
public
destructor Destroy; override;
procedure Finalise;
property Count: Integer read GetCount;
property Item[const AIdx: Integer]: TTCPConnectionProxy read GetItem; default;
procedure Add(const AProxy: TTCPConnectionProxy);
property LastItem: TTCPConnectionProxy read GetLastItem;
end;
TTCPConnectionState = (
cnsInit, // Initialising/Connecting
cnsProxyNegotiation, // Proxy busy with negotiation
cnsConnected, // Connected
cnsClosed // Closed
);
TTCPConnectionStates = set of TTCPConnectionState;
TTCPConnectionTransferState = record
LastUpdate : Word64;
TransferBytes : Int64;
TransferRate : Word64;
end;
TRawByteCharSet = set of AnsiChar;
TTCPBlockingConnection = class;
TTCPConnection = class;
TTCPConnectionNotifyEvent = procedure (AConnection: TTCPConnection) of object;
TTCPConnectionStateChangeEvent = procedure (AConnection: TTCPConnection; AState: TTCPConnectionState) of object;
TTCPConnectionLogEvent = procedure (AConnection: TTCPConnection; LogType: TTCPLogType; LogMsg: String; LogLevel: Integer) of object;
TTCPConnectionWorkerExecuteEvent = procedure (AConnection: TTCPConnection;
ABlockingConnection: TTCPBlockingConnection; var CloseOnExit: Boolean) of object;
TTCPConnection = class
protected
// parameters
FSocket : TSysSocket;
FReadThrottle : Boolean;
FReadThrottleRate : Integer;
FWriteThrottle : Boolean;
FWriteThrottleRate : Integer;
FTrackLastActivityTime : Boolean;
FUseWorkerThread : Boolean;
FUserTag : NativeInt;
FUserObject : TObject;
// events
FOnLog : TTCPConnectionLogEvent;
FOnStateChange : TTCPConnectionStateChangeEvent;
FOnReady : TTCPConnectionNotifyEvent;
FOnClose : TTCPConnectionNotifyEvent;
FOnReadShutdown : TTCPConnectionNotifyEvent;
FOnShutdown : TTCPConnectionNotifyEvent;
FOnRead : TTCPConnectionNotifyEvent;
FOnWrite : TTCPConnectionNotifyEvent;
FOnReadActivity : TTCPConnectionNotifyEvent;
FOnWriteActivity : TTCPConnectionNotifyEvent;
FOnReadBufferFull : TTCPConnectionNotifyEvent;
FOnWriteBufferEmpty : TTCPConnectionNotifyEvent;
FOnWait : TTCPConnectionNotifyEvent;
FOnWorkerExecute : TTCPConnectionWorkerExecuteEvent;
FOnWorkerFinished : TTCPConnectionNotifyEvent;
// state
FLock : TCriticalSection;
FState : TTCPConnectionState;
FErrorMessage : String;
FCreationTime : TDateTime; //// InitialiseTime
FReadBuffer : TTCPBuffer;
FWriteBuffer : TTCPBuffer;
FReadTransferState : TTCPConnectionTransferState;
FWriteTransferState : TTCPConnectionTransferState;
FProxyList : TTCPConnectionProxyList;
FProxyConnection : Boolean;
FReadyNotified : Boolean;
FReadEventPending : Boolean;
FReadBufferFull : Boolean;
FReadProcessPending : Boolean;
FReadActivityPending : Boolean;
FWriteEventPending : Boolean;
FShutdownSendPending : Boolean;
FShutdownSent : Boolean;
FShutdownRecv : Boolean;
FShutdownComplete : Boolean;
FClosePending : Boolean;
FLastReadActivityTime : TDateTime;
FLastWriteActivityTime : TDateTime;
FBlockingConnection : TTCPBlockingConnection;
FWorkerThread : TThread;
FWorkerErrorMsg : String;
FWorkerErrorClass : String;
procedure Init; virtual;
procedure InitBuffers(
const AReadBufferMinSize: Int32;
const AReadBufferMaxSize: Int32;
const AWriteBufferMinSize: Int32;
const AWriteBufferMaxSize: Int32);
procedure Lock;
procedure Unlock;
procedure Log(const LogType: TTCPLogType; const LogMsg: String; const LogLevel: Integer = 0); overload;
procedure Log(const LogType: TTCPLogType; const LogMsg: String; const LogArgs: array of const; const LogLevel: Integer = 0); overload;
function GetState: TTCPConnectionState;
function GetStateStr: String;
procedure SetStateProxyNegotiation;
procedure SetStateFailed;
procedure SetStateConnected;
function SetStateClosed: Boolean;
function GetReadBufferMinSize: Int32;
function GetReadBufferMaxSize: Int32;
function GetWriteBufferMinSize: Int32;
function GetWriteBufferMaxSize: Int32;
procedure SetReadBufferMinSize(const AReadBufferMinSize: Int32);
procedure SetReadBufferMaxSize(const AReadBufferMaxSize: Int32);
procedure SetWriteBufferMinSize(const AWriteBufferMinSize: Int32);
procedure SetWriteBufferMaxSize(const AWriteBufferMaxSize: Int32);
function GetSocketReadBufferSize: Integer;
function GetSocketWriteBufferSize: Integer;
procedure SetSocketReadBufferSize(const Size: Integer);
procedure SetSocketWriteBufferSize(const Size: Integer);
function GetReadBufferUsed: Integer;
function GetWriteBufferUsed: Integer;
function GetReadBufferAvailable: Integer;
function GetWriteBufferAvailable: Integer;
procedure SetReadThrottle(const AReadThrottle: Boolean);
procedure SetWriteThrottle(const AWriteThrottle: Boolean);
function GetReadRate: Integer;
function GetWriteRate: Integer;
procedure TriggerStateChange;
procedure TriggerReady;
procedure TriggerReadShutdown;
procedure TriggerShutdown;
procedure TriggerClose;
procedure TriggerRead;
procedure TriggerWrite;
procedure TriggerReadActivity;
procedure TriggerWriteActivity;
procedure TriggerReadBufferFull;
procedure TriggerWriteBufferEmpty;
procedure TriggerWait;
procedure TriggerWorkerFinished;
function GetFirstActiveProxy: TTCPConnectionProxy;
procedure ProxyProcessReadData(const Buf; const BufSize: Integer; out ReadEventPending: Boolean);
procedure ProxyProcessWriteData(const Buf; const BufSize: Integer);
procedure ProxyLog(const AProxy: TTCPConnectionProxy; const LogType: TTCPLogType; const LogMsg: String; const LogLevel: Integer);
procedure ProxyConnectionClose(const AProxy: TTCPConnectionProxy);
procedure ProxyStateChange(const AProxy: TTCPConnectionProxy; const AState: TTCPConnectionProxyState);
procedure ProxyConnectionPutReadData(const AProxy: TTCPConnectionProxy; const Buf; const BufSize: Integer);
procedure ProxyConnectionPutWriteData(const AProxy: TTCPConnectionProxy; const Buf; const BufSize: Integer);
procedure StartProxies(out AProxiesFinished: Boolean);
function FillBufferFromSocket(out RecvShutdown, RecvClosed, ReadEventPending, ReadBufFullEventPending: Boolean): Integer;
function WriteBufferToSocket(out BufferEmptyBefore, BufferEmptied: Boolean): Integer;
function GetLastReadActivityTime: TDateTime;
function GetLastWriteActivityTime: TDateTime;
function GetLastActivityTime: TDateTime;
function LocateByteCharInBuffer(const ADelimiter: ByteCharSet; const AMaxSize: Integer): Integer;
function LocateByteStrInBuffer(const ADelimiter: RawByteString; const AMaxSize: Integer): Integer;
function WriteToTransport(const Buf; const BufSize: Integer): Integer;
procedure DoShutdown(out AShutdownComplete: Boolean);
procedure Wait;
function GetBlockingConnection: TTCPBlockingConnection;
procedure StartWorkerThread;
procedure WorkerThreadExecute(const AThread: TThread);
public
constructor Create(
const ASocket: TSysSocket;
const AReadBufferMinSize: Int32 = TCP_BUFFER_DEFAULTMINSIZE;
const AReadBufferMaxSize: Int32 = TCP_BUFFER_DEFAULTMAXSIZE;
const AWriteBufferMinSize: Int32 = TCP_BUFFER_DEFAULTMINSIZE;
const AWriteBufferMaxSize: Int32 = TCP_BUFFER_DEFAULTMAXSIZE
);
destructor Destroy; override;
procedure Finalise;
// Events
property OnLog: TTCPConnectionLogEvent read FOnLog write FOnLog;
property OnStateChange: TTCPConnectionStateChangeEvent read FOnStateChange write FOnStateChange;
property OnReady: TTCPConnectionNotifyEvent read FOnReady write FOnReady;
property OnReadShutdown: TTCPConnectionNotifyEvent read FOnReadShutdown write FOnReadShutdown;
property OnShutdown: TTCPConnectionNotifyEvent read FOnShutdown write FOnShutdown;
property OnClose: TTCPConnectionNotifyEvent read FOnClose write FOnClose;
property OnRead: TTCPConnectionNotifyEvent read FOnRead write FOnRead;
property OnWrite: TTCPConnectionNotifyEvent read FOnWrite write FOnWrite;
property OnReadBufferFull: TTCPConnectionNotifyEvent read FOnReadBufferFull write FOnReadBufferFull;
property OnWriteBufferEmpty: TTCPConnectionNotifyEvent read FOnWriteBufferEmpty write FOnWriteBufferEmpty;
property OnReadActivity: TTCPConnectionNotifyEvent read FOnReadActivity write FOnReadActivity; //// OnSocketReadActivity
property OnWriteActivity: TTCPConnectionNotifyEvent read FOnWriteActivity write FOnWriteActivity; ////
// Parameters
property Socket: TSysSocket read FSocket;
// Buffers sizes
property ReadBufferMinSize: Int32 read GetReadBufferMinSize write SetReadBufferMinSize;
property ReadBufferMaxSize: Int32 read GetReadBufferMaxSize write SetReadBufferMaxSize;
property WriteBufferMinSize: Int32 read GetWriteBufferMinSize write SetWriteBufferMinSize;
property WriteBufferMaxSize: Int32 read GetWriteBufferMaxSize write SetWriteBufferMaxSize;
// Socket buffer sizes
property SocketReadBufferSize: Integer read GetSocketReadBufferSize write SetSocketReadBufferSize;
property SocketWriteBufferSize: Integer read GetSocketWriteBufferSize write SetSocketWriteBufferSize;
// Proxies
procedure AddProxy(const AProxy: TTCPConnectionProxy);
// State
property State: TTCPConnectionState read GetState;
property StateStr: String read GetStateStr;
property ErrorMessage: String read FErrorMessage;
property CreationTime: TDateTime read FCreationTime;
procedure Start;
// Throttling
property ReadThrottle: Boolean read FReadThrottle write SetReadThrottle;
property ReadThrottleRate: Integer read FReadThrottleRate write FReadThrottleRate;
property WriteThrottle: Boolean read FWriteThrottle write SetWriteThrottle;
property WriteThrottleRate: Integer read FWriteThrottleRate write FWriteThrottleRate;
property ReadRate: Integer read GetReadRate;
property WriteRate: Integer read GetWriteRate;
// Poll routines
procedure GetEventsToPoll(out WritePoll: Boolean);
procedure ProcessSocket(
const ProcessRead, ProcessWrite: Boolean;
const ActivityTime: TDateTime;
out Idle, Terminated: Boolean);
// Last activity times
property TrackLastActivityTime: Boolean read FTrackLastActivityTime write FTrackLastActivityTime;
property LastReadActivityTime: TDateTime read GetLastReadActivityTime;
property LastWriteActivityTime: TDateTime read GetLastWriteActivityTime;
property LastActivityTime: TDateTime read GetLastActivityTime;
// Buffer status
property ReadBufferUsed: Integer read GetReadBufferUsed;
property WriteBufferUsed: Integer read GetWriteBufferUsed;
property ReadBufferAvailable: Integer read GetReadBufferAvailable;
property WriteBufferAvailable: Integer read GetWriteBufferAvailable;
// Read
function Read(var Buf; const BufSize: Integer): Integer;
function ReadByteString(const AStrLen: Integer): RawByteString;
function ReadBytes(const ASize: Integer): TBytes;
// Discard
function Discard(const ASize: Integer): Integer;
// Peek
function Peek(var Buf; const BufSize: Int32): Integer;
function PeekByte(out B: Byte): Boolean;
function PeekByteString(const AStrLen: Integer): RawByteString;
function PeekBytes(const ASize: Integer): TBytes;
function PeekDelimited(var Buf; const BufSize: Integer;
const ADelimiter: TRawByteCharSet;
const AMaxSize: Integer = -1): Integer; overload;
function PeekDelimited(var Buf; const BufSize: Integer;
const ADelimiter: RawByteString;
const AMaxSize: Integer = -1): Integer; overload;
// ReadLine
function ReadLine(var Line: RawByteString;
const ADelimiter: RawByteString;
const AMaxLineLength: Integer = -1): Boolean;
// Write
function Write(const Buf; const BufSize: Integer): Integer;
function WriteByteString(const AStr: RawByteString): Integer;
function WriteBytes(const B: TBytes): Integer;
// Shutdown/Close
// Shutdown initiates a graceful shutdown.
// Close closes connection immediately.
procedure Shutdown;
function IsShutdownComplete: Boolean;
procedure Close;
// Worker thread
// Set UseWorkerThread to True and set OnWorkerExecute to execute
// OnWorkerExecute in a separate thread.
// The thread is launched when the connection state is Connected.
// WorkerErrorClass and WorkerErrorMsg is set to the exception details if an
// unhandled exception is raised in OnWorkerExecute.
property UseWorkerThread: Boolean read FUseWorkerThread write FUseWorkerThread;
property OnWorkerExecute: TTCPConnectionWorkerExecuteEvent read FOnWorkerExecute write FOnWorkerExecute;
property WorkerErrorClass: String read FWorkerErrorClass;
property WorkerErrorMsg: String read FWorkerErrorMsg;
procedure TerminateWorkerThread;
procedure WaitForWorkerThread;
// Blocking
// The blocking interface is available from the worker thread.
// OnWait is called when blocking occurs.
property BlockingConnection: TTCPBlockingConnection read GetBlockingConnection;
property OnWait: TTCPConnectionNotifyEvent read FOnWait write FOnWait;
// User defined values
property UserTag: NativeInt read FUserTag write FUserTag;
property UserObject: TObject read FUserObject write FUserObject;
end;
TTCPConnectionClass = class of TTCPConnection;
// Blocking connection
// These methods will block until a result is available or timeout expires.
// If TimeOut is set to -1 then method may wait indefinetely for a result.
// Note: These functions should not be called from this object's event handlers
// or from the main thread.
// These functions can only be called from the worker thread or any other user thread.
TTCPBlockingConnection = class
private
FConnection : TTCPConnection;
procedure Wait;
public
constructor Create(const AConnection : TTCPConnection);
destructor Destroy; override;
procedure Finalise;
property Connection: TTCPConnection read FConnection;
function WaitForState(const AStates: TTCPConnectionStates; const ATimeOutMs: Integer): TTCPConnectionState;
function WaitForReceiveData(const ABufferSize: Integer; const ATimeOutMs: Integer): Boolean;
function WaitForTransmitFin(const ATimeOutMs: Integer): Boolean;
function WaitForClose(const ATimeOutMs: Integer): Boolean;
function Read(var Buf; const BufferSize: Integer; const ATimeOutMs: Integer = -1): Integer;
function Write(const Buf; const BufferSize: Integer; const ATimeOutMs: Integer = -1): Integer;
procedure Shutdown(
const SettleTimeMs: Integer = 2000;
const TransmitTimeOutMs: Integer = 15000;
const CloseTimeOutMs: Integer = 45000);
procedure Close(const ATimeOutMs: Integer = 10000);
end;
implementation
uses
{ System }
{$IFDEF POSIX}
{$IFDEF DELPHI}
Posix.Time,
{$ENDIF}
{$ENDIF}
{$IFDEF POSIX}
{$IFDEF FREEPASCAL}
BaseUnix,
Unix,
{$ENDIF}
{$ENDIF}
{ Sockets }
flcSocketLib,
{ TCP }
flcTCPUtils;
{ }
{ TCP Connection Transfer Statistics }
{ }
// Reset transfer statistics
procedure TCPConnectionTransferReset(var State: TTCPConnectionTransferState);
begin
State.LastUpdate := TCPGetTick;
State.TransferBytes := 0;
State.TransferRate := 0;
end;
// Update the transfer's internal state for elapsed time
procedure TCPConnectionTransferUpdate(
var State: TTCPConnectionTransferState;
const CurrentTick: Word64;
out Elapsed: Int64);
begin
Elapsed := TCPTickDelta(State.LastUpdate, CurrentTick);
Assert(Elapsed >= 0);
// wait at least 1000ms between updates
if Elapsed < 1000 then
exit;
// update transfer rate
State.TransferRate := (State.TransferBytes * 1000) div Elapsed; // bytes per second
// scale down
while Elapsed > 60 do
begin
Elapsed := Elapsed div 2;
State.TransferBytes := State.TransferBytes div 2;
end;
State.LastUpdate := TCPTickDeltaU(Word64(Elapsed), CurrentTick);
end;
// Returns the number of bytes that can be transferred with this throttle in place
function TCPConnectionTransferThrottledSize(
var State: TTCPConnectionTransferState;
const CurrentTick: Word32;
const MaxTransferRate: Integer;
const BufferSize: Integer): Integer;
var
Elapsed : Int64;
Quota : Int64;
QuotaFree : Int64;
begin
Assert(MaxTransferRate > 0);
TCPConnectionTransferUpdate(State, CurrentTick, Elapsed);
Quota := ((Elapsed + 30) * MaxTransferRate) div 1000; // quota allowed over Elapsed period
QuotaFree := Quota - State.TransferBytes; // quota remaining
if QuotaFree >= BufferSize then
Result := BufferSize else
if QuotaFree <= 0 then
Result := 0
else
Result := QuotaFree;
end;
// Updates transfer statistics for a number of bytes transferred
procedure TCPConnectionTransferredBytes(
var State: TTCPConnectionTransferState;
const ByteCount: Integer); {$IFDEF UseInline}inline;{$ENDIF}
begin
Inc(State.TransferBytes, ByteCount);
end;
{ }
{ Error and debug strings }
{ }
const
SError_InvalidParameter = 'Invalid parameter';
SError_TimedOut = 'Timed out';
SError_ConnectionClosed = 'Connection closed';
SConnectionProxyState : array[TTCPConnectionProxyState] of String = (
'Init',
'Negotiating',
'Filtering',
'Finished',
'Error',
'Closed'
);
SConnectionState : array[TTCPConnectionState] of String = (
'Init',
'ProxyNegotiation',
////'Failed',
'Connected',
'Closed'
);
{ }
{ TCP Connection Proxy }
{ }
class function TTCPConnectionProxy.ProxyName: String;
begin
Result := ClassName;
end;
constructor TTCPConnectionProxy.Create;
begin
inherited Create;
FState := prsInit;
end;
procedure TTCPConnectionProxy.Finalise;
begin
end;
procedure TTCPConnectionProxy.Log(const LogType: TTCPLogType; const LogMsg: String; const LogLevel: Integer);
begin
if Assigned(FOnLog) then
FOnLog(Self, LogType, LogMsg, LogLevel);
end;
procedure TTCPConnectionProxy.Log(const LogType: TTCPLogType; const LogMsg: String; const LogArgs: array of const; const LogLevel: Integer);
begin
Log(LogType, Format(LogMsg, LogArgs), LogLevel);
end;
function TTCPConnectionProxy.GetStateStr: String;
begin
Result := SConnectionProxyState[FState];
end;
procedure TTCPConnectionProxy.SetState(const AState: TTCPConnectionProxyState);
begin
if AState = FState then
exit;
{$IFDEF TCP_DEBUG}
Log(tlDebug, 'State:%s', [SConnectionProxyState[AState]]);
{$ENDIF}
FState := AState;
if Assigned(FOnStateChange) then
FOnStateChange(Self, AState);
end;
procedure TTCPConnectionProxy.SetStateError(const AErrorMessage: String);
begin
if FState = prsError then
exit;
FErrorMessage := AErrorMessage;
SetState(prsError);
end;
procedure TTCPConnectionProxy.ConnectionClose;
begin
{$IFDEF TCP_DEBUG}
Log(tlDebug, 'Close');
{$ENDIF}
if Assigned(FOnConnectionClose) then
FOnConnectionClose(Self);
end;
procedure TTCPConnectionProxy.ConnectionPutReadData(const Buf; const BufSize: Integer);
begin
{$IFDEF TCP_DEBUG_DATA}
Log(tlDebug, 'PutRead:%db', [BufSize]);
{$ENDIF}
if Assigned(FOnConnectionPutReadData) then
FOnConnectionPutReadData(Self, Buf, BufSize);
end;
procedure TTCPConnectionProxy.ConnectionPutWriteData(const Buf; const BufSize: Integer);
begin
{$IFDEF TCP_DEBUG_DATA}
Log(tlDebug, 'PutWrite:%db', [BufSize]);
{$ENDIF}
if Assigned(FOnConnectionPutWriteData) then
FOnConnectionPutWriteData(Self, Buf, BufSize);
end;
procedure TTCPConnectionProxy.Start;
begin
{$IFDEF TCP_DEBUG}
Log(tlDebug, 'Start');
{$ENDIF}
ProxyStart;
end;
{ }
{ TCP Connection Proxy List }
{ }
destructor TTCPConnectionProxyList.Destroy;
var
I : Integer;
begin
for I := Length(FList) - 1 downto 0 do
FreeAndNil(FList[I]);
FList := nil;
inherited Destroy;
end;
procedure TTCPConnectionProxyList.Finalise;
var
I : Integer;
begin
for I := Length(FList) - 1 downto 0 do
FList[I].Finalise;
end;
function TTCPConnectionProxyList.GetCount: Integer;
begin
Result := Length(FList);
end;
function TTCPConnectionProxyList.GetItem(const AIdx: Integer): TTCPConnectionProxy;
begin
Assert((AIdx >= 0) and (AIdx < Length(FList)));
Result := FList[AIdx];
end;
function TTCPConnectionProxyList.GetLastItem: TTCPConnectionProxy;
var
L : Integer;
begin
L := Length(FList);
if L = 0 then
Result := nil
else
Result := FList[L - 1];
end;
procedure TTCPConnectionProxyList.Add(const AProxy: TTCPConnectionProxy);
var
L : Integer;
begin
Assert(Assigned(AProxy));
L := Length(FList);
SetLength(FList, L + 1);
FList[L] := AProxy;
end;
{ }
{ TCP Connection Worker Thread }
{ }
type
TTCPConnectionWorkerThread = class(TThread)
private
FConnection: TTCPConnection;
protected
procedure Execute; override;
public
constructor Create(const Connection: TTCPConnection);
property Terminated;
end;
constructor TTCPConnectionWorkerThread.Create(const Connection: TTCPConnection);
begin
Assert(Assigned(Connection));
FConnection := Connection;
FreeOnTerminate := False;
inherited Create(False);
end;
procedure TTCPConnectionWorkerThread.Execute;
begin
FConnection.WorkerThreadExecute(self);
end;
{ }
{ TCP Connection }
{ }
constructor TTCPConnection.Create(
const ASocket: TSysSocket;
const AReadBufferMinSize: Int32;
const AReadBufferMaxSize: Int32;
const AWriteBufferMinSize: Int32;
const AWriteBufferMaxSize: Int32);
begin
Assert(Assigned(ASocket));
inherited Create;
FSocket := ASocket;
Init;
InitBuffers(
AReadBufferMinSize,
AReadBufferMaxSize,
AWriteBufferMinSize,
AWriteBufferMaxSize);
end;
destructor TTCPConnection.Destroy;
begin
if Assigned(FWorkerThread) then
try
FWorkerThread.Terminate;
FWorkerThread.WaitFor;
FreeAndNil(FWorkerThread);
except
end;
FreeAndNil(FBlockingConnection);
TCPBufferFinalise(FWriteBuffer);
TCPBufferFinalise(FReadBuffer);
FreeAndNil(FProxyList);
FreeAndNil(FLock);
inherited Destroy;
end;
procedure TTCPConnection.Finalise;
begin
if Assigned(FBlockingConnection) then
FBlockingConnection.Finalise;
if Assigned(FProxyList) then
FProxyList.Finalise;
FUserObject := nil;
end;
procedure TTCPConnection.Init;
var
N : TDateTime;
begin
FState := cnsInit;
FReadThrottle := False;
FWriteThrottle := False;
FLock := TCriticalSection.Create;
FProxyList := TTCPConnectionProxyList.Create;
FProxyConnection := False;
N := Now;
FCreationTime := N;
FTrackLastActivityTime := False;
FLastReadActivityTime := 0.0;
FLastWriteActivityTime := 0.0;
end;
procedure TTCPConnection.InitBuffers(
const AReadBufferMinSize: Int32;
const AReadBufferMaxSize: Int32;
const AWriteBufferMinSize: Int32;
const AWriteBufferMaxSize: Int32);
begin
TCPBufferInitialise(FReadBuffer, AReadBufferMaxSize, AReadBufferMinSize);
TCPBufferInitialise(FWriteBuffer, AWriteBufferMaxSize, AWriteBufferMinSize);
end;
procedure TTCPConnection.Lock;
begin
if Assigned(FLock) then
FLock.Acquire;
end;
procedure TTCPConnection.Unlock;
begin
if Assigned(FLock) then
FLock.Release;
end;
procedure TTCPConnection.Log(const LogType: TTCPLogType; const LogMsg: String; const LogLevel: Integer);
begin
if Assigned(FOnLog) then
FOnLog(self, LogType, LogMsg, LogLevel);
end;
procedure TTCPConnection.Log(const LogType: TTCPLogType; const LogMsg: String; const LogArgs: array of const; const LogLevel: Integer);
begin
Log(LogType, Format(LogMsg, LogArgs), LogLevel);
end;
function TTCPConnection.GetReadBufferMinSize: Int32;
begin
Lock;
try
Result := TCPBufferGetMinSize(FReadBuffer);
finally
Unlock;
end;
end;
function TTCPConnection.GetReadBufferMaxSize: Int32;
begin
Lock;
try
Result := TCPBufferGetMaxSize(FReadBuffer);
finally
Unlock;
end;
end;
function TTCPConnection.GetWriteBufferMinSize: Int32;
begin
Lock;
try
Result := TCPBufferGetMinSize(FWriteBuffer);
finally
Unlock;
end;
end;
function TTCPConnection.GetWriteBufferMaxSize: Int32;
begin
Lock;
try
Result := TCPBufferGetMaxSize(FWriteBuffer);
finally
Unlock;
end;
end;
procedure TTCPConnection.SetReadBufferMinSize(const AReadBufferMinSize: Int32);
begin
Lock;
try
TCPBufferSetMinSize(FReadBuffer, AReadBufferMinSize);
finally
Unlock;
end;
end;
procedure TTCPConnection.SetReadBufferMaxSize(const AReadBufferMaxSize: Int32);
begin
Lock;
try
TCPBufferSetMaxSize(FReadBuffer, AReadBufferMaxSize);
finally
Unlock;
end;
end;
procedure TTCPConnection.SetWriteBufferMinSize(const AWriteBufferMinSize: Int32);
begin
Lock;
try
TCPBufferSetMinSize(FWriteBuffer, AWriteBufferMinSize);
finally
Unlock;
end;
end;
procedure TTCPConnection.SetWriteBufferMaxSize(const AWriteBufferMaxSize: Int32);
begin
Lock;
try
TCPBufferSetMaxSize(FWriteBuffer, AWriteBufferMaxSize);
finally
Unlock;
end;
end;
function TTCPConnection.GetSocketReadBufferSize: Integer;
begin
Lock;
try
Result := FSocket.ReceiveBufferSize;
finally
Unlock;
end;
end;
function TTCPConnection.GetSocketWriteBufferSize: Integer;
begin
Lock;
try
Result := FSocket.SendBufferSize;
finally
Unlock;
end;
end;
procedure TTCPConnection.SetSocketReadBufferSize(const Size: Integer);
begin
Lock;
try
FSocket.ReceiveBufferSize := Size;
finally
Unlock;
end;
end;
procedure TTCPConnection.SetSocketWriteBufferSize(const Size: Integer);
begin
Lock;
try
FSocket.SendBufferSize := Size;
finally
Unlock;
end;
end;
function TTCPConnection.GetReadBufferUsed: Integer;
begin
Lock;
try
Result := FReadBuffer.Used;
finally
Unlock;
end;
end;
function TTCPConnection.GetWriteBufferUsed: Integer;
begin
Lock;
try
Result := FWriteBuffer.Used;
finally
Unlock;
end;
end;
function TTCPConnection.GetReadBufferAvailable: Integer;
begin
Lock;
try
Result := TCPBufferAvailable(FReadBuffer);
finally
Unlock;
end;
end;
function TTCPConnection.GetWriteBufferAvailable: Integer;
begin
Lock;
try
Result := TCPBufferAvailable(FWriteBuffer);
finally
Unlock;
end;
end;
function TTCPConnection.GetState: TTCPConnectionState;
begin
Lock;
try
Result := FState;
finally
Unlock;
end;
end;
function TTCPConnection.GetStateStr: String;
begin
Result := SConnectionState[GetState];
end;
procedure TTCPConnection.SetStateProxyNegotiation;
begin
Lock;
try
FState := cnsProxyNegotiation;
finally
Unlock;
end;
{$IFDEF TCP_DEBUG}
Log(tlDebug, 'State:ProxyNegotiation');
{$ENDIF}
TriggerStateChange;
end;
procedure TTCPConnection.SetStateFailed;
begin
Lock;
try
if not (FState in [cnsInit, cnsProxyNegotiation]) then
exit;
////FState := cnsFailed;
finally
Unlock;
end;
{$IFDEF TCP_DEBUG}
Log(tlDebug, 'State:Failed');
{$ENDIF}
TriggerStateChange;
end;
procedure TTCPConnection.SetStateConnected;
var
NotifyReady : Boolean;
begin
Lock;
try
Assert(FState in [cnsInit, cnsProxyNegotiation, cnsClosed]);
FState := cnsConnected;
// Connection state can change from 'connected' back to 'proxy negotiation'
// and then back to 'connected' again.
// Trigger the ready event on the first change to 'connected'.
if not FReadyNotified then
begin
FReadyNotified := True;
NotifyReady := True;
end
else
NotifyReady := False;
finally
Unlock;
end;
TriggerStateChange;
if NotifyReady then
begin
Lock;
try
if FUseWorkerThread and Assigned(FOnWorkerExecute) then
StartWorkerThread;
finally
Unlock;
end;
TriggerReady;
end;
end;
function TTCPConnection.SetStateClosed: Boolean;
begin
Lock;
try
if FState = cnsClosed then
begin
Result := False;
exit;
end;
FState := cnsClosed;
finally
Unlock;
end;
TriggerStateChange;
TriggerClose;
Result := True;
end;
procedure TTCPConnection.AddProxy(const AProxy: TTCPConnectionProxy);
var
P : TTCPConnectionProxy;
DoNeg : Boolean;
begin
if not Assigned(AProxy) then
raise ETCPConnection.Create(SError_InvalidParameter);
{$IFDEF TCP_DEBUG}
Log(tlDebug, 'AddProxy:%s', [AProxy.ProxyName]);
{$ENDIF}
Lock;
try
// add to list
P := FProxyList.LastItem;
FProxyList.Add(AProxy);
if Assigned(P) then
P.FNextProxy := AProxy;
AProxy.FNextProxy := nil;
FProxyConnection := True;
// restart negotiation if connected
DoNeg := (FState = cnsConnected);
AProxy.OnLog := ProxyLog;
AProxy.OnStateChange := ProxyStateChange;
AProxy.OnConnectionClose := ProxyConnectionClose;
AProxy.OnConnectionPutReadData := ProxyConnectionPutReadData;
AProxy.OnConnectionPutWriteData := ProxyConnectionPutWriteData;
finally
Unlock;
end;
if DoNeg then
begin
SetStateProxyNegotiation;
AProxy.Start;
end;
end;
procedure TTCPConnection.SetReadThrottle(const AReadThrottle: Boolean);
begin
Lock;
try
FReadThrottle := AReadThrottle;
finally
Unlock;
end;
end;
procedure TTCPConnection.SetWriteThrottle(const AWriteThrottle: Boolean);
begin
Lock;
try
FWriteThrottle := AWriteThrottle;
finally
Unlock;
end;
end;
function TTCPConnection.GetReadRate: Integer;
var
E : Int64;
begin
Lock;
try
if not FReadThrottle then
TCPConnectionTransferUpdate(FReadTransferState, TCPGetTick, E);
Result := FReadTransferState.TransferRate;
finally
Unlock;
end;
end;
function TTCPConnection.GetWriteRate: Integer;
var
E : Int64;
begin
Lock;
try
if not FWriteThrottle then
TCPConnectionTransferUpdate(FWriteTransferState, TCPGetTick, E);
Result := FWriteTransferState.TransferRate;
finally
Unlock;
end;
end;
procedure TTCPConnection.TriggerStateChange;
begin
if Assigned(FOnStateChange) then
try
FOnStateChange(self, FState);
except
on E : Exception do
Log(tlError, 'TriggerStateChange.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerReady;
begin
if Assigned(FOnReady) then
try
FOnReady(self);
except
on E : Exception do
Log(tlError, 'TriggerReady.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerReadShutdown;
begin
if Assigned(FOnReadShutdown) then
try
FOnReadShutdown(self);
except
on E : Exception do
Log(tlError, 'TriggerReadShutdown.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerShutdown;
begin
if Assigned(FOnShutdown) then
try
FOnShutdown(self);
except
on E : Exception do
Log(tlError, 'TriggerShutdown.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerClose;
begin
if Assigned(FOnClose) then
try
FOnClose(self);
except
on E : Exception do
Log(tlError, 'TriggerClose.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerRead;
begin
if Assigned(FOnRead) then
try
FOnRead(self);
except
on E : Exception do
Log(tlError, 'TriggerRead.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerWrite;
begin
if Assigned(FOnWrite) then
try
FOnWrite(self);
except
on E : Exception do
Log(tlError, 'TriggerWrite.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerReadActivity;
begin
if Assigned(FOnReadActivity) then
try
FOnReadActivity(self);
except
on E : Exception do
Log(tlError, 'TriggerReadActivity.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerWriteActivity;
begin
if Assigned(FOnWriteActivity) then
try
FOnWriteActivity(self);
except
on E : Exception do
Log(tlError, 'TriggerWriteActivity.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerReadBufferFull;
begin
if Assigned(FOnReadBufferFull) then
try
FOnReadBufferFull(self);
except
on E : Exception do
Log(tlError, 'TriggerReadBufferFull.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerWriteBufferEmpty;
begin
if Assigned(FOnWriteBufferEmpty) then
try
FOnWriteBufferEmpty(self);
except
on E : Exception do
Log(tlError, 'TriggerWriteBufferEmpty.Error:Error=%s[%s]', [E.ClassName, E.Message]);
end;
end;
procedure TTCPConnection.TriggerWait;
begin
if Assigned(FOnWait) then
FOnWait(self);
end;
procedure TTCPConnection.TriggerWorkerFinished;
begin
if Assigned(FOnWorkerFinished) then
FOnWorkerFinished(self);
end;
function TTCPConnection.GetFirstActiveProxy: TTCPConnectionProxy;
var P : TTCPConnectionProxy;
begin
Lock;
try
if FProxyList.Count = 0 then
P := nil
else
P := FProxyList[0];
while Assigned(P) and not (P.State in [prsNegotiating, prsFiltering]) do
P := P.FNextProxy;
finally
Unlock;
end;
Result := P;
end;
procedure TTCPConnection.ProxyProcessReadData(const Buf; const BufSize: Integer; out ReadEventPending: Boolean);
var P : TTCPConnectionProxy;
begin
Assert(FProxyConnection);
Assert(FProxyList.Count > 0);
ReadEventPending := False;
P := GetFirstActiveProxy;
if Assigned(P) then
// pass to first active proxy
P.ProcessReadData(Buf, BufSize)
else
begin
// no active proxies, add data to read buffer
Lock;
try
FProxyConnection := False;
TCPBufferAddBuf(FReadBuffer, Buf, BufSize);
finally
Unlock;
end;
// allow user to read buffered data; flag pending event
ReadEventPending := True;
end;
end;
procedure TTCPConnection.ProxyProcessWriteData(const Buf; const BufSize: Integer);
var P : TTCPConnectionProxy;
begin
Assert(BufSize > 0);
Assert(FProxyConnection);
Assert(FProxyList.Count > 0);
P := GetFirstActiveProxy;
if Assigned(P) then
// pass to first active proxy
P.ProcessWriteData(Buf, BufSize)
else
begin
// no active proxies, send data
Lock;
try
FProxyConnection := False;
WriteToTransport(Buf, BufSize);
finally
Unlock;
end;
end;
end;
procedure TTCPConnection.ProxyLog(const AProxy: TTCPConnectionProxy; const LogType: TTCPLogType; const LogMsg: String; const LogLevel: Integer);
begin
Assert(Assigned(AProxy));
{$IFDEF TCP_DEBUG_PROXY}
Log(LogType, 'Proxy[%s]:%s', [AProxy.ProxyName, LogMsg], LogLevel + 1);
{$ENDIF}
end;
function GetNextFilteringProxy(const Proxy: TTCPConnectionProxy): TTCPConnectionProxy;
var P : TTCPConnectionProxy;
begin
Assert(Assigned(Proxy));
P := Proxy.FNextProxy;
while Assigned(P) and not (P.State = prsFiltering) do
P := P.FNextProxy;
Result := P;
end;
procedure TTCPConnection.ProxyConnectionPutReadData(const AProxy: TTCPConnectionProxy; const Buf; const BufSize: Integer);
var P : TTCPConnectionProxy;
begin
P := GetNextFilteringProxy(AProxy);
if Assigned(P) then
// pass along proxy chain
P.ProcessReadData(Buf, BufSize)
else
// last proxy, add to read buffer, regardless of available size
// reading from socket is throttled in FillBufferFromSocket when read buffer fills up
begin
Lock;
try
TCPBufferAddBuf(FReadBuffer, Buf, BufSize);
// allow user to read buffered data; flag pending event
FReadEventPending := True;
finally
Unlock;
end;
end;
end;
procedure TTCPConnection.ProxyConnectionPutWriteData(const AProxy: TTCPConnectionProxy; const Buf; const BufSize: Integer);
var P : TTCPConnectionProxy;
begin
P := GetNextFilteringProxy(AProxy);
if Assigned(P) then
// pass along proxy chain
P.ProcessWriteData(Buf, BufSize)
else
// last proxy, add to write buffer, regardless of available size
begin
Lock;
try
WriteToTransport(Buf, BufSize);
finally
Unlock;
end;
end;
end;
procedure TTCPConnection.ProxyConnectionClose(const AProxy: TTCPConnectionProxy);
begin
Assert(FProxyConnection);
// flag close pending; handled outside lock, after read pending
Lock;
try
FClosePending := True;
finally
Unlock;
end;
end;
// called when a proxy changes state
procedure TTCPConnection.ProxyStateChange(const AProxy: TTCPConnectionProxy; const AState: TTCPConnectionProxyState);
var P : TTCPConnectionProxy;
begin
case AState of
prsFiltering,
prsFinished :
begin
Assert(FState = cnsProxyNegotiation);
Lock;
try
P := AProxy.FNextProxy;
finally
Unlock;
end;
if Assigned(P) then
P.Start
else
SetStateConnected;
end;
prsNegotiating : ;
prsError :
begin
////
FErrorMessage := AProxy.ErrorMessage;
end;
prsClosed : ;
end;
end;
procedure TTCPConnection.StartProxies(out AProxiesFinished: Boolean);
var L : Integer;
P : TTCPConnectionProxy;
begin
Lock;
try
L := FProxyList.Count;
if L = 0 then
begin
// no proxies
AProxiesFinished := True;
exit;
end;
P := FProxyList.Item[0];
finally
Unlock;
end;
// start proxy negotiation
SetStateProxyNegotiation;
P.Start;
AProxiesFinished := False;
end;
procedure TTCPConnection.Start;
var ProxiesFin : Boolean;
begin
Assert(FState in [cnsInit, cnsClosed]);
{$IFDEF TCP_DEBUG}
Log(tlDebug, 'Start');
{$ENDIF}
Lock;
try
TCPBufferClear(FReadBuffer);
TCPBufferClear(FWriteBuffer);
TCPConnectionTransferReset(FReadTransferState);
TCPConnectionTransferReset(FWriteTransferState);
FReadyNotified := False;
FReadEventPending := False;
FReadBufferFull := False;
FReadProcessPending := False;
FReadActivityPending := False;
FWriteEventPending := False;
FShutdownSendPending := False;
FShutdownSent := False;
FShutdownRecv := False;
FShutdownComplete := False;
FClosePending := False;
finally
Unlock;
end;
StartProxies(ProxiesFin);
if ProxiesFin then
SetStateConnected;
end;
// Pre: Socket is non-blocking
function TTCPConnection.FillBufferFromSocket(
out RecvShutdown, RecvClosed, ReadEventPending, ReadBufFullEventPending: Boolean): Integer;
const
BufferSize = TCP_BUFFER_DEFAULTMAXSIZE;
var
Buffer : array[0..BufferSize - 1] of Byte;
Avail, Size : Integer;
IsHandleInvalid : Boolean;
IsProxyConn : Boolean;
{$IFDEF TCP_DEBUG}
Unlocked : Boolean;
{$ENDIF}
begin
Result := 0;
RecvShutdown := False;
RecvClosed := False;
ReadEventPending := False;
ReadBufFullEventPending := False;
{$IFDEF TCP_DEBUG}
Unlocked := False;
{$ENDIF}
Lock;
try
// check space available in read buffer
Avail := TCPBufferAvailable(FReadBuffer);
if Avail <= 0 then
// no space in buffer, don't read any more from socket
// this will eventually throttle the actual TCP connection when the system's TCP receive buffer fills up
// Set FReadBufferFull flag, since read event won't trigger again, this function must be called manually
// next time there's space in the receive buffer
begin
if not FReadBufferFull then
begin
ReadBufFullEventPending := True;
FReadBufferFull := True;
end;
exit;
end;
IsHandleInvalid := FSocket.IsSocketHandleInvalid;
// receive from socket into local buffer
if IsHandleInvalid then // socket may have been closed by a proxy
begin
{$IFDEF TCP_DEBUG}
Unlock;
Unlocked := True;
Log(tlDebug, 'FillBufferFromSocket:SocketHandleInvalid');
{$ENDIF}
RecvClosed := True;
exit;
end;
if Avail > BufferSize then
Avail := BufferSize;
Size := FSocket.Recv(Buffer, Avail);
if Size = 0 then
begin
// socket shutdown
if not FShutdownRecv then
begin
FShutdownRecv := True;
RecvShutdown := True;
end;
////RecvClosed := True;
{$IFDEF TCP_DEBUG}
Unlock;
Unlocked := True;
Log(tlDebug, 'FillBufferFromSocket:GracefulClose');
{$ENDIF}
exit;
end;
if Size < 0 then
// nothing more to receive from socket
exit;
IsProxyConn := FProxyConnection;
if not IsProxyConn then
// move from local buffer to read buffer
begin
TCPBufferAddBuf(FReadBuffer, Buffer, Size);
// allow user to read buffered data; flag pending event
ReadEventPending := True;
end;
finally
{$IFDEF TCP_DEBUG}
if not Unlocked then
{$ENDIF}
Unlock;
end;
if IsProxyConn then
// pass local buffer to proxies to process
ProxyProcessReadData(Buffer, Size, ReadEventPending);
// data received
{$IFDEF TCP_DEBUG_DATA}
Log(tlDebug, 'FillBufferFromSocket:Received:%db', [Size]);
{$ENDIF}
Result := Size;
end;
// Returns number of bytes written to socket
// Pre: Socket is non-blocking
function TTCPConnection.WriteBufferToSocket(out BufferEmptyBefore, BufferEmptied: Boolean): Integer;
var P : Pointer;
SizeBuf, SizeWrite, SizeWritten : Integer;
E : Boolean;
begin
BufferEmptied := False;
Lock;
try
SizeBuf := TCPBufferUsed(FWriteBuffer);
// get write size
E := SizeBuf <= 0;
BufferEmptyBefore := E;
if E then
SizeWrite := 0
else
if FWriteThrottle and (FWriteThrottleRate > 0) then // throttled writing
SizeWrite := TCPConnectionTransferThrottledSize(FWriteTransferState, TCPGetTick, FWriteThrottleRate, SizeBuf)
else
SizeWrite := SizeBuf;
// handle nothing to send
if SizeWrite = 0 then
begin
Result := 0;
exit;
end;
// get buffer pointer
P := TCPBufferPtr(FWriteBuffer);
// send to socket
Assert(Assigned(P));
SizeWritten := FSocket.Send(P^, SizeWrite);
// handle nothing sent
if SizeWritten = 0 then
begin
Result := 0;
exit;
end;
Assert(SizeWritten >= 0);
// update transfer statistics
TCPConnectionTransferredBytes(FWriteTransferState, SizeWritten);
// discard successfully sent bytes from connection buffer
TCPBufferDiscard(FWriteBuffer, SizeWritten);
finally
Unlock;
end;
// check if buffer emptied
if SizeWritten = SizeBuf then
BufferEmptied := True;
{$IFDEF TCP_DEBUG_DATA}
Log(tlDebug, 'WriteBufferToSocket:Fin:%d:%db:%db:%db',
[Ord(BufferEmptied), SizeBuf, SizeWrite, SizeWritten]);
{$ENDIF}
Result := SizeWritten;
end;
procedure TTCPConnection.GetEventsToPoll(out WritePoll: Boolean); // out ReadPoll; out ProcessSocket)
begin
Lock;
try
WritePoll :=
FWriteEventPending or
FShutdownSendPending or
not TCPBufferEmpty(FWriteBuffer);
finally
Unlock;
end;
end;
// Processes socket by reading/writing
// Pre: Socket is non-blocking
procedure TTCPConnection.ProcessSocket(
const ProcessRead, ProcessWrite: Boolean;
const ActivityTime: TDateTime;
out Idle, Terminated: Boolean);
var
Len : Integer;
Error, Fin : Boolean;
ShutdownReadWaiting : Boolean;
ReadProcessPending : Boolean;
ReadActivityPending : Boolean;
ReadEventPending : Boolean;
ReadDoProcess : Boolean;
RecvShutdown : Boolean;
RecvClosed : Boolean;
RecvReadEvent : Boolean;
RecvReadBufFullEvent : Boolean;
RecvCloseNow : Boolean;
RecvData : Boolean;
RecvActivity : Boolean;
RecvActivityNotified : Boolean;
ReadDirect : Boolean;
WriteBufEmptyBefore : Boolean;
WriteBufEmptied : Boolean;
WriteEventPending : Boolean;
WriteDoProcess : Boolean;
WriteEvent : Boolean;
WriteBufEmptyEvent : Boolean;
WriteShutdownNow : Boolean;
WriteData : Boolean;
WriteActivity : Boolean;
ShutdownSendPending : Boolean;
ShutdownComplete : Boolean;
ClosePending : Boolean;
TrackLastActivityTime : Boolean;
begin
try
Idle := True;
Lock;
try
// handle closed socket
if FSocket.IsSocketHandleInvalid then
begin
Terminated := True;
exit;
end;
// check if read/write should process
Assert(FState <> cnsInit);
//read
ReadProcessPending := FReadProcessPending;
FReadProcessPending := False;
ReadActivityPending := FReadActivityPending;
FReadActivityPending := False;
ReadEventPending := FReadEventPending;
FReadEventPending := False;
ClosePending := FClosePending;
FClosePending := False;
ShutdownReadWaiting := FShutdownSent and not FShutdownComplete;
ReadDoProcess :=
ProcessRead or
ReadProcessPending or
ReadActivityPending or
ReadEventPending or
ClosePending or
ShutdownReadWaiting;
ReadDirect := TCPBufferEmpty(FReadBuffer) and not FProxyConnection;
// write
WriteEventPending := FWriteEventPending;
FWriteEventPending := False;
ShutdownSendPending := FShutdownSendPending;
FShutdownSendPending := False;
WriteDoProcess :=
WriteEventPending or
ShutdownSendPending or
(ProcessWrite and not TCPBufferEmpty(FWriteBuffer));
// last activity times
TrackLastActivityTime := FTrackLastActivityTime;
finally
Unlock;
end;
Terminated := False;
// process read
RecvActivity := False;
if ReadDoProcess then
begin
RecvActivityNotified := False;
if ReadActivityPending then
RecvActivity := True;
//// 2019/12/30 - allow event handler to read directly from socket
//// 2020/05/20 - only trigger when connection buf empty and not proxy
if ReadDirect then
TriggerRead;
Fin := False;
repeat
// receive data from socket into buffer
try
Len := FillBufferFromSocket(RecvShutdown, RecvClosed, RecvReadEvent, RecvReadBufFullEvent);
except
Len := 0;
RecvShutdown := False;
RecvClosed := True;
RecvReadEvent := False;
RecvReadBufFullEvent := False;
end;
// check pending flags
if ReadEventPending then
begin
RecvReadEvent := True;
ReadEventPending := False;
end;
RecvCloseNow := ClosePending;
if RecvCloseNow then
ClosePending := False;
// check received data
RecvData := Len > 0;
if RecvData then
begin
RecvReadEvent := True; //// 2020/05/20 added to trigger TLS proxy recvd data
RecvActivity := True;
Idle := False;
end
else
Fin := True;
// perform pending actions
if RecvActivity then
if not RecvActivityNotified then
begin
RecvActivityNotified := True;
TriggerReadActivity;
end;
if RecvReadBufFullEvent then
TriggerReadBufferFull;
if RecvReadEvent then
TriggerRead;
if RecvShutdown then
begin
TriggerReadShutdown;
Lock;
try
if not FShutdownSendPending and not FShutdownSent then
begin
// send shutdown pending
ShutdownSendPending := True;
WriteDoProcess := True;
end;
finally
Unlock;
end;
end;
if ShutdownReadWaiting and RecvShutdown then
begin
Lock;
try
FShutdownComplete := True;
finally
Unlock;
end;
TriggerShutdown;
Close;
Terminated := True;
Fin := True;
end
else
if RecvCloseNow then
begin
Close;
Fin := True;
end
else
if RecvClosed then
begin
// socket closed
SetStateClosed;
Terminated := True;
Fin := True;
end;
until Fin;
end;
// process write
WriteActivity := False;
if WriteDoProcess then
begin
WriteEvent := False;
WriteBufEmptyEvent := False;
WriteShutdownNow := False;
Error := False;
try
Len := WriteBufferToSocket(WriteBufEmptyBefore, WriteBufEmptied);
except
Len := 0;
Error := True;
end;
// check write activity
WriteData := Len > 0;
if WriteEventPending then
begin
WriteActivity := True;
WriteEvent := True;
WriteBufEmptyEvent := True;
end;
// check write state
if WriteBufEmptied then
WriteBufEmptyEvent := True;
if WriteBufEmptyBefore and ShutdownSendPending then
WriteShutdownNow := True;
if WriteData then
begin
// data sent
Idle := False;
WriteEvent := True;
WriteActivity := True;
end
else
begin
if Error then
// socket send error
Terminated := True;
// nothing sent
end;
// trigger write
if WriteActivity then
TriggerWriteActivity;
if WriteEvent then
TriggerWrite;
// triger write buffer empty
if WriteBufEmptyEvent then
TriggerWriteBufferEmpty;
// pending shutdown
if WriteShutdownNow then
begin
DoShutdown(ShutdownComplete);
if ShutdownComplete then
begin
TriggerShutdown;
Close;
Terminated := True;
end;
end;
end;
// set last activity time
if RecvActivity or WriteActivity then
if TrackLastActivityTime then
begin
Lock;
try
if RecvActivity then
FLastReadActivityTime := ActivityTime;
if WriteActivity then
FLastWriteActivityTime := ActivityTime;
finally
Unlock;
end;
end;
except
on E : Exception do
begin
Idle := False;
Terminated := True;
{.IFDEF TCP_DEBUG}
Log(tlError, 'ProcessSocket:Terminated:%s', [E.Message]);
{.ENDIF}
end;
end;
end;
function TTCPConnection.GetLastReadActivityTime: TDateTime;
begin
Lock;
try
Result := FLastReadActivityTime;
finally
Unlock;
end;
end;
function TTCPConnection.GetLastWriteActivityTime: TDateTime;
begin
Lock;
try
Result := FLastWriteActivityTime;
finally
Unlock;
end;
end;
function TTCPConnection.GetLastActivityTime: TDateTime;
begin
Lock;
try
if FLastReadActivityTime > FLastWriteActivityTime then
Result := FLastReadActivityTime
else
if FLastWriteActivityTime > FCreationTime then
Result := FLastWriteActivityTime
else
Result := FCreationTime;
finally
Unlock;
end;
end;
// LocateByteCharInBuffer
// Returns position of Delimiter in buffer
// Returns >= 0 if found in buffer
// Returns -1 if not found in buffer
// MaxSize specifies maximum bytes before delimiter, of -1 for no limit
function TTCPConnection.LocateByteCharInBuffer(const ADelimiter: ByteCharSet; const AMaxSize: Integer): Integer;
begin
Result := TCPBufferLocateByteChar(FReadBuffer, ADelimiter, AMaxSize);
end;
// LocateByteStrInBuffer
// Returns position of Delimiter in buffer
// Returns >= 0 if found in buffer
// Returns -1 if not found in buffer
// MaxSize specifies maximum bytes before delimiter, of -1 for no limit
function TTCPConnection.LocateByteStrInBuffer(const ADelimiter: RawByteString; const AMaxSize: Integer): Integer;
var DelLen : Integer;
BufSize : Integer;
LocLen : Integer;
BufPtr : PByteChar;
DelPtr : PByteChar;
I : Integer;
begin
if AMaxSize = 0 then
begin
Result := -1;
exit;
end;
DelLen := Length(ADelimiter);
if DelLen = 0 then
begin
Result := -1;
exit;
end;
BufSize := FReadBuffer.Used;
if BufSize < DelLen then
begin
Result := -1;
exit;
end;
if AMaxSize < 0 then
LocLen := BufSize
else
if BufSize < AMaxSize then
LocLen := BufSize
else
LocLen := AMaxSize;
BufPtr := TCPBufferPtr(FReadBuffer);
DelPtr := PByteChar(ADelimiter);
for I := 0 to LocLen - DelLen do
if TCPCompareMem(BufPtr^, DelPtr^, DelLen) then
begin
Result := I;
exit;
end
else
Inc(BufPtr);
Result := -1;
end;
// PeekDelimited
// Returns number of bytes transferred to buffer, including delimiter
// Returns -1 if not found in buffer
// Returns >= 0 if found.
// MaxSize specifies maximum bytes before delimiter, of -1 for no limit
function TTCPConnection.PeekDelimited(var Buf; const BufSize: Integer;
const ADelimiter: TRawByteCharSet; const AMaxSize: Integer): Integer;
var DelPos : Integer;
BufPtr : PByteChar;
BufLen : Integer;
begin
Lock;
try
DelPos := LocateByteCharInBuffer(ADelimiter, AMaxSize);
if DelPos >= 0 then
begin
// found
BufPtr := TCPBufferPtr(FReadBuffer);
BufLen := DelPos + 1;
if BufLen > BufSize then
BufLen := BufSize;
Move(BufPtr^, Buf, BufLen);
Result := BufLen;
end
else
Result := -1;
finally
Unlock;
end;
end;
// PeekDelimited
// Returns number of bytes transferred to buffer, including delimiter
// Returns -1 if not found in buffer
// Returns >= 0 if found.
// MaxSize specifies maximum bytes before delimiter, of -1 for no limit
function TTCPConnection.PeekDelimited(var Buf; const BufSize: Integer;
const ADelimiter: RawByteString; const AMaxSize: Integer): Integer;
var DelPos : Integer;
BufPtr : PByteChar;
BufLen : Integer;
begin
Assert(ADelimiter <> '');
Lock;
try
DelPos := LocateByteStrInBuffer(ADelimiter, AMaxSize);
if DelPos >= 0 then
begin
// found
BufPtr := TCPBufferPtr(FReadBuffer);
BufLen := DelPos + Length(ADelimiter);
if BufLen > BufSize then
BufLen := BufSize;
Move(BufPtr^, Buf, BufLen);
Result := BufLen;
end
else
Result := -1;
finally
Unlock;
end;
end;
// Read a number of bytes from read buffer and transport.
// Return the number of bytes actually read.
// Throttles reading.
function TTCPConnection.Read(var Buf; const BufSize: Integer): Integer;
var
BufPtr : PByteChar;
SizeRead, SizeReadBuf, SizeReadSocket, SizeRecv, SizeRemain, SizeTotal : Integer;
begin
if BufSize <= 0 then
begin
Result := 0;
exit;
end;
Lock;
try
// get read size
if FReadThrottle then // throttled reading
begin
SizeRead := TCPConnectionTransferThrottledSize(FReadTransferState, TCPGetTick, FReadThrottleRate, BufSize);
// handle nothing to read
if SizeRead <= 0 then
begin
Result := 0;
exit;
end;
end
else
SizeRead := BufSize;
// read from buffer
SizeReadBuf := TCPBufferRemove(FReadBuffer, Buf, SizeRead);
if SizeReadBuf > 0 then
if FReadBufferFull then
begin
FReadBufferFull := False;
FReadProcessPending := True;
end;
if SizeReadBuf = SizeRead then
// required number of bytes was in buffer
SizeReadSocket := 0
else
if FProxyConnection then
// don't read directly from socket if this connection has proxies
SizeReadSocket := 0
else
if FSocket.IsSocketHandleInvalid then
SizeReadSocket := 0
else
begin
// calculate remaining bytes to read
SizeRemain := SizeRead - SizeReadBuf;
// read from socket
BufPtr := @Buf;
Inc(BufPtr, SizeReadBuf);
try
SizeRecv := FSocket.Recv(BufPtr^, SizeRemain);
except
SizeRecv := -1;
end;
if SizeRecv = 0 then
begin
// Graceful shutdown
////SetStateClosed;
FReadProcessPending := True;
SizeReadSocket := 0;
end
else
if SizeRecv < 0 then
SizeReadSocket := 0
else
SizeReadSocket := SizeRecv;
if SizeReadSocket > 0 then
begin
FReadActivityPending := True;
//// 2020/03/28 - Direct reading from socket in ProcessSocket needs updating last read activity
if FTrackLastActivityTime then
FLastReadActivityTime := Now;
end;
end;
SizeTotal := SizeReadBuf + SizeReadSocket;
// update transfer statistics
TCPConnectionTransferredBytes(FReadTransferState, SizeTotal);
finally
Unlock;
end;
{$IFDEF TCP_DEBUG_DATA}
Log(tlDebug, 'Read:Fin:%db:%db:%db:%db',
[BufSize, SizeRead, SizeReadBuf, SizeReadSocket]);
{$ENDIF}
// return number of bytes read
Result := SizeTotal;
end;
function TTCPConnection.ReadByteString(const AStrLen: Integer): RawByteString;
var ReadLen : Integer;
begin
if AStrLen <= 0 then
begin
Result := '';
exit;
end;
SetLength(Result, AStrLen);
ReadLen := Read(Pointer(Result)^, AStrLen);
if ReadLen < AStrLen then
SetLength(Result, ReadLen);
end;
function TTCPConnection.ReadBytes(const ASize: Integer): TBytes;
var ReadLen : Integer;
begin
if ASize <= 0 then
begin
Result := nil;
exit;
end;
SetLength(Result, ASize);
ReadLen := Read(Pointer(Result)^, ASize);
if ReadLen < ASize then
SetLength(Result, ReadLen);
end;
// Discard a number of bytes from the read buffer.
// Returns the number of bytes actually discarded.
// No throttling and no reading from transport.
function TTCPConnection.Discard(const ASize: Integer): Integer;
var SizeDiscarded : Integer;
begin
// handle nothing to discard
if ASize <= 0 then
begin
Result := 0;
exit;
end;
Lock;
try
// discard from buffer
SizeDiscarded := TCPBufferDiscard(FReadBuffer, ASize);
if SizeDiscarded > 0 then
if FReadBufferFull then
begin
FReadBufferFull := False;
FReadProcessPending := True;
end;
// update transfer statistics
TCPConnectionTransferredBytes(FReadTransferState, SizeDiscarded);
finally
Unlock;
end;
{$IFDEF TCP_DEBUG_DATA}
Log(tlDebug, 'DiscardedFromBuffer:%db:%db', [Size, SizeDiscarded]);
{$ENDIF}
// return number of bytes discarded
Result := SizeDiscarded;
end;
function TTCPConnection.WriteToTransport(const Buf; const BufSize: Integer): Integer;
var
BufP : PByteChar;
UseBuf : Boolean;
SizeToBuf : Integer;
SizeToSocket : Integer;
begin
Assert(BufSize > 0);
// if there is already data in the write buffer then add to the write buffer; or
// if write is being throttled then add to the write buffer
UseBuf := (TCPBufferUsed(FWriteBuffer) > 0) or FWriteThrottle;
if UseBuf then
begin
TCPBufferAddBuf(FWriteBuffer, Buf, BufSize);
SizeToBuf := BufSize;
SizeToSocket := 0;
end
else
begin
// send the data directly to the socket
SizeToSocket := FSocket.Send(Buf, BufSize);
// update transfer statistics
TCPConnectionTransferredBytes(FWriteTransferState, SizeToSocket);
if SizeToSocket < BufSize then
begin
// add the data not sent to the socket to the write buffer
BufP := @Buf;
Inc(BufP, SizeToSocket);
SizeToBuf := BufSize - SizeToSocket;
TCPBufferAddBuf(FWriteBuffer, BufP^, SizeToBuf);
end
else
begin
FWriteEventPending := True; // all data sent directly to socket
SizeToBuf := 0;
end;
end;
{$IFDEF TCP_DEBUG_DATA}
////Log(tlDebug, 'WriteToTransport:BufSize=%db:ToSocket=%db:ToBuf=%db', [BufSize, SizeToSocket, SizeToBuf]);
{$ENDIF}
Result := SizeToSocket + SizeToBuf;
end;
// Write a number of bytes to transport
// No throtling
function TTCPConnection.Write(const Buf; const BufSize: Integer): Integer;
var IsProxy : Boolean;
begin
Result := 0;
if BufSize <= 0 then
exit;
Lock;
try
if FState = cnsClosed then
raise ETCPConnection.Create(SError_ConnectionClosed);
// if this connection has proxies then pass the data to the proxies
IsProxy := FProxyConnection;
if not IsProxy then
// send data to buffer/socket
Result := WriteToTransport(Buf, BufSize);
finally
Unlock;
end;
if IsProxy then
begin
ProxyProcessWriteData(Buf, BufSize);
Result := BufSize;
end;
Assert(Result = BufSize);
end;
function TTCPConnection.WriteByteString(const AStr: RawByteString): Integer;
var Len : Integer;
begin
Len := Length(AStr);
if Len <= 0 then
begin
Result := 0;
exit;
end;
Result := Write(Pointer(AStr)^, Len);
end;
function TTCPConnection.WriteBytes(const B: TBytes): Integer;
var Len : Integer;
begin
Len := Length(B);
if Len <= 0 then
begin
Result := 0;
exit;
end;
Result := Write(Pointer(B)^, Len);
end;
// Peek a number of bytes from buffer.
// No throttling; no reading from transport
function TTCPConnection.Peek(var Buf; const BufSize: Int32): Integer;
begin
Lock;
try
Result := TCPBufferPeek(FReadBuffer, Buf, BufSize);
finally
Unlock;
end;
end;
function TTCPConnection.PeekByte(out B: Byte): Boolean;
begin
Lock;
try
Result := TCPBufferPeekByte(FReadBuffer, B);
finally
Unlock;
end;
end;
function TTCPConnection.PeekByteString(const AStrLen: Integer): RawByteString;
var PeekLen : Integer;
begin
if AStrLen <= 0 then
begin
Result := '';
exit;
end;
SetLength(Result, AStrLen);
PeekLen := Peek(Pointer(Result)^, AStrLen);
if PeekLen < AStrLen then
SetLength(Result, PeekLen);
end;
function TTCPConnection.PeekBytes(const ASize: Integer): TBytes;
var PeekLen : Integer;
begin
if ASize <= 0 then
begin
Result := nil;
exit;
end;
SetLength(Result, ASize);
PeekLen := Peek(Pointer(Result)^, ASize);
if PeekLen < ASize then
SetLength(Result, PeekLen);
end;
// Reads a line delimited by specified Delimiter
// MaxLineLength is maximum line length excluding the delimiter
// Returns False if line not available
// Returns True if line read
function TTCPConnection.ReadLine(var Line: RawByteString; const ADelimiter: RawByteString; const AMaxLineLength: Integer): Boolean;
var
DelPos : Integer;
DelLen : Integer;
begin
Assert(ADelimiter <> '');
Lock;
try
DelPos := LocateByteStrInBuffer(ADelimiter, AMaxLineLength);
Result := DelPos >= 0;
if not Result then
exit;
SetLength(Line, DelPos);
if DelPos > 0 then
Read(PByteChar(Line)^, DelPos);
DelLen := Length(ADelimiter);
Discard(DelLen);
finally
Unlock;
end;
end;
procedure TTCPConnection.DoShutdown(out AShutdownComplete: Boolean);
begin
{$IFDEF TCP_DEBUG}
Log(tlDebug, 'DoShutDown:%db:%db', [GetReadBufferUsed, GetWriteBufferUsed]);
{$ENDIF}
AShutdownComplete := False;
// Sends TCP FIN message to close connection and
// disallow any further sending on the socket
Lock;
try
if FShutdownSent then
exit;
FShutdownSent := True;
FSocket.Shutdown(ssSend);
AShutdownComplete := FShutdownSent;
if AShutdownComplete then
FShutdownComplete := True;
finally
Unlock;
end;
end;
procedure TTCPConnection.Shutdown;
var
ShutdownNow : Boolean;
ShutdownComplete : Boolean;
begin
{$IFDEF TCP_DEBUG}
Log(tlDebug, 'ShutDown:%db:%db', [GetReadBufferUsed, GetWriteBufferUsed]);
{$ENDIF}
Lock;
try
if FState = cnsClosed then
exit;
ShutdownNow := False;
if TCPBufferUsed(FWriteBuffer) > 0 then
// defer shutdown until write buffer is emptied to socket
FShutdownSendPending := True
else
ShutdownNow := True;
finally
Unlock;
end;
if ShutdownNow then
begin
DoShutDown(ShutdownComplete);
if ShutdownComplete then
begin
TriggerShutdown;
Close;
end;
end;
end;
function TTCPConnection.IsShutdownComplete: Boolean;
begin
Lock;
try
Result := (FState = cnsClosed) or FShutdownComplete;
finally
Unlock;
end;
end;
procedure TTCPConnection.Close;
begin
{$IFDEF TCP_DEBUG}
Log(tlDebug, 'Close:%db:%db', [GetReadBufferUsed, GetWriteBufferUsed]);
{$ENDIF}
if not SetStateClosed then
exit;
Lock;
try
FSocket.Close;
finally
Unlock;
end;
end;
procedure TTCPConnection.TerminateWorkerThread;
begin
Lock;
try
if Assigned(FWorkerThread) then
FWorkerThread.Terminate;
finally
Unlock;
end;
end;
procedure TTCPConnection.WaitForWorkerThread;
begin
Lock;
try
if Assigned(FWorkerThread) then
try
FWorkerThread.WaitFor;
except
end;
finally
Unlock;
end;
end;
procedure TTCPConnection.Wait;
begin
TriggerWait;
Sleep(5);
end;
function TTCPConnection.GetBlockingConnection: TTCPBlockingConnection;
begin
Lock;
try
if not Assigned(FBlockingConnection) then
FBlockingConnection := TTCPBlockingConnection.Create(self);
Result := FBlockingConnection;
finally
Unlock;
end;
end;
procedure TTCPConnection.StartWorkerThread;
begin
Assert(not Assigned(FWorkerThread));
FWorkerThread := TTCPConnectionWorkerThread.Create(self);
end;
procedure TTCPConnection.WorkerThreadExecute(const AThread: TThread);
var
WorkerThread : TTCPConnectionWorkerThread;
Event : TTCPConnectionWorkerExecuteEvent;
CloseOnExit : Boolean;
function IsTerminated: Boolean;
begin
Result := WorkerThread.Terminated;
end;
begin
try
WorkerThread := AThread as TTCPConnectionWorkerThread;
CloseOnExit := False;
try
try
Event := FOnWorkerExecute;
if Assigned(Event) then
Event(self, GetBlockingConnection, CloseOnExit);
finally
TriggerWorkerFinished;
end;
finally
if not IsTerminated then
if CloseOnExit then
Close;
end;
except
on E : Exception do
if not IsTerminated then
begin
Lock;
try
FWorkerErrorMsg := E.Message;
FWorkerErrorClass := E.ClassName;
finally
Unlock;
end;
end;
end;
end;
{ }
{ TCP Blocking Connection }
{ }
constructor TTCPBlockingConnection.Create(const AConnection: TTCPConnection);
begin
Assert(Assigned(AConnection));
inherited Create;
FConnection := AConnection;
end;
destructor TTCPBlockingConnection.Destroy;
begin
inherited Destroy;
end;
procedure TTCPBlockingConnection.Finalise;
begin
FConnection := nil;
end;
procedure TTCPBlockingConnection.Wait;
begin
FConnection.Wait;
end;
// Wait until one of the States or time out.
function TTCPBlockingConnection.WaitForState(const AStates: TTCPConnectionStates; const ATimeOutMs: Integer): TTCPConnectionState;
var T : Word32;
S : TTCPConnectionState;
C : TTCPConnection;
begin
T := TCPGetTick;
C := FConnection;
repeat
S := C.GetState;
if S in AStates then
break;
if ATimeOutMs >= 0 then
if TCPTickDelta(T, TCPGetTick) >= ATimeOutMs then
break;
Wait;
until False;
Result := S;
end;
// Wait until amount of data received, closed or time out.
function TTCPBlockingConnection.WaitForReceiveData(const ABufferSize: Integer; const ATimeOutMs: Integer): Boolean;
var T : Word32;
L : Integer;
C : TTCPConnection;
begin
Assert(Assigned(FConnection));
T := TCPGetTick;
C := FConnection;
repeat
L := C.ReadBufferUsed;
if L >= ABufferSize then
break;
if ATimeOutMs >= 0 then
if TCPTickDelta(T, TCPGetTick) >= ATimeOutMs then
break;
if C.GetState = cnsClosed then
break;
Wait;
until False;
Result := L >= ABufferSize;
end;
// Wait until send buffer is cleared to socket, closed or time out.
function TTCPBlockingConnection.WaitForTransmitFin(const ATimeOutMs: Integer): Boolean;
var T : Word32;
L : Integer;
C : TTCPConnection;
begin
Assert(Assigned(FConnection));
T := TCPGetTick;
C := FConnection;
repeat
L := C.WriteBufferUsed;
if L = 0 then
break;
if ATimeOutMs >= 0 then
if TCPTickDelta(T, TCPGetTick) >= ATimeOutMs then
break;
if C.GetState = cnsClosed then
break;
Wait;
until False;
Result := L = 0;
end;
// Wait until socket is closed or timeout.
function TTCPBlockingConnection.WaitForClose(const ATimeOutMs: Integer): Boolean;
begin
Result := WaitForState([cnsClosed], ATimeOutMs) = cnsClosed;
end;
// Wait for read data until required size is available or timeout.
function TTCPBlockingConnection.Read(var Buf; const BufferSize: Integer; const ATimeOutMs: Integer): Integer;
begin
if not WaitForReceiveData(BufferSize, ATimeOutMs) then
raise ETCPConnection.Create(SError_TimedOut);
Result := FConnection.Read(Buf, BufferSize);
end;
// Write and wait for write buffers to empty or timeout.
function TTCPBlockingConnection.Write(const Buf; const BufferSize: Integer; const ATimeOutMs: Integer): Integer;
begin
Assert(Assigned(FConnection));
Result := FConnection.Write(Buf, BufferSize);
if Result > 0 then
WaitForTransmitFin(ATimeOutMs);
end;
// Does a graceful shutdown and waits for connection to close or timeout.
// Data received during shutdown is available after connection close.
// SettleTimeMs is the maximum time to wait for output buffer to clear before initiating Shutdown.
// TransmitTimeOutMs is the maximum time to wait for output buffers to clear after Shutdown.
// CloseTimeOutMs is the maximum time to wait for the graceful close after output buffers cleared.
procedure TTCPBlockingConnection.Shutdown(
const SettleTimeMs: Integer;
const TransmitTimeOutMs: Integer;
const CloseTimeOutMs: Integer);
begin
Assert(Assigned(FConnection));
if SettleTimeMs > 0 then
WaitForTransmitFin(SettleTimeMs);
FConnection.Shutdown;
if not WaitForTransmitFin(TransmitTimeOutMs) then
raise ETCPConnection.Create(SError_TimedOut);
if not WaitForClose(CloseTimeOutMs) then
raise ETCPConnection.Create(SError_TimedOut);
end;
// Closes immediately and waits for connection to close or timeout.
procedure TTCPBlockingConnection.Close(const ATimeOutMs: Integer);
begin
Assert(Assigned(FConnection));
FConnection.Close;
if not WaitForClose(ATimeOutMs) then
raise ETCPConnection.Create(SError_TimedOut);
end;
end.