////////////////////////////////////////////////////////////////////////////////
//
// UFRGS - INF01151 - Marcelo Johann - 2009/1
//
// Producer and Consumer in C with fork and POSIX kernel-persistent Semaphores
//
// This program creates separate processes for the buffer monitor and for each
// producer and consumer, despite being written in a single source file.
// Please, pay attention to some details that follow:
// Global variables are not shared. The only shared element is the structure
// called SharedBuffer, which includes the buffer itselt as well as the variable
// called 'running' that serves to synchronize the end of the simulation.
// The POSIX semaphores used are named, and they are found and controlled by
// the kernel. The variables that serve as semaphore handlers need not be 
// shared, and therefore they are simple global variables on each process.
// It is worth to note, however, that their initialization is performed before
// the fork, and we conclude that the access to those semaphores is inherited
// by the children processes.
// If you happen to implement the same program using Object Orientation, in C++,
// don't be fooled to think that the buffer is one single object and the 
// sinchronization code is confined to any single entity. All buffer functions,
// or methods, are actually executed by the producer and consumer's separate
// processes, and that is why they all have to access the semaphores.

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>

// definitions

#define TRUE 1
#define FALSE 0
#define BUFFER_SIZE 20
#define DELAY_BUFFER 400
#define DELAY_MONITOR 500

// prototypes

void delay (int ticks);

// structure for thread identification and timing

typedef struct
  {
  char name;
  int ticks;
  } ThreadID;
  
// structure for the item that goes in the buffer

typedef struct
  {
  char owner;
  int number;
  } Item;
  
// Global shared buffer and its opperations  
  
typedef struct
  {
  Item Buffer[BUFFER_SIZE];
  int nextFree;
  int lastItem;
  int running;
  }SharedBuffer;

SharedBuffer voidBuffer; // Sample buffer to create the file

void bufferInit(SharedBuffer *buffer);
void bufferPrint(SharedBuffer *buffer);
void bufferInsert(SharedBuffer *buffer,char name, int number);
Item bufferRemove(SharedBuffer *buffer);
  
// The semaphores are local references of each process, but the operations
// one them are implemented by the kernel, and access the same named
// semaphores that were opened. THIS WOULDN´T WORK WITH UNNAMED SEMAPHORES
// Pay attention to initial values inside the main function
  
sem_t *mutex;
sem_t *empty_places;
sem_t *available_items;

// Consumer and Producer´s code

void * consumer( SharedBuffer *buffer, void *par)
 {
 ThreadID *me = (ThreadID*) par;
 Item it;
 while (buffer->running==TRUE)
   {
   delay(me->ticks);
   it = bufferRemove(buffer);
   fprintf(stderr,"Consumer %c got %c%d\n",me->name,it.owner,it.number);
   }
 exit(0);
 }
 
void * producer( SharedBuffer *buffer, void *par)
  {
  int i=0;
  ThreadID *me = (ThreadID*) par;
  for (i=0; buffer->running==TRUE ;++i)
    {
    delay(me->ticks);
    fprintf(stderr,"Producer %c to write %d\n",me->name,i);
    bufferInsert(buffer,me->name,i);
    }
  exit(0);
  }
  
// Code for the process that monitors the buffer status
  
void *bufferMonitor(SharedBuffer *buffer)
  {
  while (buffer->running==TRUE)
    {
    bufferPrint(buffer);
    delay(DELAY_MONITOR);
    }
  exit(0);
  }

// Code for accessing the Shared Buffer
 
void bufferInit(SharedBuffer *buffer)
  {
  int i=0;
  Item nothing = {' ',0};
  for (i=0; i<BUFFER_SIZE; ++i)
    buffer->Buffer[i]= nothing;
  buffer->running = TRUE;
  buffer->nextFree = 0;
  buffer->lastItem = 0;
  }
  
void bufferPrint(SharedBuffer *buffer)
  {
  int i=0;
  fprintf(stderr,"Buffer: ");
  for (i=0; i<BUFFER_SIZE; ++i)
    fprintf(stderr,"%c%d ",buffer->Buffer[i].owner,buffer->Buffer[i].number);
  fprintf(stderr,"\n");
  }
 
void bufferInsert(SharedBuffer *buffer,char name, int number)
  {
  Item fresh;
  fresh.owner = name;
  fresh.number = number;
  sem_wait(empty_places);
  sem_wait(mutex);
  buffer->Buffer[buffer->nextFree]=fresh;
  delay(DELAY_BUFFER);
  buffer->nextFree = (buffer->nextFree+1)%BUFFER_SIZE;
  delay(DELAY_BUFFER);
  sem_post(mutex);
  sem_post(available_items);
  }
 
Item bufferRemove(SharedBuffer *buffer)
  {
  Item out;
  Item nothing = {' ',0};
  sem_wait(available_items);
  sem_wait(mutex);
  out = buffer->Buffer[buffer->lastItem];
  buffer->Buffer[buffer->lastItem] = nothing;
  delay(DELAY_BUFFER);
  buffer->lastItem = (buffer->lastItem+1)%BUFFER_SIZE;
  delay(DELAY_BUFFER);
  sem_post(mutex);
  sem_post(empty_places);
  return out;
  }
    
// Main application, starting as the father process
 
int main (int argc, char ** argv)
  {
  char command = ' ';
  int mon, prod1,prod2,cons1,cons2;
  SharedBuffer *buffer;
  int fd = 0;
  
  ThreadID idprod1 = {'A',1000};
  ThreadID idprod2 = {'B',2000};
  ThreadID idcons1 = {'X',5000};
  ThreadID idcons2 = {'Y',6000};
  
  // Here I try to destroy the semaphores if they exist
  // The last call to our application might have been interrupted, and in this
  // case the semaphores were left at an arbitrary state that will not represent
  // the initial conditions of our simulation. As I could not solve this in any
  // way using the initialization procedures, I remove the semaphores from the
  // system before trying to open them again.
  
  sem_unlink("pncn_fork_mutex");
  sem_unlink("pncn_fork_empty_places");
  sem_unlink("pncn_fork_available_items");

  // create and initialize semaphores. Note that 511 is octal 777

  mutex =  sem_open("pncn_fork_mutex",O_CREAT,511,1);
  empty_places =  sem_open("pncn_fork_empty_places",O_CREAT,511,BUFFER_SIZE);
  available_items =  sem_open("pncn_fork_available_items",O_CREAT,511,0);
  if (mutex==SEM_FAILED || empty_places==SEM_FAILED || 
  available_items==SEM_FAILED)
    {
    printf("Error trying to open semaphores\n");
    exit(1);
    }
    
  // Here we create the shared buffer as a file and map it to memory

  fd =open("pncn_fork_shared_memory", O_RDWR | O_CREAT, 511); 
  if (fd==-1)
    {
    printf("Error opening the shared file\n");
    exit(1);
    }
  bufferInit(&voidBuffer);
  write(fd,&voidBuffer, sizeof(SharedBuffer)); 
  buffer = mmap(NULL, sizeof(SharedBuffer), PROT_READ | PROT_WRITE, 
           MAP_SHARED,fd, 0); 
  close(fd); 
  
  // Fork each different process needed
  
  if ((mon = fork())==0) bufferMonitor(buffer);
  if ((prod1 = fork())==0) producer(buffer,&idprod1);
  if ((prod2 = fork())==0) producer(buffer,&idprod2);
  if ((cons1 = fork())==0) consumer(buffer,&idcons1);
  if ((cons2 = fork())==0) consumer(buffer,&idcons2);
  
  // wait for the user to command EXIT: all processes will exit after a while
  
  fprintf(stderr,"All processes created\n");
  while (command != 'e' && command != 'E')
    command = getchar();
  buffer->running = FALSE;
  
  // wait for all processes to finish
  
  waitpid(prod1, 0,0);
  waitpid(prod2, 0,0);
  waitpid(cons1, 0,0);
  waitpid(cons2, 0,0);
  waitpid(mon, 0,0);
  
  // close the semaphores and remove them from the system
  
  sem_close(mutex);
  sem_close(empty_places);
  sem_close(available_items);
  sem_unlink("pncn_fork_mutex");
  sem_unlink("pncn_fork_empty_places");
  sem_unlink("pncn_fork_available_items");
  exit(0);
  }
 
void delay (int ticks)
  {
  int j,k;
    for (j=0;j<ticks;++j)
      {
      for (k=0;k<10000;++k)
        {
        j = j + 100;
        j = j - 100;
        }
      }
  }

