Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement assynchronous SFTP file transfer #641

Draft
wants to merge 5 commits into
base: devel
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build-manylinux-container-images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ jobs:
YEAR: 2010

env:
LIBSSH_VERSION: 0.9.6
LIBSSH_VERSION: 0.11.1
PYPA_MANYLINUX_TAG: >-
manylinux${{ matrix.YEAR }}_${{ matrix.IMAGE.ARCH }}
FULL_IMAGE_NAME: >-
Expand Down
1 change: 1 addition & 0 deletions docs/changelog-fragments/636.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Bumped the libssh version to latest release 0.11.1 -- by :user:`Jakuje`.
1 change: 1 addition & 0 deletions docs/changelog-fragments/641.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added support for asynchronous SFTP file transfers to increase throughput -- by :user:`Jakuje`.
27 changes: 27 additions & 0 deletions src/pylibsshext/includes/sftp.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -55,5 +55,32 @@ cdef extern from "libssh/sftp.h" nogil:
ssize_t sftp_read(sftp_file file, const void *buf, size_t count)
int sftp_get_error(sftp_session sftp)

struct sftp_attributes_struct:
char *name
char *longname
unsigned int flags
unsigned int type
unsigned int size
# ...
ctypedef sftp_attributes_struct * sftp_attributes
sftp_attributes sftp_stat(sftp_session session, const char *path)

struct sftp_aio_struct:
pass
ctypedef sftp_aio_struct * sftp_aio
ssize_t sftp_aio_begin_read(sftp_file file, size_t len, sftp_aio *aio)
ssize_t sftp_aio_wait_read(sftp_aio *aio, void *buf, size_t buf_size)
ssize_t sftp_aio_begin_write(sftp_file file, const void *buf, size_t len, sftp_aio *aio)
ssize_t sftp_aio_wait_write(sftp_aio *aio)
void sftp_aio_free(sftp_aio aio)

struct sftp_limits_struct:
unsigned int max_packet_length
unsigned int max_read_length
unsigned int max_write_length
unsigned int max_open_handles
ctypedef sftp_limits_struct * sftp_limits_t
sftp_limits_t sftp_limits(sftp_session sftp)

cdef extern from "sys/stat.h" nogil:
cdef int S_IRWXU
12 changes: 12 additions & 0 deletions src/pylibsshext/sftp.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,15 @@ from pylibsshext.session cimport Session
cdef class SFTP:
cdef Session session
cdef sftp.sftp_session _libssh_sftp_session

cdef class SFTP_AIO:
cdef _aio_queue
cdef _remote_file
cdef _file_size
cdef _total_bytes_requested
cdef sftp.sftp_session _sftp
cdef sftp.sftp_limits_t _limits
cdef sftp.sftp_file _rf

cdef class C_AIO:
cdef sftp.sftp_aio aio
225 changes: 173 additions & 52 deletions src/pylibsshext/sftp.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@
# License along with this library; if not, see file LICENSE.rst in this
# repository.

import os
from collections import deque

from posix.fcntl cimport O_CREAT, O_RDONLY, O_TRUNC, O_WRONLY

from cpython.bytes cimport PyBytes_AS_STRING
from cpython.mem cimport PyMem_Free, PyMem_Malloc

from pylibsshext.errors cimport LibsshSFTPException
from pylibsshext.session cimport get_libssh_session
Expand Down Expand Up @@ -54,69 +58,186 @@ cdef class SFTP:
self._libssh_sftp_session = NULL

def put(self, local_file, remote_file):
SFTP_AIO(self).put(local_file, remote_file)

def get(self, remote_file, local_file):
SFTP_AIO(self).get(remote_file, local_file)

def close(self):
if self._libssh_sftp_session is not NULL:
sftp.sftp_free(self._libssh_sftp_session)
self._libssh_sftp_session = NULL

def _get_sftp_error_str(self):
error = sftp.sftp_get_error(self._libssh_sftp_session)
if error in MSG_MAP and error != sftp.SSH_FX_FAILURE:
return MSG_MAP[error]
return "Generic failure: %s" % self.session._get_session_error_str()

cdef sftp.sftp_session get_sftp_session(SFTP sftp_obj):
return sftp_obj._libssh_sftp_session

cdef class SFTP_AIO:
def __cinit__(self, SFTP sftp_obj):
self._sftp = get_sftp_session(sftp_obj)

self._limits = sftp.sftp_limits(self._sftp)
if self._limits is NULL:
raise LibsshSFTPException("Failed to get remote SFTP limits [%s]" % (self._get_sftp_error_str()))

def __init__(self, SFTP sftp_obj):
self._aio_queue = deque()

def __dealloc__(self):
if self._rf is not NULL:
sftp.sftp_close(self._rf)
self._rf = NULL

def put(self, local_file, remote_file):
# reset
self._aio_queue = deque()
self._total_bytes_requested = 0

cdef C_AIO aio
cdef sftp.sftp_file rf
with open(local_file, "rb") as f:
remote_file_b = remote_file
if isinstance(remote_file_b, unicode):
remote_file_b = remote_file.encode("utf-8")
self._remote_file = remote_file

rf = sftp.sftp_open(self._libssh_sftp_session, remote_file_b, O_WRONLY | O_CREAT | O_TRUNC, sftp.S_IRWXU)
if rf is NULL:
raise LibsshSFTPException("Opening remote file [%s] for write failed with error [%s]" % (remote_file, self._get_sftp_error_str()))
buffer = f.read(1024)

while buffer != b"":
length = len(buffer)
written = sftp.sftp_write(rf, PyBytes_AS_STRING(buffer), length)
if written != length:
sftp.sftp_close(rf)
remote_file_b = remote_file
if isinstance(remote_file_b, unicode):
remote_file_b = remote_file.encode("utf-8")

rf = sftp.sftp_open(self._sftp, remote_file_b, O_WRONLY | O_CREAT | O_TRUNC, sftp.S_IRWXU)
if rf is NULL:
raise LibsshSFTPException("Opening remote file [%s] for write failed with error [%s]" % (remote_file, self._get_sftp_error_str()))
self._rf = rf

with open(local_file, "rb") as f:
f.seek(0, os.SEEK_END)
self._file_size = f.tell()
f.seek(0, os.SEEK_SET)

# start up to 10 requests before waiting for responses
i = 0
while i < 10 and self._total_bytes_requested < self._file_size:
self._put_chunk(f)
i += 1

while len(self._aio_queue):
aio = self._aio_queue.popleft()
bytes_written = sftp.sftp_aio_wait_write(&aio.aio)
if bytes_written == libssh.SSH_ERROR:
raise LibsshSFTPException(
"Writing to remote file [%s] failed with error [%s]" % (
remote_file,
self._get_sftp_error_str(),
)
"Failed to write to remote file [%s]: error [%s]" % (self._remote_file, self._get_sftp_error_str())
)
buffer = f.read(1024)
# was freed in the wait if it did not fail
aio.aio = NULL

# whole file read
if self._total_bytes_requested == self._file_size:
continue

# else issue more read requests
self._put_chunk(f)

sftp.sftp_close(rf)
self._rf = NULL

def _put_chunk(self, f):
to_write = min(self._file_size - self._total_bytes_requested, self._limits.max_write_length)
buffer = f.read(to_write)
if len(buffer) != to_write:
raise LibsshSFTPException("Read only [%d] but requested [%d] when reading from local file [%s] " % (len(buffer), to_write, self._remote_file))

cdef sftp.sftp_aio aio = NULL
bytes_requested = sftp.sftp_aio_begin_write(self._rf, PyBytes_AS_STRING(buffer), to_write, &aio)
if bytes_requested != to_write:
raise LibsshSFTPException("Failed to write chunk of size [%d] of file [%s] with error [%s]"
% (to_write, self._remote_file, self._get_sftp_error_str()))
self._total_bytes_requested += bytes_requested
c_aio = C_AIO()
c_aio.aio = aio
self._aio_queue.append(c_aio)

def get(self, remote_file, local_file):
cdef sftp.sftp_file rf
cdef char read_buffer[1024]
# reset
self._aio_queue = deque()
self._total_bytes_requested = 0

cdef C_AIO aio
cdef sftp.sftp_file rf = NULL
cdef sftp.sftp_attributes attrs
cdef char *buffer = NULL
self._remote_file = remote_file

remote_file_b = remote_file
if isinstance(remote_file_b, unicode):
remote_file_b = remote_file.encode("utf-8")

rf = sftp.sftp_open(self._libssh_sftp_session, remote_file_b, O_RDONLY, sftp.S_IRWXU)
if rf is NULL:
raise LibsshSFTPException("Opening remote file [%s] for read failed with error [%s]" % (remote_file, self._get_sftp_error_str()))

while True:
file_data = sftp.sftp_read(rf, <void *>read_buffer, sizeof(char) * 1024)
if file_data == 0:
break
elif file_data < 0:
sftp.sftp_close(rf)
raise LibsshSFTPException("Reading data from remote file [%s] failed with error [%s]"
% (remote_file, self._get_sftp_error_str()))

with open(local_file, 'wb+') as f:
bytes_written = f.write(read_buffer[:file_data])
if bytes_written and file_data != bytes_written:
sftp.sftp_close(rf)
raise LibsshSFTPException("Number of bytes [%s] read from remote file [%s]"
" does not match number of bytes [%s] written to local file [%s]"
" due to error [%s]"
% (file_data, remote_file, bytes_written, local_file, self._get_sftp_error_str()))
sftp.sftp_close(rf)
attrs = sftp.sftp_stat(self._sftp, remote_file_b)
if attrs is NULL:
raise LibsshSFTPException("Failed to stat the remote file [%s] with error [%s]"
% (remote_file, self._get_sftp_error_str()))
self._file_size = attrs.size

def close(self):
if self._libssh_sftp_session is not NULL:
sftp.sftp_free(self._libssh_sftp_session)
self._libssh_sftp_session = NULL
buffer_size = min(self._limits.max_read_length, self._file_size)
try:
buffer = <char *>PyMem_Malloc(buffer_size)

def _get_sftp_error_str(self):
error = sftp.sftp_get_error(self._libssh_sftp_session)
if error in MSG_MAP and error != sftp.SSH_FX_FAILURE:
return MSG_MAP[error]
return "Generic failure: %s" % self.session._get_session_error_str()
rf = sftp.sftp_open(self._sftp, remote_file_b, O_RDONLY, sftp.S_IRWXU)
if rf is NULL:
raise LibsshSFTPException("Opening remote file [%s] for reading failed with error [%s]" % (remote_file, self._get_sftp_error_str()))
self._rf = rf

with open(local_file, 'wb') as f:
# start up to 10 write requests before waiting for responses
i = 0
while i < 10 and self._total_bytes_requested < self._file_size:
self._get_chunk()
i += 1

while len(self._aio_queue):
aio = self._aio_queue.popleft()
bytes_read = sftp.sftp_aio_wait_read(&aio.aio, <void *>buffer, buffer_size)
if bytes_read == libssh.SSH_ERROR:
raise LibsshSFTPException(
"Failed to read from remote file [%s]: error [%s]" % (self._remote_file, self._get_sftp_error_str())
)
# was freed in the wait if it did not fail -- otherwise the __dealloc__ will free it
aio.aio = NULL

# write the file
f.write(buffer[:bytes_read])

# whole file read
if self._total_bytes_requested == self._file_size:
continue

# else issue more read requests
self._get_chunk()

finally:
if buffer is not NULL:
PyMem_Free(buffer)
sftp.sftp_close(rf)
self._rf = NULL

def _get_chunk(self):
to_read = min(self._file_size - self._total_bytes_requested, self._limits.max_read_length)
cdef sftp.sftp_aio aio = NULL
bytes_requested = sftp.sftp_aio_begin_read(self._rf, to_read, &aio)
if bytes_requested != to_read:
raise LibsshSFTPException("Failed to request to read chunk of size [%d] of file [%s] with error [%s]"
% (to_read, self._remote_file, self._get_sftp_error_str()))
self._total_bytes_requested += bytes_requested
c_aio = C_AIO()
c_aio.aio = aio
self._aio_queue.append(c_aio)


cdef class C_AIO:
def __cinit__(self):
self.aio = NULL

def __dealloc__(self):
sftp.sftp_aio_free(self.aio)
self.aio = NULL
40 changes: 40 additions & 0 deletions tests/unit/sftp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

"""Tests suite for sftp."""

import random
import string
import uuid

import pytest
Expand Down Expand Up @@ -63,3 +65,41 @@ def test_get(dst_path, src_path, sftp_session, transmit_payload):
"""Check that SFTP file download works."""
sftp_session.get(str(src_path), str(dst_path))
assert dst_path.read_bytes() == transmit_payload


@pytest.fixture
def large_payload():
"""
Generate a large 255 * 1024 + 1 B test payload.

The OpenSSH SFTP server supports maximum reads and writes of 256 * 1024 - 1024 B per request.
"""
random_char_kilobyte = [ord(random.choice(string.printable)) for _ in range(1024)]
full_bytes_number = 255
a_255kB_chunk = bytes(random_char_kilobyte * full_bytes_number)
the_last_byte = random.choice(random_char_kilobyte).to_bytes(length=1, byteorder='big')
return a_255kB_chunk + the_last_byte


@pytest.fixture
def src_path_large(tmp_path, large_payload):
"""Return a remote path to a 255kB + 1B sized file.

The openssh max read/write chunk size is 255kB so the test needs
a file that would execute at least two loops.
"""
path = tmp_path / 'large.txt'
path.write_bytes(large_payload)
return path


def test_put_large(dst_path, src_path_large, sftp_session, large_payload):
"""Check that SFTP can upload large file."""
sftp_session.put(str(src_path_large), str(dst_path))
assert dst_path.read_bytes() == large_payload


def test_get_large(dst_path, src_path_large, sftp_session, large_payload):
"""Check that SFTP can download large file."""
sftp_session.get(str(src_path_large), str(dst_path))
assert dst_path.read_bytes() == large_payload
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ commands =
-i --rm \
-v {toxinidir}:/io \
-e ANSIBLE_PYLIBSSH_CYTHON_TRACING \
ghcr.io/ansible/pylibssh-manylinux{env:MANYLINUX_VERSION_TAG}_{env:MANYLINUX_ARCH_TAG}:libssh-v{env:LIBSSH_VERSION:0.9.6} \
ghcr.io/ansible/pylibssh-manylinux{env:MANYLINUX_VERSION_TAG}_{env:MANYLINUX_ARCH_TAG}:libssh-v{env:LIBSSH_VERSION:0.11.1} \
/io/build-scripts/build-manylinux-wheels.sh \
"manylinux{env:MANYLINUX_VERSION_TAG}_{env:MANYLINUX_ARCH_TAG}" \
{posargs:}
Expand Down