Class HadoopConsumer

java.lang.Object
io.github.ajevans.dbcode.dbconsumers.HadoopConsumer
All Implemented Interfaces:
IDataConsumer

public class HadoopConsumer
extends Object
implements IDataConsumer
Class for writing out files to a Hadoop Distributed File System (HDFS).

Works with an IDataset (e.g. database) containing one or more IRecordHolders (e.g. data tables) containing IRecords (e.g. rows) of values. It writes these to a store (in this case a directory) as record stores (files). It will also write out an associated set of metadata files.

IReportingListener objects may be registered with objects of this class to receive suitable progress reporting and messaging. In general, exceptions not dealt with internally are re-thrown for calling objects to deal with. Messages are user-friendly.

If developers have a specific directory and set of filenames they'd like to use, they should first call setStore and setRecordStoreNames. If these are missing when initialise is finally called, initialise will build a directory in the user's space (called after defaultFileDirectory in application.properties) within which will be a directory named after the dataset or DEFAULT and files named after the record holders or DEFAULTx where x is an integer.

Either way, all developers should call initialise with a dataset that at least has the fields and fieldtypes in it. This will build the file structures with headers and set up progress monitoring.

They can then get data written to the files by either calling bulkLoad with a complete dataset object or load with a collection of records.

If you don't want file headers written, calling connectStore will check the directories exist and are ready, but won't set up the files or progress monitoring.

Note that because instance variables will hold a wide variety of information on pervious writes, it is essential that for each new set of files / dataset a new instance of this class is used.

Author:
Andy Evans
Version: 1.0 01 Mar 2021
  • Field Details

    • debug

      private boolean debug
      Debugging flag, set by System variable passed in -Ddebug=true rather than setting here / with accessor.
    • store

      private String store
      Path to directory used for storage.
    • recordStoreNames

      private ArrayList<String> recordStoreNames
      Filenames used.
    • reportingListeners

      private ArrayList<IReportingListener> reportingListeners
      Listeners interested in updates on progress.
    • DEF_DB_DIR

      private final String DEF_DB_DIR
      Default directory to create within store.
      See Also:
      Constant Field Values
    • SANITISE_DIRPATH

      private final int SANITISE_DIRPATH
      Constant for aggresive sanitisation.
      See Also:
      Constant Field Values
    • SANITISE_FILENAME

      private final int SANITISE_FILENAME
      Constant for database object name sanitisation.
      See Also:
      Constant Field Values
    • SANITISE_DATA

      private final int SANITISE_DATA
      Constant for weak sanitisation to remove a few problematic chars.
      See Also:
      Constant Field Values
    • METADATA_TABLE

      private final boolean METADATA_TABLE
      Constant proveLoaded.
      See Also:
      Constant Field Values
    • NORMAL_TABLE

      private final boolean NORMAL_TABLE
      Constant proveLoaded.
      See Also:
      Constant Field Values
    • progress

      private int progress
      Used for progress monitoring.
    • fileSystem

      org.apache.hadoop.fs.FileSystem fileSystem
      HDFS.
    • conf

      org.apache.hadoop.conf.Configuration conf
      Configuration.
  • Constructor Details

    • HadoopConsumer

      public HadoopConsumer()
      Default constructor.
  • Method Details

    • initialise

      public void initialise​(IDataset dataset) throws DBCreationException
      Sets up the data store (directory and files).

      If a store (directory) path and/or record store (file) names haven't been set using the setSource / setRecordStoreNames methods, this method deals with this. It pulls the titles from the dataset passed in for directory name and from the dataset record holders for file names. The default location for the directory is the user's home directory where an containing directory (see class docs) will be constructed first. All names and paths are sanitised.

      It then creates the relevant files with appropriate headers.

      Specified by:
      initialise in interface IDataConsumer
      Parameters:
      dataset - Dataset to store - note that this need not be filled with records as long as it has recordHolders, metadata, and field data.
      Throws:
      DBCreationException - If issues arise.
    • buildFile

      private void buildFile​(IDataset dataset, int index) throws DBCreationException
      Sets up the relevant file.
      Parameters:
      dataset - The dataset containing the file to be built. Record holders can be empty of data at this point, but must contain field information and metadata.
      index - The index of the file to build in recordStoreNames.
      Throws:
      DBCreationException - If issues arise.
    • buildMetadataFile

      private void buildMetadataFile​(IMetadata metadata, String metadataFileName) throws DBCreationException
      Sets up a metadata MapFile for each record holder file.

      MapFiles are, despite their name, a directory of interrelated files, so this is what is produced. Note that the MapFile data will be sorted alphabetically by metadata category, as is usual with these files for fast searches.

      Parameters:
      metadata - The metadata to be written to the metadata file.
      metadataFileName - The name of the metadata file.
      Throws:
      DBCreationException - If issues arise.
    • bulkLoad

      public void bulkLoad​(IDataset dataset) throws DBCreationException
      This method bulk-loads a whole dataset into one or more files.

      This method calls disconnectStore when done to clean up.

      Specified by:
      bulkLoad in interface IDataConsumer
      Parameters:
      dataset - The dataset to load.
      Throws:
      DBCreationException - If there's an issue.
    • load

      public void load​(ArrayList<IRecord> records) throws DBCreationException
      Adds multiple records to current store.

      This little at the moment than call storeRecords with the records, but we keep it as a separate method as it acts as a gateway for data in the supplier/consumer push model and we may want to add functionality to the gateway in the future.

      Specified by:
      load in interface IDataConsumer
      Parameters:
      records - ArrayList of records.
      Throws:
      DBCreationException - Only if there is an issue.
    • storeRecords

      private void storeRecords​(ArrayList<IRecord> records) throws DBCreationException
      Add a set of records to the relevant record store (file). These are standard Hadoop flat files. Dates are US MM/dd/yyyy format.
      Parameters:
      records - An ArrayList of the IRecords to add.
      Throws:
      DBCreationException - Only if there is an issue.
    • setStore

      public void setStore​(String store) throws DBCreationException
      Sets the location for the store / directory to write to.

      If not set the parameters can be set from the dataset via the initialise method.

      Sanitises input with SANITISE_DIRPATH.

      Specified by:
      setStore in interface IDataConsumer
      Parameters:
      store - File path of directory to write to.
      Throws:
      DBCreationException - Not used in this implementation.
      See Also:
      initialise(IDataset dataset)
    • getStore

      public String getStore()
      Gets the location for the store / database.

      For testing.

      Returns:
      String Store location.
    • setRecordStoreNames

      public void setRecordStoreNames​(ArrayList<String> recordStoreNames) throws DBCreationException
      Sets the record store (file) names for the store (directory).

      If not set the parameters can be set from the dataset via the initialise method.

      Sanitises input with SANITISE_FILENAME.

      If set here directly (for example, the user enters the name/s) it's the implementers responsibility to make sure the number of record stores named and number supplied in the dataset to initialise match up or initialise will throw an exception.

      Specified by:
      setRecordStoreNames in interface IDataConsumer
      Parameters:
      recordStoreNames - Names of record stores (files) to make.
      Throws:
      DBCreationException - Not used in this implementation.
      See Also:
      initialise(IDataset dataset)
    • getRecordStoreNames

      public ArrayList<String> getRecordStoreNames()
      Gets the record store names.

      For testing.

      Returns:
      ArrayList Record store names.
    • addReportingListener

      public void addReportingListener​(IReportingListener reportingListener)
      For objects wishing to get progress reports on data reading.
      Specified by:
      addReportingListener in interface IDataConsumer
      Parameters:
      reportingListener - Object wishing to gain reports.
      See Also:
      IReportingListener
    • findTitle

      protected String findTitle​(IMetadata metadata)
      Finds the title category in an unknown IMetadata object.

      If it doesn't exist, defaults to "DEFAULT".

      Parameters:
      metadata - Metadata object of unknown schema.
      Returns:
      String Title, if found.
    • connectStore

      public void connectStore() throws DBCreationException
      Creates data directory and directory it is in if missing, and calls HDFS.
      Specified by:
      connectStore in interface IDataConsumer
      Throws:
      DBCreationException - If there's an issue.
    • disconnectStore

      public void disconnectStore() throws DBCreationException
      Disconnects from current store / database and garbage collects.
      Specified by:
      disconnectStore in interface IDataConsumer
      Throws:
      DBCreationException - Not thrown in this implementation.
    • sanitise

      protected String sanitise​(String string, int level)
      Sanitise Strings.

      Directory path sanitises removes characters that are illegal in windows filenames, but leaves slashes etc. If these are likely, remove these using filename sanitisation on component parts.
      Filename sanitisation removes all illegal windows filename characters and slashes for POSIX systems. It also adds "Data-" to illegal windows filenames
      and removes trailing periods and spaces.
      Data sanitisation just removes commas in preparation for writing as CSV.

      Parameters:
      string - String to sanitise.
      level - One of SANITISE_DIRPATH, SANITISE_FILENAME, SANITISE_DATA.
      Returns:
      String Sanitised String.
    • gapFillLocalisedGUIText

      private void gapFillLocalisedGUIText()
      Sets the defaults for warnings and exceptions in English if an appropriate language properties file is missing.
    • reportProgress

      public void reportProgress​(int progress, IDataset dataset)
      Reports progress to reportingListeners.

      Reports if progress is a multiple of total records / 100. If progress is zero or less, reports progress as 0 of 1.

      Parameters:
      progress - Progress in record processing.
      dataset - Dataset to extract estimate of processing to be done.
    • reportProgress

      public void reportProgress​(int progress, int total)
      Reports progress to reportingListeners.

      Reports for an arbitrary progress and total worked towards.

      Parameters:
      progress - Value indicating progress through work total.
      total - Value indicating total work to do.
    • reportMessage

      public void reportMessage​(String message)
      Reports message to reportingListeners.
      Parameters:
      message - Message to reporting listeners.
    • load

      @Deprecated public void load​(IDataset dataset) throws DBCreationException
      Deprecated.
      Only here to satisfy deprecated interface demands conditional on other classes. Redirects to bulkLoad.
      Adds a dataset to the relevant file in the store directory.

      Specified by:
      load in interface IDataConsumer
      Parameters:
      dataset - Dataset to load.
      Throws:
      DBCreationException - Only if there is an issue.
      See Also:
      IDataConsumer.load(ArrayList<IRecord> records)
    • proveLoaded

      @Deprecated private void proveLoaded​(String tableName, boolean metadataTable) throws DBCreationException
      Deprecated.
      Not needed for this class, but a convenient add-in for proving it works without the need for unit testing or database interrogation software.
      Messages out to ReportingListeners the first and last entries in the named table.

      Will disconnect and connect to current store to prove persistence.

      Parameters:
      tableName - Table to use.
      metadataTable - One of METADATA_TABLE or NORMAL_TABLE.
      Throws:
      DBCreationException - If an issue with connecting.