/// implements asynchronous safe WebSockets tunnelling // - this unit is a part of the freeware Synopse mORMot framework, // licensed under a MPL/GPL/LGPL tri-license; version 1.18 unit SynProtoRelay; { This file is part of Synopse mORMot framework. Synopse mORMot framework. Copyright (C) 2022 Arnaud Bouchez Synopse Informatique - https://synopse.info *** BEGIN LICENSE BLOCK ***** Version: MPL 1.1/GPL 2.0/LGPL 2.1 The contents of this file are subject to the Mozilla Public License Version 1.1 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.mozilla.org/MPL Software distributed under the License is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License for the specific language governing rights and limitations under the License. The Original Code is Synopse mORMot framework. The Initial Developer of the Original Code is Arnaud Bouchez. Portions created by the Initial Developer are Copyright (C) 2022 the Initial Developer. All Rights Reserved. Contributor(s): Alternatively, the contents of this file may be used under the terms of either the GNU General Public License Version 2 or later (the "GPL"), or the GNU Lesser General Public License Version 2.1 or later (the "LGPL"), in which case the provisions of the GPL or the LGPL are applicable instead of those above. If you wish to allow use of your version of this file only under the terms of either the GPL or the LGPL, and not to allow others to use your version of this file under the terms of the MPL, indicate your decision by deleting the provisions above and replace them with the notice and other provisions required by the GPL or the LGPL. If you do not delete the provisions above, a recipient may use your version of this file under the terms of any one of the MPL, the GPL or the LGPL. ***** END LICENSE BLOCK ***** Secured WebSockets tunneling ---------------------------- It will encapsulate any WebSockets duplex stream over a public server, allowing any remote client to connect to a local server behind a firewall, using a public server (e.g. a simple Linux box) as relay. A Private Relay client should connect to a Public Relay Server, probably behind a firewall. By definition, only a single Private Relay client instance could connect at the same time to the Public Relay server. ORM/SOA client nodes don't need to be modified. Their WebSockets connection will just point to the Public Relay Server, which will tunnel their frames to the Private Relay Server, which will maintain a set of local connections to the main business Server, behind the firewal. | | REMOTE | |CORPORATE USERS | |FIREWALL | | +------------+ | | | client 3 | | | | | +------------+ | | +------------+ | client 1 | | | | | |-\ | | +------------------------------------------------------------+ +------------+ -\ | PUBLIC RELAY | | PRIVATE RELAY | | --\| +--------------------+ | | +--------------------+ +-----------v-----------+ | |-\ | | | | | | | | | | -\| | | | | ------------>| | | | ->+---------------+ | | | | +-------------+ | | mORMot ORM/SOA | | | | | encapsulation |<-------------|-----|decapsulation| | | SERVER | | -------------------------| ->+---------------+ | | | | +-------------+ | | | | | -/| | | | | ------------>| with local DB | | |-/ | | | | | | | | | --| +--------------------+ | | +--------------------+ +-----------------------+ | -/ | | | in-process communication ^ | +------------+ -/ | | +------------------------------------------------------------+ | client 2 | | | | | | | | +------------+ +------------+ | internet | corporate local network | client 4 | | | | | | | +------------+ | | | | In the above diagram, client 1 and client 2 just connect to the Public Relay server, and the mORMot ORM/SOA server see some connection coming as if they were local - just like client 3 and client 4. Only a higher latency and lower bandwidth may be noticeable, but REST is not very demanding about latency, and our WebSockets protocols has built-in SynLZ compression. One benefit of this solution is that it doesn't require to mess with the firewall. There is a single Public Relay process (using very few resources) to be setup somewhere in the Internet, and the Private Relay client could run in-process with the main mORMot ORM/SOA server, so nothing is to be changed by the corporate IT. The regular TWebSocketProtocolBinary encryption could be used, so no certificate nor security need to be setup in the Public Relay server, which doesn't uncipher the frames, but just work as a dummy relayer. All WebSockets communication starts as HTTP/1.1 regular clients to the Internet, before switch to some duplex encrypted stream. The communication between the Public Relay and the Private Relay is checked by validating each transmitted frames, and can be over-encrypted using a secret password, for even additional security. If any node (client, Public Relay, Private Relay, mORMot SOA/ORM server) is down for whatever reason (e.g. network failure), both ends will be notified with no dandling sockets. See Sample "38 - WebSockets Relay" for a stand-alone Public Relay project, which could be reused as plain binary - no recompilation is needed. } {$I Synopse.inc} // define HASINLINE CPU32 CPU64 OWNNORMTOUPPER interface uses {$ifdef MSWINDOWS} Windows, SynWinSock, {$else} SynFPCSock, // shared with Kylix {$ifdef KYLIX3} LibC, {$endif} {$ifdef FPC} SynFPCLinux, {$endif} {$endif} SysUtils, SynCommons, SynTable, SynCrtSock, SynBidirSock, SynCrypto, SynLog; { ----------------- low-level shared definitions ------------ } const /// as set to TRelayFrame RELAYFRAME_VER = $aa00; /// if TRelayFrame.payload is in fact a TRelayFrameRestPayload focRestPayload = focReservedF; type /// TRelayFrame.payload for opcode = focRestPayload = focReservedF TRelayFrameRestPayload = packed record status: integer; url, method, headers, contenttype: RawUTF8; content: RawByteString; end; TRelayFrameRestPayloadDynArray = array of TRelayFrameRestPayload; /// internal binary serialized content for frames tunnelling TRelayFrame = packed record revision: word; opcode: TWebSocketFrameOpCode; content: TWebSocketFramePayloads; connection: THttpServerConnectionID; payload: RawByteString; end; PRelayFrame = ^TRelayFrame; { ----------------- low-level WebSockets protocols involved ------------ } type ERelayProtocol = class(ESynException); TPublicRelay = class; TPrivateRelay = class; /// regular mORMot client to Public Relay connection using // synopsejson/synopsebin/synopsebinary protocols // - any incoming frame will be encapsulated with the connection ID, then // relayed to the Private Relay node using TRelayServerProtocol TSynopseServerProtocol = class(TWebSocketProtocol) protected fOwner: TPublicRelay; procedure ProcessIncomingFrame(Sender: TWebSocketProcess; var Frame: TWebSocketFrame; const info: RawUTF8); override; // implements SynBidirSock's TWebSocketProtocolRest variants function GetSubprotocols: RawUTF8; override; function SetSubprotocol(const aProtocolName: RawUTF8): boolean; override; public /// initialize the protocol to be processed on a given TPublicRelay constructor Create(aOwner: TPublicRelay); reintroduce; /// used server-side for any new connection function Clone(const aClientURI: RawUTF8): TWebSocketProtocol; override; end; /// Public Relay to Private Relay connection, encapsulating connection ID // - see also TRelayClientProtocol as the reversed process // - any incoming frame will be decapsulate the associated connection ID, then // relayed to the proper client node using TSynopseServerProtocol TRelayServerProtocol = class(TWebSocketProtocol) protected fOwner: TPublicRelay; procedure ProcessIncomingFrame(Sender: TWebSocketProcess; var Frame: TWebSocketFrame; const info: RawUTF8); override; public /// initialize the protocol to be processed on a given TPublicRelay constructor Create(aOwner: TPublicRelay; const aServerKey: RawUTF8); reintroduce; /// used server-side for any new connection function Clone(const aClientURI: RawUTF8): TWebSocketProtocol; override; end; /// Private Relay to Public Relay connection, decapsulating connection ID // - see also TRelayServerProtocol as the reversed process TRelayClientProtocol = class(TWebSocketProtocol) protected fOwner: TPrivateRelay; procedure ProcessIncomingFrame(Sender: TWebSocketProcess; var Frame: TWebSocketFrame; const info: RawUTF8); override; public /// initialize the protocol to be processed on a given TPrivateRelay constructor Create(aOwner: TPrivateRelay; const aRelayKey: RawUTF8); reintroduce; /// used server-side for any new connection function Clone(const aClientURI: RawUTF8): TWebSocketProtocol; override; end; /// Private Relay to local Server connection // - forward raw frames from TSynopseServerProtocol TSynopseClientProtocol = class(TWebSocketProtocol) protected fOwner: TPrivateRelay; procedure ProcessIncomingFrame(Sender: TWebSocketProcess; var Frame: TWebSocketFrame; const info: RawUTF8); override; public /// initialize the protocol to be processed on a given TPrivateRelay // - the protocol is relayed from TRelayClientProtocol.ProcessIncomingFrame constructor Create(aOwner: TPrivateRelay; const aProtocolName: RawUTF8); reintroduce; /// used server-side for any new connection function Clone(const aClientURI: RawUTF8): TWebSocketProtocol; override; end; { ----------------- Public and Private relay process ------------ } TAbstractRelay = class(TSynPersistentLock) protected fLog: TSynLogClass; fStarted: RawUTF8; fSent, fReceived: QWord; fFrames, fValid, fInvalid, fRejected, fRestFrames: integer; fRestFrame: array of THttpServerRequest; fRestFrameCount: integer; fRestPending, fRestTimeoutMS: integer; function EncapsulateAndSend(Process: TWebSocketProcess; const IP: RawUTF8; const Frame: TWebSocketFrame; Connection: THttpServerConnectionID): boolean; function Decapsulate(Protocol: TWebSocketProtocol; var Frame: TWebSocketFrame): THttpServerConnectionID; public constructor Create(aLog: TSynLogClass); reintroduce; destructor Destroy; override; published property Started: RawUTF8 read fStarted; property Frames: integer read fFrames; property Sent: QWord read fSent; property Received: QWord read fReceived; property Valid: integer read fValid; property Invalid: integer read fInvalid; property Rejected: integer read fRejected; property RestFrames: integer read fRestFrames; property RestPending: integer read fRestPending; end; /// implements a Public Relay server, e.g. located on a small Linux/BSD box TPublicRelay = class(TAbstractRelay) protected fServerJWT: TJWTAbstract; fClients, fServer: TWebSocketServer; fServerConnected: TWebSocketProcess; fServerConnectedToLocalHost: boolean; fStatCache: RawJSON; fStatTix: integer; function OnServerBeforeBody(const aURL,aMethod,aInHeaders, aInContentType, aRemoteIP: SockString; aContentLength: integer; aUseSSL: boolean): cardinal; function OnServerRequest(Ctxt: THttpServerRequest): cardinal; function OnClientsRequest(Ctxt: THttpServerRequest): cardinal; function GetStats: RawJSON; public /// initialize the Public Relay // - WebSockets clients will connect to aClientsPort as usual // - all communication will be encapsulated and relayed to aServerPort, // using the optional aServerKey for TWebSocketProtocol.SetEncryptKey(), // and aServerJWT to authenticate the incoming connection (owned by this instance) constructor Create(aLog: TSynLogClass; const aClientsPort, aServerPort: SockString; const aServerKey: RawUTF8; aServerJWT: TJWTAbstract; aClientsThreadPoolCount: integer=2; aClientsKeepAliveTimeOut: integer=30000); reintroduce; /// finalize the Public Relay server destructor Destroy; override; /// access to the JWT authentication for TPrivateRelay communication property ServerJWT: TJWTAbstract read fServerJWT; published /// raw access to the Private Relay server instance property Server: TWebSocketServer read fServer; /// raw access to the ORM/SOA clients server instance property Clients: TWebSocketServer read fClients; end; /// implements a relayed link to the local ORM/SOA server instance // - add some internal fields for TPrivateRelay.fServers[] TServerClient = class(THttpClientWebSockets) protected Connection: THttpServerConnectionID; OriginIP: RawUTF8; end; TServerClients = array of TServerClient; /// implements a Private Relay client, located in a private network behind // a restricted firewall TPrivateRelay = class(TAbstractRelay) protected fRelayHost, fRelayPort, fRelayKey, fRelayCustomHeaders, fServerHost, fServerPort, fServerRemoteIPHeader: RawUTF8; fRelayClient: THttpClientWebSockets; // using TRelayClientProtocol fServers: TServerClients; // links to local ORM/SOA server fServersCount: integer; function FindServerClientByConnection(connection: THttpServerConnectionID; out index: integer): TServerClient; function FindServerClientByProcess(process: TWebSocketProcess; out index: integer): TServerClient; function NewServerClient(connection: THttpServerConnectionID; const ipprotocoluri: RawUTF8): TServerClient; public /// initialize the Private Relay // - communication will be encapsulated and relayed to aServerHost/aServerPort // via a new TServerClient connection, using aServerWebSocketsURI, // aServerWebSocketsEncryptionKey, aServerWebSocketsAJAX and // aServerWebSocketsCompression parameters as any regular client in the // local network - from the server point of view, those clients will // appear like local clients unless ServerRemoteIPHeader is set according // to the TSQLHttpServerDefinition.ServerRemoteIPHeader value (e.g. as // 'X-Real-IP') and the remote client IP will be used instead // - Connected/TryConnect should be called on a regular basis to connect to // the Public Relay using aRelayHost/aRelayPort/aRelayKey/aRelayBearer constructor Create(aLog: TSynLogClass; const aRelayHost, aRelayPort, aRelayKey, aRelayBearer, aServerHost, aServerPort, aServerRemoteIPHeader: RawUTF8); reintroduce; /// check if the Public Relay did connect to this Private Relay instance function Connected: boolean; /// true if this Private Relay uses encryption with the Public Relay function Encrypted: boolean; /// (re)connect to aRelayHost/aRelayPort Public Relay via a single link // - will first disconnect is needed function TryConnect: boolean; /// disconnect from aRelayHost/aRelayPort Public Relay procedure Disconnect; /// finalize the Private Relay server destructor Destroy; override; published /// Public Relay host address property RelayHost: RawUTF8 read fRelayHost; /// Public Relay host port property RelayPort: RawUTF8 read fRelayPort; /// true if the Private Relay is connected to the Public Relay property RelayConnected: boolean read Connected; /// true if the Private Relay to the Public Relay link is encrypted property RelayEncrypted: boolean read Encrypted; /// local processing server host address property ServerHost: RawUTF8 read fServerHost; /// local processing server host port property ServerPort: RawUTF8 read fServerPort; /// how many client connections are actually relayed via this instance property Connections: integer read fServersCount; end; implementation { TAbstractRelay } function TAbstractRelay.EncapsulateAndSend(Process: TWebSocketProcess; const IP: RawUTF8; const Frame: TWebSocketFrame; Connection: THttpServerConnectionID): boolean; var encapsulated: TRelayFrame; dest: TWebSocketFrame; trigger: integer; begin result := false; if (Process = nil) or (Connection = 0) then exit; encapsulated.revision := RELAYFRAME_VER; encapsulated.opcode := Frame.opcode; encapsulated.content := Frame.content; encapsulated.connection := Connection; encapsulated.payload := Frame.payload; encapsulated.payload := RecordSave(encapsulated, TypeInfo(TRelayFrame)); if (encapsulated.opcode in [focText,focRestPayload]) and not (fopAlreadyCompressed in Frame.content) then trigger := WebSocketsBinarySynLzThreshold else trigger := maxInt; // no compression, just crc32c to avoid most attacks encapsulated.payload := AlgoSynLZ.Compress(encapsulated.payload, trigger); dest.opcode := focBinary; dest.content := []; if (Process.Protocol <> nil) and Process.Protocol.Encrypted then Process.Protocol.Encryption.Encrypt(encapsulated.payload, dest.payload) else dest.payload := encapsulated.payload; result := Process.SendFrame(dest); if result then begin inc(fSent, length(Frame.payload)); inc(fFrames); end; fLog.Add.Log(LOG_TRACEWARNING[not result], 'EncapsulateAndSend % #% % %', [ToText(Frame.opcode)^, Connection, IP, KBNoSpace(length(dest.payload))], self); end; function TAbstractRelay.Decapsulate(Protocol: TWebSocketProtocol; var Frame: TWebSocketFrame): THttpServerConnectionID; var encapsulated: TRelayFrame; plain: RawByteString; begin if (Frame.opcode = focBinary) and (Protocol <> nil) and Protocol.Encrypted then Protocol.Encryption.Decrypt(Frame.payload, plain) else plain := Frame.payload; if AlgoSynLZ.TryDecompress(plain, Frame.payload) and (length(Frame.payload) > 8) and (PRelayFrame(Frame.payload)^.revision = RELAYFRAME_VER) and (PRelayFrame(Frame.payload)^.opcode in [focContinuation, focConnectionClose, focBinary, focText, focRestPayload]) and RecordLoad(encapsulated, Frame.payload, TypeInfo(TRelayFrame)) and (encapsulated.revision = RELAYFRAME_VER) then begin // paranoid Frame.opcode := encapsulated.opcode; Frame.content := encapsulated.content; Frame.payload := encapsulated.payload; inc(fReceived, length(Frame.payload)); inc(fValid); fLog.Add.Log(sllTrace, 'Decapsulate: #% % %', [encapsulated.connection, ToText(Frame.opcode)^, KBNoSpace(length(Frame.payload))], self); result := encapsulated.connection; end else begin inc(fInvalid); fLog.Add.Log(sllWarning, 'Decapsulate: incorrect % frame %', [ToText(Frame.opcode)^, EscapeToShort(Frame.payload)], self); result := 0; end; end; constructor TAbstractRelay.Create(aLog: TSynLogClass); begin inherited Create; fStarted := NowUTCToString; if aLog = nil then fLog := TSynLog else fLog := aLog; fRestTimeoutMS := 10000; // wait 10 seconds for the REST request to be relayed end; destructor TAbstractRelay.Destroy; var tix: Int64; begin fStarted := ''; // notify destroying if fRestPending <> 0 then begin fLog.Add.Log(sllDebug, 'Destroy: RestPending=%', [fRestPending], self); tix := GetTickCount64 + 500; while (fRestPending <> 0) and (GetTickCount64 < tix) do sleep(1); end; inherited Destroy; end; { TRelayServerProtocol } procedure TRelayServerProtocol.ProcessIncomingFrame(Sender: TWebSocketProcess; var Frame: TWebSocketFrame; const info: RawUTF8); var log: TSynLog; connection: THttpServerConnectionID; sent: boolean; rest: TRelayFrameRestPayload; client: TWebSocketServerResp; i: PtrInt; p: ^THttpServerRequest; begin log := fOwner.fLog.Add; log.Log(sllTrace, 'ProcessIncomingFrame % %', [Sender.RemoteIP, ToText(Frame.opcode)^], self); fOwner.Safe.Lock; try case Frame.opcode of focContinuation: if fOwner.fServerConnected <> nil then raise ERelayProtocol.Create('Only a single server instance is allowed') else begin fOwner.fServerConnected := Sender; fOwner.fServerConnectedToLocalHost := Sender.RemoteIP = ''; end; focConnectionClose: if fOwner.fServerConnected = Sender then fOwner.fServerConnected := nil else log.Log(sllWarning, 'ProcessIncomingFrame: fServerConnected?', self); focBinary: begin if fOwner.fServerConnected <> Sender then raise ERelayProtocol.CreateUTF8('Unexpected %.ProcessIncomingFrame Sender', [self]); connection := fOwner.Decapsulate(Sender.Protocol, Frame); if connection = 0 then exit; if Frame.opcode = focRestPayload then begin if RecordLoad(rest, Frame.payload, TypeInfo(TRelayFrameRestPayload)) then begin p := pointer(fOwner.fRestFrame); for i := 1 to fOwner.fRestFrameCount do if p^.RequestID = connection then begin log.Log(sllTrace, 'ProcessIncomingFrame received #%.% % [%]', [p^.ConnectionID, p^.RequestID, p^.Method, rest.Status], self); if rest.status = 0 then break; p^.OutContent := rest.content; if rest.contenttype = '' then p^.OutContentType := JSON_CONTENT_TYPE_VAR else p^.OutContentType := rest.contenttype; p^.OutCustomHeaders := rest.headers; p^.Status := rest.status; // should be the latest set exit; // will be intercepted by TPublicRelay.OnClientsRequest end else inc(p); end; raise ERelayProtocol.CreateUTF8( 'Unexpected #$.% focRestPayload in %.ProcessIncomingFrame',[connection, self]); end else begin client := fOwner.fClients.IsActiveWebSocket(connection); if client = nil then begin log.Log(sllWarning, 'ProcessIncomingFrame: unknown connection #%', [connection], self); if Frame.opcode <> focConnectionClose then begin Frame.opcode := focConnectionClose; Frame.payload := ''; fOwner.EncapsulateAndSend(Sender, 'removed', Frame, connection); end; end else begin // redirect the frame to the final client sent := client.WebSocketProcess.SendFrame(Frame); log.Log(LOG_DEBUGERROR[not sent], 'ProcessIncomingFrame % #% % %', [ToText(Frame.opcode)^, connection, client.WebSocketProcess.RemoteIP, KBNoSpace(length(Frame.payload))], self); end; end; end; else raise ERelayProtocol.CreateUTF8('Unexpected % in %.ProcessIncomingFrame', [ToText(Frame.opcode)^, self]); end; finally fOwner.Safe.UnLock; end; end; constructor TRelayServerProtocol.Create(aOwner: TPublicRelay; const aServerKey: RawUTF8); begin fOwner := aOwner; inherited Create('synopserelay', ''); if aServerKey <> '' then SetEncryptKey({aServer=}true, aServerKey); end; function TRelayServerProtocol.Clone(const aClientURI: RawUTF8): TWebSocketProtocol; begin result := TRelayServerProtocol.Create(fOwner, ''); if fEncryption<>nil then TRelayServerProtocol(result).fEncryption := fEncryption.Clone; end; { TSynopseServerProtocol } constructor TSynopseServerProtocol.Create(aOwner: TPublicRelay); begin fOwner := aOwner; inherited Create('synopserelay', ''); end; function TSynopseServerProtocol.Clone(const aClientURI: RawUTF8): TWebSocketProtocol; begin result := TSynopseServerProtocol.Create(fOwner); end; function TSynopseServerProtocol.GetSubprotocols: RawUTF8; begin // SynBidirSock's TWebSocketProtocolRest variants result := 'synopsejson, synopsebin, synopsebinary'; end; function TSynopseServerProtocol.SetSubprotocol(const aProtocolName: RawUTF8): boolean; begin result := FindPropName(['synopsejson', 'synopsebin', 'synopsebinary'], aProtocolName) >= 0; end; procedure TSynopseServerProtocol.ProcessIncomingFrame(Sender: TWebSocketProcess; var Frame: TWebSocketFrame; const info: RawUTF8); var ip: RawUTF8; begin fOwner.fLog.Add.Log(sllTrace, 'ProcessIncomingFrame % %', [Sender.RemoteIP, ToText(Frame.opcode)^], self); fOwner.Safe.Lock; try case Frame.opcode of focContinuation, focText, focBinary: if fOwner.fServerConnected = nil then raise ERelayProtocol.CreateUTF8('%.ProcessIncomingFrame: No server to relay to', [self]); focConnectionClose: if fOwner.fServerConnected = nil then exit; else exit; // relay meaningfull frames end; ip := Sender.RemoteIP; if Frame.opcode = focContinuation then Frame.payload := ip + #13 + Name + #13 + UpgradeURI; // propagate to Private Relay if not fOwner.EncapsulateAndSend(fOwner.fServerConnected, ip, Frame, Sender.OwnerConnection) and (Frame.opcode <> focConnectionClose) then raise ERelayProtocol.CreateUTF8('%.ProcessIncomingFrame: Error relaying % from #% % to server', [ToText(Frame.opcode)^, Sender.OwnerConnection, ip]); finally fOwner.Safe.UnLock; end; end; { TPublicRelay } function TPublicRelay.OnServerBeforeBody(const aURL, aMethod, aInHeaders, aInContentType, aRemoteIP: SockString; aContentLength: integer; aUseSSL: boolean): cardinal; var bearer: RawUTF8; res: TJWTResult; begin if IdemPChar(pointer(aURL), '/STAT') then begin result := STATUS_SUCCESS; exit; end; FindNameValue(aInHeaders, HEADER_BEARER_UPPER, bearer); res := fServerJWT.Verify(bearer); if res = jwtValid then if fServerConnected <> nil then begin fLog.Add.Log(sllWarning, 'OnBeforeBody % already connected to %', [aRemoteIP, fServerConnected.RemoteIP], self); result := STATUS_NOTACCEPTABLE; end else result := STATUS_SUCCESS else begin inc(fRejected); fLog.Add.Log(sllUserAuth, 'OnBeforeBody % %', [ToText(res)^, aRemoteIP], self); result := STATUS_FORBIDDEN; end; end; function TPublicRelay.OnServerRequest(Ctxt: THttpServerRequest): cardinal; begin if IdemPChar(pointer(Ctxt.URL), '/STAT') then begin Ctxt.OutContent := GetStats; Ctxt.OutContentType := JSON_CONTENT_TYPE_VAR; fLog.Add.Log(sllTrace, 'OnRequest stats=%', [Ctxt.OutContent], self); result := STATUS_SUCCESS; end else result := STATUS_NOTFOUND; end; procedure SetRestFrame(out frame: TWebSocketFrame; status: integer; const url, method, headers, content, contenttype: RawByteString); var rest: TRelayFrameRestPayload; begin rest.status := status; rest.url := url; rest.method := method; rest.headers := PurgeHeaders(pointer(headers)); if (contenttype <> '') and not IdemPropNameU(contenttype, JSON_CONTENT_TYPE_VAR) then rest.contenttype := contenttype; rest.content := content; FrameInit(focRestPayload, rest.content, rest.contenttype, frame); frame.payload := RecordSave(rest,TypeInfo(TRelayFrameRestPayload)); end; function TPublicRelay.OnClientsRequest(Ctxt: THttpServerRequest): cardinal; var frame: TWebSocketFrame; start, diff: Int64; log: ISynLog; begin result := 504; // HTTP_GATEWAYTIMEOUT log := fLog.Enter('OnClientsRequest #%.% % % %', [Ctxt.ConnectionID, Ctxt.RequestID, Ctxt.RemoteIP, Ctxt.Method, Ctxt.URL], self); if Ctxt.RequestID = 0 then raise ERelayProtocol.CreateUTF8('%.OnClientsRequest: RequestID=0', [self]); SetRestFrame(frame, 0, Ctxt.URL, Ctxt.Method, Ctxt.InHeaders, Ctxt.InContent, Ctxt.InContentType); Safe.Lock; try if fServerConnected = nil then raise ERelayProtocol.CreateUTF8('%.OnClientsRequest: No server to relay to', [self]); if not EncapsulateAndSend(fServerConnected, Ctxt.RemoteIP, frame, Ctxt.RequestID) then raise ERelayProtocol.CreateUTF8('%.OnClientsRequest: Error relaying from #% % to server', [Ctxt.ConnectionID, Ctxt.RemoteIP]); ObjArrayAddCount(fRestFrame, Ctxt, fRestFrameCount); inc(fRestFrames); finally Safe.UnLock; end; InterLockedIncrement(fRestPending); // now wait for the response to come try start := GetTickCount64; repeat if Ctxt.Status <> 0 then begin if log<>nil then log.Log(sllTrace, 'OnClientsRequest: answer [%] % %', [Ctxt.Status, KB(Ctxt.OutContent), Ctxt.OutContentType], self); result := Ctxt.Status; break; end; diff := GetTickCount64 - start; if (fStarted <> '') and (diff < fRestTimeoutMS) then begin if (diff < 10) and fServerConnectedToLocalHost then SleepHiRes(0) // faster on loopback (e.g. tests) else Sleep(1); continue; end; if log<>nil then log.Log(sllTrace, 'OnClientsRequest: timeout after %ms (start=% now=%)', [diff, start, GetTickCount64], self); break; until false; finally InterLockedDecrement(fRestPending); Safe.Lock; try if PtrArrayDelete(fRestFrame, Ctxt, @fRestFrameCount) < 0 then if log<>nil then log.Log(sllWarning, 'OnClientsRequest: no Ctxt in fRestFrame[]', self); finally Safe.UnLock; end; end; end; function TPublicRelay.GetStats: RawJSON; var tix: integer; ip: RawUTF8; begin tix := GetTickCount64 shr 9; // 512 ms cache to avoid DoS attacks if tix <> fStatTix then begin fStatTix := tix; if fServerConnected = nil then ip := 'not connected' else ip := fServerConnected.RemoteIP; fStatCache := JSONReformat(JsonEncode([ 'version',ExeVersion.Version.Detailed, 'started',Started, 'memory',TSynMonitorMemory.ToVariant, 'disk free',GetDiskPartitionsText, 'exceptions',GetLastExceptions, 'connections',fClients.ServerConnectionCount, 'rejected',Rejected, 'server',ip, 'sent','{','kb',KB(Sent), 'frames',Frames, '}', 'received','{','kb',KB(Received), 'valid',Valid, 'invalid',Invalid, '}', 'rest','{', 'frames',fRestFrameCount, 'pending',fRestPending, '}', 'connected',fClients.ServerConnectionActive, 'clients',fClients.WebSocketConnections])); end; result := fStatCache; end; constructor TPublicRelay.Create(aLog: TSynLogClass; const aClientsPort, aServerPort: SockString; const aServerKey: RawUTF8; aServerJWT: TJWTAbstract; aClientsThreadPoolCount, aClientsKeepAliveTimeOut: integer); var log: ISynLog; begin inherited Create(aLog); log := fLog.Enter('Create: bind clients on %, server on %, encrypted=% %', [aClientsPort, aServerPort, BOOL_STR[aServerKey<>''], aServerJWT], self); fServerJWT := aServerJWT; fServer := TWebSocketServer.Create(aServerPort, nil, nil, 'relayserver'); fServer.WaitStarted; if fServerJWT <> nil then fServer.OnBeforeBody := OnServerBeforeBody; fServer.OnRequest := OnServerRequest; fServer.WebSocketProtocols.Add(TRelayServerProtocol.Create(self, aServerKey)); fClients := TWebSocketServer.Create(aClientsPort, nil, nil, 'relayclients', aClientsThreadPoolCount, aClientsKeepAliveTimeOut); fClients.WaitStarted; fClients.WebSocketProtocols.Add(TSynopseServerProtocol.Create(self)); fClients.OnRequest := OnClientsRequest; if log<>nil then log.Log(sllDebug, 'Create: Server=% Clients=%', [fServer, fClients], self); end; destructor TPublicRelay.Destroy; var log: ISynLog; begin log := fLog.Enter(self, 'Destroy'); fStatTix := 0; // force GetStats recomputation log.Log(sllDebug, 'Destroying %', [self], self); fClients.Free; fServerConnected := nil; fServer.Free; inherited Destroy; fServerJWT.Free; end; { TRelayClientProtocol } constructor TRelayClientProtocol.Create(aOwner: TPrivateRelay; const aRelayKey: RawUTF8); begin fOwner := aOwner; inherited Create('synopserelay', ''); SetEncryptKey({aServer=}false, aRelayKey); end; function TRelayClientProtocol.Clone(const aClientURI: RawUTF8): TWebSocketProtocol; begin result := nil; // not used on this client-side protocol end; procedure TRelayClientProtocol.ProcessIncomingFrame(Sender: TWebSocketProcess; var Frame: TWebSocketFrame; const info: RawUTF8); var server, tobedeleted: TServerClient; connection: THttpServerConnectionID; serverindex: integer; rest: TRelayFrameRestPayload; http: THttpClientSocket; log: TSynLog; begin log := fOwner.fLog.Add; log.Log(sllTrace, 'ProcessIncomingFrame % %', [Sender.RemoteIP, ToText(Frame.opcode)^], self); case Frame.opcode of focConnectionClose: begin if fOwner.Connected then begin log.Log(sllTrace, 'ProcessIncomingFrame: Public Relay server shutdown', self); fOwner.Disconnect; end; log.Log(sllTrace, 'ProcessIncomingFrame: disconnected to Public Relay', self); exit; end; focBinary: if not fOwner.Connected then raise ERelayProtocol.CreateUTF8('%.ProcessIncomingFrame: not connected', [self]); else exit; // relay meaningfull frames end; connection := fOwner.Decapsulate(Sender.Protocol, Frame); if connection = 0 then exit; if Frame.opcode = focRestPayload then begin if not RecordLoad(rest, Frame.payload, TypeInfo(TRelayFrameRestPayload)) then raise ERelayProtocol.CreateUTF8('%.ProcessIncomingFrame: focRestPayload payload', [self]); log.Log(sllTrace, 'ProcessIncomingFrame: relay #$.% %', [connection, rest.method], self); try http := THttpClientSocket.Open(fOwner.fServerHost, fOwner.fServerPort); try // use a quick thread-pooled HTTP/1.0 request to the ORM/SOA server if rest.contenttype = '' then rest.contenttype := JSON_CONTENT_TYPE_VAR; rest.status := http.Request(rest.url, rest.method, {keepalive=}0, rest.headers, rest.content, rest.contenttype, {retry=}false); SetRestFrame(Frame, rest.status, '', '', http.HeaderGetText, http.Content, http.ContentType); log.Log(sllTrace, 'ProcessIncomingFrame: answered [%] %', [rest.status, KB(http.content)], self); finally http.Free; end; except on E: Exception do SetRestFrame(Frame, 502 { = bad gateway }, '', '', '', FormatUTF8('% failure: % %', [self, E, E.Message]), TEXT_CONTENT_TYPE); end; fOwner.Safe.Lock; try fOwner.EncapsulateAndSend(Sender, Sender.RemoteIP, Frame, connection); finally fOwner.Safe.UnLock; end; exit; end; tobedeleted := nil; fOwner.Safe.Lock; try server := fOwner.FindServerClientByConnection(connection, serverindex); if server = nil then case Frame.opcode of focConnectionClose: exit; // during closure handcheck focContinuation: begin server := fOwner.NewServerClient(connection, Frame.payload); if server = nil then begin log.Log(sllWarning, 'ProcessIncomingFrame: failed to create new link', self); Frame.opcode := focConnectionClose; Frame.content := []; Frame.payload := ''; fOwner.EncapsulateAndSend(Sender, 'noserver', Frame, connection); exit; end; end else raise ERelayProtocol.CreateUTF8('%.ProcessIncomingFrame #% %?', [self, connection, ToText(Frame.opcode)^]); end; case Frame.opcode of focBinary, focText: if not server.WebSockets.SendFrame(Frame) then log.Log(sllWarning, 'ProcessIncomingFrame: SendFrame failed', self); focConnectionClose: begin log.Log(sllTrace, 'ProcessIncomingFrame: delete %', [server], self); tobedeleted := server; PtrArrayDelete(fOwner.fServers, serverindex, @fOwner.fServersCount); end; end; finally fOwner.Safe.UnLock; end; tobedeleted.Free; // outside fOwner.Safe.Lock to prevent deadlocks end; { TSynopseClientProtocol } constructor TSynopseClientProtocol.Create(aOwner: TPrivateRelay; const aProtocolName: RawUTF8); begin fOwner := aOwner; inherited Create(aProtocolName, ''); end; function TSynopseClientProtocol.Clone(const aClientURI: RawUTF8): TWebSocketProtocol; begin result := nil; // not used on this client-side only protocol end; procedure TSynopseClientProtocol.ProcessIncomingFrame( Sender: TWebSocketProcess; var Frame: TWebSocketFrame; const info: RawUTF8); var server, tobedeleted: TServerClient; serverindex: integer; begin fOwner.fLog.Add.Log(sllTrace, 'ProcessIncomingFrame % %', [Sender.RemoteIP, ToText(Frame.opcode)^], self); if not (Frame.opcode in [focConnectionClose, focText, focBinary]) then exit; tobedeleted := nil; fOwner.fSafe.Lock; try if (fOwner.fRelayClient = nil) and (Frame.opcode <> focConnectionClose) then raise ERelayProtocol.CreateUTF8('%.ProcessIncomingFrame: Public Relay down at %:%', [self, fOwner.fRelayHost, fOwner.fRelayPort]); server := fOwner.FindServerClientByProcess(Sender, serverindex); if (server = nil) or (server.Connection = 0) or (server.WebSockets <> Sender) then if Frame.opcode = focConnectionClose then exit else raise ERelayProtocol.CreateUTF8('%.ProcessIncomingFrame: Unexpected %', [self, ToText(Frame.opcode)^]); if Frame.opcode = focConnectionClose then tobedeleted := server; if not fOwner.EncapsulateAndSend(fOwner.fRelayClient.WebSockets, server.OriginIP, Frame, server.Connection) and (tobedeleted = nil) then raise ERelayProtocol.CreateUTF8('%.ProcessIncomingFrame: Error sending to Public Relay %:%', [self, fOwner.fRelayHost, fOwner.fRelayPort]); if tobedeleted <> nil then PtrArrayDelete(fOwner.fServers, serverindex, @fOwner.fServersCount); finally fOwner.fSafe.UnLock; if tobedeleted <> nil then tobedeleted.Free; // outside fOwner.Safe.Lock to prevent deadlocks end; end; { TPrivateRelay } constructor TPrivateRelay.Create(aLog: TSynLogClass; const aRelayHost, aRelayPort, aRelayKey, aRelayBearer, aServerHost, aServerPort, aServerRemoteIPHeader: RawUTF8); begin inherited Create(aLog);; fRelayHost := aRelayHost; fRelayPort := aRelayPort; fRelayKey := aRelayKey; fRelayCustomHeaders := AuthorizationBearer(aRelayBearer); fServerHost := aServerHost; fServerPort := aServerPort; if aServerRemoteIPHeader <> '' then fServerRemoteIPHeader := aServerRemoteIPHeader + ': '; fLog.Add.Log(sllDebug, 'Create: %', [self], self); end; function TPrivateRelay.FindServerClientByConnection(connection: THttpServerConnectionID; out index: integer): TServerClient; var client: ^TServerClient; // compiles to efficient asm on FPC i: integer; begin // caller made fSafe.Lock result := nil; if connection = 0 then exit; client := pointer(fServers); for i := 1 to fServersCount do if client^.Connection = connection then begin index := i - 1; result := client^; exit; end else inc(client); end; function TPrivateRelay.FindServerClientByProcess(process: TWebSocketProcess; out index: integer): TServerClient; var client: ^TServerClient; i: integer; begin // caller made fSafe.Lock result := nil; if process = nil then exit; client := pointer(fServers); for i := 1 to fServersCount do if client^.fProcess = process then begin index := i - 1; result := client^; exit; end else inc(client); end; function TPrivateRelay.NewServerClient(connection: THttpServerConnectionID; const ipprotocoluri: RawUTF8): TServerClient; var ip, protocol, url, header: RawUTF8; log: ISynLog; begin // caller made fSafe.Lock split(ipprotocoluri, #13, ip, protocol); split(protocol, #13, protocol, url); log := fLog.Enter('NewServerClient(%:%) for #% %/% %', [fServerHost, fServerPort, connection, ip, url, protocol], self); if fServerRemoteIPHeader <> '' then header := fServerRemoteIPHeader + ip; result := TServerClient(TServerClient.WebSocketsConnect(fServerHost, fServerPort, TSynopseClientProtocol.Create(self, protocol), fLog, 'NewServerClient', url, header)); if result <> nil then begin result.Connection := connection; result.OriginIP := ip; ObjArrayAddCount(fServers, result, fServersCount); end; if log<>nil then log.Log(sllTrace, 'NewServerClient = %', [result], self); end; procedure TPrivateRelay.Disconnect; var threadsafe: TServerClients; log: ISynLog; i: PtrInt; begin if not Connected then exit; log := fLog.Enter('Disconnect %:% count=%', [fRelayHost, fRelayPort, fServersCount], self); fSafe.Lock; // avoid deadlock with focConnectionClose notification try threadsafe := fServers; // shutdown all current per-client links SetLength(threadsafe, fServersCount + 1); threadsafe[fServersCount] := pointer(fRelayClient); // + PublicRelay link fServers := nil; fServersCount := 0; fRelayClient := nil; finally fSafe.UnLock; end; for i := 0 to high(threadsafe) do begin if log<>nil then log.Log(sllDebug, 'Disconnect %', [threadsafe[i]], self); threadsafe[i].Free; end; end; function TPrivateRelay.Connected: boolean; begin result := fRelayClient <> nil; end; function TPrivateRelay.Encrypted: boolean; begin result := fRelayKey <> ''; end; function TPrivateRelay.TryConnect: boolean; var log: ISynLog; begin log := fLog.Enter('TryConnect %:%', [fRelayHost, fRelayPort], self); if Connected then Disconnect; // will do proper Safe.Lock/UnLock fSafe.Lock; try fRelayClient := THttpClientWebSockets.WebSocketsConnect( fRelayHost, fRelayPort, TRelayClientProtocol.Create(self, fRelayKey), fLog, 'TPrivateRelay.TryConnect', '', fRelayCustomHeaders); result := fRelayClient <> nil; finally fSafe.UnLock; end; if log<>nil then log.Log(sllDebug, 'TryConnect=%', [BOOL_STR[result]], self); end; destructor TPrivateRelay.Destroy; var log: ISynLog; begin log := fLog.Enter(self, 'Destroy'); try if log<>nil then log.Log(sllDebug, 'Destroying %', [self], self); if Connected then begin Disconnect; if log<>nil then log.Log(sllDebug, 'Destroy: disconnected as %', [self], self); end; except end; inherited Destroy; end; end.