Package ch.cern.dirq

Class QueueSimple

java.lang.Object
ch.cern.dirq.QueueSimple
All Implemented Interfaces:
Queue, Iterable<String>

public class QueueSimple extends Object implements Queue
QueueSimple - object oriented interface to a simple directory based queue.
A port of Perl module Directory::Queue::Simple http://search.cpan.org/dist/Directory-Queue/
The documentation from Directory::Queue::Simple module has been adapted for Java.

Usage

 
 // sample producer
 QueueSimple dirq = new QueueSimple("/tmp/test");
 for (int i=0; i < 100; i++) {
     String name = dirq.add("element " + i);
     System.out.println("# added element " + i + " as " + name);
 }

 // sample consumer
 dirq = QueueSimple("/tmp/test");
 for (String name: dirq) {
     if (! dirq.lock(name)) {
         continue;
     }
     System.out.println("# reading element " + name);
     String data = dirq.get(name);
     // one could use dirq.unlock(name) to only browse the queue...
     dirq.remove(name);
 }
 
 

Description

This module is very similar to the normal directory queue, but uses a different way to store data in the filesystem, using less directories. Its API is almost identical.
Compared to normal directory queue, this module:
  • is simpler
  • is faster
  • uses less space on disk
  • can be given existing files to store
  • does not support schemas
  • can only store and retrieve byte strings
  • is not compatible (at filesystem level) with the normal directory queue

Directory Structure

The toplevel directory contains intermediate directories that contain the stored elements, each of them in a file.
The names of the intermediate directories are time based: the element insertion time is used to create a 8-digits long hexadecimal number. The granularity (see the constructor) is used to limit the number of new directories. For instance, with a granularity of 60 (the default), new directories will be created at most once per minute.
Since there is usually a filesystem limit in the number of directories a directory can hold, there is a trade-off to be made. If you want to support many added elements per second, you should use a low granularity to keep small directories. However, in this case, you will create many directories and this will limit the total number of elements you can store.
The elements themselves are stored in files (one per element) with a 14-digits long hexadecimal name SSSSSSSSMMMMMR where:
  • SSSSSSSS represents the number of seconds since the Epoch
  • MMMMM represents the microsecond part of the time since the Epoch
  • R is a random hexadecimal digit used to reduce name collisions

A temporary element (being added to the queue) will have a .tmp suffix.
A locked element will have a hard link with the same name and the .lck suffix.
Please refer to Queue for general information about directory queues.
  • Field Details

  • Constructor Details

    • QueueSimple

      public QueueSimple(String path) throws IOException
      Constructor creating a simple directory queue from the given path.
      Parameters:
      path - path of the directory queue
      Throws:
      IOException - if any file operation fails
    • QueueSimple

      public QueueSimple(String path, int numask) throws IOException
      Constructor creating a simple directory queue from the given path and umask.
      Parameters:
      path - path of the directory queue
      numask - numerical umask of the directory queue
      Throws:
      IOException - if any file operation fails
  • Method Details

    • getQueuePath

      public String getQueuePath()
      Description copied from interface: Queue
      Return the path of the queue.
      Specified by:
      getQueuePath in interface Queue
      Returns:
      queue path
    • getId

      public String getId()
      Description copied from interface: Queue
      Return a unique identifier for the queue.
      Specified by:
      getId in interface Queue
      Returns:
      unique queue identifier
    • add

      public String add(String data) throws IOException
      Description copied from interface: Queue
      Add String data to the queue.
      Specified by:
      add in interface Queue
      Parameters:
      data - data to be added
      Returns:
      element name (as directory_name/file_name)
      Throws:
      IOException - if any file operation fails
    • add

      public String add(byte[] data) throws IOException
      Description copied from interface: Queue
      Add byte array data to the queue.
      Specified by:
      add in interface Queue
      Parameters:
      data - data to be added
      Returns:
      element name (as directory_name/file_name)
      Throws:
      IOException - if any file operation fails
    • addPath

      public String addPath(String path) throws IOException
      Description copied from interface: Queue
      Add the given file (identified by its path) to the queue and return the corresponding element name, the file must be on the same filesystem and will be moved to the queue.
      Specified by:
      addPath in interface Queue
      Parameters:
      path - path of the file to be added
      Returns:
      element name (as directory_name/file_name)
      Throws:
      IOException - if any file operation fails
    • get

      public String get(String name) throws IOException
      Description copied from interface: Queue
      Get the given locked element as String data.
      Specified by:
      get in interface Queue
      Parameters:
      name - name of the element to be retrieved
      Returns:
      data associated with the given element
      Throws:
      IOException - if any file operation fails
    • getAsByteArray

      public byte[] getAsByteArray(String name) throws IOException
      Description copied from interface: Queue
      Get the given locked element as byte array data.
      Specified by:
      getAsByteArray in interface Queue
      Parameters:
      name - name of the element to be retrieved
      Returns:
      data associated with the given element
      Throws:
      IOException - if any file operation fails
    • getPath

      public String getPath(String name)
      Description copied from interface: Queue
      Get the path of the given locked element.
      This pathFile can be read but not removed, you must use the remove() method for this purpose.
      Specified by:
      getPath in interface Queue
      Parameters:
      name - name of the element
      Returns:
      path of the element
    • lock

      public boolean lock(String name) throws IOException
      Description copied from interface: Queue
      Lock an element in permissive mode.
      Specified by:
      lock in interface Queue
      Parameters:
      name - name of the element to be locked
      Returns:
      true on success, false if the element could not be locked
      Throws:
      IOException - if any file operation fails
    • lock

      public boolean lock(String name, boolean permissive) throws IOException
      Description copied from interface: Queue
      Lock an element.
      Specified by:
      lock in interface Queue
      Parameters:
      name - name of the element to be locked
      permissive - work in permissive mode
      Returns:
      true on success, false if the element could not be locked
      Throws:
      IOException - if any file operation fails
    • unlock

      public boolean unlock(String name) throws IOException
      Description copied from interface: Queue
      Unlock an element in non-permissive mode.
      Specified by:
      unlock in interface Queue
      Parameters:
      name - name of the element to be unlocked
      Returns:
      true on success, false if the element could not be unlocked
      Throws:
      IOException - if any file operation fails
    • unlock

      public boolean unlock(String name, boolean permissive) throws IOException
      Description copied from interface: Queue
      Unlock an element.
      Specified by:
      unlock in interface Queue
      Parameters:
      name - name of the element to be unlocked
      permissive - work in permissive mode
      Returns:
      true on success, false if the element could not be unlocked
      Throws:
      IOException - if any file operation fails
    • remove

      public void remove(String name) throws IOException
      Description copied from interface: Queue
      Remove a locked element from the queue.
      Specified by:
      remove in interface Queue
      Parameters:
      name - name of the element to be removed
      Throws:
      IOException - if any file operation fails
    • count

      public int count()
      Description copied from interface: Queue
      Return the number of elements in the queue.
      Locked elements are counted but temporary elements are not.
      Specified by:
      count in interface Queue
      Returns:
      number of elements in the queue
    • purge

      public void purge() throws IOException
      Description copied from interface: Queue
      Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.
      It uses default value for maxTemp and maxLock
      Specified by:
      purge in interface Queue
      Throws:
      IOException - if any file operation fails
    • purge

      public void purge(int maxLock) throws IOException
      Description copied from interface: Queue
      Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.
      Specified by:
      purge in interface Queue
      Parameters:
      maxLock - maximum time for a locked element (in seconds); if set to 0, locked elements will not be unlocked; if set to null, the object's default value will be used
      Throws:
      IOException - if any file operation fails
    • purge

      public void purge(int maxLock, int maxTemp) throws IOException
      Description copied from interface: Queue
      Purge the queue by removing unused intermediate directories, removing too old temporary elements and unlocking too old locked elements (aka staled locks); note: this can take a long time on queues with many elements.
      Specified by:
      purge in interface Queue
      Parameters:
      maxLock - maximum time for a locked element (in seconds); if set to 0, locked elements will not be unlocked; if set to null, the object's default value will be used
      maxTemp - maximum time for a temporary element (in seconds); if set to 0, temporary elements will not be removed if set to null, the object's default value will be used
      Throws:
      IOException - if any file operation fails
    • getGranularity

      public int getGranularity()
      Get the granularity.
      Returns:
      granularity (in seconds)
    • setGranularity

      public QueueSimple setGranularity(int value)
      Set the granularity.
      Parameters:
      value - granularity to be set (in seconds)
      Returns:
      the object itself
    • getUmask

      public int getUmask()
      Get the umask.
      Returns:
      numerical umask
    • setUmask

      public QueueSimple setUmask(int value)
      Set the umask.
      Parameters:
      value - umask to be set (numerical)
      Returns:
      the object itself
    • getMaxLock

      public int getMaxLock()
      Get the default maxLock for purge().
      Returns:
      maximum lock time (in seconds)
    • setMaxLock

      public QueueSimple setMaxLock(int value)
      Set the default maxLock for purge().
      Parameters:
      value - maximum lock time (in seconds)
      Returns:
      the object itself
    • getMaxTemp

      public int getMaxTemp()
      Get the default maxTemp for purge().
      Returns:
      maximum temporary time (in seconds)
    • setMaxTemp

      public QueueSimple setMaxTemp(int value)
      Set the default maxTemp for purge().
      Parameters:
      value - maximum temporary time (in seconds)
      Returns:
      the object itself
    • getRndHex

      public int getRndHex()
      Get the random hexadecimal digit.
      Returns:
      numerical hexadecimal digit
    • setRndHex

      public QueueSimple setRndHex(int value)
      Set the random hexadecimal digit.
      Parameters:
      value - hexadecimal digit to be set (numerical)
      Returns:
      the object itself
    • directoryPerms

      private static Set<PosixFilePermission> directoryPerms(int numask)
    • filePerms

      private static Set<PosixFilePermission> filePerms(int numask)
    • directoryName

      private String directoryName()
    • elementName

      private static String elementName(int rnd)
    • addPathHelper

      private String addPathHelper(Path tmp, String dir) throws IOException
      Throws:
      IOException
    • createFile

      private Path createFile(String path) throws IOException
      Throws:
      IOException
    • getNewPath

      private Path getNewPath(String dir) throws IOException
      Throws:
      IOException
    • addDataHelper

      private Path addDataHelper(String dir, byte[] data) throws IOException
      Throws:
      IOException
    • addDataHelper

      private Path addDataHelper(String dir, String data) throws IOException
      Throws:
      IOException
    • ensureDirectory

      private void ensureDirectory(Path path) throws IOException
      Throws:
      IOException
    • touchFile

      private boolean touchFile(File file)
    • iterator

      public Iterator<String> iterator()
      Iterator for the simple directory queue.
      Specified by:
      iterator in interface Iterable<String>