diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 30580b1..a0c10b3 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,9 +1,9 @@ name: CI on: pull_request: - branches: - - 'master' - - 'release-*' + # branches: + # - 'master' + # - 'release-*' push: branches: - 'master' diff --git a/src/cluster.jl b/src/cluster.jl index 82a355f..d3a903a 100644 --- a/src/cluster.jl +++ b/src/cluster.jl @@ -9,6 +9,14 @@ Cluster managers implement how workers can be added, removed and communicated wi """ abstract type ClusterManager end +# cluster_manager is a global constant +const cluster_manager = Ref{ClusterManager}() + +function throw_if_cluster_manager_unassigned() + isassigned(cluster_manager) || error("cluster_manager is unassigned") + return nothing +end + """ WorkerConfig @@ -390,8 +398,7 @@ function init_worker(cookie::AbstractString, manager::ClusterManager=DefaultClus # On workers, the default cluster manager connects via TCP sockets. Custom # transports will need to call this function with their own manager. - global cluster_manager - cluster_manager = manager + cluster_manager[] = manager # Since our pid has yet to be set, ensure no RemoteChannel / Future have been created or addprocs() called. @assert nprocs() <= 1 @@ -569,7 +576,7 @@ function setup_launched_worker(manager, wconfig, launched_q) # same type. This is done by setting an appropriate value to `WorkerConfig.cnt`. cnt = something(wconfig.count, 1) if cnt === :auto - cnt = wconfig.environ[:cpu_threads] + cnt = (wconfig.environ::AbstractDict)[:cpu_threads] end cnt = cnt - 1 # Removing self from the requested number @@ -607,7 +614,7 @@ function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launch end end -function create_worker(manager, wconfig) +function create_worker(manager::ClusterManager, wconfig::WorkerConfig) # only node 1 can add new nodes, since nobody else has the full list of address:port @assert LPROC.id == 1 timeout = worker_timeout() diff --git a/src/macros.jl b/src/macros.jl index c58faf6..ea98c60 100644 --- a/src/macros.jl +++ b/src/macros.jl @@ -333,6 +333,7 @@ completion. To wait for completion, prefix the call with [`@sync`](@ref), like : macro distributed(args...) na = length(args) if na==1 + reducer = identity loop = args[1] elseif na==2 reducer = args[1] diff --git a/src/managers.jl b/src/managers.jl index ab79abe..2ce1a3b 100644 --- a/src/managers.jl +++ b/src/managers.jl @@ -347,7 +347,7 @@ function launch_on_machine(manager::SSHManager, machine::AbstractString, cnt, pa any(c -> c == '"', exename) && throw(ArgumentError("invalid exename")) - remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...)) + remotecmd = shell_escape_wincmd(escape_microsoft_c_args(exename, exeflags...)::AbstractString) # change working directory if dir !== nothing && dir != "" any(c -> c == '"', dir) && throw(ArgumentError("invalid dir")) @@ -553,7 +553,7 @@ end function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol) if op === :interrupt - kill(config.process, 2) + kill(config.process::Process, 2) end end @@ -606,7 +606,7 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) # master connecting to workers if config.io !== nothing - (bind_addr, port::Int) = read_worker_host_port(config.io) + (bind_addr, port::Int) = read_worker_host_port(config.io::IO) pubhost = something(config.host, bind_addr) config.host = pubhost config.port = port @@ -776,21 +776,22 @@ function kill(manager::LocalManager, pid::Int, config::WorkerConfig; profile_wai sleep(exit_timeout) # Check to see if our child exited, and if not, send an actual kill signal - if !process_exited(config.process) + process = config.process::Process + if !process_exited(process) @warn "Failed to gracefully kill worker $(pid)" profile_sig = Sys.iswindows() ? nothing : Sys.isbsd() ? ("SIGINFO", 29) : ("SIGUSR1" , 10) if profile_sig !== nothing @warn("Sending profile $(profile_sig[1]) to worker $(pid)") - kill(config.process, profile_sig[2]) + kill(process, profile_sig[2]) sleep(profile_wait) end @warn("Sending SIGQUIT to worker $(pid)") - kill(config.process, Base.SIGQUIT) + kill(process, Base.SIGQUIT) sleep(term_timeout) - if !process_exited(config.process) + if !process_exited(process) @warn("Worker $(pid) ignored SIGQUIT, sending SIGKILL") - kill(config.process, Base.SIGKILL) + kill(process, Base.SIGKILL) end end end diff --git a/src/messages.jl b/src/messages.jl index 6e895f0..1a5dd82 100644 --- a/src/messages.jl +++ b/src/messages.jl @@ -102,7 +102,7 @@ end function send_msg(s::IO, header, msg) id = worker_id_from_socket(s) if id > -1 - return send_msg(worker_from_id(id), header, msg) + return send_msg(worker_from_id(id)::Worker, header, msg) end send_msg_unknown(s, header, msg) end @@ -110,7 +110,7 @@ end function send_msg_now(s::IO, header, msg::AbstractMsg) id = worker_id_from_socket(s) if id > -1 - return send_msg_now(worker_from_id(id), header, msg) + return send_msg_now(worker_from_id(id)::Worker, header, msg) end send_msg_unknown(s, header, msg) end diff --git a/src/process_messages.jl b/src/process_messages.jl index a444651..211c225 100644 --- a/src/process_messages.jl +++ b/src/process_messages.jl @@ -75,7 +75,7 @@ function run_work_thunk(thunk::Function, print_error::Bool) end return result end -function run_work_thunk(rv::RemoteValue, thunk) +function run_work_thunk_remotevalue(rv::RemoteValue, thunk) put!(rv, run_work_thunk(thunk, false)) nothing end @@ -85,7 +85,7 @@ function schedule_call(rid, thunk) rv = RemoteValue(def_rv_channel()) (PGRP::ProcessGroup).refs[rid] = rv push!(rv.clientset, rid.whence) - errormonitor(@async run_work_thunk(rv, thunk)) + errormonitor(@async run_work_thunk_remotevalue(rv, thunk)) return rv end end @@ -289,7 +289,7 @@ function handle_msg(msg::CallMsg{:call_fetch}, header, r_stream, w_stream, versi try deliver_result(w_stream, :call_fetch, header.notify_oid, v.v) finally - unlock(v.rv.synctake) + unlock(v.rv.synctake::ReentrantLock) end else deliver_result(w_stream, :call_fetch, header.notify_oid, v) @@ -315,8 +315,10 @@ function handle_msg(msg::ResultMsg, header, r_stream, w_stream, version) end function handle_msg(msg::IdentifySocketMsg, header, r_stream, w_stream, version) + throw_if_cluster_manager_unassigned() + # register a new peer worker connection - w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager; version=version) + w = Worker(msg.from_pid, r_stream, w_stream, cluster_manager[]; version=version)::Worker send_connection_hdr(w, false) send_msg_now(w, MsgHeader(), IdentifySocketAckMsg()) notify(w.initialized) @@ -328,8 +330,10 @@ function handle_msg(msg::IdentifySocketAckMsg, header, r_stream, w_stream, versi end function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) + throw_if_cluster_manager_unassigned() + LPROC.id = msg.self_pid - controller = Worker(1, r_stream, w_stream, cluster_manager; version=version) + controller = Worker(1, r_stream, w_stream, cluster_manager[]; version=version)::Worker notify(controller.initialized) register_worker(LPROC) topology(msg.topology) @@ -348,9 +352,9 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version) let rpid=rpid, wconfig=wconfig if lazy # The constructor registers the object with a global registry. - Worker(rpid, ()->connect_to_peer(cluster_manager, rpid, wconfig)) + Worker(rpid, ()->connect_to_peer(cluster_manager[], rpid, wconfig)) else - @async connect_to_peer(cluster_manager, rpid, wconfig) + @async connect_to_peer(cluster_manager[], rpid, wconfig) end end end @@ -362,7 +366,7 @@ end function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig) try (r_s, w_s) = connect(manager, rpid, wconfig) - w = Worker(rpid, r_s, w_s, manager; config=wconfig) + w = Worker(rpid, r_s, w_s, manager; config=wconfig)::Worker process_messages(w.r_stream, w.w_stream, false) send_connection_hdr(w, true) send_msg_now(w, MsgHeader(), IdentifySocketMsg(myid())) diff --git a/src/remotecall.jl b/src/remotecall.jl index 644ff04..a1cbb16 100644 --- a/src/remotecall.jl +++ b/src/remotecall.jl @@ -413,8 +413,8 @@ function serialize(s::AbstractSerializer, ::Future) invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_fut) end -function serialize(s::AbstractSerializer, ::RemoteChannel) - zero_rc = RemoteChannel{Channel{Any}}((0,0,0)) +function serialize(s::AbstractSerializer, ::RemoteChannel{T}) where T + zero_rc = RemoteChannel{T}((0,0,0)) invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_rc) end @@ -706,8 +706,8 @@ function put_ref(rid, caller, args...) put!(rv, args...) if myid() == caller && rv.synctake !== nothing # Wait till a "taken" value is serialized out - github issue #29932 - lock(rv.synctake) - unlock(rv.synctake) + lock(rv.synctake::ReentrantLock) + unlock(rv.synctake::ReentrantLock) end nothing end @@ -731,7 +731,7 @@ function take_ref(rid, caller, args...) # special handling for local put! / remote take! on unbuffered channel # github issue #29932 synctake = true - lock(rv.synctake) + lock(rv.synctake::ReentrantLock) end v = try @@ -739,7 +739,7 @@ function take_ref(rid, caller, args...) catch e # avoid unmatched unlock when exception occurs # github issue #33972 - synctake && unlock(rv.synctake) + synctake && unlock(rv.synctake::ReentrantLock) rethrow(e) end diff --git a/src/workerpool.jl b/src/workerpool.jl index b28c726..aa73a61 100644 --- a/src/workerpool.jl +++ b/src/workerpool.jl @@ -289,7 +289,7 @@ julia> default_worker_pool() WorkerPool(Channel{Int64}(sz_max:9223372036854775807,sz_curr:3), Set([4, 2, 3]), RemoteChannel{Channel{Any}}(1, 1, 4)) ``` """ -function default_worker_pool() +function default_worker_pool()::AbstractWorkerPool # On workers retrieve the default worker pool from the master when accessed # for the first time if _default_worker_pool[] === nothing @@ -299,7 +299,7 @@ function default_worker_pool() _default_worker_pool[] = remotecall_fetch(()->default_worker_pool(), 1) end end - return _default_worker_pool[] + return _default_worker_pool[]::AbstractWorkerPool end """ diff --git a/test/distributed_exec.jl b/test/distributed_exec.jl index a218bf6..19d10eb 100644 --- a/test/distributed_exec.jl +++ b/test/distributed_exec.jl @@ -343,9 +343,12 @@ end @testset "Ser/deser to non-ClusterSerializer objects" begin function test_regular_io_ser(ref::DistributedNext.AbstractRemoteRef) io = IOBuffer() - serialize(io, ref) + # Wrapping the ref in a Dict to exercise the case when the + # type parameter of the RemoteChannel is part of an outer type. + # See https://github.com/JuliaLang/Distributed.jl/issues/178 + serialize(io, Dict("ref" => ref)) seekstart(io) - ref2 = deserialize(io) + ref2 = deserialize(io)["ref"] for fld in fieldnames(typeof(ref)) v = getfield(ref2, fld) if isa(v, Number) @@ -361,6 +364,7 @@ end test_regular_io_ser(Future()) test_regular_io_ser(RemoteChannel()) + test_regular_io_ser(RemoteChannel(() -> Channel{Bool}(1))) end @testset "@distributed and [un]buffered reads" begin