Simplify I/O handling

This commit is contained in:
Marc André Tanner 2014-03-21 20:40:55 +01:00
parent 6bd887dc96
commit a99e46fd23
4 changed files with 111 additions and 265 deletions

View File

@ -77,20 +77,8 @@ typedef struct {
} u;
} Packet;
typedef struct {
Packet pkt;
size_t off;
} ClientPacketState;
typedef struct {
Packet *pkt;
size_t off;
} ServerPacketState;
typedef struct Client Client;
struct Client {
ServerPacketState output; /* display output as received from server */
ClientPacketState input; /* input as sent to the server */
int socket;
enum {
STATE_CONNECTED,
@ -98,7 +86,6 @@ struct Client {
STATE_DETACHED,
STATE_DISCONNECTED,
} state;
time_t last_activity;
bool need_resize;
Client *next;
};
@ -108,11 +95,6 @@ typedef struct {
int client_count;
int socket;
Packet pty_output;
ClientPacketState pty_input;
Packet queue[10];
unsigned int queue_count;
unsigned int queue_insert;
unsigned int queue_remove;
int pty;
int exit_status;
struct termios term;
@ -134,6 +116,8 @@ static int create_socket(const char *name);
static void die(const char *s);
static void info(const char *str, ...);
#include "debug.c"
static inline size_t packet_header_size() {
return offsetof(Packet, u);
}
@ -142,19 +126,61 @@ static size_t packet_size(Packet *pkt) {
return packet_header_size() + pkt->len;
}
static bool is_client_packet_complete(ClientPacketState *pkt) {
return pkt->off >= packet_header_size() && pkt->off == packet_size(&pkt->pkt);
static ssize_t write_all(int fd, const char *buf, size_t len) {
debug("write_all(%d)\n", len);
ssize_t ret = len;
while (len > 0) {
ssize_t res = write(fd, buf, len);
if (res < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
continue;
return -1;
}
if (res == 0)
return ret - len;
buf += res;
len -= res;
}
return ret;
}
static bool is_server_packet_complete(ServerPacketState *pkt) {
return pkt->pkt && pkt->off == packet_size(pkt->pkt);
static ssize_t read_all(int fd, char *buf, size_t len) {
debug("read_all(%d)\n", len);
ssize_t ret = len;
while (len > 0) {
ssize_t res = read(fd, buf, len);
if (res < 0) {
if (errno == EWOULDBLOCK)
return ret - len;
if (errno == EAGAIN || errno == EINTR)
continue;
return -1;
}
if (res == 0)
return ret - len;
buf += res;
len -= res;
}
return ret;
}
static bool is_server_packet_nonempty(ServerPacketState *pkt) {
return pkt->pkt && pkt->pkt->len > 0;
static bool send_packet(int socket, Packet *pkt) {
size_t size = packet_size(pkt);
return write_all(socket, (char *)pkt, size) == size;
}
static bool recv_packet(int socket, Packet *pkt) {
ssize_t len = read_all(socket, (char*)pkt, packet_header_size());
if (len <= 0 || len != packet_header_size())
return false;
if (pkt->len > 0) {
len = read_all(socket, pkt->u.msg, pkt->len);
if (len <= 0 || len != pkt->len)
return false;
}
return true;
}
#include "debug.c"
#include "client.c"
#include "server.c"

View File

@ -4,64 +4,21 @@ static void client_sigwinch_handler(int sig) {
client.need_resize = true;
}
static ssize_t write_all(int fd, const char *buf, size_t len) {
ssize_t ret = len;
while (len > 0) {
ssize_t res = write(fd, buf, len);
if (res < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINTR)
continue;
return -1;
}
if (res == 0)
return ret - len;
buf += res;
len -= res;
}
return ret;
}
static ssize_t read_all(int fd, char *buf, size_t len) {
ssize_t ret = len;
while (len > 0) {
ssize_t res = read(fd, buf, len);
if (res < 0) {
if (errno == EWOULDBLOCK)
return ret - len;
if (errno == EAGAIN || errno == EINTR)
continue;
return -1;
}
if (res == 0)
return ret - len;
buf += res;
len -= res;
}
return ret;
}
static bool client_send_packet(Packet *pkt) {
print_packet("client-send:", pkt);
size_t size = packet_size(pkt);
if (write_all(server.socket, (char *)pkt, size) != size) {
debug("FAILED\n");
server.running = false;
return false;
}
return true;
if (send_packet(server.socket, pkt))
return true;
debug("FAILED\n");
server.running = false;
return false;
}
static bool client_recv_packet(Packet *pkt) {
ssize_t len = read_all(server.socket, (char*)pkt, packet_header_size());
if (len <= 0 || len != packet_header_size() || pkt->len == 0)
goto error;
len = read_all(server.socket, pkt->u.msg, pkt->len);
print_packet("client-recv:", pkt);
if (len <= 0 || len != pkt->len)
goto error;
return true;
error:
debug("FAILED here\n");
if (recv_packet(server.socket, pkt)) {
print_packet("client-recv:", pkt);
return true;
}
debug("client-recv: FAILED\n");
server.running = false;
return false;
}
@ -123,7 +80,7 @@ static int client_mainloop() {
if (len == -1 && errno != EAGAIN && errno != EINTR)
die("client-stdin");
if (len > 0) {
debug("client-stdin: %c", pkt.u.msg[0]);
debug("client-stdin: %c\n", pkt.u.msg[0]);
pkt.len = len;
if (pkt.u.msg[0] == KEY_REDRAW) {
client.need_resize = true;

17
debug.c
View File

@ -1,8 +1,6 @@
#ifdef NDEBUG
static void debug(const char *errstr, ...) { }
static void print_packet(const char *prefix, Packet *pkt) { }
static void print_client_packet_state(const char *prefix, ClientPacketState *pkt) { }
static void print_server_packet_state(const char *prefix, ServerPacketState *pkt) { }
#else
static void debug(const char *errstr, ...) {
@ -13,6 +11,7 @@ static void debug(const char *errstr, ...) {
}
static void print_packet(const char *prefix, Packet *pkt) {
// XXX: look up table
char *s = "UNKNOWN";
switch (pkt->type) {
case MSG_CONTENT:
@ -40,21 +39,11 @@ static void print_packet(const char *prefix, Packet *pkt) {
for (size_t i = 0; i < pkt->len && i < sizeof(pkt->u.msg); i++)
fprintf(stderr, "%c", pkt->u.msg[i]);
fprintf(stderr, "\n");
} else if (pkt->type == MSG_RESIZE) {
fprintf(stderr, "%s %s %d x %d\n", prefix, s, pkt->u.ws.ws_col, pkt->u.ws.ws_row);
} else {
fprintf(stderr, "%s %s len: %d\n", prefix, s, pkt->len);
}
}
static void print_client_packet_state(const char *prefix, ClientPacketState *pkt) {
fprintf(stderr, "%s %d/%d\n", prefix, pkt->off, packet_size(&pkt->pkt));
if (is_client_packet_complete(pkt))
print_packet(prefix, &pkt->pkt);
}
static void print_server_packet_state(const char *prefix, ServerPacketState *pkt) {
fprintf(stderr, "%s %d/%d\n", prefix, pkt->off, packet_size(pkt->pkt));
if (is_server_packet_complete(pkt))
print_packet(prefix, pkt->pkt);
}
#endif /* NDEBUG */

218
server.c
View File

@ -43,7 +43,6 @@ static int server_create_socket(const char *name) {
goto error;
if (listen(socket, 5) == -1)
goto error;
debug("old: %d new: %d\n", server.socket, socket);
return socket;
error:
unlink(sockaddr.sun_path);
@ -57,7 +56,7 @@ static int server_set_socket_non_blocking(int sock) {
return fcntl(sock, F_SETFL, flags | O_NONBLOCK);
}
static Client *server_accept_client(time_t now) {
static Client *server_accept_client() {
int newfd = accept(server.socket, NULL, NULL);
if (newfd == -1)
return NULL;
@ -69,7 +68,6 @@ static Client *server_accept_client(time_t now) {
server_set_socket_non_blocking(newfd);
c->socket = newfd;
c->state = STATE_CONNECTED;
c->last_activity = now;
c->next = server.clients;
server.clients = c;
server.client_count++;
@ -87,87 +85,33 @@ static bool server_read_pty(Packet *pkt) {
return len > 0;
}
static bool server_write_pty(ClientPacketState *pkt) {
size_t count = pkt->pkt.len - pkt->off;
ssize_t len = write(server.pty, pkt->pkt.u.msg + pkt->off, count);
if (len == -1) {
if (errno != EAGAIN && errno != EINTR)
server.running = false;
} else {
pkt->off += len;
}
print_client_packet_state("server-write-pty:", pkt);
return len == count;
}
static void server_place_packet(Client *c, Packet *pkt) {
c->output.pkt = pkt;
c->output.off = 0;
}
static bool server_recv_packet_header(Client *c) {
ClientPacketState *pkt = &c->input;
if (pkt->off >= packet_header_size())
static bool server_write_pty(Packet *pkt) {
print_packet("server-write-pty:", pkt);
size_t size = pkt->len;
if (write_all(server.pty, pkt->u.msg, size) == size)
return true;
size_t count = packet_header_size() - pkt->off;
ssize_t len = recv(c->socket, ((char *)&pkt->pkt) + pkt->off, count, 0);
switch (len) {
case -1:
if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
case 0:
c->state = STATE_DISCONNECTED;
}
break;
default:
pkt->off += len;
break;
}
print_client_packet_state("server-recv:", pkt);
return len == count;
debug("FAILED\n");
server.running = false;
return false;
}
static bool server_recv_packet(Client *c) {
ClientPacketState *pkt = &c->input;
if (is_client_packet_complete(pkt))
static bool server_recv_packet(Client *c, Packet *pkt) {
if (recv_packet(c->socket, pkt)) {
print_packet("server-recv:", pkt);
return true;
if (!server_recv_packet_header(c))
return false;
size_t count = packet_size(&pkt->pkt) - pkt->off;
ssize_t len = recv(c->socket, ((char *)&pkt->pkt) + pkt->off, count, 0);
switch (len) {
case -1:
if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
case 0:
c->state = STATE_DISCONNECTED;
}
break;
default:
pkt->off += len;
break;
}
print_client_packet_state("server-recv:", pkt);
return len == count;
debug("server-recv: FAILED\n");
c->state = STATE_DISCONNECTED;
return false;
}
static bool server_send_packet(Client *c) {
ServerPacketState *pkt = &c->output;
if (is_server_packet_complete(pkt))
static bool server_send_packet(Client *c, Packet *pkt) {
print_packet("server-send:", pkt);
if (send_packet(c->socket, pkt))
return true;
size_t count = packet_size(pkt->pkt) - pkt->off;
ssize_t len = send(c->socket, (char*)pkt->pkt + pkt->off, count, 0);
switch (len) {
case -1:
if (errno != EAGAIN && errno != EWOULDBLOCK && errno != EINTR) {
case 0:
c->state = STATE_DISCONNECTED;
}
break;
default:
pkt->off += len;
break;
}
print_server_packet_state("server-send:", pkt);
return len == count;
debug("FAILED\n");
c->state = STATE_DISCONNECTED;
return false;
}
static void server_pty_died_handler(int sig) {
@ -201,30 +145,6 @@ static void server_atexit_handler() {
unlink(sockaddr.sun_path);
}
static bool server_queue_empty() {
return server.queue_count == 0;
}
static bool server_queue_packet(Packet *pkt) {
if (server.queue_count >= countof(server.queue))
return false;
server.queue[server.queue_insert] = *pkt;
server.queue_insert++;
server.queue_insert %= countof(server.queue);
server.queue_count++;
return true;
}
static Packet *server_peek_packet() {
return &server.queue[server.queue_remove];
}
static void server_dequeue_packet() {
server.queue_remove++;
server.queue_remove %= countof(server.queue);
server.queue_count--;
}
static void server_mainloop() {
atexit(server_atexit_handler);
fd_set new_readfds, new_writefds;
@ -232,25 +152,14 @@ static void server_mainloop() {
FD_ZERO(&new_writefds);
FD_SET(server.socket, &new_readfds);
int new_fdmax = server.socket;
Packet *exit_pkt = NULL;
for (;;) {
while (!exit_pkt) {
int fdmax = new_fdmax;
fd_set readfds = new_readfds;
fd_set writefds = new_writefds;
FD_SET_MAX(server.socket, &readfds, fdmax);
if (!server.running && server.exit_status != -1 && server.clients) { /* application terminated */
Packet pkt = { .type = MSG_EXIT, .len = sizeof(int), .u.i = server.exit_status };
server.pty_output = pkt;
time_t now = time(NULL);
for (Client *c = server.clients; c; c = c->next) {
server_place_packet(c, &server.pty_output);
c->last_activity = now;
FD_SET_MAX(c->socket, &writefds, fdmax);
}
server.exit_status = -1;
}
if (select(fdmax+1, &readfds, &writefds, NULL, NULL) == -1) {
if (errno == EINTR)
continue;
@ -261,15 +170,21 @@ static void server_mainloop() {
FD_ZERO(&new_writefds);
new_fdmax = server.socket;
time_t now = time(NULL);
time_t timeout = now - CLIENT_TIMEOUT;
bool pty_data = false, clients_ready = true, exit_sent = false;
bool pty_data = false;
Packet server_packet, client_packet;
if (FD_ISSET(server.socket, &readfds))
server_accept_client(now);
server_accept_client();
if (FD_ISSET(server.pty, &readfds))
pty_data = server_read_pty(&server.pty_output);
pty_data = server_read_pty(&server_packet);
if (!server.running && server.exit_status != -1 && server.clients) { /* application terminated */
Packet pkt = { .type = MSG_EXIT, .len = sizeof(int), .u.i = server.exit_status };
exit_pkt = &pkt;
server.exit_status = -1;
}
for (Client **prev_next = &server.clients, *c = server.clients; c;) {
if (c->state == STATE_DISCONNECTED) {
@ -281,20 +196,19 @@ static void server_mainloop() {
continue;
}
if (FD_ISSET(c->socket, &readfds))
server_recv_packet(c);
if (is_client_packet_complete(&c->input)) {
bool packet_handled = true;
switch (c->input.pkt.type) {
if (FD_ISSET(c->socket, &readfds) && server_recv_packet(c, &client_packet)) {
switch (client_packet.type) {
case MSG_CONTENT:
packet_handled = server_queue_packet(&c->input.pkt);
server_write_pty(&client_packet);
break;
case MSG_ATTACH:
case MSG_RESIZE:
c->state = STATE_ATTACHED;
case MSG_REDRAW:
if (c->input.pkt.type == MSG_REDRAW || c == server.clients)
ioctl(server.pty, TIOCSWINSZ, &c->input.pkt.u.ws);
if (client_packet.type == MSG_REDRAW || c == server.clients) {
debug("server-ioct: TIOCSWINSZ\n");
ioctl(server.pty, TIOCSWINSZ, &client_packet.u.ws);
}
kill(-server.pid, SIGWINCH);
break;
case MSG_DETACH:
@ -303,61 +217,21 @@ static void server_mainloop() {
default: /* ignore package */
break;
}
if (packet_handled) {
c->input.off = 0;
FD_SET_MAX(c->socket, &new_readfds, new_fdmax);
}
} else {
FD_SET_MAX(c->socket, &new_readfds, new_fdmax);
}
if (pty_data) {
server_place_packet(c, &server.pty_output);
c->last_activity = now;
}
if (FD_ISSET(c->socket, &writefds)) {
server_send_packet(c);
c->last_activity = now;
}
FD_SET_MAX(c->socket, &new_readfds, new_fdmax);
if (!is_server_packet_complete(&c->output)) {
if (c->last_activity < timeout) {
c->state = STATE_DISCONNECTED;
} else if (is_server_packet_nonempty(&c->output)) {
clients_ready = false;
FD_SET_MAX(c->socket, &new_writefds, new_fdmax);
}
} else {
exit_sent = (c->output.pkt && c->output.pkt->type == MSG_EXIT);
}
if (pty_data)
server_send_packet(c, &server_packet);
if (exit_pkt)
server_send_packet(c, exit_pkt);
if (c->state != STATE_ATTACHED)
clients_ready = false;
prev_next = &c->next;
c = c->next;
}
if (server.running && clients_ready)
if (server.running)
FD_SET_MAX(server.pty, &new_readfds, new_fdmax);
if (FD_ISSET(server.pty, &writefds)) {
while (!server_queue_empty()) {
if (!server.pty_input.off)
server.pty_input.pkt = *server_peek_packet();
if (!server_write_pty(&server.pty_input))
break;
server_dequeue_packet();
server.pty_input.off = 0;
}
}
if (!server_queue_empty())
FD_SET_MAX(server.pty, &new_writefds, new_fdmax);
if (exit_sent)
break;
}
exit(EXIT_SUCCESS);