diff options
Diffstat (limited to 'common/trans.c')
-rw-r--r-- | common/trans.c | 257 |
1 files changed, 210 insertions, 47 deletions
diff --git a/common/trans.c b/common/trans.c index 3828a174..432b6334 100644 --- a/common/trans.c +++ b/common/trans.c @@ -24,9 +24,11 @@ #include "parse.h" #include "ssl_calls.h" +#define MAX_SBYTES 0 + /*****************************************************************************/ int APP_CC -trans_tls_recv(struct trans *self, void *ptr, int len) +trans_tls_recv(struct trans *self, char *ptr, int len) { if (self->tls == NULL) { @@ -37,7 +39,7 @@ trans_tls_recv(struct trans *self, void *ptr, int len) /*****************************************************************************/ int APP_CC -trans_tls_send(struct trans *self, const void *data, int len) +trans_tls_send(struct trans *self, const char *data, int len) { if (self->tls == NULL) { @@ -59,14 +61,14 @@ trans_tls_can_recv(struct trans *self, int sck, int millis) /*****************************************************************************/ int APP_CC -trans_tcp_recv(struct trans *self, void *ptr, int len) +trans_tcp_recv(struct trans *self, char *ptr, int len) { return g_tcp_recv(self->sck, ptr, len, 0); } /*****************************************************************************/ int APP_CC -trans_tcp_send(struct trans *self, const void *data, int len) +trans_tcp_send(struct trans *self, const char *data, int len) { return g_tcp_send(self->sck, data, len, 0); } @@ -75,7 +77,7 @@ trans_tcp_send(struct trans *self, const void *data, int len) int APP_CC trans_tcp_can_recv(struct trans *self, int sck, int millis) { - return g_tcp_can_recv(sck, millis); + return g_sck_can_recv(sck, millis); } /*****************************************************************************/ @@ -169,13 +171,29 @@ trans_get_wait_objs(struct trans *self, tbus *objs, int *count) /*****************************************************************************/ int APP_CC trans_get_wait_objs_rw(struct trans *self, tbus *robjs, int *rcount, - tbus *wobjs, int *wcount) + tbus *wobjs, int *wcount, int *timeout) { - if (trans_get_wait_objs(self, robjs, rcount) != 0) + if (self == 0) { return 1; } + if (self->status != TRANS_STATUS_UP) + { + return 1; + } + + if ((self->si != 0) && (self->si->source[self->my_source] > MAX_SBYTES)) + { + } + else + { + if (trans_get_wait_objs(self, robjs, rcount) != 0) + { + return 1; + } + } + if (self->wait_s != 0) { wobjs[*wcount] = self->sck; @@ -187,7 +205,7 @@ trans_get_wait_objs_rw(struct trans *self, tbus *robjs, int *rcount, /*****************************************************************************/ int APP_CC -send_waiting(struct trans *self, int block) +trans_send_waiting(struct trans *self, int block) { struct stream *temp_s; int bytes; @@ -209,9 +227,13 @@ send_waiting(struct trans *self, int block) if (sent > 0) { temp_s->p += sent; + if (temp_s->source != 0) + { + temp_s->source[0] -= sent; + } if (temp_s->p >= temp_s->end) { - self->wait_s = (struct stream *) (temp_s->next_packet); + self->wait_s = temp_s->next; free_stream(temp_s); } } @@ -227,6 +249,18 @@ send_waiting(struct trans *self, int block) } } } + else if (block) + { + /* check for term here */ + if (self->is_term != 0) + { + if (self->is_term()) + { + /* term */ + return 1; + } + } + } } else { @@ -247,6 +281,7 @@ trans_check_wait_objs(struct trans *self) int to_read = 0; int read_so_far = 0; int rv = 0; + int cur_source; if (self == 0) { @@ -262,7 +297,7 @@ trans_check_wait_objs(struct trans *self) if (self->type1 == TRANS_TYPE_LISTENER) /* listening */ { - if (g_tcp_can_recv(self->sck, 0)) + if (g_sck_can_recv(self->sck, 0)) { in_sck = g_sck_accept(self->sck, self->addr, sizeof(self->addr), self->port, sizeof(self->port)); @@ -295,7 +330,7 @@ trans_check_wait_objs(struct trans *self) sizeof(self->addr) - 1); g_strncpy(in_trans->port, self->port, sizeof(self->port) - 1); - + g_sck_set_non_blocking(in_sck); if (self->trans_conn_in(self, in_trans) != 0) { trans_delete(in_trans); @@ -310,8 +345,17 @@ trans_check_wait_objs(struct trans *self) } else /* connected server or client (2 or 3) */ { - if (self->trans_can_recv(self, self->sck, 0)) + if (self->si != 0 && self->si->source[self->my_source] > MAX_SBYTES) { + } + else if (self->trans_can_recv(self, self->sck, 0)) + { + cur_source = 0; + if (self->si != 0) + { + cur_source = self->si->cur_source; + self->si->cur_source = self->my_source; + } read_so_far = (int) (self->in_s->end - self->in_s->data); to_read = self->header_size - read_so_far; @@ -329,6 +373,10 @@ trans_check_wait_objs(struct trans *self) { /* error */ self->status = TRANS_STATUS_DOWN; + if (self->si != 0) + { + self->si->cur_source = cur_source; + } return 1; } } @@ -336,6 +384,10 @@ trans_check_wait_objs(struct trans *self) { /* error */ self->status = TRANS_STATUS_DOWN; + if (self->si != 0) + { + self->si->cur_source = cur_source; + } return 1; } else @@ -357,8 +409,12 @@ trans_check_wait_objs(struct trans *self) } } } + if (self->si != 0) + { + self->si->cur_source = cur_source; + } } - if (send_waiting(self, 0) != 0) + if (trans_send_waiting(self, 0) != 0) { /* error */ self->status = TRANS_STATUS_DOWN; @@ -368,6 +424,7 @@ trans_check_wait_objs(struct trans *self) return rv; } + /*****************************************************************************/ int APP_CC trans_force_read_s(struct trans *self, struct stream *in_s, int size) @@ -378,7 +435,6 @@ trans_force_read_s(struct trans *self, struct stream *in_s, int size) { return 1; } - while (size > 0) { /* make sure stream has room */ @@ -386,14 +442,12 @@ trans_force_read_s(struct trans *self, struct stream *in_s, int size) { return 1; } - rcvd = self->trans_recv(self, in_s->end, size); - if (rcvd == -1) { if (g_tcp_last_error_would_block(self->sck)) { - if (!g_tcp_can_recv(self->sck, 100)) + if (!self->trans_can_recv(self, self->sck, 100)) { /* check for term here */ if (self->is_term != 0) @@ -426,7 +480,6 @@ trans_force_read_s(struct trans *self, struct stream *in_s, int size) size -= rcvd; } } - return 0; } @@ -449,20 +502,16 @@ trans_force_write_s(struct trans *self, struct stream *out_s) { return 1; } - size = (int) (out_s->end - out_s->data); total = 0; - - if (send_waiting(self, 1) != 0) + if (trans_send_waiting(self, 1) != 0) { self->status = TRANS_STATUS_DOWN; return 1; } - while (total < size) { sent = self->trans_send(self, out_s->data + total, size - total); - if (sent == -1) { if (g_tcp_last_error_would_block(self->sck)) @@ -499,7 +548,6 @@ trans_force_write_s(struct trans *self, struct stream *out_s) total = total + sent; } } - return 0; } @@ -512,23 +560,69 @@ trans_force_write(struct trans *self) /*****************************************************************************/ int APP_CC -trans_write_copy(struct trans *self) +trans_write_copy_s(struct trans *self, struct stream *out_s) { int size; - struct stream *out_s; + int sent; struct stream *wait_s; struct stream *temp_s; + char *out_data; if (self->status != TRANS_STATUS_UP) { return 1; } - - out_s = self->out_s; + /* try to send any left over */ + if (trans_send_waiting(self, 0) != 0) + { + /* error */ + self->status = TRANS_STATUS_DOWN; + return 1; + } + out_data = out_s->data; + sent = 0; size = (int) (out_s->end - out_s->data); + if (self->wait_s == 0) + { + /* if no left over, try to send this new data */ + if (g_tcp_can_send(self->sck, 0)) + { + sent = self->trans_send(self, out_s->data, size); + if (sent > 0) + { + out_data += sent; + size -= sent; + } + else if (sent == 0) + { + return 1; + } + else + { + if (!g_tcp_last_error_would_block(self->sck)) + { + return 1; + } + } + } + } + if (size < 1) + { + return 0; + } + /* did not send right away, have to copy */ make_stream(wait_s); init_stream(wait_s, size); - out_uint8a(wait_s, out_s->data, size); + if (self->si != 0) + { + if ((self->si->cur_source != 0) && + (self->si->cur_source != self->my_source)) + { + self->si->source[self->si->cur_source] += size; + wait_s->source = self->si->source + self->si->cur_source; + } + } + out_uint8a(wait_s, out_data, size); s_mark_end(wait_s); wait_s->p = wait_s->data; if (self->wait_s == 0) @@ -538,53 +632,110 @@ trans_write_copy(struct trans *self) else { temp_s = self->wait_s; - while (temp_s->next_packet != 0) + while (temp_s->next != 0) { - temp_s = (struct stream *) (temp_s->next_packet); + temp_s = temp_s->next; } - temp_s->next_packet = (char *) wait_s; - } - - /* try to send */ - if (send_waiting(self, 0) != 0) - { - /* error */ - self->status = TRANS_STATUS_DOWN; - return 1; + temp_s->next = wait_s; } - return 0; } /*****************************************************************************/ int APP_CC +trans_write_copy(struct trans* self) +{ + return trans_write_copy_s(self, self->out_s); +} + +/*****************************************************************************/ +int APP_CC trans_connect(struct trans *self, const char *server, const char *port, int timeout) { int error; + int now; + int start_time; + + start_time = g_time3(); if (self->sck != 0) { g_tcp_close(self->sck); + self->sck = 0; } if (self->mode == TRANS_MODE_TCP) /* tcp */ { self->sck = g_tcp_socket(); if (self->sck < 0) + { + self->status = TRANS_STATUS_DOWN; return 1; - + } g_tcp_set_non_blocking(self->sck); - error = g_tcp_connect(self->sck, server, port); + while (1) + { + error = g_tcp_connect(self->sck, server, port); + if (error == 0) + { + break; + } + else + { + if (timeout < 1) + { + self->status = TRANS_STATUS_DOWN; + return 1; + } + now = g_time3(); + if (now - start_time < timeout) + { + g_sleep(timeout / 5); + } + else + { + self->status = TRANS_STATUS_DOWN; + return 1; + } + } + } } else if (self->mode == TRANS_MODE_UNIX) /* unix socket */ { self->sck = g_tcp_local_socket(); if (self->sck < 0) + { + self->status = TRANS_STATUS_DOWN; return 1; - + } g_tcp_set_non_blocking(self->sck); - error = g_tcp_local_connect(self->sck, port); + while (1) + { + error = g_tcp_local_connect(self->sck, port); + if (error == 0) + { + break; + } + else + { + if (timeout < 1) + { + self->status = TRANS_STATUS_DOWN; + return 1; + } + now = g_time3(); + if (now - start_time < timeout) + { + g_sleep(timeout / 5); + } + else + { + self->status = TRANS_STATUS_DOWN; + return 1; + } + } + } } else { @@ -596,6 +747,15 @@ trans_connect(struct trans *self, const char *server, const char *port, { if (g_tcp_last_error_would_block(self->sck)) { + now = g_time3(); + if (now - start_time < timeout) + { + timeout = timeout - (now - start_time); + } + else + { + timeout = 0; + } if (g_tcp_can_send(self->sck, timeout)) { self->status = TRANS_STATUS_UP; /* ok */ @@ -717,10 +877,12 @@ trans_get_out_s(struct trans *self, int size) return rv; } + /*****************************************************************************/ /* returns error */ int APP_CC -trans_set_tls_mode(struct trans *self, const char *key, const char *cert) +trans_set_tls_mode(struct trans *self, const char *key, const char *cert, + int disableSSLv3, const char *tls_ciphers) { self->tls = ssl_tls_create(self, key, cert); if (self->tls == NULL) @@ -729,7 +891,7 @@ trans_set_tls_mode(struct trans *self, const char *key, const char *cert) return 1; } - if (ssl_tls_accept(self->tls) != 0) + if (ssl_tls_accept(self->tls, disableSSLv3, tls_ciphers) != 0) { g_writeln("trans_set_tls_mode: ssl_tls_accept failed"); return 1; @@ -742,6 +904,7 @@ trans_set_tls_mode(struct trans *self, const char *key, const char *cert) return 0; } + /*****************************************************************************/ /* returns error */ int APP_CC |