////////////////////////////////////////////////////////////////////////////////
//
// UFRGS - INF01151 - Marcelo Johann - 2009/1
//
// Producer and Consumer in C with POSIX kernel-persistent Semaphores

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <fcntl.h>

// definitions

#define TRUE 1
#define FALSE 0
#define BUFFER_SIZE 20
#define DELAY_BUFFER 400
#define DELAY_MONITOR 500

// prototypes

void delay (int ticks);

// structure for thread identification and timing

typedef struct
  {
  char name;
  int ticks;
  } ThreadID;
  
// structure for the item that goes in the buffer

typedef struct
  {
  char owner;
  int number;
  } Item;
  
// Global buffer and its opperations  
  
Item Buffer[BUFFER_SIZE];
int nextFree=0;
int lastItem=0;
void bufferInit(void);
void bufferPrint(void);
void bufferInsert(char name, int number);
Item bufferRemove(void);

// Global synchronizing semaphores
// Pay attention to initial values inside the main function

sem_t *mutex;
sem_t *empty_places;
sem_t *available_items;

// This variable is used by all threads to know when  the user wants to stop

int running = TRUE;

// Consumer and Producer´s code

void * consumer( void *par)
 {
 ThreadID *me = (ThreadID*) par;
 Item it;
 while (running==TRUE)
   {
   delay(me->ticks);
   it = bufferRemove();
   fprintf(stderr,"Consumer %c got %c%d\n",me->name,it.owner,it.number);
   }
 return 0;
 }
 
void * producer( void *par)
  {
  int i=0;
  ThreadID *me = (ThreadID*) par;
  for (i=0; running==TRUE ;++i)
    {
    delay(me->ticks);
    fprintf(stderr,"Producer %c to write %d\n",me->name,i);
    bufferInsert(me->name,i);
    }
  return 0;
  }

// Buffer´s code
 
void bufferInit(void)
  {
  int i=0;
  Item nothing = {' ',0};
  for (i=0; i<BUFFER_SIZE; ++i)
    Buffer[i]= nothing;
  }
  
void bufferPrint(void)
  {
  int i=0;
  fprintf(stderr,"Buffer: ");
  for (i=0; i<BUFFER_SIZE; ++i)
    fprintf(stderr,"%c%d ",Buffer[i].owner,Buffer[i].number);
  fprintf(stderr,"\n");
  }
 
void bufferInsert(char name, int number)
  {
  Item fresh;
  fresh.owner = name;
  fresh.number = number;
  sem_wait(empty_places);
  sem_wait(mutex);
  Buffer[nextFree]=fresh;
  delay(DELAY_BUFFER);
  nextFree = (nextFree+1)%BUFFER_SIZE;
  delay(DELAY_BUFFER);
  sem_post(mutex);
  sem_post(available_items);
  }
 
Item bufferRemove(void)
  {
  Item out;
  Item nothing = {' ',0};
  sem_wait(available_items);
  sem_wait(mutex);
  out = Buffer[lastItem];
  Buffer[lastItem] = nothing;
  delay(DELAY_BUFFER);
  lastItem = (lastItem+1)%BUFFER_SIZE;
  delay(DELAY_BUFFER);
  sem_post(mutex);
  sem_post(empty_places);
  return out;
  }
  
void *bufferMonitor(void *nothing)
  {
  while (running==TRUE)
    {
    bufferPrint();
    delay(DELAY_MONITOR);
    }
  return 0;
  }
  
// Main application
 
int main (int argc, char ** argv)
  {
  char command = ' ';
  pthread_t mon;
  pthread_t prod1,prod2;
  pthread_t cons1,cons2;
  pthread_attr_t at;

  pthread_attr_init(&at);
  pthread_attr_setscope(&at,PTHREAD_SCOPE_SYSTEM);

  ThreadID idprod1 = {'A',1000};
  ThreadID idprod2 = {'B',2000};
  ThreadID idcons1 = {'X',5000};
  ThreadID idcons2 = {'Y',6000};
  
  // Here I try to destroy the semaphores if they exist
  // The last call to our application might have been interrupted, and in this
  // case the semaphores were left at an arbitrary state that will not represent
  // the initial conditions of our simulation. As I could not solve this in any
  // way using the initialization procedures, I remove the semaphores from the
  // system before trying to use open them again.
  
  sem_unlink("pncn15_mutex");
  sem_unlink("pncn15_empty_places");
  sem_unlink("pncn15_available_items");

  // create and initialize semaphores. Note that 511 is octal 777

  mutex =  sem_open("pncn15_mutex",O_CREAT,511,1);
  empty_places =  sem_open("pncn15_empty_places",O_CREAT,511,BUFFER_SIZE);
  available_items =  sem_open("pncn15_available_items",O_CREAT,511,0);
  if (mutex==SEM_FAILED || empty_places==SEM_FAILED || 
  available_items==SEM_FAILED)
    {
    printf("error\n");
    exit(0);
    }

  bufferInit();
  pthread_create(&mon, &at ,bufferMonitor,  0);
  pthread_create(&prod1, &at ,producer,  (void*) &idprod1);
  pthread_create(&prod2, &at ,producer,  (void*) &idprod2);
  pthread_create(&cons1, &at ,consumer,  (void*) &idcons1);
  pthread_create(&cons2, &at ,consumer,  (void*) &idcons2);
  
  // wait for the user to command EXIT: all threads will stop after a while
  
  fprintf(stderr,"All created\n");
  while (command != 'e' && command != 'E')
    command = getchar();
  running = FALSE;
  
  // wait for all threads to finish
  
  pthread_join(prod1, NULL);
  pthread_join(prod2, NULL);
  pthread_join(cons1, NULL);
  pthread_join(cons2, NULL);
  pthread_join(mon, NULL);
  
  // close the semaphores and remove them from the system
  
  sem_close(mutex);
  sem_close(empty_places);
  sem_close(available_items);
  sem_unlink("pncn15_mutex");
  sem_unlink("pncn15_empty_places");
  sem_unlink("pncn15_available_items");
  exit(0);
  }
 
void delay (int ticks)
  {
  int j,k;
    for (j=0;j<ticks;++j)
      {
      for (k=0;k<10000;++k)
        {
        j = j + 100;
        j = j - 100;
        }
      }
  }

