Class: Datadog::Statsd::Sender
- Inherits:
-
Object
- Object
- Datadog::Statsd::Sender
- Defined in:
- lib/datadog/statsd/sender.rb
Overview
Sender is using a companion thread to flush and pack messages in a ‘MessageBuffer`. The communication with this thread is done using a `Queue`. If the thread is dead, it is starting a new one to avoid having a blocked Sender with no companion thread to communicate with (most of the time, having a dead companion thread means that a fork just happened and that we are running in the child process).
Constant Summary collapse
- CLOSEABLE_QUEUES =
Queue.instance_methods.include?(:close)
Instance Method Summary collapse
- #add(message) ⇒ Object
- #flush(sync: false) ⇒ Object
-
#initialize(message_buffer, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread) ⇒ Sender
constructor
A new instance of Sender.
- #rendez_vous ⇒ Object
- #start ⇒ Object
-
#stop(join_worker: true) ⇒ Object
when calling stop, make sure that no other threads is trying to close the sender nor trying to continue to ‘#add` more message into the sender.
Constructor Details
#initialize(message_buffer, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread) ⇒ Sender
Returns a new instance of Sender.
15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
# File 'lib/datadog/statsd/sender.rb', line 15 def initialize(, telemetry: nil, queue_size: UDP_DEFAULT_BUFFER_SIZE, logger: nil, flush_interval: nil, queue_class: Queue, thread_class: Thread) @message_buffer = @telemetry = telemetry @queue_size = queue_size @logger = logger @mx = Mutex.new @queue_class = queue_class @thread_class = thread_class @flush_timer = if flush_interval Datadog::Statsd::Timer.new(flush_interval) { flush(sync: true) } else nil end end |
Instance Method Details
#add(message) ⇒ Object
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 |
# File 'lib/datadog/statsd/sender.rb', line 63 def add() raise ArgumentError, 'Start sender first' unless # if the thread does not exist, we assume we are running in a forked process, # empty the message queue and message buffers (these messages belong to # the parent process) and spawn a new companion thread. if !sender_thread.alive? @mx.synchronize { # a call from another thread has already re-created # the companion thread before this one acquired the lock break if sender_thread.alive? @logger.debug { "Statsd: companion thread is dead, re-creating one" } if @logger .close if CLOSEABLE_QUEUES @message_queue = nil .reset start @flush_timer.start if @flush_timer && @flush_timer.stop? } end if .length <= @queue_size << else if @telemetry bytesize = .respond_to?(:bytesize) ? .bytesize : 0 @telemetry.dropped_queue(packets: 1, bytes: bytesize) end end end |
#flush(sync: false) ⇒ Object
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# File 'lib/datadog/statsd/sender.rb', line 30 def flush(sync: false) # keep a copy around in case another thread is calling #stop while this method is running = # don't try to flush if there is no message_queue instantiated or # no companion thread running if ! @logger.debug { "Statsd: can't flush: no message queue ready" } if @logger return end if !sender_thread.alive? @logger.debug { "Statsd: can't flush: no sender_thread alive" } if @logger return end .push(:flush) rendez_vous if sync end |
#rendez_vous ⇒ Object
49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/datadog/statsd/sender.rb', line 49 def rendez_vous # could happen if #start hasn't be called return unless # Initialize and get the thread's sync queue queue = (@thread_class.current[:statsd_sync_queue] ||= @queue_class.new) # tell sender-thread to notify us in the current # thread's queue .push(queue) # wait for the sender thread to send a message # once the flush is done queue.pop end |
#start ⇒ Object
94 95 96 97 98 99 100 101 102 103 |
# File 'lib/datadog/statsd/sender.rb', line 94 def start raise ArgumentError, 'Sender already started' if # initialize a new message queue for the background thread @message_queue = @queue_class.new # start background thread @sender_thread = @thread_class.new(&method(:send_loop)) @sender_thread.name = "Statsd Sender" unless Gem::Version.new(RUBY_VERSION) < Gem::Version.new('2.3') @flush_timer.start if @flush_timer end |
#stop(join_worker: true) ⇒ Object
when calling stop, make sure that no other threads is trying to close the sender nor trying to continue to ‘#add` more message into the sender.
109 110 111 112 113 114 115 116 117 |
# File 'lib/datadog/statsd/sender.rb', line 109 def stop(join_worker: true) @flush_timer.stop if @flush_timer = @message_queue .close if sender_thread = @sender_thread sender_thread.join if sender_thread && join_worker end |