Class: SizedQueue
Overview
This class represents queues of specified size capacity. The push operation may be blocked if the capacity is full.
See Queue for an example of how a SizedQueue works.
Instance Method Summary collapse
-
#clear ⇒ Object
Removes all objects from the queue.
-
#close ⇒ Object
Similar to Queue#close.
-
#empty? ⇒ Boolean
Returns
true
if the queue is empty. -
#new(max) ⇒ Object
constructor
Creates a fixed-length queue with a maximum size of
max
. -
#length ⇒ Object
(also: #size)
Returns the length of the queue.
-
#max ⇒ Object
Returns the maximum size of the queue.
-
#max=(number) ⇒ Object
Sets the maximum size of the queue to the given
number
. -
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
-
#pop(*args) ⇒ Object
(also: #deq, #shift)
Retrieves data from the queue.
-
#push(*args) ⇒ Object
(also: #enq, #<<)
Pushes
object
to the queue.
Constructor Details
#new(max) ⇒ Object
Creates a fixed-length queue with a maximum size of max
.
1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 |
# File 'thread_sync.c', line 1122
static VALUE
rb_szqueue_initialize(VALUE self, VALUE vmax)
{
long max;
struct rb_szqueue *sq = szqueue_ptr(self);
max = NUM2LONG(vmax);
if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive");
}
RB_OBJ_WRITE(self, &sq->q.que, ary_buf_new());
list_head_init(szqueue_waitq(sq));
list_head_init(szqueue_pushq(sq));
sq->max = max;
return self;
}
|
Instance Method Details
#clear ⇒ Object
Removes all objects from the queue.
1305 1306 1307 1308 1309 1310 1311 1312 1313 |
# File 'thread_sync.c', line 1305
static VALUE
rb_szqueue_clear(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
rb_ary_clear(check_array(self, sq->q.que));
wakeup_all(szqueue_pushq(sq));
return self;
}
|
#close ⇒ Object
Similar to Queue#close.
The difference is behavior with waiting enqueuing threads.
If there are waiting enqueuing threads, they are interrupted by raising ClosedQueueError(‘queue closed’).
1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 |
# File 'thread_sync.c', line 1153
static VALUE
rb_szqueue_close(VALUE self)
{
if (!queue_closed_p(self)) {
struct rb_szqueue *sq = szqueue_ptr(self);
FL_SET(self, QUEUE_CLOSED);
wakeup_all(szqueue_waitq(sq));
wakeup_all(szqueue_pushq(sq));
}
return self;
}
|
#empty? ⇒ Boolean
Returns true
if the queue is empty.
1353 1354 1355 1356 1357 1358 1359 |
# File 'thread_sync.c', line 1353
static VALUE
rb_szqueue_empty_p(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return queue_length(self, &sq->q) == 0 ? Qtrue : Qfalse;
}
|
#length ⇒ Object #size ⇒ Object Also known as: size
Returns the length of the queue.
1324 1325 1326 1327 1328 1329 1330 |
# File 'thread_sync.c', line 1324
static VALUE
rb_szqueue_length(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return LONG2NUM(queue_length(self, &sq->q));
}
|
#max ⇒ Object
Returns the maximum size of the queue.
1172 1173 1174 1175 1176 |
# File 'thread_sync.c', line 1172
static VALUE
rb_szqueue_max_get(VALUE self)
{
return LONG2NUM(szqueue_ptr(self)->max);
}
|
#max=(number) ⇒ Object
Sets the maximum size of the queue to the given number
.
1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 |
# File 'thread_sync.c', line 1185
static VALUE
rb_szqueue_max_set(VALUE self, VALUE vmax)
{
long max = NUM2LONG(vmax);
long diff = 0;
struct rb_szqueue *sq = szqueue_ptr(self);
if (max <= 0) {
rb_raise(rb_eArgError, "queue size must be positive");
}
if (max > sq->max) {
diff = max - sq->max;
}
sq->max = max;
sync_wakeup(szqueue_pushq(sq), diff);
return vmax;
}
|
#num_waiting ⇒ Object
Returns the number of threads waiting on the queue.
1338 1339 1340 1341 1342 1343 1344 |
# File 'thread_sync.c', line 1338
static VALUE
rb_szqueue_num_waiting(VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
return INT2NUM(sq->q.num_waiting + sq->num_waiting_push);
}
|
#pop(non_block = false) ⇒ Object #deq(non_block = false) ⇒ Object #shift(non_block = false) ⇒ Object Also known as: deq, shift
Retrieves data from the queue.
If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block
is true, the thread isn’t suspended, and ThreadError
is raised.
1292 1293 1294 1295 1296 1297 |
# File 'thread_sync.c', line 1292
static VALUE
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
{
int should_block = queue_pop_should_block(argc, argv);
return szqueue_do_pop(self, should_block);
}
|
#push(object, non_block = false) ⇒ Object #enq(object, non_block = false) ⇒ Object #<<(object) ⇒ Object Also known as: enq, <<
Pushes object
to the queue.
If there is no space left in the queue, waits until space becomes available, unless non_block
is true. If non_block
is true, the thread isn’t suspended, and ThreadError
is raised.
1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 |
# File 'thread_sync.c', line 1228
static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
{
struct rb_szqueue *sq = szqueue_ptr(self);
int should_block = szqueue_push_should_block(argc, argv);
while (queue_length(self, &sq->q) >= sq->max) {
if (!should_block) {
rb_raise(rb_eThreadError, "queue full");
}
else if (queue_closed_p(self)) {
break;
}
else {
rb_execution_context_t *ec = GET_EC();
COROUTINE_STACK_LOCAL(struct queue_waiter, qw);
struct list_head *pushq = szqueue_pushq(sq);
qw->w.self = self;
qw->w.th = ec->thread_ptr;
qw->w.fiber = ec->fiber_ptr;
qw->as.sq = sq;
list_add_tail(pushq, &qw->w.node);
sq->num_waiting_push++;
rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)qw);
}
}
if (queue_closed_p(self)) {
raise_closed_queue_error(self);
}
return queue_do_push(self, &sq->q, argv[0]);
}
|