tpws: multisplit

This commit is contained in:
bol-van
2024-11-13 22:05:06 +03:00
parent ef9f9ae428
commit 46d31003e2
11 changed files with 532 additions and 164 deletions

View File

@@ -24,6 +24,7 @@
#include "helpers.h"
#include "hostlist.h"
#define PKTDATA_MAXDUMP 32
// keep separate legs counter. counting every time thousands of legs can consume cpu
static int legs_local, legs_remote;
@@ -92,26 +93,43 @@ static bool socks_send_rep_errno(uint8_t ver, int fd, int errn)
return ver==5 ? socks5_send_rep_errno(fd,errn) : socks4_send_rep_errno(fd, errn);
}
static void packet_debug(const uint8_t *data, size_t sz)
{
hexdump_limited_dlog(data, sz, PKTDATA_MAXDUMP); VPRINT("\n");
}
static bool cork(int fd, int enable)
{
#ifdef __linux__
int e = errno;
if (setsockopt(fd, SOL_TCP, TCP_CORK, &enable, sizeof(enable))<0)
{
DLOG_PERROR("setsockopt (TCP_CORK)");
errno = e;
return false;
}
errno = e;
#endif
return true;
}
ssize_t send_with_ttl(int fd, const void *buf, size_t len, int flags, int ttl)
{
ssize_t wr;
ssize_t wr;
if (ttl)
if (!params.skip_nodelay)
{
int ttl_apply = ttl ? ttl : params.ttl_default;
DBGPRINT("send_with_ttl %d fd=%d\n",ttl,fd);
if (!set_ttl_hl(fd, ttl))
if (!set_ttl_hl(fd, ttl_apply))
//DLOG_ERR("could not set ttl %d to fd=%d\n",ttl,fd);
DLOG_ERR("could not set ttl %d to fd=%d\n",ttl,fd);
DLOG_ERR("could not set ttl %d to fd=%d\n",ttl_apply,fd);
cork(fd,true);
}
wr = send(fd, buf, len, flags);
if (ttl)
{
int e=errno;
if (!set_ttl_hl(fd, params.ttl_default))
DLOG_ERR("could not set ttl %d to fd=%d\n",params.ttl_default,fd);
errno=e;
}
if (!params.skip_nodelay)
cork(fd,false);
return wr;
}
@@ -308,19 +326,18 @@ bool set_socket_buffers(int fd, int rcvbuf, int sndbuf)
if (rcvbuf && setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(int)) <0)
{
DLOG_PERROR("setsockopt (SO_RCVBUF)");
close(fd);
return false;
}
if (sndbuf && setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(int)) <0)
{
DLOG_PERROR("setsockopt (SO_SNDBUF)");
close(fd);
return false;
}
dbgprint_socket_buffers(fd);
return true;
}
static bool proxy_remote_conn_ack(tproxy_conn_t *conn, int sock_err)
{
// if proxy mode acknowledge connection request
@@ -393,7 +410,10 @@ static int connect_remote(const struct sockaddr *remote_addr, int mss)
return -1;
}
if (!set_socket_buffers(remote_fd, params.remote_rcvbuf, params.remote_sndbuf))
{
close(remote_fd);
return -1;
}
if (!set_keepalive(remote_fd))
{
DLOG_PERROR("set_keepalive");
@@ -1068,9 +1088,9 @@ static bool in_tamper_out_range(tproxy_conn_t *conn)
}
static void tamper(tproxy_conn_t *conn, uint8_t *segment, size_t segment_buffer_size, size_t *segment_size, size_t *split_pos, uint8_t *split_flags)
static void tamper(tproxy_conn_t *conn, uint8_t *segment, size_t segment_buffer_size, size_t *segment_size, size_t *multisplit_pos, int *multisplit_count, uint8_t *split_flags)
{
*split_pos=0;
if (multisplit_count) *multisplit_count=0;
if (params.tamper)
{
if (conn->remote)
@@ -1081,26 +1101,26 @@ static void tamper(tproxy_conn_t *conn, uint8_t *segment, size_t segment_buffer_
else
{
if (in_tamper_out_range(conn))
tamper_out(&conn->track,(struct sockaddr*)&conn->dest,segment,segment_buffer_size,segment_size,split_pos,split_flags);
tamper_out(&conn->track,(struct sockaddr*)&conn->dest,segment,segment_buffer_size,segment_size,multisplit_pos,multisplit_count,split_flags);
}
}
}
// buffer must have at least one extra byte for OOB
static ssize_t send_or_buffer_oob(send_buffer_t *sb, int fd, uint8_t *buf, size_t len, int ttl, bool oob, uint8_t oob_byte)
static ssize_t send_oob(int fd, uint8_t *buf, size_t len, int ttl, bool oob, uint8_t oob_byte)
{
ssize_t wr;
if (oob)
{
VPRINT("Sending OOB byte %02X\n", oob_byte);
uint8_t oob_save;
oob_save = buf[len];
buf[len] = oob_byte;
wr = send_or_buffer(sb, fd, buf, len+1, MSG_OOB, ttl);
wr = send_with_ttl(fd, buf, len+1, MSG_OOB, ttl);
buf[len] = oob_save;
if (wr<0 && errno==EAGAIN) wr=0;
}
else
wr = send_or_buffer(sb, fd, buf, len, 0, ttl);
wr = send_with_ttl(fd, buf, len, 0, ttl);
return wr;
}
@@ -1186,36 +1206,53 @@ static bool handle_epoll(tproxy_conn_t *conn, struct tailhead *conn_list, uint32
#endif
{
// incoming data from local leg
uint8_t buf[RD_BLOCK_SIZE + 5];
uint8_t buf[RD_BLOCK_SIZE + 6];
rd = recv(conn->fd, buf, RD_BLOCK_SIZE, MSG_DONTWAIT);
DBGPRINT("recv fd=%d rd=%zd err=%d\n",conn->fd, rd,errno);
if (rd<0 && errno==EAGAIN) rd=0;
if (rd>0)
{
size_t split_pos;
size_t multisplit_pos[MAX_SPLITS];
int multisplit_count;
uint8_t split_flags;
bs = rd;
// tamper needs to know stream position of the block start
tamper(conn, buf, sizeof(buf), &bs, &split_pos, &split_flags);
tamper(conn, buf, sizeof(buf), &bs, multisplit_pos, &multisplit_count, &split_flags);
// increase after tamper
conn->tnrd++;
conn->trd+=rd;
if (split_pos && bs<sizeof(buf) && split_pos<sizeof(buf))
if (multisplit_count)
{
VPRINT("Splitting at pos %zu%s\n", split_pos, (split_flags & SPLIT_FLAG_DISORDER) ? " with disorder" : "");
wr = send_or_buffer_oob(conn->partner->wr_buf, conn->partner->fd, buf, split_pos, !!(split_flags & SPLIT_FLAG_DISORDER), !!(split_flags & SPLIT_FLAG_OOB), conn->track.dp ? conn->track.dp->oob_byte : 0);
DBGPRINT("send_or_buffer(1) fd=%d wr=%zd err=%d\n",conn->partner->fd,wr,errno);
if (wr >= 0)
ssize_t from,to,len;
int i;
bool bApplyDisorder, bApplyOOB;
for (i=0,from=0;i<=multisplit_count;i++)
{
to = i==multisplit_count ? bs : multisplit_pos[i];
bApplyDisorder = !(i & 1) && i<multisplit_count && (split_flags & SPLIT_FLAG_DISORDER);
bApplyOOB = i==0 && (split_flags & SPLIT_FLAG_OOB);
len = to-from;
VPRINT("Sending multisplit part %d %zd-%zd (len %zd)%s%s : ", i+1, from, to, len, bApplyDisorder ? " with disorder" : "", bApplyOOB ? " with OOB" : "");
packet_debug(buf+from,len);
wr = send_oob(conn->partner->fd, buf+from, len, bApplyDisorder, bApplyOOB, conn->track.dp ? conn->track.dp->oob_byte : 0);
if (wr<0) break;
conn->partner->twr += wr;
wr = send_or_buffer(conn->partner->wr_buf + 1, conn->partner->fd, buf + split_pos, bs - split_pos, 0, 0);
DBGPRINT("send_or_buffer(2) fd=%d wr=%zd err=%d\n",conn->partner->fd,wr,errno);
if (wr>0) conn->partner->twr += wr;
if (wr<len)
{
from+=wr;
VPRINT("Cannot send part %d immediately. only %zd bytes were sent (%zd left in segment). cancelling split.\n", i+1, wr, bs-from);
wr = send_or_buffer(conn->partner->wr_buf, conn->partner->fd, buf+from, bs-from, 0, 0);
if (wr>0) conn->partner->twr += wr;
break;
}
from = to;
}
}
else
@@ -1279,7 +1316,7 @@ static bool read_all_and_buffer(tproxy_conn_t *conn, int buffer_number)
DBGPRINT("read_all_and_buffer(%d) numbytes=%d\n",buffer_number,numbytes);
if (numbytes>0)
{
if (send_buffer_create(conn->partner->wr_buf+buffer_number, NULL, numbytes, 5, 0, 0))
if (send_buffer_create(conn->partner->wr_buf+buffer_number, NULL, numbytes, 6, 0, 0))
{
ssize_t rd = recv(conn->fd, conn->partner->wr_buf[buffer_number].data, numbytes, MSG_DONTWAIT);
if (rd>0)
@@ -1289,10 +1326,7 @@ static bool read_all_and_buffer(tproxy_conn_t *conn, int buffer_number)
conn->partner->bFlowOut = true;
size_t split_pos;
uint8_t split_flags;
tamper(conn, conn->partner->wr_buf[buffer_number].data, numbytes+5, &conn->partner->wr_buf[buffer_number].len, &split_pos, &split_flags);
tamper(conn, conn->partner->wr_buf[buffer_number].data, numbytes+6, &conn->partner->wr_buf[buffer_number].len, NULL, NULL, NULL);
if (epoll_update_flow(conn->partner))
return true;
@@ -1369,7 +1403,7 @@ static bool handle_resolve_pipe(tproxy_conn_t **conn, struct tailhead *conn_list
else if (rd!=sizeof(void*))
{
// partial pointer read is FATAL. in any case it will cause pointer corruption and coredump
DLOG_ERR("resolve_pipe not full read %zu\n",rd);
DLOG_ERR("resolve_pipe not full read %zd\n",rd);
exit(1000);
}
b = resolve_complete(ri, conn_list);