From ae3749aa312d614ca9bded2c47bbabb5402f2a25 Mon Sep 17 00:00:00 2001 From: leitner Date: Thu, 11 Apr 2024 22:11:25 +0000 Subject: [PATCH] test client and server for iom --- test/iomcli.c | 85 +++++++++++++++++++++++++++++++++++++ test/iomsrv.c | 114 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 test/iomcli.c create mode 100644 test/iomsrv.c diff --git a/test/iomcli.c b/test/iomcli.c new file mode 100644 index 0000000..58a90a1 --- /dev/null +++ b/test/iomcli.c @@ -0,0 +1,85 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +iomux_t iom; +int s[32]; +atomic_int n; + +#define TRANSACTIONS 100000 + +int w(void* a) { + (void)a; + for (;;) { + int64 s; + unsigned int events; + switch (iom_wait(&iom,&s,&events,1000)) { + case -1: + diesys(111,"iom_wait"); + break; + case 0: + carp("iom_wait: timeout"); + break; + case 1: + if (events&IOM_WRITE) { + if (write(s,"salut!\n",7) != 7) + diesys(111,"write"); + if (iom_requeue(&iom,s,IOM_READ) == -1) + diesys(111,"iom_requeue"); + } else { + char buf[100]; + ssize_t r=read(s,buf,sizeof buf); + if (r<=0 || atomic_fetch_add(&n,1) >= TRANSACTIONS) { + close(s); + return 0; + } + if (iom_requeue(&iom,s,IOM_WRITE) == -1) + diesys(111,"iom_requeue"); + } + } + } + return 0; +} + +int main(int argc,char* argv[]) { + iom_init(&iom); + for (unsigned int i=0; i1 && scan_uint(argv[1],&nthr)>0) { + if (nthr>sizeof(t)/sizeof(t[0])) { + char buf[FMT_LONG]; + buf[fmt_ulong(buf,sizeof(t)/sizeof(t[0]))]=0; + die(111,"max threads > ",buf); + } + } + for (unsigned int i=0; i +#include +#include +#include +#include +#include +#include +#include +#include + +iomux_t iom; + +void read_data_from_socket(int s) { + char buf[1024]; + ssize_t r=read(s,buf,sizeof buf); + if (r==-1) { + r=fmt_str(buf,"I/O error from fd "); + r+=fmt_ulong(buf+r,s); + buf[r]=0; + carpsys(buf); + close(s); + return; + } + if (r==0) { + close(s); + return; + } + /* we'd usually have some protocol parsing here but for this + * simplistic test program we'll just accept any input as valid + * request */ + if (iom_requeue(&iom,s,IOM_WRITE)!=0) { + carpsys("iom_requeue"); + close(s); + } +} + +void write_data_to_socket(int s) { + if (write(s,"hello, world!\n",14) != 14) { + carpsys("write"); + close(s); + } + if (iom_requeue(&iom,s,IOM_READ)!=0) { + carpsys("iom_requeue"); + close(s); + } +} + +int w(void* a) { + int servsock=*(int*)a; + for (;;) { + int64 s; + unsigned int events; + switch (iom_wait(&iom,&s,&events,1000)) { + case -1: + diesys(111,"iom_wait"); + break; + case 0: + carp("iom_wait: timeout"); + break; + case 1: + if (s==servsock) { + char ip[16]; + uint16 port; + uint32 scope_id; + int conn=socket_accept6(s,ip,&port,&scope_id); + if (conn==-1) + diesys(111,"accept"); + if (iom_add(&iom,conn,IOM_READ)==-1) + diesys(111,"iom_add"); + if (iom_requeue(&iom,s,IOM_READ)==-1) + diesys(111,"iom_requeue"); + } else if (events&IOM_READ) + read_data_from_socket(s); + else + write_data_to_socket(s); + } + } + return 0; +} + +int main(int argc,char* argv[]) { + int servsock=socket_tcp6(); + errmsg_iam("iomsrv"); + if (servsock==-1) + diesys(111,"socket"); + if (socket_bind6_reuse(servsock,V6loopback,8000,0)==-1) + diesys(111,"bind to localhost:8000"); + if (socket_listen(servsock,16)==-1) + diesys(111,"listen"); + iom_init(&iom); + if (iom_add(&iom,servsock,IOM_READ)==-1) + diesys(111,"iom_add"); + + thrd_t t[32]; + unsigned int nthr=1; + if (argc>1 && scan_uint(argv[1],&nthr)>0) { + if (nthr>sizeof(t)/sizeof(t[0])) { + char buf[FMT_LONG]; + buf[fmt_ulong(buf,sizeof(t)/sizeof(t[0]))]=0; + die(111,"max threads > ",buf); + } + } + for (unsigned int i=0; i