tpws : major connection code rewrite

This commit is contained in:
bolvan 2019-05-27 11:02:56 +03:00
parent 0c3a5aac87
commit 35b429c155
17 changed files with 1118 additions and 707 deletions

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -133,3 +133,8 @@ v26
ipv6 support ipv6 support
tpws : advanced bind options tpws : advanced bind options
v27
tpws : major connection code rewrite. originally it was derived from not top quality example , with many bugs and potential problems.
next generation connection code uses nonblocking sockets. now its in EXPERIMENTAL state.

View File

@ -1,4 +1,4 @@
zapret v.26 zapret v.27
Для чего это надо Для чего это надо
----------------- -----------------

30
tpws/params.h Normal file
View File

@ -0,0 +1,30 @@
#pragma once
#include <stdbool.h>
#include <net/if.h>
#include <stdint.h>
#include "strpool.h"
enum splithttpreq { split_none = 0, split_method, split_host };
struct params_s
{
char bindaddr[64],bindiface[IFNAMSIZ];
bool bind_if6;
bool bindll,bindll_force;
int bind_wait_ifup,bind_wait_ip,bind_wait_ip_ll;
uid_t uid;
gid_t gid;
uint16_t port;
bool daemon;
bool hostcase, hostdot, hosttab, hostnospace, methodspace, methodeol, unixeol;
char hostspell[4];
enum splithttpreq split_http_req;
int split_pos;
int maxconn;
char hostfile[256];
char pidfile[256];
strpool *hostlist;
};
extern struct params_s params;

182
tpws/tamper.c Normal file
View File

@ -0,0 +1,182 @@
#include "tamper.h"
#include "params.h"
#include <string.h>
#include <stdio.h>
char *find_bin(void *data, size_t len, const void *blk, size_t blk_len)
{
while (len >= blk_len)
{
if (!memcmp(data, blk, blk_len))
return data;
data = (char*)data + 1;
len--;
}
return NULL;
}
// pHost points to "Host: ..."
bool find_host(char **pHost,char *buf,size_t bs)
{
if (!*pHost)
{
*pHost = find_bin(buf, bs, "\nHost: ", 7);
if (*pHost) (*pHost)++;
printf("Found Host: at pos %zu\n",*pHost - buf);
}
return !!*pHost;
}
static const char *http_methods[] = { "GET /","POST /","HEAD /","OPTIONS /","PUT /","DELETE /","CONNECT /","TRACE /",NULL };
void modify_tcp_segment(char *segment,size_t *size,size_t *split_pos)
{
char *p, *pp, *pHost = NULL;
size_t method_len = 0, pos;
const char **method;
bool bIsHttp = false, bBypass = false;
char bRemovedHostSpace = 0;
char Host[128];
*split_pos=0;
for (method = http_methods; *method; method++)
{
method_len = strlen(*method);
if (method_len <= *size && !memcmp(segment, *method, method_len))
{
bIsHttp = true;
method_len -= 2; // "GET /" => "GET"
break;
}
}
if (bIsHttp)
{
printf("Data block looks like http request start : %s\n", *method);
// cpu saving : we search host only if and when required. we do not research host every time we need its position
if (params.hostlist && find_host(&pHost,segment,*size))
{
bool bInHostList = false;
p = pHost + 6;
while (p < (segment + *size) && (*p == ' ' || *p == '\t')) p++;
pp = p;
while (pp < (segment + *size) && (pp - p) < (sizeof(Host) - 1) && *pp != '\r' && *pp != '\n') pp++;
memcpy(Host, p, pp - p);
Host[pp - p] = '\0';
printf("Requested Host is : %s\n", Host);
for(p = Host; *p; p++) *p=tolower(*p);
p = Host;
while (p)
{
bInHostList = StrPoolCheckStr(params.hostlist, p);
printf("Hostlist check for %s : %s\n", p, bInHostList ? "positive" : "negative");
if (bInHostList) break;
p = strchr(p, '.');
if (p) p++;
}
bBypass = !bInHostList;
}
if (!bBypass)
{
if (params.unixeol)
{
p = pp = segment;
while (p = find_bin(p, segment + *size - p, "\r\n", 2))
{
*p = '\n'; p++;
memmove(p, p + 1, segment + *size - p - 1);
(*size)--;
if (pp == (p - 1))
{
// probably end of http headers
printf("Found double EOL at pos %zu. Stop replacing.\n", pp - segment);
break;
}
pp = p;
}
pHost = NULL; // invalidate
}
if (params.methodspace)
{
// we only work with data blocks looking as HTTP query, so method is at the beginning
printf("Adding extra space after method\n");
p = segment + method_len + 1;
pos = method_len + 1;
memmove(p + 1, p, *size - pos);
*p = ' '; // insert extra space
(*size)++; // block will grow by 1 byte
if (pHost) pHost++; // Host: position will move by 1 byte
}
if ((params.hostdot || params.hosttab) && find_host(&pHost,segment,*size))
{
p = pHost + 6;
while (p < (segment + *size) && *p != '\r' && *p != '\n') p++;
if (p < (segment + *size))
{
pos = p - segment;
printf("Adding %s to host name at pos %zu\n", params.hostdot ? "dot" : "tab", pos);
memmove(p + 1, p, *size - pos);
*p = params.hostdot ? '.' : '\t'; // insert dot or tab
(*size)++; // block will grow by 1 byte
}
}
if (params.hostnospace && find_host(&pHost,segment,*size) && pHost[5] == ' ')
{
p = pHost + 6;
pos = p - segment;
printf("Removing space before host name at pos %zu\n", pos);
memmove(p - 1, p, *size - pos);
(*size)--; // block will shrink by 1 byte
bRemovedHostSpace = 1;
}
if (!params.split_pos)
{
switch (params.split_http_req)
{
case split_method:
*split_pos = method_len - 1;
break;
case split_host:
if (find_host(&pHost,segment,*size))
*split_pos = pHost + 6 - bRemovedHostSpace - segment;
break;
}
}
if (params.hostcase && find_host(&pHost,segment,*size))
{
printf("Changing 'Host:' => '%c%c%c%c:' at pos %zu\n", params.hostspell[0], params.hostspell[1], params.hostspell[2], params.hostspell[3], pHost - segment);
memcpy(pHost, params.hostspell, 4);
}
if (params.methodeol)
{
printf("Adding EOL before method\n");
if (params.unixeol)
{
memmove(segment + 1, segment, *size);
(*size)++;;
segment[0] = '\n';
if (*split_pos) (*split_pos)++;
}
else
{
memmove(segment + 2, segment, *size);
*size += 2;
segment[0] = '\r';
segment[1] = '\n';
if (*split_pos) *split_pos += 2;
}
}
if (params.split_pos && params.split_pos < *size) *split_pos = params.split_pos;
}
else
{
printf("Not acting on this request\n");
}
}
else
{
printf("Data block does not look like http request start\n");
// this is the only parameter applicable to non-http block (may be https ?)
if (params.split_pos && params.split_pos < *size) *split_pos = params.split_pos;
}
}

8
tpws/tamper.h Normal file
View File

@ -0,0 +1,8 @@
#pragma once
#include <stdbool.h>
#include <sys/types.h>
char *find_bin(void *data, size_t len, const void *blk, size_t blk_len);
bool find_host(char **pHost,char *buf,size_t bs);
void modify_tcp_segment(char *segment,size_t *size,size_t *split_pos);

View File

@ -13,8 +13,6 @@
#include <sys/select.h> #include <sys/select.h>
#include <fcntl.h> #include <fcntl.h>
#include <stdint.h> #include <stdint.h>
#include <sys/queue.h>
#include <sys/epoll.h>
#include <sys/ioctl.h> #include <sys/ioctl.h>
#include <signal.h> #include <signal.h>
#include <errno.h> #include <errno.h>
@ -29,43 +27,10 @@
#include "tpws.h" #include "tpws.h"
#include "tpws_conn.h" #include "tpws_conn.h"
#include "hostlist.h" #include "hostlist.h"
#include "params.h"
enum splithttpreq { split_none = 0, split_method, split_host };
struct params_s
{
char bindaddr[64],bindiface[IFNAMSIZ];
bool bind_if6;
bool bindll,bindll_force;
int bind_wait_ifup,bind_wait_ip,bind_wait_ip_ll;
uid_t uid;
gid_t gid;
uint16_t port;
bool daemon;
bool hostcase, hostdot, hosttab, hostnospace, methodspace, methodeol, unixeol;
char hostspell[4];
enum splithttpreq split_http_req;
int split_pos;
int maxconn;
char hostfile[256];
char pidfile[256];
strpool *hostlist;
};
struct params_s params; struct params_s params;
unsigned char *find_bin(void *data, size_t len, const void *blk, size_t blk_len)
{
while (len >= blk_len)
{
if (!memcmp(data, blk, blk_len))
return data;
data = (char*)data + 1;
len--;
}
return NULL;
}
bool bHup = false; bool bHup = false;
void onhup(int sig) void onhup(int sig)
{ {
@ -82,410 +47,16 @@ void dohup()
if (params.hostlist) if (params.hostlist)
{ {
if (!LoadHostList(&params.hostlist, params.hostfile)) if (!LoadHostList(&params.hostlist, params.hostfile))
{
// what will we do without hostlist ?? sure, gonna die
exit(1); exit(1);
} }
}
bHup = false; bHup = false;
} }
} }
size_t send_with_flush(int sockfd, const void *buf, size_t len, int flags)
{
int flag, err;
size_t wr;
flag = 1;
setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
wr = send(sockfd, buf, len, flags);
err = errno;
flag = 0;
setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, (char *)&flag, sizeof(int));
errno = err;
return wr;
}
#define RD_BLOCK_SIZE 8192
// pHost points to "Host: ..."
bool find_host(char **pHost,char *buf,size_t bs)
{
if (!*pHost)
{
*pHost = find_bin(buf, bs, "\nHost: ", 7);
if (*pHost) (*pHost)++;
printf("Found Host: at pos %zu\n",*pHost - buf);
}
return !!*pHost;
}
static const char *http_methods[] = { "GET /","POST /","HEAD /","OPTIONS /","PUT /","DELETE /","CONNECT /","TRACE /",NULL };
void modify_tcp_segment(char *segment,size_t *size,size_t *split_pos)
{
char *p, *pp, *pHost = NULL;
size_t method_len = 0, pos;
const char **method;
bool bIsHttp = false, bBypass = false;
char bRemovedHostSpace = 0;
char Host[128];
*split_pos=0;
for (method = http_methods; *method; method++)
{
method_len = strlen(*method);
if (method_len <= *size && !memcmp(segment, *method, method_len))
{
bIsHttp = true;
method_len -= 2; // "GET /" => "GET"
break;
}
}
if (bIsHttp)
{
printf("Data block looks like http request start : %s\n", *method);
// cpu saving : we search host only if and when required. we do not research host every time we need its position
if (params.hostlist && find_host(&pHost,segment,*size))
{
bool bInHostList = false;
p = pHost + 6;
while (p < (segment + *size) && (*p == ' ' || *p == '\t')) p++;
pp = p;
while (pp < (segment + *size) && (pp - p) < (sizeof(Host) - 1) && *pp != '\r' && *pp != '\n') pp++;
memcpy(Host, p, pp - p);
Host[pp - p] = '\0';
printf("Requested Host is : %s\n", Host);
for(p = Host; *p; p++) *p=tolower(*p);
p = Host;
while (p)
{
bInHostList = StrPoolCheckStr(params.hostlist, p);
printf("Hostlist check for %s : %s\n", p, bInHostList ? "positive" : "negative");
if (bInHostList) break;
p = strchr(p, '.');
if (p) p++;
}
bBypass = !bInHostList;
}
if (!bBypass)
{
if (params.unixeol)
{
p = pp = segment;
while (p = find_bin(p, segment + *size - p, "\r\n", 2))
{
*p = '\n'; p++;
memmove(p, p + 1, segment + *size - p - 1);
(*size)--;
if (pp == (p - 1))
{
// probably end of http headers
printf("Found double EOL at pos %zu. Stop replacing.\n", pp - segment);
break;
}
pp = p;
}
pHost = NULL; // invalidate
}
if (params.methodspace)
{
// we only work with data blocks looking as HTTP query, so method is at the beginning
printf("Adding extra space after method\n");
p = segment + method_len + 1;
pos = method_len + 1;
memmove(p + 1, p, *size - pos);
*p = ' '; // insert extra space
(*size)++; // block will grow by 1 byte
if (pHost) pHost++; // Host: position will move by 1 byte
}
if ((params.hostdot || params.hosttab) && find_host(&pHost,segment,*size))
{
p = pHost + 6;
while (p < (segment + *size) && *p != '\r' && *p != '\n') p++;
if (p < (segment + *size))
{
pos = p - segment;
printf("Adding %s to host name at pos %zu\n", params.hostdot ? "dot" : "tab", pos);
memmove(p + 1, p, *size - pos);
*p = params.hostdot ? '.' : '\t'; // insert dot or tab
(*size)++; // block will grow by 1 byte
}
}
if (params.hostnospace && find_host(&pHost,segment,*size) && pHost[5] == ' ')
{
p = pHost + 6;
pos = p - segment;
printf("Removing space before host name at pos %zu\n", pos);
memmove(p - 1, p, *size - pos);
(*size)--; // block will shrink by 1 byte
bRemovedHostSpace = 1;
}
if (!params.split_pos)
{
switch (params.split_http_req)
{
case split_method:
*split_pos = method_len - 1;
break;
case split_host:
if (find_host(&pHost,segment,*size))
*split_pos = pHost + 6 - bRemovedHostSpace - segment;
break;
}
}
if (params.hostcase && find_host(&pHost,segment,*size))
{
printf("Changing 'Host:' => '%c%c%c%c:' at pos %zu\n", params.hostspell[0], params.hostspell[1], params.hostspell[2], params.hostspell[3], pHost - segment);
memcpy(pHost, params.hostspell, 4);
}
if (params.methodeol)
{
printf("Adding EOL before method\n");
if (params.unixeol)
{
memmove(segment + 1, segment, *size);
(*size)++;;
segment[0] = '\n';
if (*split_pos) (*split_pos)++;
}
else
{
memmove(segment + 2, segment, *size);
*size += 2;
segment[0] = '\r';
segment[1] = '\n';
if (*split_pos) *split_pos += 2;
}
}
if (params.split_pos && params.split_pos < *size) *split_pos = params.split_pos;
}
else
{
printf("Not acting on this request\n");
}
}
else
{
printf("Data block does not look like http request start\n");
// this is the only parameter applicable to non-http block (may be https ?)
if (params.split_pos && params.split_pos < *size) *split_pos = params.split_pos;
}
}
bool handle_epollin(tproxy_conn_t *conn, ssize_t *data_transferred)
{
int numbytes;
int fd_in, fd_out;
bool bOutgoing;
ssize_t rd = 0, wr = 0;
size_t bs;
//Easy way to determin which socket is ready for reading
//TODO: Optimize. This one allows me quick lookup for conn, but
//I need to make a system call to determin which socket
numbytes = 0;
if (ioctl(conn->local_fd, FIONREAD, &numbytes) != -1
&& numbytes > 0) {
fd_in = conn->local_fd;
fd_out = conn->remote_fd;
bOutgoing = true;
}
else {
fd_in = conn->remote_fd;
fd_out = conn->local_fd;
numbytes = 0;
ioctl(fd_in, FIONREAD, &numbytes);
bOutgoing = false;
}
if (numbytes)
{
if (bOutgoing)
{
char buf[RD_BLOCK_SIZE + 4];
rd = recv(fd_in, buf, RD_BLOCK_SIZE, MSG_DONTWAIT);
if (rd > 0)
{
size_t split_pos;
bs = rd;
modify_tcp_segment(buf,&bs,&split_pos);
if (split_pos)
{
printf("Splitting at pos %zu\n", split_pos);
wr = send_with_flush(fd_out, buf, split_pos, 0);
if (wr >= 0)
wr = send(fd_out, buf + split_pos, bs - split_pos, 0);
}
else
{
wr = send(fd_out, buf, bs, 0);
}
}
}
else
{
// *** we are not interested in incoming traffic
// splice it without processing
//printf("splicing numbytes=%d\n",numbytes);
rd = numbytes = splice(fd_in, NULL, conn->splice_pipe[1], NULL,
SPLICE_LEN, SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
//printf("spliced rd=%d\n",rd);
if (rd > 0)
{
wr = splice(conn->splice_pipe[0], NULL, fd_out, NULL,
rd, SPLICE_F_MOVE);
}
//printf("splice rd=%d wr=%d\n",rd,wr);
}
}
if (data_transferred) *data_transferred = rd < 0 ? 0 : rd;
return rd != -1 && wr != -1;
}
void remove_closed_connections(struct tailhead *close_list)
{
tproxy_conn_t *conn = NULL;
while (close_list->tqh_first != NULL) {
conn = (tproxy_conn_t*)close_list->tqh_first;
TAILQ_REMOVE(close_list, close_list->tqh_first, conn_ptrs);
ssize_t rd = 0;
while (handle_epollin(conn, &rd) && rd);
printf("Socket %d and %d closed, connection removed\n",
conn->local_fd, conn->remote_fd);
free_conn(conn);
}
}
void close_tcp_conn(tproxy_conn_t *conn, struct tailhead *conn_list, struct tailhead *close_list)
{
conn->state = CONN_CLOSED;
TAILQ_REMOVE(conn_list, conn, conn_ptrs);
TAILQ_INSERT_TAIL(close_list, conn, conn_ptrs);
}
int event_loop(int listen_fd)
{
int retval = 0, num_events = 0;
int tmp_fd = 0; //Used to temporarily hold the accepted file descriptor
tproxy_conn_t *conn = NULL;
int efd, i;
struct epoll_event ev, events[MAX_EPOLL_EVENTS];
struct tailhead conn_list, close_list;
uint8_t check_close = 0;
int conncount = 0;
//Initialize queue (remember that TAILQ_HEAD just defines the struct)
TAILQ_INIT(&conn_list);
TAILQ_INIT(&close_list);
if ((efd = epoll_create(1)) == -1) {
perror("epoll_create");
return -1;
}
//Start monitoring listen socket
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
//There is only one listen socket, and I want to use ptr in order to have
//easy access to the connections. So if ptr is NULL that means an event on
//listen socket.
ev.data.ptr = NULL;
if (epoll_ctl(efd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
perror("epoll_ctl (listen socket)");
return -1;
}
while (1) {
if ((num_events = epoll_wait(efd, events, MAX_EPOLL_EVENTS, -1)) == -1) {
if (errno == EINTR) continue; // system call was interrupted
perror("epoll_wait");
retval = -1;
break;
}
dohup();
for (i = 0; i < num_events; i++) {
if (events[i].data.ptr == NULL) {
//Accept new connection
tmp_fd = accept(listen_fd, NULL, 0);
if (tmp_fd < 0)
{
fprintf(stderr, "Failed to accept connection\n");
}
else if (conncount >= params.maxconn)
{
close(tmp_fd);
fprintf(stderr, "Too much connections : %d\n", conncount);
}
else if ((conn = add_tcp_connection(efd, &conn_list, tmp_fd, params.port)) == NULL)
{
close(tmp_fd);
fprintf(stderr, "Failed to add connection\n");
}
else
{
conncount++;
printf("Connections : %d\n", conncount);
}
}
else {
conn = (tproxy_conn_t*)events[i].data.ptr;
//Only applies to remote_fd, connection attempt has
//succeeded/failed
if (events[i].events & EPOLLOUT) {
if (check_connection_attempt(conn, efd) == -1) {
fprintf(stderr, "Connection attempt failed for %d\n",
conn->remote_fd);
check_close = 1;
close_tcp_conn(conn, &conn_list, &close_list);
conncount--;
}
continue;
}
else if (conn->state != CONN_CLOSED &&
(events[i].events & EPOLLRDHUP ||
events[i].events & EPOLLHUP ||
events[i].events & EPOLLERR)) {
check_close = 1;
close_tcp_conn(conn, &conn_list, &close_list);
conncount--;
continue;
}
//Since I use an event cache, earlier events might cause for
//example this connection to be closed. No need to process fd if
//that is the case
if (conn->state == CONN_CLOSED) {
continue;
}
if (!handle_epollin(conn, NULL)) {
close_tcp_conn(conn, &conn_list, &close_list);
conncount--;
check_close = 1;
}
}
}
//Remove connections
if (check_close)
remove_closed_connections(&close_list);
check_close = 0;
}
//Add cleanup
return retval;
}
int8_t block_sigpipe() int8_t block_sigpipe()
{ {
@ -768,7 +339,7 @@ void parse_params(int argc, char *argv[])
void daemonize() void daemonize()
{ {
int pid; int pid,fd;
pid = fork(); pid = fork();
if (pid == -1) if (pid == -1)
@ -789,9 +360,9 @@ void daemonize()
/* redirect fd's 0,1,2 to /dev/null */ /* redirect fd's 0,1,2 to /dev/null */
open("/dev/null", O_RDWR); open("/dev/null", O_RDWR);
/* stdin */ /* stdin */
dup(0); fd=dup(0);
/* stdout */ /* stdout */
dup(0); fd=dup(0);
/* stderror */ /* stderror */
} }
@ -822,7 +393,7 @@ int getmaxcap()
FILE *F = fopen("/proc/sys/kernel/cap_last_cap","r"); FILE *F = fopen("/proc/sys/kernel/cap_last_cap","r");
if (F) if (F)
{ {
fscanf(F,"%d",&maxcap); int n=fscanf(F,"%d",&maxcap);
fclose(F); fclose(F);
} }
return maxcap; return maxcap;
@ -1068,11 +639,6 @@ int main(int argc, char *argv[]) {
perror("setsockopt (SO_REUSEADDR): "); perror("setsockopt (SO_REUSEADDR): ");
goto exiterr; goto exiterr;
} }
if (setsockopt(listen_fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)) == -1)
{
perror("setsockopt (SO_KEEPALIVE): ");
goto exiterr;
}
//Mark that this socket can be used for transparent proxying //Mark that this socket can be used for transparent proxying
//This allows the socket to accept connections for non-local IPs //This allows the socket to accept connections for non-local IPs

View File

@ -1,37 +1,3 @@
#ifndef TPROXY_EXAMPLE_H #pragma once
#define TPROXY_EXAMPLE_H
#include <stdint.h> void dohup();
#include <sys/queue.h>
#define BACKLOG 10
#define MAX_EPOLL_EVENTS BACKLOG
#define IP_TRANSPARENT 19 //So that application compiles on OpenWRT
#define SPLICE_LEN 65536
#define DEFAULT_MAX_CONN 512
//Three different states of a connection
enum{
CONN_AVAILABLE=0,
CONN_CLOSED,
};
typedef uint8_t conn_state_t;
struct tproxy_conn{
int local_fd; //Connection to host on local network
int remote_fd; //Connection to remote host
int splice_pipe[2]; //Have pipes per connection for now. Multiplexing
//different connections onto pipes is tricky, for
//example when flushing pipe after one connection has
//failed.
conn_state_t state;
//Create the struct which contains ptrs to next/prev element
TAILQ_ENTRY(tproxy_conn) conn_ptrs;
};
typedef struct tproxy_conn tproxy_conn_t;
//Define the struct tailhead (code in sys/queue.h is quite intuitive)
//Use tail queue for efficient delete
TAILQ_HEAD(tailhead, tproxy_conn);
#endif

View File

@ -1,3 +1,4 @@
#define _GNU_SOURCE
#include <stdio.h> #include <stdio.h>
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
@ -8,21 +9,151 @@
#include <sys/socket.h> #include <sys/socket.h>
#include <arpa/inet.h> #include <arpa/inet.h>
#include <netinet/ip.h> #include <netinet/ip.h>
#include <netinet/tcp.h>
#include <sys/epoll.h> #include <sys/epoll.h>
#include <sys/ioctl.h>
#include <fcntl.h> #include <fcntl.h>
#include <linux/netfilter_ipv4.h> #include <linux/netfilter_ipv4.h>
#include <ifaddrs.h> #include <ifaddrs.h>
#include "tpws.h"
#include "tpws_conn.h" #include "tpws_conn.h"
#include "tamper.h"
#include "params.h"
#ifndef IP6T_SO_ORIGINAL_DST #ifndef IP6T_SO_ORIGINAL_DST
#define IP6T_SO_ORIGINAL_DST 80 #define IP6T_SO_ORIGINAL_DST 80
#endif #endif
int linger(int sock_fd)
bool send_buffer_create(send_buffer_t *sb, char *data, size_t len)
{
if (sb->data)
{
fprintf(stderr,"FATAL : send_buffer_create but buffer is not empty\n");
exit(1);
}
sb->data = malloc(len);
if (!sb->data) return false;
if (data) memcpy(sb->data,data,len);
sb->len = len;
sb->pos = 0;
return true;
}
bool send_buffer_free(send_buffer_t *sb)
{
if (sb->data)
{
free(sb->data);
sb->data = NULL;
}
}
void send_buffers_free(send_buffer_t *sb_array, int count)
{
for (int i=0;i<count;i++)
send_buffer_free(sb_array+i);
}
void conn_free_buffers(tproxy_conn_t *conn)
{
send_buffers_free(conn->wr_buf,sizeof(conn->wr_buf)/sizeof(conn->wr_buf[0]));
}
bool send_buffer_present(send_buffer_t *sb)
{
return !!sb->data;
}
bool send_buffers_present(send_buffer_t *sb_array, int count)
{
for(int i=0;i<count;i++)
if (send_buffer_present(sb_array+i))
return true;
return false;
}
ssize_t send_buffer_send(send_buffer_t *sb, int fd)
{
ssize_t wr;
wr = send(fd, sb->data + sb->pos, sb->len - sb->pos, 0);
if (wr>0)
{
sb->pos += wr;
if (sb->pos >= sb->len)
{
send_buffer_free(sb);
}
}
else if (wr<0 && errno==EAGAIN) wr=0;
return wr;
}
ssize_t send_buffers_send(send_buffer_t *sb_array, int count, int fd, size_t *real_wr)
{
ssize_t wr=0,twr=0;
for (int i=0;i<count;i++)
{
if (send_buffer_present(sb_array+i))
{
wr = send_buffer_send(sb_array+i, fd);
if (wr<0)
{
if (real_wr) *real_wr = twr;
return wr; // send error
}
twr+=wr;
if (send_buffer_present(sb_array+i)) // send next buffer only when current is fully sent
break;
}
}
if (real_wr) *real_wr = twr;
return twr;
}
bool conn_buffers_present(tproxy_conn_t *conn)
{
return send_buffers_present(conn->wr_buf,sizeof(conn->wr_buf)/sizeof(conn->wr_buf[0]));
}
ssize_t conn_buffers_send(tproxy_conn_t *conn)
{
size_t wr,real_twr;
wr = send_buffers_send(conn->wr_buf,sizeof(conn->wr_buf)/sizeof(conn->wr_buf[0]), conn->fd, &real_twr);
conn->twr += real_twr;
return wr;
}
bool conn_has_unsent(tproxy_conn_t *conn)
{
return !conn->remote && conn->wr_unsent || conn_buffers_present(conn);
}
bool conn_has_unsent_pair(tproxy_conn_t *conn)
{
return conn_has_unsent(conn) || (conn->partner && conn_has_unsent(conn->partner));
}
ssize_t send_or_buffer(send_buffer_t *sb, int fd, char *buf, size_t len)
{
ssize_t wr=0;
if (len)
{
wr = send(fd, buf, len, 0);
if (wr<0 && errno==EAGAIN) wr=0;
if (wr>=0 && wr<len)
{
if (!send_buffer_create(sb, buf+wr, len-wr))
wr=-1;
}
}
return wr;
}
bool set_linger(int fd)
{ {
struct linger ling={1,5}; struct linger ling={1,5};
return setsockopt(sock_fd,SOL_SOCKET,SO_LINGER,&ling,sizeof(ling)); return setsockopt(fd,SOL_SOCKET,SO_LINGER,&ling,sizeof(ling))!=-1;
}
int set_keepalive(int fd)
{
int yes=1;
return setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(int))!=-1;
} }
bool ismapped(const struct sockaddr_in6 *sa) bool ismapped(const struct sockaddr_in6 *sa)
@ -47,7 +178,7 @@ uint16_t saport(const struct sockaddr *sa)
sa->sa_family==AF_INET6 ? ((struct sockaddr_in6*)sa)->sin6_port : 0); sa->sa_family==AF_INET6 ? ((struct sockaddr_in6*)sa)->sin6_port : 0);
} }
// -1 = error, 0 = not local, 1 = local // -1 = error, 0 = not local, 1 = local
int check_local_ip(const struct sockaddr *saddr) bool check_local_ip(const struct sockaddr *saddr)
{ {
struct ifaddrs *addrs,*a; struct ifaddrs *addrs,*a;
@ -59,45 +190,52 @@ int check_local_ip(const struct sockaddr *saddr)
if (a->ifa_addr && sacmp(a->ifa_addr,saddr)) if (a->ifa_addr && sacmp(a->ifa_addr,saddr))
{ {
freeifaddrs(addrs); freeifaddrs(addrs);
return 1; return true;
} }
a = a->ifa_next; a = a->ifa_next;
} }
freeifaddrs(addrs); freeifaddrs(addrs);
return 0; return false;
} }
//Createas a socket and initiates the connection to the host specified by //Createas a socket and initiates the connection to the host specified by
//remote_addr. //remote_addr.
//Returns 0 if something fails, >0 on success (socket fd). //Returns 0 if something fails, >0 on success (socket fd).
static int connect_remote(struct sockaddr_storage *remote_addr){ static int connect_remote(struct sockaddr_storage *remote_addr)
{
int remote_fd = 0, yes = 1; int remote_fd = 0, yes = 1;
//Use NONBLOCK to avoid slow connects affecting the performance of other connections
//Use NONBLOCK to avoid slow connects affecting the performance of other if((remote_fd = socket(remote_addr->ss_family, SOCK_STREAM | SOCK_NONBLOCK, 0)) < 0){
//connections
if((remote_fd = socket(remote_addr->ss_family, SOCK_STREAM |
SOCK_NONBLOCK, 0)) < 0){
perror("socket (connect_remote): "); perror("socket (connect_remote): ");
return 0; return 0;
} }
if(setsockopt(remote_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0){ if(setsockopt(remote_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)) < 0)
{
perror("setsockopt (SO_REUSEADDR, connect_remote): "); perror("setsockopt (SO_REUSEADDR, connect_remote): ");
close(remote_fd); close(remote_fd);
return 0; return 0;
} }
if(setsockopt(remote_fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)) < 0){ if(!set_keepalive(remote_fd))
perror("setsockopt (SO_KEEPALIVE, connect_remote): "); {
perror("set_keepalive: ");
close(remote_fd);
return 0;
}
if (setsockopt(remote_fd, IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)) <0)
{
perror("setsockopt (SO_NODELAY, connect_remote): ");
close(remote_fd); close(remote_fd);
return 0; return 0;
} }
if(connect(remote_fd, (struct sockaddr*) remote_addr, if(connect(remote_fd, (struct sockaddr*) remote_addr,
remote_addr->ss_family == AF_INET ? sizeof(struct sockaddr_in) : remote_addr->ss_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)) < 0)
sizeof(struct sockaddr_in6)) < 0){ {
if(errno != EINPROGRESS){ if(errno != EINPROGRESS)
{
perror("connect (connect_remote): "); perror("connect (connect_remote): ");
close(remote_fd); close(remote_fd);
return 0; return 0;
@ -109,7 +247,8 @@ static int connect_remote(struct sockaddr_storage *remote_addr){
//Store the original destination address in remote_addr //Store the original destination address in remote_addr
//Return 0 on success, <0 on failure //Return 0 on success, <0 on failure
static int get_org_dstaddr(int sockfd, struct sockaddr_storage *orig_dst){ static bool get_dest_addr(int sockfd, struct sockaddr_storage *orig_dst)
{
char orig_dst_str[INET6_ADDRSTRLEN]; char orig_dst_str[INET6_ADDRSTRLEN];
socklen_t addrlen = sizeof(*orig_dst); socklen_t addrlen = sizeof(*orig_dst);
int r; int r;
@ -132,21 +271,111 @@ static int get_org_dstaddr(int sockfd, struct sockaddr_storage *orig_dst){
if (r<0) if (r<0)
{ {
perror("getsockname: "); perror("getsockname: ");
return -1; return false;
} }
} }
if(orig_dst->ss_family == AF_INET){ if (orig_dst->ss_family == AF_INET)
inet_ntop(AF_INET, {
&(((struct sockaddr_in*) orig_dst)->sin_addr), inet_ntop(AF_INET, &(((struct sockaddr_in*) orig_dst)->sin_addr), orig_dst_str, INET_ADDRSTRLEN);
orig_dst_str, INET_ADDRSTRLEN); printf("Original destination for socket fd=%d : %s:%d\n", sockfd,orig_dst_str, htons(((struct sockaddr_in*) orig_dst)->sin_port));
fprintf(stderr, "Original destination for socket %d : %s:%d\n", sockfd,orig_dst_str, htons(((struct sockaddr_in*) orig_dst)->sin_port));
} else if(orig_dst->ss_family == AF_INET6){
inet_ntop(AF_INET6,
&(((struct sockaddr_in6*) orig_dst)->sin6_addr),
orig_dst_str, INET6_ADDRSTRLEN);
fprintf(stderr, "Original destination for socket %d : [%s]:%d\n", sockfd,orig_dst_str, htons(((struct sockaddr_in6*) orig_dst)->sin6_port));
} }
return 0; else if (orig_dst->ss_family == AF_INET6)
{
inet_ntop(AF_INET6,&(((struct sockaddr_in6*) orig_dst)->sin6_addr), orig_dst_str, INET6_ADDRSTRLEN);
printf("Original destination for socket fd=%d : [%s]:%d\n", sockfd,orig_dst_str, htons(((struct sockaddr_in6*) orig_dst)->sin6_port));
}
return true;
}
//Free resources occupied by this connection
void free_conn(tproxy_conn_t *conn)
{
if (conn->fd) close(conn->fd);
if (conn->splice_pipe[0])
{
close(conn->splice_pipe[0]);
close(conn->splice_pipe[1]);
}
conn_free_buffers(conn);
if (conn->partner) conn->partner->partner=NULL;
free(conn);
}
static tproxy_conn_t *new_conn(int fd, bool remote)
{
tproxy_conn_t *conn;
//Create connection object and fill in information
if((conn = (tproxy_conn_t*) malloc(sizeof(tproxy_conn_t))) == NULL)
{
fprintf(stderr, "Could not allocate memory for connection\n");
return NULL;
}
memset(conn, 0, sizeof(tproxy_conn_t));
conn->state = CONN_UNAVAILABLE;
conn->fd = fd;
conn->remote = remote;
// pipe only needed for one leg. other we process by send/recv
// lets store pipe in local leg
if(!remote && pipe2(conn->splice_pipe, O_NONBLOCK) != 0)
{
fprintf(stderr, "Could not create the splice pipe\n");
free_conn(conn);
return NULL;
}
return conn;
}
bool epoll_set(tproxy_conn_t *conn, uint32_t events)
{
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = events;
ev.data.ptr = (void*) conn;
if(epoll_ctl(conn->efd, EPOLL_CTL_MOD, conn->fd, &ev)==-1 &&
epoll_ctl(conn->efd, EPOLL_CTL_ADD, conn->fd, &ev)==-1)
{
perror("epoll_ctl (add/mod)");
return false;
}
return true;
}
bool epoll_del(tproxy_conn_t *conn)
{
struct epoll_event ev;
memset(&ev, 0, sizeof(ev));
if(epoll_ctl(conn->efd, EPOLL_CTL_DEL, conn->fd, &ev)==-1)
{
perror("epoll_ctl (del)");
return false;
}
return true;
}
bool epoll_update_flow(tproxy_conn_t *conn)
{
if (conn->bFlowInPrev==conn->bFlowIn && conn->bFlowOutPrev==conn->bFlowOut && conn->bPrevRdhup==(conn->state==CONN_RDHUP))
return true; // unchanged, no need to syscall
uint32_t evtmask = (conn->state==CONN_RDHUP ? 0 : EPOLLRDHUP)|(conn->bFlowIn?EPOLLIN:0)|(conn->bFlowOut?EPOLLOUT:0);
if (!epoll_set(conn, evtmask))
return false;
//printf("SET FLOW fd=%d to in=%d out=%d state_rdhup=%d\n",conn->fd,conn->bFlowIn,conn->bFlowOut,conn->state==CONN_RDHUP);
conn->bFlowInPrev = conn->bFlowIn;
conn->bFlowOutPrev = conn->bFlowOut;
conn->bPrevRdhup = (conn->state==CONN_RDHUP);
return true;
}
bool epoll_set_flow(tproxy_conn_t *conn, bool bFlowIn, bool bFlowOut)
{
conn->bFlowIn = bFlowIn;
conn->bFlowOut = bFlowOut;
return epoll_update_flow(conn);
} }
//Acquires information, initiates a connect and initialises a new connection //Acquires information, initiates a connect and initialises a new connection
@ -157,57 +386,59 @@ tproxy_conn_t* add_tcp_connection(int efd, struct tailhead *conn_list,
struct sockaddr_storage orig_dst; struct sockaddr_storage orig_dst;
tproxy_conn_t *conn; tproxy_conn_t *conn;
int remote_fd; int remote_fd;
struct epoll_event ev; int yes=1;
if(get_org_dstaddr(local_fd, &orig_dst)){ if(!get_dest_addr(local_fd, &orig_dst))
fprintf(stderr, "Could not get local address\n"); {
fprintf(stderr, "Could not get destination address\n");
close(local_fd); close(local_fd);
return NULL; return NULL;
} }
if (check_local_ip((struct sockaddr*)&orig_dst)==1 && saport((struct sockaddr*)&orig_dst)==listen_port) if (check_local_ip((struct sockaddr*)&orig_dst) && saport((struct sockaddr*)&orig_dst)==listen_port)
{ {
fprintf(stderr, "Dropping connection to local address to the same port to avoid loop\n"); fprintf(stderr, "Dropping connection to local address to the same port to avoid loop\n");
close(local_fd); close(local_fd);
return NULL; return NULL;
} }
if(!set_keepalive(local_fd))
{
perror("set_keepalive: ");
close(local_fd);
return 0;
}
if((remote_fd = connect_remote(&orig_dst)) == 0){ if(!(remote_fd = connect_remote(&orig_dst)))
{
fprintf(stderr, "Failed to connect\n"); fprintf(stderr, "Failed to connect\n");
close(remote_fd);
close(local_fd); close(local_fd);
return NULL; return NULL;
} }
//Create connection object and fill in information if(!(conn = new_conn(local_fd, false)))
if((conn = (tproxy_conn_t*) malloc(sizeof(tproxy_conn_t))) == NULL){ {
fprintf(stderr, "Could not allocate memory for connection\n");
close(remote_fd); close(remote_fd);
close(local_fd); close(local_fd);
return NULL; return NULL;
} }
conn->state = CONN_AVAILABLE; // accepted connection is immediately available
conn->efd = efd;
memset(conn, 0, sizeof(tproxy_conn_t)); if(!(conn->partner = new_conn(remote_fd, true)))
conn->state = CONN_AVAILABLE; {
conn->remote_fd = remote_fd;
conn->local_fd = local_fd;
if(pipe(conn->splice_pipe) != 0){
fprintf(stderr, "Could not create the required pipe\n");
free_conn(conn); free_conn(conn);
close(remote_fd);
return NULL; return NULL;
} }
conn->partner->partner = conn;
conn->partner->efd = efd;
//remote_fd is connecting. Non-blocking connects are signaled as done by //remote_fd is connecting. Non-blocking connects are signaled as done by
//socket being marked as ready for writing //socket being marked as ready for writing
memset(&ev, 0, sizeof(ev)); if (!epoll_set(conn->partner, EPOLLOUT|EPOLLERR))
ev.events = EPOLLIN | EPOLLOUT; {
ev.data.ptr = (void*) conn; free_conn(conn->partner);
if(epoll_ctl(efd, EPOLL_CTL_ADD, remote_fd, &ev) == -1){
perror("epoll_ctl (remote_fd)");
free_conn(conn); free_conn(conn);
return NULL; return NULL;
} }
@ -216,78 +447,453 @@ tproxy_conn_t* add_tcp_connection(int efd, struct tailhead *conn_list,
//to detect this when waiting for connect() to complete. However, I dont //to detect this when waiting for connect() to complete. However, I dont
//want to get EPOLLIN-events, as I dont want to receive any data before //want to get EPOLLIN-events, as I dont want to receive any data before
//remote connection is established //remote connection is established
ev.events = EPOLLRDHUP; if (!epoll_set(conn, 0))
{
if(epoll_ctl(efd, EPOLL_CTL_ADD, local_fd, &ev) == -1){ free_conn(conn->partner);
perror("epoll_ctl (local_fd)");
free_conn(conn); free_conn(conn);
return NULL; return NULL;
} else }
{
TAILQ_INSERT_HEAD(conn_list, conn, conn_ptrs); TAILQ_INSERT_HEAD(conn_list, conn, conn_ptrs);
TAILQ_INSERT_HEAD(conn_list, conn->partner, conn_ptrs);
return conn; return conn;
}
}
//Free resources occupied by this connection
void free_conn(tproxy_conn_t *conn){
close(conn->remote_fd);
close(conn->local_fd);
if(conn->splice_pipe[0] != 0){
close(conn->splice_pipe[0]);
close(conn->splice_pipe[1]);
}
free(conn);
} }
//Checks if a connection attempt was successful or not //Checks if a connection attempt was successful or not
//Returns 0 if successfull, -1 if not //Returns true if successfull, false if not
int8_t check_connection_attempt(tproxy_conn_t *conn, int efd){ bool check_connection_attempt(tproxy_conn_t *conn, int efd)
struct epoll_event ev; {
int conn_success = 0;
int fd_flags = 0; int fd_flags = 0;
int conn_success = 0;
socklen_t optlen = sizeof(conn_success); socklen_t optlen = sizeof(conn_success);
//If the connection was sucessfull or not is contained in SO_ERROR if (conn->state!=CONN_UNAVAILABLE || !conn->remote)
if(getsockopt(conn->remote_fd, SOL_SOCKET, SO_ERROR, &conn_success, {
&optlen) == -1){ // locals are connected since accept
// remote need to be checked only once
return true;
}
// check the connection was sucessfull. it means its not in in SO_ERROR state
if(getsockopt(conn->fd, SOL_SOCKET, SO_ERROR, &conn_success, &optlen) == -1)
{
perror("getsockopt (SO_ERROR)"); perror("getsockopt (SO_ERROR)");
return -1; return false;
}
if(conn_success == 0)
{
printf("Socket fd=%d (remote) connected\n", conn->fd);
if (!epoll_set_flow(conn, true, false) || !epoll_set_flow(conn->partner, true, false))
return false;
conn->state = CONN_AVAILABLE;
return true;
} }
if(conn_success == 0){ return false;
fprintf(stderr, "Socket %d connected\n", conn->remote_fd);
//Set socket as blocking now, for ease of processing
//TODO: Non-blocking
if((fd_flags = fcntl(conn->remote_fd, F_GETFL)) == -1){
perror("fcntl (F_GETFL)");
return -1;
}
if(fcntl(conn->remote_fd, F_SETFL, fd_flags & ~O_NONBLOCK) == -1){
perror("fcntl (F_SETFL)");
return -1;
}
//Update both file descriptors. I am interested in EPOLLIN (if there is
//any data) and EPOLLRDHUP (remote peer closed socket). As this is just
//an example, EPOLLOUT is ignored and it is OK for send() to block
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLRDHUP;
ev.data.ptr = (void*) conn;
if(epoll_ctl(efd, EPOLL_CTL_MOD, conn->remote_fd, &ev) == -1 ||
epoll_ctl(efd, EPOLL_CTL_MOD, conn->local_fd, &ev) == -1){
perror("epoll_ctl (check_connection_attempt)");
return -1;
} else {
return 0;
}
}
return -1;
} }
bool epoll_set_flow_pair(tproxy_conn_t *conn)
{
bool bHasUnsent = conn_has_unsent(conn);
bool bHasUnsentPartner = conn->partner ? conn_has_unsent(conn->partner) : false;
if (!epoll_set_flow(conn, !bHasUnsentPartner, bHasUnsent))
return false;
if (conn->partner)
if (!epoll_set_flow(conn->partner, !bHasUnsent, bHasUnsentPartner))
return false;
return true;
}
bool handle_unsent(tproxy_conn_t *conn)
{
ssize_t wr=0,twr=0;
//printf("+handle_unsent, fd=%d has_unsent=%d has_unsent_partner=%d\n",conn->fd,conn_has_unsent(conn),conn->partner ? conn_has_unsent(conn->partner) : false);
// its possible to have unsent data both in the pipe and in buffers
// but we initialize pipe only on local leg
if (!conn->remote)
{
if (conn->wr_unsent)
{
wr = splice(conn->splice_pipe[0], NULL, conn->fd, NULL, conn->wr_unsent, SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
//printf("splice unsent=%zd wr=%zd err=%d\n",conn->wr_unsent,wr,errno);
if (wr<0)
{
if (errno==EAGAIN) wr=0;
else return false;
}
twr += wr;
conn->twr += wr;
conn->wr_unsent -= wr;
}
}
if (!conn->wr_unsent && conn_buffers_present(conn))
{
wr=conn_buffers_send(conn);
if (wr<0) return false;
twr += wr;
}
return epoll_set_flow_pair(conn);
}
#define RD_BLOCK_SIZE 8192
bool handle_epoll(tproxy_conn_t *conn, uint32_t evt)
{
int numbytes;
ssize_t rd = 0, wr = 0;
size_t bs;
//printf("+handle_epoll\n");
if (!handle_unsent(conn))
return false; // error
if (!conn->partner && !conn_has_unsent(conn))
return false; // when no partner, we only waste read and send unsent
if (!(evt & EPOLLIN))
return true; // nothing to read
if (!conn->partner)
{
// throw it to a black hole
char waste[1448];
ssize_t rrd;
while((rrd=recv(conn->fd, waste, sizeof(waste), MSG_DONTWAIT))>0)
{
rd+=rrd;
conn->trd+=rrd;
}
//printf("wasted recv=%zd all_rd=%zd err=%d\n",rrd,rd,errno);
return true;
}
// do not receive new until old is sent
if (conn_has_unsent(conn->partner))
return true;
numbytes=-1;
ioctl(conn->fd, FIONREAD, &numbytes)!=-1;
//printf("numbytes=%d\n",numbytes);
if (numbytes>0)
{
if (conn->remote)
{
// incoming data from remote leg we splice without touching
// pipe is in the local leg, so its in conn->partner->splice_pipe
rd = splice(conn->fd, NULL, conn->partner->splice_pipe[1], NULL, SPLICE_LEN, SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
//printf("splice len=%d rd=%zd err=%d\n",SPLICE_LEN,rd,errno);
if (rd<0 && errno==EAGAIN) rd=0;
if (rd>0)
{
conn->trd += rd;
conn->partner->wr_unsent += rd;
wr = splice(conn->partner->splice_pipe[0], NULL, conn->partner->fd, NULL, conn->partner->wr_unsent, SPLICE_F_MOVE | SPLICE_F_NONBLOCK);
//printf("splice wr=%zd err=%d\n",wr,errno);
if (wr<0 && errno==EAGAIN) wr=0;
if (wr>0)
{
conn->partner->wr_unsent -= wr;
conn->partner->twr += wr;
}
}
}
else
{
// incoming data from local leg
char buf[RD_BLOCK_SIZE + 4];
rd = recv(conn->fd, buf, RD_BLOCK_SIZE, MSG_DONTWAIT);
if (rd<0 && errno==EAGAIN) rd=0;
if (rd>0)
{
conn->trd+=rd;
size_t split_pos=0;
bs = rd;
modify_tcp_segment(buf,&bs,&split_pos);
if (split_pos)
{
printf("Splitting at pos %zu\n", split_pos);
wr = send_or_buffer(conn->partner->wr_buf, conn->partner->fd, buf, split_pos);
if (wr >= 0)
{
conn->partner->twr += wr;
wr = send_or_buffer(conn->partner->wr_buf + 1, conn->partner->fd, buf + split_pos, bs - split_pos);
if (wr>0) conn->partner->twr += wr;
}
}
else
{
wr = send_or_buffer(conn->partner->wr_buf, conn->partner->fd, buf, bs);
if (wr>0) conn->partner->twr += wr;
}
}
}
if (!epoll_set_flow_pair(conn))
return false;
}
//printf("-handle_epoll rd=%zd wr=%zd\n",rd,wr);
return rd != -1 && wr != -1;
}
bool remove_closed_connections(int efd, struct tailhead *close_list)
{
tproxy_conn_t *conn = NULL;
bool bRemoved = false;
while (conn = TAILQ_FIRST(close_list))
{
TAILQ_REMOVE(close_list, conn, conn_ptrs);
shutdown(conn->fd,SHUT_RDWR);
epoll_del(conn);
printf("Socket fd=%d (partner_fd=%d, remote=%d) closed, connection removed. total_read=%zu total_write=%zu\n",
conn->fd, conn->partner ? conn->partner->fd : 0, conn->remote, conn->trd, conn->twr);
free_conn(conn);
bRemoved = true;
}
return bRemoved;
}
// move to close list connection and its partner
void close_tcp_conn(tproxy_conn_t *conn, struct tailhead *conn_list, struct tailhead *close_list)
{
conn->state = CONN_CLOSED;
TAILQ_REMOVE(conn_list, conn, conn_ptrs);
TAILQ_INSERT_TAIL(close_list, conn, conn_ptrs);
}
bool read_all_and_buffer(tproxy_conn_t *conn)
{
if (conn->partner)
{
//printf("read_all_and_buffer\n");
int numbytes=-1;
ioctl(conn->fd, FIONREAD, &numbytes);
if (numbytes>0)
{
if (send_buffer_create(conn->partner->wr_buf+2, NULL, numbytes))
{
ssize_t rd = recv(conn->fd, conn->partner->wr_buf[2].data, numbytes, MSG_DONTWAIT);
if (rd>0)
{
conn->trd+=rd;
conn->partner->wr_buf[2].len = rd;
conn->partner->bFlowOut = true;
if (epoll_update_flow(conn->partner))
return true;
}
send_buffer_free(conn->partner->wr_buf+2);
}
}
}
return false;
}
void count_legs(struct tailhead *conn_list, int *ct_local, int *ct_remote)
{
tproxy_conn_t *conn = NULL;
if (ct_local) *ct_local = 0;
if (ct_remote) *ct_remote = 0;
TAILQ_FOREACH(conn, conn_list, conn_ptrs)
{
if (conn->remote)
{
if (ct_remote) (*ct_remote)++;
}
else
{
if (ct_local) (*ct_local)++;
}
}
}
void print_legs(struct tailhead *conn_list)
{
int legs_local,legs_remote;
count_legs(conn_list, &legs_local, &legs_remote);
printf("Legs : local:%d remote:%d\n", legs_local, legs_remote);
}
#define CONN_CLOSE(conn) { \
if (conn->state!=CONN_CLOSED) close_tcp_conn(conn, &conn_list, &close_list); \
}
#define CONN_CLOSE_BOTH(conn) { \
if (conn->partner) CONN_CLOSE(conn->partner); \
CONN_CLOSE(conn); \
}
#define CONN_CLOSE_WITH_PARTNER_SHUTDOWN(conn) { \
CONN_CLOSE(conn); \
if (conn->partner) conn_shutdown(conn->partner); \
}
bool conn_shutdown(tproxy_conn_t *conn)
{
// after shutdown we must receive data remainder and send unsent
// if we dont receive data, connectin can be stalled because of "TCP window full"
return !shutdown(conn->fd,SHUT_RD) && epoll_set_flow(conn, true, true);
}
int event_loop(int listen_fd)
{
int retval = 0, num_events = 0;
int tmp_fd = 0; //Used to temporarily hold the accepted file descriptor
tproxy_conn_t *conn = NULL;
int efd, i;
struct epoll_event ev, events[MAX_EPOLL_EVENTS];
struct tailhead conn_list, close_list;
//Initialize queue (remember that TAILQ_HEAD just defines the struct)
TAILQ_INIT(&conn_list);
TAILQ_INIT(&close_list);
if ((efd = epoll_create(1)) == -1) {
perror("epoll_create");
return -1;
}
//Start monitoring listen socket
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN;
//There is only one listen socket, and I want to use ptr in order to have
//easy access to the connections. So if ptr is NULL that means an event on
//listen socket.
ev.data.ptr = NULL;
if (epoll_ctl(efd, EPOLL_CTL_ADD, listen_fd, &ev) == -1) {
perror("epoll_ctl (listen socket)");
close(efd);
return -1;
}
while (1)
{
//printf("\nepoll_wait\n");
if ((num_events = epoll_wait(efd, events, MAX_EPOLL_EVENTS, -1)) == -1)
{
if (errno == EINTR) continue; // system call was interrupted
perror("epoll_wait");
retval = -1;
break;
}
dohup();
for (i = 0; i < num_events; i++)
{
if (events[i].data.ptr == NULL)
{
int legs_local;
count_legs(&conn_list, &legs_local, NULL);
//Accept new connection
tmp_fd = accept4(listen_fd, NULL, 0, SOCK_NONBLOCK);
if (tmp_fd < 0)
{
fprintf(stderr, "Failed to accept connection\n");
}
else if (legs_local >= params.maxconn) // each connection has 2 legs - local and remote
{
close(tmp_fd);
fprintf(stderr, "Too many local legs : %d\n", legs_local);
}
else if (!(conn=add_tcp_connection(efd, &conn_list, tmp_fd, params.port)))
{
// add_tcp_connection closes fd in case of failure
fprintf(stderr, "Failed to add connection\n");
}
else
{
printf("Socket fd=%d (local) connected\n", conn->fd);
print_legs(&conn_list);
}
}
else
{
conn = (tproxy_conn_t*)events[i].data.ptr;
//printf("\nEVENT mask %08X fd=%d fd_partner=%d\n",events[i].events,conn->fd,conn->partner ? conn->partner->fd : 0);
if (conn->state != CONN_CLOSED)
{
if (events[i].events & (EPOLLERR|EPOLLHUP))
{
// immediately shutdown both ends
CONN_CLOSE_BOTH(conn);
continue;
}
if (events[i].events & EPOLLOUT)
{
if (!check_connection_attempt(conn, efd))
{
fprintf(stderr, "Connection attempt failed for %d\n", conn->fd);
CONN_CLOSE_BOTH(conn);
continue;
}
}
if (events[i].events & EPOLLRDHUP)
{
read_all_and_buffer(conn);
if (conn_has_unsent(conn))
{
//printf("conn fd=%d has unsent, not closing\n", conn->fd);
conn_shutdown(conn);
if (conn->partner) conn_shutdown(conn->partner);
conn->state = CONN_RDHUP; // only writes
}
else
{
//printf("conn fd=%d has no unsent, closing\n", conn->fd);
CONN_CLOSE_WITH_PARTNER_SHUTDOWN(conn);
}
continue;
}
if (events[i].events & (EPOLLIN|EPOLLOUT))
{
// will not receive this until successful check_connection_attempt()
if (!handle_epoll(conn, events[i].events))
{
//printf("handle_epoll false\n");
CONN_CLOSE_WITH_PARTNER_SHUTDOWN(conn);
continue;
}
}
}
}
}
if (remove_closed_connections(efd, &close_list))
print_legs(&conn_list);
fflush(stderr); fflush(stdout); // for console messages
}
close(efd);
return retval;
}

View File

@ -1,13 +1,61 @@
#ifndef TPROXY_TEST_CONN_H #pragma once
#define TPROXY_TEST_CONN_H
#include "tpws.h"
#include <stdbool.h> #include <stdbool.h>
#include <sys/queue.h>
int check_local_ip(const struct sockaddr *saddr); #define BACKLOG 10
uint16_t saport(const struct sockaddr *sa); #define MAX_EPOLL_EVENTS BACKLOG
tproxy_conn_t* add_tcp_connection(int efd, struct tailhead *conn_list, #define IP_TRANSPARENT 19 //So that application compiles on OpenWRT
int local_fd, uint16_t listen_port); #define SPLICE_LEN 16384
void free_conn(tproxy_conn_t *conn); #define DEFAULT_MAX_CONN 512
int8_t check_connection_attempt(tproxy_conn_t *conn, int efd);
#endif int event_loop(int listen_fd);
//Three different states of a connection
enum{
CONN_UNAVAILABLE=0, // connecting
CONN_AVAILABLE, // operational
CONN_RDHUP, // received RDHUP, only sending unsent buffers. more RDHUPs are blocked
CONN_CLOSED // will be deleted soon
};
typedef uint8_t conn_state_t;
struct send_buffer
{
char *data;
size_t len,pos;
};
typedef struct send_buffer send_buffer_t;
struct tproxy_conn
{
bool remote; // false - accepted, true - connected
int efd; // epoll fd
int fd;
int splice_pipe[2];
conn_state_t state;
struct tproxy_conn *partner; // other leg
//Create the struct which contains ptrs to next/prev element
TAILQ_ENTRY(tproxy_conn) conn_ptrs;
bool bFlowIn,bFlowOut, bFlowInPrev,bFlowOutPrev, bPrevRdhup;
// total read,write
size_t trd,twr;
// connection is either spliced or send/recv
// spliced connection have pipe buffering but also can have send_buffer's
// pipe buffer comes first, then send_buffer's from 0 to countof(wr_buf)-1
// send/recv connection do not have pipe and wr_unsent is meaningless
ssize_t wr_unsent; // unsent bytes in the pipe
// buffer 1 : send before split_pos
// buffer 2 : send after split_pos
// buffer 3 : after RDHUP read all and buffer to the partner
struct send_buffer wr_buf[3];
};
typedef struct tproxy_conn tproxy_conn_t;
//Define the struct tailhead (code in sys/queue.h is quite intuitive)
//Use tail queue for efficient delete
TAILQ_HEAD(tailhead, tproxy_conn);