Class: CLI::UI::WorkQueue

Inherits:
Object
  • Object
show all
Extended by:
T::Sig
Defined in:
lib/cli/ui/work_queue.rb

Defined Under Namespace

Classes: Future

Instance Method Summary collapse

Methods included from T::Sig

sig

Constructor Details

#initialize(max_concurrent) ⇒ WorkQueue

Returns a new instance of WorkQueue.



72
73
74
75
76
77
78
# File 'lib/cli/ui/work_queue.rb', line 72

def initialize(max_concurrent)
  @max_concurrent = max_concurrent
  @queue = T.let(Queue.new, Queue)
  @mutex = T.let(Mutex.new, Mutex)
  @condition = T.let(ConditionVariable.new, ConditionVariable)
  @workers = T.let([], T::Array[Thread])
end

Instance Method Details

#closeObject



91
92
93
# File 'lib/cli/ui/work_queue.rb', line 91

def close
  @queue.close
end

#enqueue(&block) ⇒ Object



81
82
83
84
85
86
87
88
# File 'lib/cli/ui/work_queue.rb', line 81

def enqueue(&block)
  future = Future.new
  @mutex.synchronize do
    start_worker if @workers.size < @max_concurrent
  end
  @queue.push([future, block])
  future
end

#interruptObject



102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File 'lib/cli/ui/work_queue.rb', line 102

def interrupt
  @mutex.synchronize do
    @queue.close
    # Fail any remaining tasks in the queue
    until @queue.empty?
      future, _block = @queue.pop(true)
      future&.fail(Interrupt.new)
    end
    # Interrupt all worker threads
    @workers.each { |worker| worker.raise(Interrupt) if worker.alive? }
    @workers.each(&:join)
    @workers.clear
  end
end

#waitObject



96
97
98
99
# File 'lib/cli/ui/work_queue.rb', line 96

def wait
  @queue.close
  @workers.each(&:join)
end