Product SiteDocumentation Site

5. What's In It For Me?

Please note this example leaves out nearly all return code checking, and all necessary spin locks, to shorten the illustration. You can find the complete example in the library's source code distribution as tests/pipelined-server.c.
When we supply an input stream to the complete program, you will notice that it echos 20 bytes every second. A single worker is capable of processing only 10 bytes per second, but the two present workers double the throughput.
static void worker(void *data)
{
	struct tc_fd *tcfd = (struct tc_fd *)data;
	struct req *r;

	while (1) {
		tc_wait_fd(EPOLLIN, tcfd);
		r = malloc(sizeof(struct req));	     /* begin of READ stage */
		if (tc_read(tcfd, r->data, 10) < 10)
			break;
		r->processed = 0;
		CIRCLEQ_INSERT_TAIL(&queue, r, ll);
		tc_rearm(tcfd);			   /* READ -> PROCESS stage */
		sleep(1);		/* simulate processing for 1 second */
		r->processed = 1;		    /* end of PROCESS stage */
		if (atomic_set_if_eq(1, 0, &writing_active)) { /* WRITE st. */
			CIRCLEQ_FOREACH(r, &queue, ll) {
				if (!r->processed)
					break;
				write(fileno(stdout), r->data, 10);
				CIRCLEQ_REMOVE(&queue, r, ll);
				free(r);
			}
			atomic_set(&writing_active, 0); /* end of WRITE st. */
		}
	}
}

static void tc_main(void *arg)
{
	/* initialization of queue, writing_active etc, left out */
	tcfd = tc_register_fd(fileno(stdin));
	tc_thread_pool_new(&tp, worker, tcfd, "worker_%d", 0);
	tc_thread_pool_wait(&tp);
	tc_unregister_fd(tcfd);
}

int main(int argc, char** argv)
{
	tc_run(tc_main, NULL, "tc_main", 2);
}
Example 3. Pipelined server's worker illustration