{{#soa}} {{#unitasynch}} /// asynch version of {{#services}}{{interfaceName}} {{/services}} {{/unitasynch}} {{#unitsynch}} /// implements {{#services}}{{interfaceName}} {{/services}}over *Asynch {{/unitsynch}} unit {{filename}}; { WARNING: This unit has been generated by {{exeName}}. Any manual modification of this file may be lost after regeneration. {{#unitasynch}} Defines asynchronous (non-blocking) types for the following services: {{#services}} - {{interfaceName}} as non-blocking {{interfaceName}}Asynch, associated with blocking T{{uri}}Synch / {{interfaceName}}Synch, {{interfaceName}}AsynchAck and T{{uri}}Delays. {{/services}} {{/unitasynch}} {{#unitsynch}} Defines synchronous (blocking) implementation for the following services: {{#services}} - {{interfaceName}} as blocking T{{uri}}Abstract, calling {{interfaceName}}Synch / {{interfaceName}}Asynch {{/services}} {{/unitsynch}} Corresponding to {{projectname}} version {{exeVersion}}. Generated by {{User}} at {{time}}. } {{asynchparam}}{{#args}}{{#dirInput}}; {{dirName}} {{argName}}: {{typeSource}}{{/dirInput}}{{/args}}; {{>callparam}}){{#args}}{{#dirResult}}: {{typeSource}}{{/dirResult}}{{/args}};{{/methodasynch}} {{callparam}}; {{#args}}{{^dirResult}}{{#dirOutput}} const {{argName}}: {{typeSource}};{{/dirOutput}}{{/dirResult}}{{/args}}{{#args}}{{#dirResult}} const res: {{typeSource}}{{/dirResult}}{{/args}});{{/methodack}} {{asynchparam}}{{#args}}{{^dirResult}}; {{dirName}} {{argName}}: {{typeSource}}{{/dirResult}}{{/args}}){{#args}}{{#dirResult}}: {{typeSource}}{{/dirResult}}{{/args}};{{/methodsynch}} {{#services}}{{#unitasynch}} { -------- asynchronous version of {{interfaceName}} } type {{methoddelay}} ms property {{methodName}}: integer read f{{methodName}} write f{{methodName}}; {{/isInherited}} {{/methods}} end; /// the {{interfaceName}}Asynch progress callback definition // - a single callback, after subscription via Subscribe{{uri}}(), // would receive the acknowledgements of all {{interfaceName}}Asynch methods // - some commands may take a lot of time, so this asynchronous mechanism // would increase the system reactivity // - naming is following the {{interfaceName}} method names // - call: {{calltype}} is the opaque value supplied at command invoke {{interfaceName}}AsynchAck = interface(IInvokable) ['{{newguid .}}'] {{#methods}} {{^isInherited}} procedure {{>methodack}} {{/isInherited}} {{/methods}} end; /// identify any {{interfaceName}}Asynch method // - see also ToText(), ToMethodName() and To{{uri}}Ack() functions T{{uri}}Ack = ( ack{{uri}}Undefined{{#methods}}{{^isInherited}}, ack{{methodName}}{{/isInherited}}{{/methods}}); /// high-level asynchronous (non blocking) definition of {{interfaceName}} // - all the methods match the latest inheritance level of synchronous // (blocking) {{interfaceName}} - it won't define the parents methods, // since it would allow to work on a dual phase Select/Command with no // prior Select (multiple inheritance of interfaces may have helped a lot, but // but they are not allowed yet){{#asynchkey}} using {{.}}: {{asynchkeytype}} to redirect // the {{interfaceName}}Asynch call to the corresponding {{interfaceName}} {{/asynchkey}} // - call: {{calltype}} is an opaque value, which would identify the command // when it is acknowledged by {{interfaceName}}AsynchAck {{interfaceName}}Asynch = interface(IInvokable) ['{{newguid .}}'] /// this method is expected to be called once at the beginning of the // process, to receive all asynchronous acknowledgements of the other methods // - it would return the default delays for the associated timeouts, as // defined on the server side function Subscribe{{uri}}(const OnAck: {{interfaceName}}AsynchAck; out Delays: T{{uri}}Delays): TCQRSResult; // all methods below map {{interfaceName}} methods, and their input parameters {{#methods}} {{^isInherited}} {{verb}} {{>methodasynch}} {{/isInherited}} {{/methods}} end; /// waiting semaphore associated to {{interfaceName}}Asynch // - used internally by T{{uri}}AsynchAck T{{uri}}AsynchCall = class(TBlockingProcessPoolItem) protected procedure ResetInternal; override; // set Params to 0 public Params: record // execution context {{#asynchkey}} {{.}}: {{asynchkeytype}}; {{/asynchkey}} methodname: RawUTF8; ack: T{{uri}}Ack; // additional parameters, copied from {{interfaceName}}AsynchAck res: TCQRSResult;{{#methods}}{{^isInherited}}{{#args}}{{#dirOutput}}{{^dirResult}} {{argName}}{{methodIndex}}: {{typeSource}};{{/dirResult}}{{/dirOutput}}{{/args}}{{/isInherited}}{{/methods}} end; published {{#asynchkey}} property {{.}}: {{asynchkeytype}} read Params.{{.}}; {{/asynchkey}} property ack: T{{uri}}Ack read Params.ack; property res: TCQRSResult read Params.res; end; /// propagate acknowledgements for {{interfaceName}}Asynch // - {{interfaceName}}AsynchAck acknowledgements would be propagated using the // associated {{calltype}}, to release the wait of the main {{interfaceName}} // blocking process // - would allow to run {{interfaceName}} blocking methods over a supplied // {{interfaceName}}Asynch instance T{{uri}}AsynchAck = class(TCQRSServiceAsynchAck, {{interfaceName}}AsynchAck) protected function Notify({{>callparam}}; ack: T{{uri}}Ack; res: TCQRSResult; out process: T{{uri}}AsynchCall): boolean; overload; procedure Notify({{>callparam}}; ack: T{{uri}}Ack; res: TCQRSResult); overload; // {{interfaceName}}AsynchAck methods // would propagate the acknowledgement, and copy any additional parameter // to T{{uri}}AsynchCall.Params {{#methods}} {{^isInherited}} procedure {{>methodack}} {{/isInherited}} {{/methods}} public constructor Create(aLog: TSynLogClass); /// returns a blocking process from the internal semaphore pool function NewAsynchCall: T{{uri}}AsynchCall; end; /// shared synchronous (blocking) interface of {{interfaceName}}Asynch {{#asynchkey}} // - every method expects a {{.}}: {{asynchkeytype}} first input // parameter, in addition to the regular {{interfaceName}} parameters {{/asynchkey}} {{interfaceName}}Synch = interface(IInvokable) ['{{newguid .}}'] {{#methods}} {{^isInherited}} {{verb}} {{>methodsynch}} {{/isInherited}} {{/methods}} end; /// implements {{interfaceName}}Synch over a {{interfaceName}}Asynch instance // - it will use a shared T{{uri}}AsynchAck callback to wait for each // command to be finished, and emulate synchronous (non-blocking) execution // - you may use this class e.g. at API level, over a blocking REST server, // and communicate with the Domain event-driven services via asynchronous calls T{{uri}}Synch = class(TCQRSServiceSynch, {{interfaceName}}Synch) protected fLog: TSynLogClass; fDelays: T{{uri}}Delays; fDelaysOwned: boolean; fAsynch: {{interfaceName}}Asynch; fSharedCallback: T{{uri}}AsynchAck; procedure WaitFor(call: T{{uri}}AsynchCall;{{#asynchkey}} const {{.}}: {{asynchkeytype}};{{/asynchkey}} delay: integer; ack: T{{uri}}Ack; var result: TCQRSResult); public /// initialize the blocking instance // - would allocate an internal T{{uri}}AsynchAck callback, and // execute {{interfaceName}}Asynch.Subscribe{{uri}}() // - you may specify custom delays, to overload values supplied by the server // during Subscribe{{uri}}() constructor Create(const aAsynch: {{interfaceName}}Asynch; aDelays: T{{uri}}Delays = nil; aLog: TSynLogClass = nil); reintroduce; /// finalize the instance destructor Destroy; override; /// access to the asynchronous methods property Asynch: {{interfaceName}}Asynch read fAsynch; /// associated time out values, in ms property Delays: T{{uri}}Delays read fDelays; public // {{interfaceName}}Synch blocking methods, returning cqrsTimeout if the // non-blocking calls did not respond in the expected delay, or the // TCQRSResult returned by the associated {{interfaceName}}Asynch method {{#methods}} {{^isInherited}} {{verb}} {{>methodsynch}} {{/isInherited}} {{/methods}} end; /// returns the low-level text value of the enumerated, including trailing "ack" // - may be used e.g. for debugging/logging purpose function ToText(ack: T{{uri}}Ack): PShortString; overload; /// returns the original method name without trailing "ack", as defined in // {{interfaceName}}Asynch // - reverse function of To{{uri}}Ack() function ToMethodName(ack: T{{uri}}Ack): RawUTF8; overload; /// find a T{{uri}}Ack item, matching original method name // without trailing "ack", as defined in {{interfaceName}}Asynch // - reverse function of ToMethodName() function To{{uri}}Ack(const MethodName: RawUTF8): T{{uri}}Ack; {{/unitasynch}} {{#asynchkey}}{{#unitsynch}}{ -------- implements {{interfaceName}} over {{interfaceName}}Synch } {{#query}}{{method}} {{/isInherited}} {{/methods}} end; {{/query}}{{/unitsynch}}{{/asynchkey}} {{/services}} implementation {{#services}} {{#unitasynch}} { -------- asynchronous version of {{interfaceName}} } function ToText(ack: T{{uri}}Ack): PShortString; begin result := GetEnumName(TypeInfo(T{{uri}}Ack), ord(ack)); end; function ToMethodName(ack: T{{uri}}Ack): RawUTF8; begin result := TrimLeftLowerCaseShort(ToText(ack)); end; function To{{uri}}Ack(const MethodName: RawUTF8): T{{uri}}Ack; var ndx: integer; begin ndx := GetEnumNameValueTrimmed(TypeInfo(T{{uri}}Ack), pointer(MethodName), length(MethodName)); if ndx > 0 then result := T{{uri}}Ack(ndx) else result := ack{{uri}}Undefined; end; { T{{uri}}Delays } constructor T{{uri}}Delays.Create; begin inherited; {{#methods}} {{^isInherited}} f{{methodName}} := {{>methoddelay}}; {{/isInherited}} {{/methods}} end; { T{{uri}}AsynchCall } procedure T{{uri}}AsynchCall.ResetInternal; begin inherited ResetInternal; // set fEvent := evNone and fCall := 0 Finalize(Params); FillCharFast(Params, sizeof(Params), 0); end; { T{{uri}}AsynchAck } constructor T{{uri}}AsynchAck.Create(aLog: TSynLogClass); begin inherited Create; fLog := aLog; fCalls := TBlockingProcessPool.Create(T{{uri}}AsynchCall); end; {{callparam}}; ack: T{{uri}}Ack; res: TCQRSResult; out process: T{{uri}}AsynchCall): boolean; var id: integer; begin result := false; {{#callfunction}} if not {{.}}(call, id) then begin fLog.Add.Log(sllTrace, 'Notify: invalid %(call=%) received', [ToText(ack)^, call], self); exit; end; {{/callfunction}} {{^callfunction}} id := call; {{/callfunction}} process := pointer(fCalls.FromCall(id, true)); if process = nil then begin fLog.Add.Log(sllTrace, 'Notify: deprecated/unexpected {{>callfmt}} received -> skipped', [ToText(ack)^, id, {{#asynchkey}}'?', {{/asynchkey}}ToText(res)^], self); exit; end; fLog.Add.Log(sllTrace, 'Notify: {{>callfmt}} received', [process.Params.methodname, id, {{#asynchkey}}process.{{.}}, {{/asynchkey}}ToText(res)^], self); process.Params.res := res; result := true; end; procedure T{{uri}}AsynchAck.Notify({{>callparam}}; ack: T{{uri}}Ack; res: TCQRSResult); var process: T{{uri}}AsynchCall; begin if Notify(call, ack, res, process) then process.NotifyFinished(true); // notify caller to unlock "WaitFor" method end; function T{{uri}}AsynchAck.NewAsynchCall: T{{uri}}AsynchCall; begin result := pointer(fCalls.NewProcess(0)); if result = nil then raise {{Exception}}.CreateUTF8('%.NewAsynchCall: NewProcess=nil', [self]); end; // {{interfaceName}}AsynchAck methods {{#methods}} {{^isInherited}} procedure T{{uri}}AsynchAck.{{>methodack}} {{#hasOutNotResultParams}} var process: T{{uri}}AsynchCall; begin if Notify(call, ack{{methodName}}, res, process) then begin{{#args}}{{#dirOutput}}{{^dirResult}} process.Params.{{argName}}{{methodIndex}} := {{argName}};{{/dirResult}}{{/dirOutput}}{{/args}} process.NotifyFinished(true); end; {{/hasOutNotResultParams}} {{^hasOutNotResultParams}} begin Notify(call, ack{{methodName}}, res); {{/hasOutNotResultParams}} end; {{/isInherited}} {{/methods}} { T{{uri}}Synch } constructor T{{uri}}Synch.Create(const aAsynch: {{interfaceName}}Asynch; aDelays: T{{uri}}Delays; aLog: TSynLogClass); var res: TCQRSResult; outdelays: T{{uri}}Delays; begin if aAsynch = nil then raise {{exception}}.CreateUTF8('%.Create(aAsynch=nil)', [self]); fAsynch := aAsynch; fLog := aLog; fSharedCallback := T{{uri}}AsynchAck.Create(fLog); inherited Create(fSharedCallback); outdelays := T{{uri}}Delays.Create; try res := fAsynch.Subscribe{{uri}}(fSharedCallback, outdelays); if res <> cqrsSuccess then raise EDomPanel.CreateUTF8('%.Create: {{interfaceName}}Asynch.Subscribe=%', [self, ToText(res)^]); if aDelays <> nil then fDelays := aDelays // force custom delays else begin fDelays := outdelays; fDelaysOwned := true; outdelays := nil; end; finally outdelays.Free; end; end; destructor T{{uri}}Synch.Destroy; begin if fDelaysOwned then fDelays.Free; inherited Destroy; end; procedure T{{uri}}Synch.WaitFor(call: T{{uri}}AsynchCall; {{#asynchkey}}const {{.}}: {{asynchkeytype}}; {{/asynchkey}}delay: integer; ack: T{{uri}}Ack; var result: TCQRSResult); var msg: RawUTF8; begin call.Lock; try {{#asynchkey}} call.Params.{{.}} := {{.}}; // for Notify() {{/asynchkey}} call.Params.ack := ack; call.Params.methodname := ToMethodName(ack); FormatUTF8('WaitFor: Asynch.{{>callfmt}}', [call.Params.methodname, call.Call, {{#asynchkey}}{{.}}, {{/asynchkey}}ToText(result)^], msg); finally call.Unlock; end; fLog.Add.Log(sllTrace, msg, self); if result <> cqrsSuccess then fLog.Add.Log(sllDDDError, '%: input parameters?', [msg]) else if call.WaitFor(delay) = evTimeOut then begin fLog.Add.Log(sllDDDInfo, '% timeout after %ms', [msg, delay]); result := cqrsTimeout; end else result := call.Params.res; end; // {{interfaceName}}Synch blocking methods {{methodsynch}} var log: ISynLog; call: T{{uri}}AsynchCall; begin if fLog <> nil then log := fLog.Enter('{{methodName}}({{#asynchkey}}{{.}}=%{{/asynchkey}}{{#args}}{{#dirInput}}, {{argName}}=%{{/dirInput}}{{/args}})', [{{#asynchkey}}{{.}}{{/asynchkey}}{{#args}}{{#dirInput}},{{>argvalue}}{{/dirInput}}{{/args}}], self); try call := fSharedCallback.NewAsynchCall; try result := Asynch.{{methodName}}({{#asynchkey}}{{.}}, {{/asynchkey}}{{#args}}{{#dirInput}}{{argName}}, {{/dirInput}}{{/args}}call.Call); WaitFor(call, {{#asynchkey}}{{.}}, {{/asynchkey}}Delays.{{methodName}}, ack{{methodName}}, result); finally{{#hasOutNotResultParams}}{{#args}}{{#dirOutput}}{{^dirResult}} {{argName}} := call.Params.{{argName}}{{methodIndex}};{{/dirResult}}{{/dirOutput}}{{/args}}{{/hasOutNotResultParams}} call.Reset; end; except on Exception do result := cqrsInternalError; end; if log <> nil then log.Log(sllDebug, '{{methodName}}{{#asynchkey}}(%){{/asynchkey}} returned %{{#args}}{{#dirOutput}}{{^dirResult}} {{argName}}=%{{/dirResult}}{{/dirOutput}}{{/args}}', [{{#asynchkey}}{{.}}, {{/asynchkey}}ToText(result)^{{#args}}{{#dirOutput}}{{^dirResult}}, {{>argvalue}}{{/dirResult}}{{/dirOutput}}{{/args}}], self); end; {{/isInherited}} {{/methods}} {{/unitasynch}} {{#asynchkey}}{{#unitsynch}}{ -------- implements {{interfaceName}} over {{interfaceName}}Synch } {{#query}} { T{{uri}}Abstract } function T{{uri}}Abstract.BeginSynch(var aResult: TCQRSResult): boolean; begin result := false; if CqrsBeginMethod(qaCommandOnSelect, aResult) then begin Set{{uri}}Synch; if f{{uri}}Synch = nil then CqrsSetResultMsg(cqrsInternalError, '{{uri}}Synch=nil') else result := true; end; end; {{#methods}}{{^isInherited}} {{verb}} T{{uri}}Abstract.{{>method}} begin if BeginSynch(result) then CqrsSetResult(f{{uri}}Synch.{{methodName}}( f{{asynchkey}}{{#args}}{{^dirResult}}, {{argName}}{{/dirResult}}{{/args}})); end; {{/isInherited}} {{/methods}} {{/query}}{{/unitsynch}}{{/asynchkey}} {{/services}} initialization {{#services}} {{#unitasynch}} TInterfaceFactory.RegisterInterfaces([ TypeInfo({{interfaceName}}AsynchAck), TypeInfo({{interfaceName}}Asynch)]); {{/unitasynch}} {{/services}} {{/soa}} end.