Skip to content

Commit

Permalink
util/stream: Add pumping functions
Browse files Browse the repository at this point in the history
This extends in_stream and out_stream to provide pumping
functions

Additionally stream_pump() function is added to allow easy
pumping data between streams.

Signed-off-by: Jerzy Kasenberg <[email protected]>
  • Loading branch information
kasjer committed Jun 10, 2024
1 parent f2b4e41 commit dcae9c1
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 2 deletions.
52 changes: 50 additions & 2 deletions util/stream/include/stream/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,17 @@ struct in_stream_vft {
* negative error code
*/
int (*available)(struct in_stream *istream);

/**
* Pump data from this in_stream to other out_stream
*
* @param istream - this input stream to get data from
* @param ostream - output stream to put data to
* @param count - number of bytes to pump
* @return non-negative number of bytes written
* negative error code
*/
int (*pump_to)(struct in_stream *istream, struct out_stream *ostream, uint32_t count);
};

/* Output stream functions */
Expand All @@ -83,6 +94,17 @@ struct out_stream_vft {
* negative error code
*/
int (*flush)(struct out_stream *ostream);

/**
* Pump data from in_stream to this out_stream
*
* @param ostream - this output stream to write data to
* @param istream - other input stream to get data from
* @param count - number for bytes to pump
* @return non-negative number of bytes written
* negative error code
*/
int (*pump_from)(struct out_stream *ostream, struct in_stream *istream, uint32_t count);
};

/* Plain input stream */
Expand All @@ -102,24 +124,48 @@ struct mem_in_stream {
uint32_t read_ptr;
};

#define OSTREAM_DEF(type) \
static int type ## _write(struct out_stream *ostream, const uint8_t *buf, uint32_t count); \
static int type ## _flush(struct out_stream *ostream); \
static int type ## _pump_from(struct out_stream *ostream, struct in_stream *istream, uint32_t count); \
const struct out_stream_vft type ## _vft = { \
.write = type ## _write, \
.flush = type ## _flush, \
.pump_from = type ## _pump_from, \
}

#define OSTREAM_VFT(type, _write, _flush, _pump) \
static int _write(struct out_stream *ostream, const uint8_t *buf, uint32_t count); \
static int _flush(struct out_stream *ostream); \
static int _pump_from(struct out_stream *ostream, struct in_stream *istream, uint32_t count); \
const struct out_stream_vft type ## _vft = { \
.write = _write, \
.flush = _flush, \
.pump_from = _pump_from, \
}

#define OSTREAM(type, name) \
struct type name = { \
.vft = &type ## _vft \
}

#define OSTREAM_INIT(type, field) \
.field.vft = &type ## _vft

#define ISTREAM_DEF(type) \
const struct in_stream_vft type ## _vft = { \
.available = type ## _available, \
.read = type ## _read, \
.pump_to = type ## _pump_to, \
}

#define ISTREAM(type, name) \
struct in_stream name = { \
.vft = &type ## _vft \
}

#define ISTREAM_INIT(type) \
.type.vft = &type ## _vft \
#define ISTREAM_INIT(type, field) \
.field.vft = &type ## _vft \

/**
* Check how many bytes can be read out of stream.
Expand Down Expand Up @@ -175,6 +221,8 @@ int ostream_flush(struct out_stream *ostream);
*/
int ostream_write(struct out_stream *ostream, const uint8_t *buf, uint32_t count, bool flush);

int stream_pump(struct in_stream *istream, struct out_stream *ostream, uint32_t count);

/**
* Initialize memory input stream
*
Expand Down
29 changes: 29 additions & 0 deletions util/stream/src/stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <string.h>

#include <stream/stream.h>
#include "os/os.h"

int
istream_flush(struct in_stream *istream)
Expand Down Expand Up @@ -107,3 +108,31 @@ istream_read(struct in_stream *istream, uint8_t *buf, uint32_t count)
return istream->vft->read(istream, buf, count);
}

int
stream_pump(struct in_stream *istream, struct out_stream *ostream, uint32_t count)
{
int pumped;

/* If output stream has pump_from function used it */
if (ostream->vft->pump_from) {
pumped = ostream->vft->pump_from(ostream, istream, count);
} else if (istream->vft->pump_to) {
/* When input stream has pump_to used it */
pumped = istream->vft->pump_to(istream, ostream, count);
} else {
/* Both stream don't provide pump functions, use local buffer for pumping data */
uint8_t buf[16];
pumped = 0;
while (count) {
uint32_t n = min(count, sizeof(buf));
uint32_t r = istream_read(istream, buf, n);
pumped += r;
count -= r;
ostream_write(ostream, buf, r, false);
if (r == 0) {
break;
}
}
}
return pumped;
}

0 comments on commit dcae9c1

Please sign in to comment.