Merge pull request #217 from geky/socket_event_coalescing

Add socket event coalescing to reduce cpu usage in lazy implementations
geky 2016-06-07 10:54:02 +01:00
commit a534c20a17
8 changed files with 89 additions and 53 deletions

View File

@ -23,12 +23,6 @@ Socket::Socket()
{ {
} }
Socket::~Socket()
{
// Underlying close is thread safe
close();
}
int Socket::open(NetworkStack *iface, nsapi_protocol_t proto) int Socket::open(NetworkStack *iface, nsapi_protocol_t proto)
{ {
_lock.lock(); _lock.lock();
@ -47,7 +41,8 @@ int Socket::open(NetworkStack *iface, nsapi_protocol_t proto)
} }
_socket = socket; _socket = socket;
_iface->socket_attach(_socket, &Socket::thunk, this); _event.attach(this, &Socket::event);
_iface->socket_attach(_socket, Callback<void()>::thunk, &_event);
_lock.unlock(); _lock.unlock();
@ -69,7 +64,7 @@ int Socket::close()
// Wakeup anything in a blocking operation // Wakeup anything in a blocking operation
// on this socket // on this socket
socket_event(); event();
_lock.unlock(); _lock.unlock();
return ret; return ret;
@ -156,16 +151,3 @@ void Socket::attach(Callback<void()> callback)
_lock.unlock(); _lock.unlock();
} }
void Socket::thunk(void *data)
{
Socket *self = (Socket *)data;
self->socket_event();
}
void Socket::socket_event(void)
{
if (_callback) {
_callback();
}
}

View File

@ -29,7 +29,7 @@ public:
* *
* Closes socket if the socket is still open * Closes socket if the socket is still open
*/ */
virtual ~Socket(); virtual ~Socket() {}
/** Opens a socket /** Opens a socket
* *
@ -169,13 +169,12 @@ public:
protected: protected:
Socket(); Socket();
int open(NetworkStack *iface, nsapi_protocol_t proto); int open(NetworkStack *iface, nsapi_protocol_t proto);
virtual void event() = 0;
static void thunk(void *);
virtual void socket_event(void);
NetworkStack *_iface; NetworkStack *_iface;
void *_socket; void *_socket;
uint32_t _timeout; uint32_t _timeout;
Callback<void()> _event;
Callback<void()> _callback; Callback<void()> _callback;
rtos::Mutex _lock; rtos::Mutex _lock;
}; };

View File

@ -17,15 +17,22 @@
#include "TCPServer.h" #include "TCPServer.h"
#include "Timer.h" #include "Timer.h"
TCPServer::TCPServer(): _accept_sem(0) TCPServer::TCPServer()
: _pending(0), _accept_sem(0)
{ {
} }
TCPServer::TCPServer(NetworkStack *iface): _accept_sem(0) TCPServer::TCPServer(NetworkStack *iface)
: _pending(0), _accept_sem(0)
{ {
open(iface); open(iface);
} }
TCPServer::~TCPServer()
{
close();
}
int TCPServer::open(NetworkStack *iface) int TCPServer::open(NetworkStack *iface)
{ {
return Socket::open(iface, NSAPI_TCP); return Socket::open(iface, NSAPI_TCP);
@ -55,6 +62,7 @@ int TCPServer::accept(TCPSocket *connection)
break; break;
} }
_pending = 0;
void *socket; void *socket;
ret = _iface->socket_accept(&socket, _socket); ret = _iface->socket_accept(&socket, _socket);
if (0 == ret) { if (0 == ret) {
@ -66,7 +74,8 @@ int TCPServer::accept(TCPSocket *connection)
connection->_iface = _iface; connection->_iface = _iface;
connection->_socket = socket; connection->_socket = socket;
_iface->socket_attach(socket, &Socket::thunk, connection); connection->_event = Callback<void()>(connection, &TCPSocket::event);
_iface->socket_attach(socket, &Callback<void()>::thunk, &connection->_event);
connection->_lock.unlock(); connection->_lock.unlock();
break; break;
@ -90,12 +99,15 @@ int TCPServer::accept(TCPSocket *connection)
return ret; return ret;
} }
void TCPServer::socket_event() void TCPServer::event()
{ {
int32_t status = _accept_sem.wait(0); int32_t acount = _accept_sem.wait(0);
if (status <= 1) { if (acount <= 1) {
_accept_sem.release(); _accept_sem.release();
} }
Socket::socket_event(); _pending += 1;
if (_callback && _pending == 1) {
_callback();
}
} }

View File

@ -32,6 +32,12 @@ public:
*/ */
TCPServer(); TCPServer();
/** Destroy a socket
*
* Closes socket if the socket is still open
*/
virtual ~TCPServer();
/** Create a socket on a network stack /** Create a socket on a network stack
* *
* Creates and opens a socket on the specified network stack. * Creates and opens a socket on the specified network stack.
@ -76,7 +82,8 @@ public:
*/ */
int accept(TCPSocket *connection); int accept(TCPSocket *connection);
protected: protected:
virtual void socket_event(void); virtual void event();
volatile unsigned _pending;
rtos::Semaphore _accept_sem; rtos::Semaphore _accept_sem;
}; };

View File

@ -17,16 +17,23 @@
#include "TCPSocket.h" #include "TCPSocket.h"
#include "Timer.h" #include "Timer.h"
TCPSocket::TCPSocket(): _read_sem(0), _write_sem(0) TCPSocket::TCPSocket()
: _pending(0), _read_sem(0), _write_sem(0)
{ {
} }
TCPSocket::TCPSocket(NetworkStack *iface): _read_sem(0), _write_sem(0) TCPSocket::TCPSocket(NetworkStack *iface)
: _pending(0), _read_sem(0), _write_sem(0)
{ {
// TCPSocket::open is thread safe // TCPSocket::open is thread safe
open(iface); open(iface);
} }
TCPSocket::~TCPSocket()
{
close();
}
int TCPSocket::open(NetworkStack *iface) int TCPSocket::open(NetworkStack *iface)
{ {
// Socket::open is thread safe // Socket::open is thread safe
@ -74,6 +81,7 @@ int TCPSocket::send(const void *data, unsigned size)
break; break;
} }
_pending = 0;
int sent = _iface->socket_send(_socket, data, size); int sent = _iface->socket_send(_socket, data, size);
if ((0 == _timeout) || (NSAPI_ERROR_WOULD_BLOCK != sent)) { if ((0 == _timeout) || (NSAPI_ERROR_WOULD_BLOCK != sent)) {
ret = sent; ret = sent;
@ -114,6 +122,7 @@ int TCPSocket::recv(void *data, unsigned size)
break; break;
} }
_pending = 0;
int recv = _iface->socket_recv(_socket, data, size); int recv = _iface->socket_recv(_socket, data, size);
if ((0 == _timeout) || (NSAPI_ERROR_WOULD_BLOCK != recv)) { if ((0 == _timeout) || (NSAPI_ERROR_WOULD_BLOCK != recv)) {
ret = recv; ret = recv;
@ -140,17 +149,19 @@ int TCPSocket::recv(void *data, unsigned size)
return ret; return ret;
} }
void TCPSocket::socket_event() void TCPSocket::event()
{ {
int32_t count; int32_t wcount = _write_sem.wait(0);
count = _write_sem.wait(0); if (wcount <= 1) {
if (count <= 1) {
_write_sem.release(); _write_sem.release();
} }
count = _read_sem.wait(0); int32_t rcount = _read_sem.wait(0);
if (count <= 1) { if (rcount <= 1) {
_read_sem.release(); _read_sem.release();
} }
Socket::socket_event(); _pending += 1;
if (_callback && _pending == 1) {
_callback();
}
} }

View File

@ -31,6 +31,12 @@ public:
*/ */
TCPSocket(); TCPSocket();
/** Destroy a socket
*
* Closes socket if the socket is still open
*/
virtual ~TCPSocket();
/** Create a socket on a network stack /** Create a socket on a network stack
* *
* Creates and opens a socket on the specified network stack. * Creates and opens a socket on the specified network stack.
@ -103,7 +109,8 @@ public:
int recv(void *data, unsigned size); int recv(void *data, unsigned size);
protected: protected:
virtual void socket_event(void); virtual void event();
volatile unsigned _pending;
rtos::Mutex _read_lock; rtos::Mutex _read_lock;
rtos::Semaphore _read_sem; rtos::Semaphore _read_sem;
rtos::Mutex _write_lock; rtos::Mutex _write_lock;

View File

@ -17,15 +17,22 @@
#include "UDPSocket.h" #include "UDPSocket.h"
#include "Timer.h" #include "Timer.h"
UDPSocket::UDPSocket(): _read_sem(0), _write_sem(0) UDPSocket::UDPSocket()
: _pending(0), _read_sem(0), _write_sem(0)
{ {
} }
UDPSocket::UDPSocket(NetworkStack *iface): _read_sem(0), _write_sem(0) UDPSocket::UDPSocket(NetworkStack *iface)
: _pending(0), _read_sem(0), _write_sem(0)
{ {
open(iface); open(iface);
} }
UDPSocket::~UDPSocket()
{
close();
}
int UDPSocket::open(NetworkStack *iface) int UDPSocket::open(NetworkStack *iface)
{ {
return Socket::open(iface, NSAPI_UDP); return Socket::open(iface, NSAPI_UDP);
@ -58,6 +65,7 @@ int UDPSocket::sendto(const SocketAddress &address, const void *data, unsigned s
break; break;
} }
_pending = 0;
int sent = _iface->socket_sendto(_socket, address, data, size); int sent = _iface->socket_sendto(_socket, address, data, size);
if ((0 == _timeout) || (NSAPI_ERROR_WOULD_BLOCK != sent)) { if ((0 == _timeout) || (NSAPI_ERROR_WOULD_BLOCK != sent)) {
ret = sent; ret = sent;
@ -98,6 +106,7 @@ int UDPSocket::recvfrom(SocketAddress *address, void *buffer, unsigned size)
break; break;
} }
_pending = 0;
int recv = _iface->socket_recvfrom(_socket, address, buffer, size); int recv = _iface->socket_recvfrom(_socket, address, buffer, size);
if ((0 == _timeout) || (NSAPI_ERROR_WOULD_BLOCK != recv)) { if ((0 == _timeout) || (NSAPI_ERROR_WOULD_BLOCK != recv)) {
ret = recv; ret = recv;
@ -124,17 +133,19 @@ int UDPSocket::recvfrom(SocketAddress *address, void *buffer, unsigned size)
return ret; return ret;
} }
void UDPSocket::socket_event() void UDPSocket::event()
{ {
int32_t count; int32_t wcount = _write_sem.wait(0);
count = _write_sem.wait(0); if (wcount <= 1) {
if (count <= 1) {
_write_sem.release(); _write_sem.release();
} }
count = _read_sem.wait(0); int32_t rcount = _read_sem.wait(0);
if (count <= 1) { if (rcount <= 1) {
_read_sem.release(); _read_sem.release();
} }
Socket::socket_event(); _pending += 1;
if (_callback && _pending == 1) {
_callback();
}
} }

View File

@ -31,6 +31,12 @@ public:
*/ */
UDPSocket(); UDPSocket();
/** Destroy a socket
*
* Closes socket if the socket is still open
*/
virtual ~UDPSocket();
/** Create a socket on a network stack /** Create a socket on a network stack
* *
* Creates and opens a socket on the specified network stack. * Creates and opens a socket on the specified network stack.
@ -102,7 +108,8 @@ public:
*/ */
int recvfrom(SocketAddress *address, void *data, unsigned size); int recvfrom(SocketAddress *address, void *data, unsigned size);
protected: protected:
virtual void socket_event(void); virtual void event();
volatile unsigned _pending;
rtos::Mutex _read_lock; rtos::Mutex _read_lock;
rtos::Semaphore _read_sem; rtos::Semaphore _read_sem;
rtos::Mutex _write_lock; rtos::Mutex _write_lock;