Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[RFC] Wrap and test the non-blocking collective functions #143

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions deps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,24 @@ FortranCInterface_HEADER(jlmpi_f2c.h MACRO_NAMESPACE "JLMPI_" SYMBOLS
MPI_GATHERV
MPI_GET_COUNT
MPI_GET_PROCESSOR_NAME
MPI_IALLGATHER
MPI_IALLGATHERV
MPI_IALLREDUCE
MPI_IALLTOALL
MPI_IALLTOALLV
MPI_IBARRIER
MPI_IBCAST
MPI_IEXSCAN
MPI_IGATHER
MPI_IGATHERV
MPI_INIT
MPI_INITIALIZED
MPI_IPROBE
MPI_IRECV
MPI_IREDUCE
MPI_ISCAN
MPI_ISCATTER
MPI_ISCATTERV
MPI_ISEND
MPI_OP_CREATE
MPI_OP_FREE
Expand Down
14 changes: 14 additions & 0 deletions deps/gen_functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,24 @@ int main(int argc, char *argv[]) {
printf(" :MPI_GET_COUNT => \"%s\",\n", STRING(MPI_GET_COUNT));
printf(" :MPI_GET_PROCESSOR_NAME => \"%s\",\n",
STRING(MPI_GET_PROCESSOR_NAME));
printf(" :MPI_IALLGATHER => \"%s\",\n", STRING(MPI_IALLGATHER));
printf(" :MPI_IALLGATHERV => \"%s\",\n", STRING(MPI_IALLGATHERV));
printf(" :MPI_IALLREDUCE => \"%s\",\n", STRING(MPI_IALLREDUCE));
printf(" :MPI_IALLTOALL => \"%s\",\n", STRING(MPI_IALLTOALL));
printf(" :MPI_IALLTOALLV => \"%s\",\n", STRING(MPI_IALLTOALLV));
printf(" :MPI_IBARRIER => \"%s\",\n", STRING(MPI_IBARRIER));
printf(" :MPI_IBCAST => \"%s\",\n", STRING(MPI_IBCAST));
printf(" :MPI_IEXSCAN => \"%s\",\n", STRING(MPI_IEXSCAN));
printf(" :MPI_IGATHER => \"%s\",\n", STRING(MPI_IGATHER));
printf(" :MPI_IGATHERV => \"%s\",\n", STRING(MPI_IGATHERV));
printf(" :MPI_INIT => \"%s\",\n", STRING(MPI_INIT));
printf(" :MPI_INITIALIZED => \"%s\",\n", STRING(MPI_INITIALIZED));
printf(" :MPI_IPROBE => \"%s\",\n", STRING(MPI_IPROBE));
printf(" :MPI_IRECV => \"%s\",\n", STRING(MPI_IRECV));
printf(" :MPI_IREDUCE => \"%s\",\n", STRING(MPI_IREDUCE));
printf(" :MPI_ISCAN => \"%s\",\n", STRING(MPI_ISCAN));
printf(" :MPI_ISCATTER => \"%s\",\n", STRING(MPI_ISCATTER));
printf(" :MPI_ISCATTERV => \"%s\",\n", STRING(MPI_ISCATTERV));
printf(" :MPI_ISEND => \"%s\",\n", STRING(MPI_ISEND));
printf(" :MPI_OP_CREATE => \"%s\",\n", STRING(MPI_OP_CREATE));
printf(" :MPI_OP_FREE => \"%s\",\n", STRING(MPI_OP_FREE));
Expand Down
221 changes: 221 additions & 0 deletions src/mpi-base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,11 @@ function Cancel!(req::Request)
end

# Collective communication
function Ibarrier(comm::Comm)
rval = Ref{Cint}()
ccall(MPI_IBARRIER, Void, (Ptr{Cint},Ptr{Cint},Ptr{Cint}), &comm.val, rval, &0)
Request(rval[], nothing)
end

function Barrier(comm::Comm)
ccall(MPI_BARRIER, Void, (Ptr{Cint},Ptr{Cint}), &comm.val, &0)
Expand All @@ -480,6 +485,19 @@ function Bcast!{T}(buffer::Array{T}, root::Integer, comm::Comm)
Bcast!(buffer, length(buffer), root, comm)
end

function Ibcast!{T}(buffer::MPIBuffertype{T}, count::Integer,
root::Integer, comm::Comm)
rval = Ref{Cint}()
ccall(MPI_IBCAST, Void,
(Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
buffer, &count, &mpitype(T), &root, &comm.val, rval, &0)
Request(rval[], buffer), buffer
end

function Ibcast!{T}(buffer::Array{T}, root::Integer, comm::Comm)
Ibcast!(buffer, length(buffer), root, comm)
end

#=
function Bcast{T}(obj::T, root::Integer, comm::Comm)
buf = [T]
Expand Down Expand Up @@ -529,6 +547,69 @@ function Reduce{T}(object::T, op::Op, root::Integer, comm::Comm)
isroot ? recvbuf[1] : nothing
end

function Ireduce{T}(sendbuf::MPIBuffertype{T}, count::Integer,
op::Op, root::Integer, comm::Comm)
rval = Ref{Cint}()
isroot = Comm_rank(comm) == root
recvbuf = Array(T, isroot ? count : 0)
ccall(MPI_IREDUCE, Void,
(Ptr{T}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint},
Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, recvbuf, &count, &mpitype(T), &op.val, &root, &comm.val,
rval, &0)
Request(rval[], sendbuf), isroot ? recvbuf : nothing
end

function Ireduce{T}(sendbuf::Array{T}, op::Op, root::Integer, comm::Comm)
Ireduce(sendbuf, length(sendbuf), op, root, comm)
end

function Ireduce{T}(object::T, op::Op, root::Integer, comm::Comm)
isroot = Comm_rank(comm) == root
sendbuf = T[object]
req, recvbuf = Ireduce(sendbuf, op, root, comm)
req, isroot ? recvbuf[1] : nothing
end

function Allreduce{T}(sendbuf::MPIBuffertype{T}, count::Integer, op::Op, comm::Comm)
recvbuf = Array(T, count)
ccall(MPI_ALLREDUCE, Void,
(Ptr{T}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, recvbuf, &count, &mpitype(T), &op.val, &comm.val, &0)
recvbuf
end

function Allreduce{T}(sendbuf::Array{T}, op::Op, comm::Comm)
Allreduce(sendbuf, length(sendbuf), op, comm)
end

function Allreduce{T}(object::T, op::Op, comm::Comm)
sendbuf = T[object]
recvbuf = Allreduce(sendbuf, op, comm)
recvbuf[1]
end

function Iallreduce{T}(sendbuf::MPIBuffertype{T}, count::Integer, op::Op, comm::Comm)
rval = Ref{Cint}()
recvbuf = Array(T, count)
ccall(MPI_IALLREDUCE, Void,
(Ptr{T}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint},
Ptr{Cint}, Ptr{Cint}),
sendbuf, recvbuf, &count, &mpitype(T), &op.val, &comm.val,
rval, &0)
Request(rval[], sendbuf), recvbuf
end

function Iallreduce{T}(sendbuf::Array{T}, op::Op, comm::Comm)
Iallreduce(sendbuf, length(sendbuf), op, comm)
end

function Iallreduce{T}(object::T, op::Op, comm::Comm)
sendbuf = T[object]
req, recvbuf = Iallreduce(sendbuf, op, comm)
req, recvbuf[1]
end

function Scatter{T}(sendbuf::MPIBuffertype{T},
count::Integer, root::Integer, comm::Comm)
recvbuf = Array(T, count)
Expand All @@ -538,6 +619,16 @@ function Scatter{T}(sendbuf::MPIBuffertype{T},
recvbuf
end

function Iscatter{T}(sendbuf::MPIBuffertype{T},
count::Integer, root::Integer, comm::Comm)
rval = Ref{Cint}()
recvbuf = Array(T, count)
ccall(MPI_ISCATTER, Void,
(Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, &count, &mpitype(T), recvbuf, &count, &mpitype(T), &root, &comm.val, rval, &0)
Request(rval[], sendbuf), recvbuf
end

function Scatterv{T}(sendbuf::MPIBuffertype{T},
counts::Vector{Cint}, root::Integer,
comm::Comm)
Expand All @@ -550,6 +641,19 @@ function Scatterv{T}(sendbuf::MPIBuffertype{T},
recvbuf
end

function Iscatterv{T}(sendbuf::MPIBuffertype{T},
counts::Vector{Cint}, root::Integer,
comm::Comm)
rval = Ref{Cint}()
recvbuf = Array(T, counts[Comm_rank(comm) + 1])
recvcnt = counts[Comm_rank(comm) + 1]
disps = cumsum(counts) - counts
ccall(MPI_ISCATTERV, Void,
(Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, counts, disps, &mpitype(T), recvbuf, &recvcnt, &mpitype(T), &root, &comm.val, rval, &0)
Request(rval[], sendbuf), recvbuf
end

function Gather{T}(sendbuf::MPIBuffertype{T}, count::Integer,
root::Integer, comm::Comm)
isroot = Comm_rank(comm) == root
Expand All @@ -571,6 +675,27 @@ function Gather{T}(object::T, root::Integer, comm::Comm)
isroot ? recvbuf : nothing
end

function Igather{T}(sendbuf::MPIBuffertype{T}, count::Integer,
root::Integer, comm::Comm)
isroot = Comm_rank(comm) == root
recvbuf = Array(T, isroot ? Comm_size(comm) * count : 0)
rval = Ref{Cint}()
ccall(MPI_IGATHER, Void,
(Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, &count, &mpitype(T), recvbuf, &count, &mpitype(T), &root, &comm.val, rval, &0)
Request(rval[], sendbuf), isroot ? recvbuf : nothing
end

function Igather{T}(sendbuf::Array{T}, root::Integer, comm::Comm)
Igather(sendbuf, length(sendbuf), root, comm)
end

function Igather{T}(object::T, root::Integer, comm::Comm)
isroot = Comm_rank(comm) == root
sendbuf = T[object]
Igather(sendbuf, root, comm)
end

function Allgather{T}(sendbuf::MPIBuffertype{T}, count::Integer,
comm::Comm)
recvbuf = Array(T, Comm_size(comm) * count)
Expand All @@ -590,6 +715,25 @@ function Allgather{T}(object::T, comm::Comm)
recvbuf
end

function Iallgather{T}(sendbuf::MPIBuffertype{T}, count::Integer,
comm::Comm)
rval = Ref{Cint}()
recvbuf = Array(T, Comm_size(comm) * count)
ccall(MPI_IALLGATHER, Void,
(Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, &count, &mpitype(T), recvbuf, &count, &mpitype(T), &comm.val, rval, &0)
Request(rval[], sendbuf), recvbuf
end

function Iallgather{T}(sendbuf::Array{T}, comm::Comm)
Iallgather(sendbuf, length(sendbuf), comm)
end

function Iallgather{T}(object::T, comm::Comm)
sendbuf = T[object]
Iallgather(sendbuf, comm)
end

function Gatherv{T}(sendbuf::MPIBuffertype{T}, counts::Vector{Cint},
root::Integer, comm::Comm)
isroot = Comm_rank(comm) == root
Expand All @@ -602,6 +746,19 @@ function Gatherv{T}(sendbuf::MPIBuffertype{T}, counts::Vector{Cint},
isroot ? recvbuf : nothing
end

function Igatherv{T}(sendbuf::MPIBuffertype{T}, counts::Vector{Cint},
root::Integer, comm::Comm)
isroot = Comm_rank(comm) == root
rval = Ref{Cint}()
displs = cumsum(counts) - counts
sendcnt = counts[Comm_rank(comm) + 1]
recvbuf = Array(T, isroot ? sum(counts) : 0)
ccall(MPI_IGATHERV, Void,
(Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, &sendcnt, &mpitype(T), recvbuf, counts, displs, &mpitype(T), &root, &comm.val, rval, &0)
Request(rval[], sendbuf), isroot ? recvbuf : nothing
end

function Allgatherv{T}(sendbuf::MPIBuffertype{T}, counts::Vector{Cint},
comm::Comm)
displs = cumsum(counts) - counts
Expand All @@ -613,6 +770,18 @@ function Allgatherv{T}(sendbuf::MPIBuffertype{T}, counts::Vector{Cint},
recvbuf
end

function Iallgatherv{T}(sendbuf::MPIBuffertype{T}, counts::Vector{Cint},
comm::Comm)
rval = Ref{Cint}()
displs = cumsum(counts) - counts
sendcnt = counts[Comm_rank(comm) + 1]
recvbuf = Array(T, sum(counts))
ccall(MPI_IALLGATHERV, Void,
(Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, &sendcnt, &mpitype(T), recvbuf, counts, displs, &mpitype(T), &comm.val, rval, &0)
Request(rval[], sendbuf), recvbuf
end

function Alltoall{T}(sendbuf::MPIBuffertype{T}, count::Integer,
comm::Comm)
recvbuf = Array(T, Comm_size(comm)*count)
Expand All @@ -622,6 +791,16 @@ function Alltoall{T}(sendbuf::MPIBuffertype{T}, count::Integer,
recvbuf
end

function Ialltoall{T}(sendbuf::MPIBuffertype{T}, count::Integer,
comm::Comm)
rval = Ref{Cint}()
recvbuf = Array(T, Comm_size(comm)*count)
ccall(MPI_IALLTOALL, Void,
(Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, &count, &mpitype(T), recvbuf, &count, &mpitype(T), &comm.val, rval, &0)
Request(rval[], sendbuf), recvbuf
end

function Alltoallv{T}(sendbuf::MPIBuffertype{T}, scounts::Vector{Cint},
rcounts::Vector{Cint}, comm::Comm)
recvbuf = Array(T, sum(rcounts))
Expand All @@ -633,6 +812,18 @@ function Alltoallv{T}(sendbuf::MPIBuffertype{T}, scounts::Vector{Cint},
recvbuf
end

function Ialltoallv{T}(sendbuf::MPIBuffertype{T}, scounts::Vector{Cint},
rcounts::Vector{Cint}, comm::Comm)
rval = Ref{Cint}()
recvbuf = Array(T, sum(rcounts))
sdispls = cumsum(scounts) - scounts
rdispls = cumsum(rcounts) - rcounts
ccall(MPI_IALLTOALLV, Void,
(Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, scounts, sdispls, &mpitype(T), recvbuf, rcounts, rdispls, &mpitype(T), &comm.val, rval, &0)
Request(rval[], sendbuf), recvbuf
end

function Scan{T}(sendbuf::MPIBuffertype{T}, count::Integer,
op::Op, comm::Comm)
recvbuf = Array(T, count)
Expand All @@ -647,6 +838,21 @@ function Scan{T}(object::T, op::Op, comm::Comm)
Scan(sendbuf,1,op,comm)
end

function Iscan{T}(sendbuf::MPIBuffertype{T}, count::Integer,
op::Op, comm::Comm)
recvbuf = Array(T, count)
rval = Ref{Cint}()
ccall(MPI_ISCAN, Void,
(Ptr{T}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, recvbuf, &count, &mpitype(T), &op.val, &comm.val, rval, &0)
Request(rval[], sendbuf), recvbuf
end

function Iscan{T}(object::T, op::Op, comm::Comm)
sendbuf = T[object]
Iscan(sendbuf,1,op,comm)
end

function ExScan{T}(sendbuf::MPIBuffertype{T}, count::Integer,
op::Op, comm::Comm)
recvbuf = Array(T, count)
Expand All @@ -661,6 +867,21 @@ function ExScan{T}(object::T, op::Op, comm::Comm)
ExScan(sendbuf,1,op,comm)
end

function IExScan{T}(sendbuf::MPIBuffertype{T}, count::Integer,
op::Op, comm::Comm)
recvbuf = Array(T, count)
rval = Ref{Cint}()
ccall(MPI_IEXSCAN, Void,
(Ptr{T}, Ptr{T}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}, Ptr{Cint}),
sendbuf, recvbuf, &count, &mpitype(T), &op.val, &comm.val, rval, &0)
Request(rval[], sendbuf), recvbuf
end

function IExScan{T}(object::T, op::Op, comm::Comm)
sendbuf = T[object]
IExScan(sendbuf,1,op,comm)
end

# Conversion between C and Fortran Comm handles:
if HAVE_MPI_COMM_C2F
# use MPI_Comm_f2c and MPI_Comm_c2f
Expand Down
Loading