summaryrefslogtreecommitdiff
path: root/phpinspect-pipeline.el
diff options
context:
space:
mode:
authorHugo Thunnissen <devel@hugot.nl>2023-08-06 17:39:32 +0200
committerHugo Thunnissen <devel@hugot.nl>2023-08-06 17:39:32 +0200
commitd86ef5756bc887c0cb2b4b3c8c07a2fbc9524029 (patch)
tree2ed955ebfb126daf264799636e8c5e02e77cf121 /phpinspect-pipeline.el
parentc20df819b8aca43feff09a5c1c5fc7408f3af168 (diff)
Remove `phpinspect-define-pipeline-step' in favor of direct fun call
Diffstat (limited to 'phpinspect-pipeline.el')
-rw-r--r--phpinspect-pipeline.el193
1 files changed, 91 insertions, 102 deletions
diff --git a/phpinspect-pipeline.el b/phpinspect-pipeline.el
index 8ff789c..a55b944 100644
--- a/phpinspect-pipeline.el
+++ b/phpinspect-pipeline.el
@@ -124,6 +124,81 @@ user input.")
phpinspect-pipeline-pause-time mx (make-condition-variable mx "phpinspect-pipeline-pause")))
(thread-yield))))
+(define-inline phpinspect--read-pipeline-emission (&rest body)
+ (push 'progn body)
+ (inline-quote
+ (catch 'phpinspect-pipeline-emit
+ ,body
+ nil)))
+
+(defmacro phpinspect--run-as-pipeline-step (func-name queue consumer-queue pipeline-ctx &optional local-ctx)
+ (unless (symbolp func-name)
+ (error "Function name must be a symbol, got: %s" func-name))
+
+
+ (let ((thread-name (concat "phpinspect-pipeline-" (symbol-name func-name)))
+ (statement (list func-name))
+ (incoming (gensym "incoming"))
+ (outgoing (gensym "outgoing"))
+ (inc-queue (gensym "queue"))
+ (out-queue (gensym "queue"))
+ (context-sym (gensym "context"))
+ (continue-running (gensym "continue-running"))
+ (pctx-sym (gensym "pipeline-ctx"))
+ (incoming-end (gensym "incoming-end"))
+ (end (gensym "end")))
+
+ (when local-ctx
+ (setq statement (nconc statement (list context-sym))))
+
+ (setq statement (nconc statement (list incoming)))
+
+ `(let ((,inc-queue ,queue)
+ (,out-queue ,consumer-queue)
+ (,context-sym ,local-ctx)
+ (,pctx-sym ,pipeline-ctx))
+ (make-thread
+ (lambda ()
+ (let ((,continue-running t)
+ ,incoming ,outgoing ,end ,incoming-end)
+
+ (phpinspect-pipeline--register-wakeup-function ,inc-queue)
+ (while ,continue-running
+ (condition-case err
+ (progn
+ (phpinspect-pipeline-pause)
+ (setq ,incoming (phpinspect-pipeline-receive ,inc-queue))
+
+ (if (phpinspect-pipeline-end-p ,incoming)
+ (progn
+ (setq ,incoming-end ,incoming)
+ (when (phpinspect-pipeline-end-value ,incoming)
+ (progn
+ (setq ,incoming (phpinspect-pipeline-end-value ,incoming)
+ ,outgoing (phpinspect--read-pipeline-emission ,statement))
+ (phpinspect-pipeline--enqueue ,out-queue ,outgoing 'no-notify)))
+
+ (setq ,end (phpinspect-make-pipeline-end :thread (current-thread)))
+ (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
+ (setq ,continue-running nil)
+ (phpinspect-pipeline--enqueue ,out-queue ,end))
+
+ ;; Else
+ (setq ,outgoing (phpinspect--read-pipeline-emission ,statement))
+ (when (phpinspect-pipeline-end-p ,outgoing)
+ (setq ,end (phpinspect-make-pipeline-end :thread (current-thread)))
+ (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
+ (setq ,continue-running nil))
+ (phpinspect-pipeline--enqueue ,out-queue ,outgoing)))
+ (phpinspect-pipeline-incoming)
+ (t (phpinspect--log "Pipeline thread errored: %s" err)
+ (setq ,end (phpinspect-make-pipeline-end :thread (current-thread) :error err))
+ (setq ,continue-running nil)
+ (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
+ (phpinspect-pipeline--enqueue ,out-queue ,end))))))
+ ,thread-name))))
+
+
(defun phpinspect--chain-pipeline-steps (steps start-queue end-queue ctx)
(let ((result (gensym "result"))
(incoming (gensym "incoming"))
@@ -135,9 +210,9 @@ user input.")
(setq statement
(if (phpinspect--pipeline-step--context-var-name step)
- `(,(phpinspect-pipeline-step-name name "create")
- ,incoming ,outgoing ,ctx-sym ,(phpinspect--pipeline-step--context-var-name step))
- `(,(phpinspect-pipeline-step-name name "create") ,incoming ,outgoing ,ctx-sym)))
+ `(phpinspect--run-as-pipeline-step
+ ,name ,incoming ,outgoing ,ctx-sym ,(phpinspect--pipeline-step--context-var-name step))
+ `(phpinspect--run-as-pipeline-step ,name ,incoming ,outgoing ,ctx-sym)))
(setq body (nconc body `(,(if steps
`(setq ,outgoing (phpinspect-make-queue))
`(setq ,outgoing ,end-queue))
@@ -244,9 +319,11 @@ user input.")
(phpinspect-pipeline-ctx-close ,ctx-sym)
,result-sym)))))
-(define-inline phpinspect-pipeline (seed-form &rest parameters)
+(defmacro phpinspect-pipeline (seed-form &rest parameters)
(declare (indent defun))
- (let ((result (gensym)) async macro-params)
+ (let ((result (gensym))
+ (async-sym (gensym))
+ async macro-params)
(while parameters
(setq key (pop parameters)
value (pop parameters))
@@ -255,16 +332,15 @@ user input.")
(:async (setq async value))
(_ (setq macro-params (nconc macro-params (list key value))))))
- (inline-quote
- (if ,async
- (make-thread
- (lambda ()
- (condition-case err
- (let ((,result ,(append '(phpinspect--pipeline) (list seed-form) macro-params)))
- (funcall ,async (or ,result 'phpinspect-pipeline-nil-result) nil))
- (t (funcall ,async nil err))))
- "phpinspect-pipeline-async")
- ,(append '(phpinspect--pipeline) (list seed-form) macro-params)))))
+ `(if-let ((,async-sym ,async))
+ (make-thread
+ (lambda ()
+ (condition-case err
+ (let ((,result (phpinspect--pipeline ,seed-form ,@macro-params)))
+ (funcall ,async-sym (or ,result 'phpinspect-pipeline-nil-result) nil))
+ (t (funcall ,async-sym nil err))))
+ "phpinspect-pipeline-async")
+ (phpinspect--pipeline ,seed-form ,@macro-params))))
(define-inline phpinspect-pipeline-receive (queue)
(inline-letevals (queue)
@@ -297,92 +373,5 @@ user input.")
,queue (pop (phpinspect-pipeline-emission-collection ,emission)) ,no-notify))
(phpinspect-queue-enqueue ,queue ,emission ,no-notify))))))
-(defmacro phpinspect-define-pipeline-step (name function-name)
- (unless (symbolp name)
- (error "name must be a symbol"))
-
- (unless (symbolp function-name)
- (error "function-name must be a symbol"))
-
- (let ((execute-function (phpinspect-pipeline-step-name name "execute"))
- (constructor-function (phpinspect-pipeline-step-name name "create")))
-
- `(progn
- (define-inline ,execute-function (input &optional context)
- (if context
- (inline-quote
- (catch 'phpinspect-pipeline-emit
- ,(append `(,function-name) '(,context) '(,input))
- nil))
- (inline-quote
- (catch 'phpinspect-pipeline-emit
- ,(append `(,function-name) '(,input))
- nil))))
-
- (define-inline ,constructor-function (queue consumer-queue pipeline-ctx &optional context)
- (inline-letevals (queue consumer-queue context)
- (let ((thread-name ,(concat "phpinspect-pipeline-" (symbol-name name)))
- (statement (list (quote ,execute-function))))
- ,@(list
- '(let ((incoming (gensym "incoming"))
- (outgoing (gensym "outgoing"))
- (inc-queue (gensym "queue"))
- (out-queue (gensym "queue"))
- (context-sym (gensym "context"))
- (continue-running (gensym "continue-running"))
- (pctx-sym (gensym "pipeline-ctx"))
- (incoming-end (gensym "incoming-end"))
- (end (gensym "end")))
-
- (setq statement (nconc statement (list incoming)))
- (unless (and (inline-const-p context) (not (inline-const-val context)))
- (setq statement (nconc statement (list context-sym))))
-
- (inline-quote
- (let ((,inc-queue ,queue)
- (,out-queue ,consumer-queue)
- (,context-sym ,context)
- (,pctx-sym ,pipeline-ctx))
- (make-thread
- (lambda ()
- (let ((,continue-running t)
- ,incoming ,outgoing ,end ,incoming-end)
-
- (phpinspect-pipeline--register-wakeup-function ,inc-queue)
- (while ,continue-running
- (condition-case err
- (progn
- (phpinspect-pipeline-pause)
- (setq ,incoming (phpinspect-pipeline-receive ,inc-queue))
-
- (if (phpinspect-pipeline-end-p ,incoming)
- (progn
- (setq ,incoming-end ,incoming)
- (when (phpinspect-pipeline-end-value ,incoming)
- (progn
- (setq ,incoming (phpinspect-pipeline-end-value ,incoming)
- ,outgoing ,statement)
- (phpinspect-pipeline--enqueue ,out-queue ,outgoing 'no-notify)))
-
- (setq ,end (phpinspect-make-pipeline-end :thread (current-thread)))
- (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
- (setq ,continue-running nil)
- (phpinspect-pipeline--enqueue ,out-queue ,end))
-
- ;; Else
- (setq ,outgoing ,statement)
- (when (phpinspect-pipeline-end-p ,outgoing)
- (setq ,end (phpinspect-make-pipeline-end :thread (current-thread)))
- (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
- (setq ,continue-running nil))
- (phpinspect-pipeline--enqueue ,out-queue ,outgoing)))
- (phpinspect-pipeline-incoming)
- (t (phpinspect--log "Pipeline thread errored: %s" err)
- (setq ,end (phpinspect-make-pipeline-end :thread (current-thread) :error err))
- (setq ,continue-running nil)
- (phpinspect-pipeline-ctx-register-end ,pctx-sym ,end)
- (phpinspect-pipeline--enqueue ,out-queue ,end))))))
- ,thread-name)))))))))))
-
(provide 'phpinspect-pipeline)
;;; phpinspect-pipeline.el ends here