Module: Chore::Strategy::Ipc
- Included in:
- PreForkedWorkerStrategy, PreforkedWorker, WorkDistributor, WorkerManager
- Defined in:
- lib/chore/strategies/worker/helpers/ipc.rb
Overview
:nodoc:
Constant Summary collapse
- BIG_ENDIAN =
'L>'.freeze
- MSG_BYTES =
4
- READY_MSG =
'R'
Instance Method Summary collapse
- #add_worker_socket ⇒ Object
- #child_connection(socket) ⇒ Object
- #clear_ready(socket) ⇒ Object
- #create_master_socket ⇒ Object
- #delete_socket_file ⇒ Object
-
#ipc_help ⇒ Object
Used for unit tests.
-
#read_msg(socket) ⇒ Object
read a message from socket (must be a connected socket).
- #select_sockets(sockets, self_pipe = nil, timeout = 0.5) ⇒ Object
-
#send_msg(socket, msg) ⇒ Object
Sending a message to a socket (must be a connected socket).
- #signal_ready(socket) ⇒ Object
Instance Method Details
#add_worker_socket ⇒ Object
43 44 45 46 47 |
# File 'lib/chore/strategies/worker/helpers/ipc.rb', line 43 def add_worker_socket UNIXSocket.new(socket_file).tap do |socket| (socket) end end |
#child_connection(socket) ⇒ Object
17 18 19 |
# File 'lib/chore/strategies/worker/helpers/ipc.rb', line 17 def child_connection(socket) socket.accept end |
#clear_ready(socket) ⇒ Object
49 50 51 |
# File 'lib/chore/strategies/worker/helpers/ipc.rb', line 49 def clear_ready(socket) _ = socket.gets end |
#create_master_socket ⇒ Object
10 11 12 13 14 15 |
# File 'lib/chore/strategies/worker/helpers/ipc.rb', line 10 def create_master_socket File.delete socket_file if File.exist? socket_file UNIXServer.new(socket_file).tap do |socket| (socket) end end |
#delete_socket_file ⇒ Object
65 66 67 68 69 |
# File 'lib/chore/strategies/worker/helpers/ipc.rb', line 65 def delete_socket_file File.unlink(socket_file) rescue nil end |
#ipc_help ⇒ Object
Used for unit tests
72 73 74 |
# File 'lib/chore/strategies/worker/helpers/ipc.rb', line 72 def ipc_help :available end |
#read_msg(socket) ⇒ Object
read a message from socket (must be a connected socket)
31 32 33 34 35 36 37 38 39 40 41 |
# File 'lib/chore/strategies/worker/helpers/ipc.rb', line 31 def read_msg(socket) encoded_size = socket.recv(MSG_BYTES, Socket::MSG_PEEK) return if encoded_size.nil? || encoded_size == '' size = encoded_size.unpack(BIG_ENDIAN).first = socket.recv(MSG_BYTES + size) Marshal.load([MSG_BYTES..-1]) rescue Errno::ECONNRESET => ex Chore.logger.info "IPC: Connection was closed on socket #{socket}" raise ex end |
#select_sockets(sockets, self_pipe = nil, timeout = 0.5) ⇒ Object
60 61 62 63 |
# File 'lib/chore/strategies/worker/helpers/ipc.rb', line 60 def select_sockets(sockets, self_pipe = nil, timeout = 0.5) all_socks = [sockets, self_pipe].flatten.compact IO.select(all_socks, nil, all_socks, timeout) end |
#send_msg(socket, msg) ⇒ Object
Sending a message to a socket (must be a connected socket)
22 23 24 25 26 27 28 |
# File 'lib/chore/strategies/worker/helpers/ipc.rb', line 22 def send_msg(socket, msg) raise 'send_msg cannot send empty messages' if msg.nil? || msg.size.zero? = Marshal.dump(msg) encoded_size = [.size].pack(BIG_ENDIAN) = "#{encoded_size}#{}" socket.send , 0 end |