#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include "templates/posix_sockets.h"
struct reconnect_state_t {
const char* hostname;
const char* port;
const char* topic;
uint8_t* sendbuf;
size_t sendbufsz;
uint8_t* recvbuf;
size_t recvbufsz;
};
void reconnect_client(
struct mqtt_client* client,
void **reconnect_state_vptr);
void* client_refresher(void* client);
void exit_example(int status, int sockfd, pthread_t *client_daemon);
int main(int argc, const char *argv[])
{
const char* addr;
const char* port;
const char* topic;
if (argc > 1) {
addr = argv[1];
} else {
addr = "test.mosquitto.org";
}
if (argc > 2) {
port = argv[2];
} else {
port = "1883";
}
if (argc > 3) {
topic = argv[3];
} else {
topic = "datetime";
}
struct reconnect_state_t reconnect_state;
reconnect_state.hostname = addr;
reconnect_state.port = port;
reconnect_state.topic = topic;
uint8_t sendbuf[2048];
uint8_t recvbuf[1024];
reconnect_state.sendbuf = sendbuf;
reconnect_state.sendbufsz = sizeof(sendbuf);
reconnect_state.recvbuf = recvbuf;
reconnect_state.recvbufsz = sizeof(recvbuf);
reconnect_client, &reconnect_state,
publish_callback
);
pthread_t client_daemon;
if(pthread_create(&client_daemon, NULL, client_refresher, &client)) {
fprintf(stderr, "Failed to start client daemon.\n");
exit_example(EXIT_FAILURE, -1, NULL);
}
printf("%s listening for '%s' messages.\n", argv[0], topic);
printf("Press ENTER to inject an error.\n");
printf("Press CTRL-D to exit.\n\n");
while(fgetc(stdin) != EOF) {
printf("Injecting error: \"MQTT_ERROR_SOCKET_ERROR\"\n");
client.
error = MQTT_ERROR_SOCKET_ERROR;
}
printf("\n%s disconnecting from %s\n", argv[0], addr);
sleep(1);
exit_example(EXIT_SUCCESS, client.
socketfd, &client_daemon);
}
void reconnect_client(
struct mqtt_client* client,
void **reconnect_state_vptr)
{
struct reconnect_state_t *reconnect_state = *((struct reconnect_state_t**) reconnect_state_vptr);
if (client->
error != MQTT_ERROR_INITIAL_RECONNECT) {
}
if (client->
error != MQTT_ERROR_INITIAL_RECONNECT) {
printf("reconnect_client: called while client was in error state \"%s\"\n",
);
}
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
if (sockfd == -1) {
perror("Failed to open socket: ");
exit_example(EXIT_FAILURE, sockfd, NULL);
}
reconnect_state->sendbuf, reconnect_state->sendbufsz,
reconnect_state->recvbuf, reconnect_state->recvbufsz
);
mqtt_connect(client,
"subscribing_client", NULL, NULL, 0, NULL, NULL, 0, 400);
}
void exit_example(int status, int sockfd, pthread_t *client_daemon)
{
if (sockfd != -1) close(sockfd);
if (client_daemon != NULL) pthread_cancel(*client_daemon);
exit(status);
}
{
printf(
"Received publish('%s'): %s\n", topic_name, (
const char*) published->
application_message);
free(topic_name);
}
void* client_refresher(void* client)
{
while(1)
{
usleep(100000U);
}
return NULL;
}