Skip to content

Commit

Permalink
Merge pull request #15 from infrawatch/jwysogla-add-list-support
Browse files Browse the repository at this point in the history
Add AMQP list message type support.
  • Loading branch information
vyzigold authored Sep 8, 2020
2 parents 039a3bd + 9ff915f commit eda73be
Showing 1 changed file with 61 additions and 31 deletions.
92 changes: 61 additions & 31 deletions socket_snd_th.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,63 @@ static int prepare_send_socket_inet(app_data_t *app) {
return 0;
}

static int process_message_binary(app_data_t *app, pn_data_t *body) {
pn_bytes_t b = pn_data_get_bytes(body);
if (b.start != NULL) {
int send_flags = app->socket_flags;

ssize_t sent_bytes = sendto(app->send_sock, b.start, b.size, send_flags,
&app->sa, app->sa_len);
if (sent_bytes <= 0) {
// MSG_DONTWAIT is set
switch (errno) {
case EAGAIN:
// Normal backup
app->sock_would_block++;
break;
case EBADF:
case ENOTSOCK:
// sockfd is not a valid file descriptor
// TODO reopen socket
perror("SG Send");
return 1;
break;
case ECONNREFUSED:
break;
default:
perror("SG Send");
printf("%d ",errno);
return 1;
}
} else {
app->sock_sent++;
}
}
return 0;
}

static int process_message_body(app_data_t *app, pn_data_t *body) {
int err = 0;
if (pn_data_type(body) == PN_LIST) {
size_t count = pn_data_get_list(body);
pn_data_enter(body);
for (size_t i = 0; i < count; i++) {
if (pn_data_next(body)) {
err += process_message_body(app, body);
}
}
pn_data_exit(body);
} else if (pn_data_type(body) == PN_SYMBOL ||
pn_data_type(body) == PN_STRING ||
pn_data_type(body) == PN_BINARY) {
err = process_message_binary(app, body);
} else {
perror("Unexpected message datatype recieved.");
err = 1;
}
return err;
}

static int decode_message(app_data_t *app, pn_rwbytes_t data) {
pn_message_t *m;

Expand All @@ -105,36 +162,9 @@ static int decode_message(app_data_t *app, pn_rwbytes_t data) {
if (!err) {
pn_data_t *body = pn_message_body(m);
if (pn_data_next(body)) {
pn_bytes_t b = pn_data_get_bytes(body);
if (b.start != NULL) {
int send_flags = app->socket_flags;

ssize_t sent_bytes = sendto(app->send_sock, b.start, b.size, send_flags,
&app->sa, app->sa_len);
if (sent_bytes <= 0) {
// MSG_DONTWAIT is set
switch (errno) {
case EAGAIN:
// Normal backup
app->sock_would_block++;
break;
case EBADF:
case ENOTSOCK:
// sockfd is not a valid file descriptor
// TODO reopen socket
perror("SG Send");
return 1;
break;
case ECONNREFUSED:
break;
default:
perror("SG Send");
printf("%d ",errno);
return 1;
}
} else {
app->sock_sent++;
}
err = process_message_body(app, body);
if (err) {
return 1;
}
}
} else {
Expand Down Expand Up @@ -202,4 +232,4 @@ void *socket_snd_th(void *app_ptr) {
pthread_cleanup_pop(1);

return NULL;
}
}

0 comments on commit eda73be

Please sign in to comment.