diff options
| author | Hugo Thunnissen <devel@hugot.nl> | 2023-08-06 17:39:32 +0200 |
|---|---|---|
| committer | Hugo Thunnissen <devel@hugot.nl> | 2023-08-06 17:39:32 +0200 |
| commit | d86ef5756bc887c0cb2b4b3c8c07a2fbc9524029 (patch) | |
| tree | 2ed955ebfb126daf264799636e8c5e02e77cf121 /phpinspect-pipeline.el | |
| parent | c20df819b8aca43feff09a5c1c5fc7408f3af168 (diff) | |
Remove `phpinspect-define-pipeline-step' in favor of direct fun call
Diffstat (limited to 'phpinspect-pipeline.el')
| -rw-r--r-- | phpinspect-pipeline.el | 193 |
1 files changed, 91 insertions, 102 deletions
diff --git a/phpinspect-pipeline.el b/phpinspect-pipeline.el index 8ff789c..a55b944 100644 --- a/phpinspect-pipeline.el +++ b/phpinspect-pipeline.el @@ -124,6 +124,81 @@ user input.") phpinspect-pipeline-pause-time mx (make-condition-variable mx "phpinspect-pipeline-pause"))) (thread-yield)))) +(define-inline phpinspect--read-pipeline-emission (&rest body) + (push 'progn body) + (inline-quote + (catch 'phpinspect-pipeline-emit + ,body + nil))) + +(defmacro phpinspect--run-as-pipeline-step (func-name queue consumer-queue pipeline-ctx &optional local-ctx) + (unless (symbolp func-name) + (error "Function name must be a symbol, got: %s" func-name)) + + + (let ((thread-name (concat "phpinspect-pipeline-" (symbol-name func-name))) + (statement (list func-name)) + (incoming (gensym "incoming")) + (outgoing (gensym "outgoing")) + (inc-queue (gensym "queue")) + (out-queue (gensym "queue")) + (context-sym (gensym "context")) + (continue-running (gensym "continue-running")) + (pctx-sym (gensym "pipeline-ctx")) + (incoming-end (gensym "incoming-end")) + (end (gensym "end"))) + + (when local-ctx + (setq statement (nconc statement (list context-sym)))) + + (setq statement (nconc statement (list incoming))) + + `(let ((,inc-queue ,queue) + (,out-queue ,consumer-queue) + (,context-sym ,local-ctx) + (,pctx-sym ,pipeline-ctx)) + (make-thread + (lambda () + (let ((,continue-running t) + ,incoming ,outgoing ,end ,incoming-end) + + (phpinspect-pipeline--register-wakeup-function ,inc-queue) + (while ,continue-running + (condition-case err + (progn + (phpinspect-pipeline-pause) + (setq ,incoming (phpinspect-pipeline-receive ,inc-queue)) + + (if (phpinspect-pipeline-end-p ,incoming) + (progn + (setq ,incoming-end ,incoming) + (when (phpinspect-pipeline-end-value ,incoming) + (progn + (setq ,incoming (phpinspect-pipeline-end-value ,incoming) + ,outgoing (phpinspect--read-pipeline-emission ,statement)) + (phpinspect-pipeline--enqueue ,out-queue ,outgoing 'no-notify))) + + (setq ,end (phpinspect-make-pipeline-end :thread (current-thread))) + (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end) + (setq ,continue-running nil) + (phpinspect-pipeline--enqueue ,out-queue ,end)) + + ;; Else + (setq ,outgoing (phpinspect--read-pipeline-emission ,statement)) + (when (phpinspect-pipeline-end-p ,outgoing) + (setq ,end (phpinspect-make-pipeline-end :thread (current-thread))) + (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end) + (setq ,continue-running nil)) + (phpinspect-pipeline--enqueue ,out-queue ,outgoing))) + (phpinspect-pipeline-incoming) + (t (phpinspect--log "Pipeline thread errored: %s" err) + (setq ,end (phpinspect-make-pipeline-end :thread (current-thread) :error err)) + (setq ,continue-running nil) + (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end) + (phpinspect-pipeline--enqueue ,out-queue ,end)))))) + ,thread-name)))) + + (defun phpinspect--chain-pipeline-steps (steps start-queue end-queue ctx) (let ((result (gensym "result")) (incoming (gensym "incoming")) @@ -135,9 +210,9 @@ user input.") (setq statement (if (phpinspect--pipeline-step--context-var-name step) - `(,(phpinspect-pipeline-step-name name "create") - ,incoming ,outgoing ,ctx-sym ,(phpinspect--pipeline-step--context-var-name step)) - `(,(phpinspect-pipeline-step-name name "create") ,incoming ,outgoing ,ctx-sym))) + `(phpinspect--run-as-pipeline-step + ,name ,incoming ,outgoing ,ctx-sym ,(phpinspect--pipeline-step--context-var-name step)) + `(phpinspect--run-as-pipeline-step ,name ,incoming ,outgoing ,ctx-sym))) (setq body (nconc body `(,(if steps `(setq ,outgoing (phpinspect-make-queue)) `(setq ,outgoing ,end-queue)) @@ -244,9 +319,11 @@ user input.") (phpinspect-pipeline-ctx-close ,ctx-sym) ,result-sym))))) -(define-inline phpinspect-pipeline (seed-form &rest parameters) +(defmacro phpinspect-pipeline (seed-form &rest parameters) (declare (indent defun)) - (let ((result (gensym)) async macro-params) + (let ((result (gensym)) + (async-sym (gensym)) + async macro-params) (while parameters (setq key (pop parameters) value (pop parameters)) @@ -255,16 +332,15 @@ user input.") (:async (setq async value)) (_ (setq macro-params (nconc macro-params (list key value)))))) - (inline-quote - (if ,async - (make-thread - (lambda () - (condition-case err - (let ((,result ,(append '(phpinspect--pipeline) (list seed-form) macro-params))) - (funcall ,async (or ,result 'phpinspect-pipeline-nil-result) nil)) - (t (funcall ,async nil err)))) - "phpinspect-pipeline-async") - ,(append '(phpinspect--pipeline) (list seed-form) macro-params))))) + `(if-let ((,async-sym ,async)) + (make-thread + (lambda () + (condition-case err + (let ((,result (phpinspect--pipeline ,seed-form ,@macro-params))) + (funcall ,async-sym (or ,result 'phpinspect-pipeline-nil-result) nil)) + (t (funcall ,async-sym nil err)))) + "phpinspect-pipeline-async") + (phpinspect--pipeline ,seed-form ,@macro-params)))) (define-inline phpinspect-pipeline-receive (queue) (inline-letevals (queue) @@ -297,92 +373,5 @@ user input.") ,queue (pop (phpinspect-pipeline-emission-collection ,emission)) ,no-notify)) (phpinspect-queue-enqueue ,queue ,emission ,no-notify)))))) -(defmacro phpinspect-define-pipeline-step (name function-name) - (unless (symbolp name) - (error "name must be a symbol")) - - (unless (symbolp function-name) - (error "function-name must be a symbol")) - - (let ((execute-function (phpinspect-pipeline-step-name name "execute")) - (constructor-function (phpinspect-pipeline-step-name name "create"))) - - `(progn - (define-inline ,execute-function (input &optional context) - (if context - (inline-quote - (catch 'phpinspect-pipeline-emit - ,(append `(,function-name) '(,context) '(,input)) - nil)) - (inline-quote - (catch 'phpinspect-pipeline-emit - ,(append `(,function-name) '(,input)) - nil)))) - - (define-inline ,constructor-function (queue consumer-queue pipeline-ctx &optional context) - (inline-letevals (queue consumer-queue context) - (let ((thread-name ,(concat "phpinspect-pipeline-" (symbol-name name))) - (statement (list (quote ,execute-function)))) - ,@(list - '(let ((incoming (gensym "incoming")) - (outgoing (gensym "outgoing")) - (inc-queue (gensym "queue")) - (out-queue (gensym "queue")) - (context-sym (gensym "context")) - (continue-running (gensym "continue-running")) - (pctx-sym (gensym "pipeline-ctx")) - (incoming-end (gensym "incoming-end")) - (end (gensym "end"))) - - (setq statement (nconc statement (list incoming))) - (unless (and (inline-const-p context) (not (inline-const-val context))) - (setq statement (nconc statement (list context-sym)))) - - (inline-quote - (let ((,inc-queue ,queue) - (,out-queue ,consumer-queue) - (,context-sym ,context) - (,pctx-sym ,pipeline-ctx)) - (make-thread - (lambda () - (let ((,continue-running t) - ,incoming ,outgoing ,end ,incoming-end) - - (phpinspect-pipeline--register-wakeup-function ,inc-queue) - (while ,continue-running - (condition-case err - (progn - (phpinspect-pipeline-pause) - (setq ,incoming (phpinspect-pipeline-receive ,inc-queue)) - - (if (phpinspect-pipeline-end-p ,incoming) - (progn - (setq ,incoming-end ,incoming) - (when (phpinspect-pipeline-end-value ,incoming) - (progn - (setq ,incoming (phpinspect-pipeline-end-value ,incoming) - ,outgoing ,statement) - (phpinspect-pipeline--enqueue ,out-queue ,outgoing 'no-notify))) - - (setq ,end (phpinspect-make-pipeline-end :thread (current-thread))) - (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end) - (setq ,continue-running nil) - (phpinspect-pipeline--enqueue ,out-queue ,end)) - - ;; Else - (setq ,outgoing ,statement) - (when (phpinspect-pipeline-end-p ,outgoing) - (setq ,end (phpinspect-make-pipeline-end :thread (current-thread))) - (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end) - (setq ,continue-running nil)) - (phpinspect-pipeline--enqueue ,out-queue ,outgoing))) - (phpinspect-pipeline-incoming) - (t (phpinspect--log "Pipeline thread errored: %s" err) - (setq ,end (phpinspect-make-pipeline-end :thread (current-thread) :error err)) - (setq ,continue-running nil) - (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end) - (phpinspect-pipeline--enqueue ,out-queue ,end)))))) - ,thread-name))))))))))) - (provide 'phpinspect-pipeline) ;;; phpinspect-pipeline.el ends here |
