Skip to content

Commit

Permalink
Merge pull request #27 from andreyto/at_chunks
Browse files Browse the repository at this point in the history
Use larger static buffer for key send and send in chunks.
  • Loading branch information
bwlewis committed Oct 1, 2015
2 parents e4c5a1b + 27092af commit fb0f526
Showing 1 changed file with 67 additions and 18 deletions.
85 changes: 67 additions & 18 deletions src/alive.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <Rinternals.h>

#define BS 4096
#define BS_LARGE 16777216

int go;
#ifdef Win32
Expand Down Expand Up @@ -113,6 +114,31 @@ tcpconnect(char *host, int port)
}
#endif

/* From Brian "Beej Jorgensen" Hall Beej's Guide to Network Programming
* Keep sending until entire buffer is sent
*/
#ifdef Win32
int sendall(SOCKET s, char *buf, size_t *len)
#else
int sendall(int s, char *buf, size_t *len)
#endif
{
size_t total = 0; // how many bytes we've sent
size_t bytesleft = *len; // how many we have left to send
int n;

while(total < *len) {
n = send(s, buf+total, bytesleft, 0);
if (n == -1) { break; }
total += n;
bytesleft -= n;
}

*len = total; // return number actually sent here

return n==-1?-1:0; // return -1 on failure, 0 on success
}

#ifdef Win32
int
msg(SOCKET sock, char *cmd, char *response)
Expand All @@ -122,30 +148,57 @@ msg(int sock, char *cmd, char *response)
#endif
{
int j;
j = send(sock, cmd, strlen(cmd), 0);
size_t cmd_len = strlen(cmd);
j = sendall(sock, cmd, &cmd_len);
if(j<0) return j;
memset(response,0,BS);
j = recv(sock, response, BS, 0);
if(response[0] == '-') j = -1;
return j;
}


void thread_exit(int ex_code)
{
#ifdef Win32
ExitThread((DWORD)(ex_code));
#else
/* exit code to pthread_exit cannot be a pointer to stack variable
* In our case the thread is a singleton, so static is fine*/
static int _ex_code;
_ex_code = ex_code;
pthread_exit(&_ex_code);
#endif
}


#ifdef Win32
void *ok(LPVOID x)
#else
void *ok(void *x)
#endif
{
char set[BS];
char expire[BS];
char buf[BS];
/* this thread should be used as a singleton, so static variables are OK.
* BS_LARGE can be too large for stack variables. We can use calloc()
* instead, but then we have to handle user interrupts and other error
* conditions and deallocate. */
static char set[BS_LARGE];
static char expire[BS_LARGE];
char buf[BS]; /* asumption is that response is short */
int pr_n = -1;
int j, m;
char *key = (char *)x;
int k = strlen(key);
memset(set,0,BS);
memset(expire,0,BS);
snprintf(set,BS,"*3\r\n$3\r\nSET\r\n$%d\r\n%s\r\n$2\r\nOK\r\n", k, key);
snprintf(expire,BS,"*3\r\n$6\r\nEXPIRE\r\n$%d\r\n%s\r\n$1\r\n5\r\n", k, key);
memset(set,0,BS_LARGE);
memset(expire,0,BS_LARGE);
pr_n = snprintf(set,BS_LARGE,"*3\r\n$3\r\nSET\r\n$%d\r\n%s\r\n$2\r\nOK\r\n", k, key);
if(pr_n < 0 || pr_n >= BS_LARGE) {
thread_exit(-2);
}
pr_n = snprintf(expire,BS_LARGE,"*3\r\n$6\r\nEXPIRE\r\n$%d\r\n%s\r\n$1\r\n5\r\n", k, key);
if(pr_n < 0 || pr_n >= BS_LARGE) {
thread_exit(-2);
}

/* Check for thread termination every 1/10 sec, but only update Redis
* every 3s (expire alive key after 5s).
Expand All @@ -155,17 +208,13 @@ void *ok(void *x)
m += 1;
if(m==0 || m>30) {
j = msg(s, set, buf);
#ifdef Win32
if(j<0) ExitThread((DWORD)j);
#else
if(j<0) pthread_exit(&j);
#endif
if(j<0) {
thread_exit(j);
}
j = msg(s, expire, buf);
#ifdef Win32
if(j<0) ExitThread((DWORD)j);
#else
if(j<0) pthread_exit(&j);
#endif
if(j<0) {
thread_exit(j);
}
m = 0;
}
#ifdef Win32
Expand Down

0 comments on commit fb0f526

Please sign in to comment.