Move accept() to abstract socket class and implement in TCPSocket

Deprecate TCPServer in favor of just TCPSocket::accept()
pull/7192/head
Seppo Takalo 2018-06-14 16:58:15 +03:00
parent cdbe43c0d5
commit 407b2f275d
9 changed files with 248 additions and 175 deletions

View File

@ -20,9 +20,9 @@
using namespace mbed;
InternetSocket::InternetSocket()
: _stack(0)
, _socket(0)
, _timeout(osWaitForever)
: _stack(0), _socket(0), _timeout(osWaitForever),
_readers(0), _writers(0), _factory_allocated(false),
_pending(0)
{
}
@ -68,7 +68,19 @@ nsapi_error_t InternetSocket::close()
// on this socket
event();
// Wait until all readers and writers are gone
while (_readers || _writers) {
_lock.unlock();
_event_flag.wait_any(FINISHED_FLAG, osWaitForever);
_lock.lock();
}
_lock.unlock();
// When allocated by accept() call, will self desctruct on close();
if (_factory_allocated) {
delete this;
}
return ret;
}
@ -172,6 +184,15 @@ nsapi_error_t InternetSocket::getsockopt(int level, int optname, void *optval, u
return ret;
}
void InternetSocket::event()
{
_event_flag.set(READ_FLAG|WRITE_FLAG);
_pending += 1;
if (_callback && _pending == 1) {
_callback();
}
}
void InternetSocket::sigio(Callback<void()> callback)
{

View File

@ -23,6 +23,7 @@
#include "netsocket/Socket.h"
#include "netsocket/NetworkStack.h"
#include "rtos/Mutex.h"
#include "rtos/EventFlags.h"
#include "Callback.h"
#include "mbed_toolchain.h"
@ -217,7 +218,7 @@ public:
protected:
InternetSocket();
virtual nsapi_protocol_t get_proto() = 0;
virtual void event() = 0;
virtual void event();
int modify_multicast_group(const SocketAddress &address, nsapi_socket_option_t socketopt);
NetworkStack *_stack;
@ -225,8 +226,18 @@ protected:
uint32_t _timeout;
mbed::Callback<void()> _event;
mbed::Callback<void()> _callback;
rtos::EventFlags _event_flag;
rtos::Mutex _lock;
SocketAddress _remote_peer;
uint8_t _readers;
uint8_t _writers;
volatile unsigned _pending;
bool _factory_allocated;
// Event flags
static const int READ_FLAG = 0x1u;
static const int WRITE_FLAG = 0x2u;
static const int FINISHED_FLAG = 0x3u;
};
#endif // INTERNETSOCKET_H

View File

@ -110,11 +110,10 @@ public:
/** Receive a data from a socket
*
* Receives a data and stores the source address in address if address
* is not NULL. Returns the number of bytes written into the buffer. If the
* datagram is larger than the buffer, the excess data is silently discarded.
* is not NULL. Returns the number of bytes written into the buffer.
*
* By default, recvfrom blocks until a datagram is received. If socket is set to
* non-blocking or times out with no datagram, NSAPI_ERROR_WOULD_BLOCK
* non-blocking or times out with no data, NSAPI_ERROR_WOULD_BLOCK
* is returned.
*
* @param address Destination for the source address or NULL
@ -213,6 +212,32 @@ public:
* @return 0 on success, negative error code on failure
*/
virtual nsapi_error_t getsockopt(int level, int optname, void *optval, unsigned *optlen) = 0;
/** Accepts a connection on a socket.
*
* The server socket must be bound and set to listen for connections.
* On a new connection, returns connected network socket which user is expected to call close()
* and that deallocates the resources. Referencing a returned pointer after a close()
* call is not allowed and leads to undefined behaviour.
*
* By default, accept blocks until incomming connection occurs. If socket is set to
* non-blocking or times out, error is set to NSAPI_ERROR_WOULD_BLOCK.
*
* @param error pointer to storage of the error value or NULL
* @return pointer to a socket
*/
virtual Socket *accept(nsapi_error_t *error = NULL) = 0;
/** Listen for incoming connections.
*
* Marks the socket as a passive socket that can be used to accept
* incoming connections.
*
* @param backlog Number of pending connections that can be queued
* simultaneously, defaults to 1
* @return 0 on success, negative error code on failure
*/
virtual nsapi_error_t listen(int backlog = 1) = 0;
};

View File

@ -18,7 +18,6 @@
#include "mbed.h"
TCPServer::TCPServer()
: _pending(0), _accept_sem(0)
{
}
@ -27,29 +26,8 @@ TCPServer::~TCPServer()
close();
}
nsapi_protocol_t TCPServer::get_proto()
{
return NSAPI_TCP;
}
nsapi_error_t TCPServer::listen(int backlog)
{
_lock.lock();
nsapi_error_t ret;
if (!_socket) {
ret = NSAPI_ERROR_NO_SOCKET;
} else {
ret = _stack->socket_listen(_socket, backlog);
}
_lock.unlock();
return ret;
}
nsapi_error_t TCPServer::accept(TCPSocket *connection, SocketAddress *address)
{
_lock.lock();
nsapi_error_t ret;
while (true) {
@ -76,19 +54,19 @@ nsapi_error_t TCPServer::accept(TCPSocket *connection, SocketAddress *address)
connection->_lock.unlock();
break;
} else if (NSAPI_ERROR_WOULD_BLOCK != ret) {
} else if ((_timeout == 0) || (ret != NSAPI_ERROR_WOULD_BLOCK)) {
break;
} else {
int32_t count;
uint32_t flag;
// Release lock before blocking so other threads
// accessing this object aren't blocked
_lock.unlock();
count = _accept_sem.wait(_timeout);
flag = _event_flag.wait_any(READ_FLAG, _timeout);
_lock.lock();
if (count < 1) {
// Semaphore wait timed out so break out and return
if (flag & osFlagsError) {
// Timeout break
ret = NSAPI_ERROR_WOULD_BLOCK;
break;
}
@ -98,41 +76,3 @@ nsapi_error_t TCPServer::accept(TCPSocket *connection, SocketAddress *address)
_lock.unlock();
return ret;
}
void TCPServer::event()
{
int32_t acount = _accept_sem.wait(0);
if (acount <= 1) {
_accept_sem.release();
}
_pending += 1;
if (_callback && _pending == 1) {
_callback();
}
}
nsapi_error_t TCPServer::connect(const SocketAddress)
{
return NSAPI_ERROR_UNSUPPORTED;
}
nsapi_size_or_error_t TCPServer::send(const void *, nsapi_size_t)
{
return NSAPI_ERROR_UNSUPPORTED;
}
nsapi_size_or_error_t TCPServer::recv(void *, nsapi_size_t)
{
return NSAPI_ERROR_UNSUPPORTED;
}
nsapi_size_or_error_t TCPServer::sendto(const SocketAddress&, const void *, nsapi_size_t)
{
return NSAPI_ERROR_UNSUPPORTED;
}
nsapi_size_or_error_t TCPServer::recvfrom(SocketAddress*, void *, nsapi_size_t)
{
return NSAPI_ERROR_UNSUPPORTED;
}

View File

@ -28,12 +28,14 @@
/** TCP socket server
* @addtogroup netsocket
*/
class TCPServer : public InternetSocket {
class TCPServer : public TCPSocket {
public:
/** Create an uninitialized socket
*
* Must call open to initialize the socket on a network stack.
*/
MBED_DEPRECATED_SINCE("mbed-os-5.10",
"TCPServer is deprecated, use TCPSocket")
TCPServer();
/** Create a socket on a network interface
@ -44,8 +46,9 @@ public:
* @param stack Network stack as target for socket
*/
template <typename S>
MBED_DEPRECATED_SINCE("mbed-os-5.10",
"TCPServer is deprecated, use TCPSocket")
TCPServer(S *stack)
: _pending(0), _accept_sem(0)
{
open(stack);
}
@ -56,17 +59,6 @@ public:
*/
virtual ~TCPServer();
/** Listen for connections on a TCP socket
*
* Marks the socket as a passive socket that can be used to accept
* incoming connections.
*
* @param backlog Number of pending connections that can be queued
* simultaneously, defaults to 1
* @return 0 on success, negative error code on failure
*/
nsapi_error_t listen(int backlog = 1);
/** Accepts a connection on a TCP socket
*
* The server socket must be bound and set to listen for connections.
@ -81,51 +73,9 @@ public:
* @param address Destination for the remote address or NULL
* @return 0 on success, negative error code on failure
*/
MBED_DEPRECATED_SINCE("mbed-os-5.10",
"TCPServer::accept() is deprecated, use Socket *Socket::accept() instead")
nsapi_error_t accept(TCPSocket *connection, SocketAddress *address = NULL);
/** Not supported on TCPServer.
* @param address unused
* @return NSAPI_ERROR_UNSUPPORTED
*/
virtual nsapi_error_t connect(const SocketAddress &address);
/** Not supported on TCPServer.
* @param data unused
* @param size unused
* @return NSAPI_ERROR_UNSUPPORTED
*/
virtual nsapi_size_or_error_t send(const void *data, nsapi_size_t size);
/** Not supported on TCPServer.
* @param data unused
* @param size unused
* @return NSAPI_ERROR_UNSUPPORTED
*/
virtual nsapi_size_or_error_t recv(void *data, nsapi_size_t size);
/** Not supported on TCPServer.
* @param address unused
* @param data unused
* @param size unused
* @return NSAPI_ERROR_UNSUPPORTED
*/
virtual nsapi_size_or_error_t sendto(const SocketAddress& address, const void *data, nsapi_size_t size);
/** Not supported on TCPServer.
* @param address unused
* @param data unused
* @param size unused
* @return NSAPI_ERROR_UNSUPPORTED
*/
virtual nsapi_size_or_error_t recvfrom(SocketAddress* address, void *data, nsapi_size_t size);
protected:
virtual nsapi_protocol_t get_proto();
virtual void event();
volatile unsigned _pending;
rtos::Semaphore _accept_sem;
};
#endif

View File

@ -18,12 +18,7 @@
#include "Timer.h"
#include "mbed_assert.h"
#define READ_FLAG 0x1u
#define WRITE_FLAG 0x2u
TCPSocket::TCPSocket()
: _pending(0), _event_flag(),
_read_in_progress(false), _write_in_progress(false)
{
}
@ -45,8 +40,8 @@ nsapi_error_t TCPSocket::connect(const SocketAddress &address)
// If this assert is hit then there are two threads
// performing a send at the same time which is undefined
// behavior
MBED_ASSERT(!_write_in_progress);
_write_in_progress = true;
MBED_ASSERT(_writers == 0);
_writers++;
bool blocking_connect_in_progress = false;
@ -77,7 +72,10 @@ nsapi_error_t TCPSocket::connect(const SocketAddress &address)
}
}
_write_in_progress = false;
_writers--;
if (!_socket) {
_event_flag.set(FINISHED_FLAG);
}
/* Non-blocking connect gives "EISCONN" once done - convert to OK for blocking mode if we became connected during this call */
if (ret == NSAPI_ERROR_IS_CONNECTED && blocking_connect_in_progress) {
@ -116,8 +114,8 @@ nsapi_size_or_error_t TCPSocket::send(const void *data, nsapi_size_t size)
// If this assert is hit then there are two threads
// performing a send at the same time which is undefined
// behavior
MBED_ASSERT(!_write_in_progress);
_write_in_progress = true;
MBED_ASSERT(_writers == 0);
_writers++;
// Unlike recv, we should write the whole thing if blocking. POSIX only
// allows partial as a side-effect of signal handling; it normally tries to
@ -156,7 +154,11 @@ nsapi_size_or_error_t TCPSocket::send(const void *data, nsapi_size_t size)
}
}
_write_in_progress = false;
_writers--;
if (!_socket) {
_event_flag.set(FINISHED_FLAG);
}
_lock.unlock();
if (ret <= 0 && ret != NSAPI_ERROR_WOULD_BLOCK) {
return ret;
@ -181,8 +183,8 @@ nsapi_size_or_error_t TCPSocket::recv(void *data, nsapi_size_t size)
// If this assert is hit then there are two threads
// performing a recv at the same time which is undefined
// behavior
MBED_ASSERT(!_read_in_progress);
_read_in_progress = true;
MBED_ASSERT(_readers == 0);
_readers++;
while (true) {
if (!_socket) {
@ -211,7 +213,11 @@ nsapi_size_or_error_t TCPSocket::recv(void *data, nsapi_size_t size)
}
}
_read_in_progress = false;
_readers--;
if (!_socket) {
_event_flag.set(FINISHED_FLAG);
}
_lock.unlock();
return ret;
}
@ -224,12 +230,74 @@ nsapi_size_or_error_t TCPSocket::recvfrom(SocketAddress *address, void *data, ns
return recv(data, size);
}
void TCPSocket::event()
nsapi_error_t TCPSocket::listen(int backlog)
{
_event_flag.set(READ_FLAG|WRITE_FLAG);
_lock.lock();
nsapi_error_t ret;
_pending += 1;
if (_callback && _pending == 1) {
_callback();
if (!_socket) {
ret = NSAPI_ERROR_NO_SOCKET;
} else {
ret = _stack->socket_listen(_socket, backlog);
}
_lock.unlock();
return ret;
}
TCPSocket *TCPSocket::accept(nsapi_error_t *error)
{
_lock.lock();
TCPSocket *connection = NULL;
_readers++;
while (true) {
if (!_socket) {
*error = NSAPI_ERROR_NO_SOCKET;
break;
}
_pending = 0;
void *socket;
SocketAddress address;
*error = _stack->socket_accept(_socket, &socket, &address);
if (0 == *error) {
TCPSocket *connection = new TCPSocket();
connection->_lock.lock();
connection->_factory_allocated = true; // Destroy automatically on close()
connection->_remote_peer = address;
connection->_stack = _stack;
connection->_socket = socket;
connection->_event = mbed::Callback<void()>(connection, &TCPSocket::event);
_stack->socket_attach(socket, &mbed::Callback<void()>::thunk, &connection->_event);
connection->_lock.unlock();
break;
} else if ((_timeout == 0) || (*error != NSAPI_ERROR_WOULD_BLOCK)) {
break;
} else {
uint32_t flag;
// Release lock before blocking so other threads
// accessing this object aren't blocked
_lock.unlock();
flag = _event_flag.wait_any(READ_FLAG, _timeout);
_lock.lock();
if (flag & osFlagsError) {
// Timeout break
*error = NSAPI_ERROR_WOULD_BLOCK;
break;
}
}
}
_readers--;
if (!_socket) {
_event_flag.set(FINISHED_FLAG);
}
_lock.unlock();
return connection;
}

View File

@ -45,8 +45,6 @@ public:
*/
template <typename S>
TCPSocket(S *stack)
: _pending(0), _event_flag(0),
_read_in_progress(false), _write_in_progress(false)
{
open(stack);
}
@ -133,19 +131,54 @@ public:
*/
virtual nsapi_size_or_error_t sendto(const SocketAddress &address,
const void *data, nsapi_size_t size);
/** Receive a data from a socket
*
* Receives a data and stores the source address in address if address
* is not NULL. Returns the number of bytes written into the buffer.
*
* By default, recvfrom blocks until a data is received. If socket is set to
* non-blocking or times out with no datagram, NSAPI_ERROR_WOULD_BLOCK
* is returned.
*
* @param address Destination for the source address or NULL
* @param data Destination buffer for datagram received from the host
* @param size Size of the buffer in bytes
* @return Number of received bytes on success, negative error
* code on failure
*/
virtual nsapi_size_or_error_t recvfrom(SocketAddress *address,
void *data, nsapi_size_t size);
/** Accepts a connection on a socket.
*
* The server socket must be bound and set to listen for connections.
* On a new connection, returns connected network socket which user is expected to call close()
* and that deallocates the resources. Referencing a returned pointer after a close()
* call is not allowed and leads to undefined behaviour.
*
* By default, accept blocks until incomming connection occurs. If socket is set to
* non-blocking or times out, error is set to NSAPI_ERROR_WOULD_BLOCK.
*
* @param error pointer to storage of the error value or NULL
* @return pointer to a socket
*/
virtual TCPSocket *accept(nsapi_error_t *error = NULL);
/** Listen for incoming connections.
*
* Marks the socket as a passive socket that can be used to accept
* incoming connections.
*
* @param backlog Number of pending connections that can be queued
* simultaneously, defaults to 1
* @return 0 on success, negative error code on failure
*/
virtual nsapi_error_t listen(int backlog = 1);
protected:
friend class TCPServer;
virtual nsapi_protocol_t get_proto();
virtual void event();
volatile unsigned _pending;
rtos::EventFlags _event_flag;
bool _read_in_progress;
bool _write_in_progress;
};

View File

@ -18,12 +18,7 @@
#include "Timer.h"
#include "mbed_assert.h"
#define TCP_EVENT "UDP_Events"
#define READ_FLAG 0x1u
#define WRITE_FLAG 0x2u
UDPSocket::UDPSocket()
: _pending(0), _event_flag()
{
}
@ -64,6 +59,8 @@ nsapi_size_or_error_t UDPSocket::sendto(const SocketAddress &address, const void
_lock.lock();
nsapi_size_or_error_t ret;
_writers++;
while (true) {
if (!_socket) {
ret = NSAPI_ERROR_NO_SOCKET;
@ -92,6 +89,10 @@ nsapi_size_or_error_t UDPSocket::sendto(const SocketAddress &address, const void
}
}
_writers--;
if (!_socket || !_writers) {
_event_flag.set(FINISHED_FLAG);
}
_lock.unlock();
return ret;
}
@ -107,6 +108,13 @@ nsapi_size_or_error_t UDPSocket::recvfrom(SocketAddress *address, void *buffer,
{
_lock.lock();
nsapi_size_or_error_t ret;
SocketAddress ignored;
if (!address) {
address = &ignored;
}
_readers++;
while (true) {
if (!_socket) {
@ -136,22 +144,29 @@ nsapi_size_or_error_t UDPSocket::recvfrom(SocketAddress *address, void *buffer,
}
}
_readers--;
if (!_socket || !_readers) {
_event_flag.set(FINISHED_FLAG);
}
_lock.unlock();
return ret;
}
nsapi_size_or_error_t UDPSocket::recv(void *buffer, nsapi_size_t size)
{
SocketAddress ignored; // Dangerous, I'm spending ~50 bytes from stack for address space that I'll ignore.
return recvfrom(&ignored, buffer, size);
return recvfrom(NULL, buffer, size);
}
void UDPSocket::event()
Socket *UDPSocket::accept(nsapi_error_t *error)
{
_event_flag.set(READ_FLAG|WRITE_FLAG);
_pending += 1;
if (_callback && _pending == 1) {
_callback();
if (error) {
*error = NSAPI_ERROR_UNSUPPORTED;
}
return NULL;
}
nsapi_error_t UDPSocket::listen(int)
{
return NSAPI_ERROR_UNSUPPORTED;
}

View File

@ -45,7 +45,6 @@ public:
*/
template <typename S>
UDPSocket(S *stack)
: _pending(0), _event_flag(0)
{
open(stack);
}
@ -152,12 +151,23 @@ public:
* code on failure.
*/
virtual nsapi_size_or_error_t recv(void *data, nsapi_size_t size);
/** Not implemented for UDP
*
* @param error unused
* @return NSAPI_ERROR_UNSUPPORTED
*/
virtual Socket *accept(nsapi_error_t *error = NULL);
/** Not implemented for UDP
*
* @param backlog unused
* @return NSAPI_ERROR_UNSUPPORTED
*/
virtual nsapi_error_t listen(int backlog = 1);
protected:
virtual nsapi_protocol_t get_proto();
virtual void event();
volatile unsigned _pending;
rtos::EventFlags _event_flag;
};