2019年4月19日金曜日

ジョブキューもどき

 Threadごとにconnectionが張れるようになったのだからマルチプロセスでキューイングしたやつをマルチスレッド化してみよう。mailboxというライブラリを使おうとしたのだけど、キューが空だったとき、readで待ってくれない。nilが返ってくる。
 自分でsleepするの1
 ABCLのmailboxは待ってくれるのだが。
 ほかのライブラリはないか、と探してみると、lparallelのqueueなら待ってくれることがわかった。ちょこちょこと組んでみたらあっさりできあがって驚いた。
 これで立ち上げたworker-jobの数分だけ並列でキューが処理される——はず。

;; ジョブキューもどき
(in-package :cl-user)
(defpackage thread-queue
(:use :cl
:alexandria
:bordeaux-threads
:lparallel.queue))
(in-package :thread-queue)
(annot:enable-annot-syntax)
(defvar *queue* (make-queue))
@export
(defun worker-job(&key (name "worker")(proc (lambda(x)(print x)))(exit (lambda()(print "END"))))
"queueを処理するworkerをスレッド起動する。queueがnilだったら終了する。"
(bt:make-thread (lambda()
(loop (let ((obj (pop-queue *queue*)))
(unless obj
(funcall exit)
(return))
(funcall proc obj))))
:name name))
@export
(defun regist-queue(obj)
"queueを登録する"
(unless (queue-full-p *queue*)
(push-queue obj *queue*)))
@export
(defun clear-queue()
"queueをクリアする。queueをlistにして返却する。"
(with-locked-queue *queue*
(loop repeat (queue-count/no-lock *queue*)
collect (pop-queue/no-lock *queue*))))
@export
(defun cutin-queue(&rest obj)
"先頭にobjを追加する。"
(with-locked-queue *queue*
(let ((list (loop repeat (queue-count/no-lock *queue*)
collect (pop-queue/no-lock *queue*))))
(loop for x in obj
do (push-queue/no-lock x *queue*))
(loop for x in list
do (push-queue/no-lock x *queue*)))))

Footnotes:

1

mutexを使えば、sleepは必要ないだろうけど。