Class HadoopConsumer
- All Implemented Interfaces:
IDataConsumer
public class HadoopConsumer extends Object implements IDataConsumer
Works with an IDataset (e.g. database) containing one or more
IRecordHolders (e.g. data tables) containing
IRecords (e.g. rows) of values. It writes these to a
store (in this case a directory) as record stores (files). It will also
write out an associated set of metadata files.
IReportingListener objects may be registered with objects of
this class to receive suitable progress reporting and messaging. In general,
exceptions not dealt with internally are re-thrown for calling objects to
deal with. Messages are user-friendly.
If developers have a specific directory and set of filenames
they'd like to use, they should first call setStore
and setRecordStoreNames. If these are missing when
initialise is finally called, initialise
will build a directory in the user's space (called after
defaultFileDirectory in application.properties)
within which will be a directory named after the dataset or
DEFAULT and files named after the record holders or
DEFAULTx where x is an integer.
Either way, all developers should call initialise with
a dataset that at least has the fields and fieldtypes in it. This will
build the file structures with headers and set up progress monitoring.
They can then get data written to the files by either calling
bulkLoad with a complete dataset object or
load with a collection of records.
If you don't want file headers written, calling connectStore will
check the directories exist and are ready, but won't set up the files or
progress monitoring.
Note that because instance variables will hold a wide variety of information on pervious writes, it is essential that for each new set of files / dataset a new instance of this class is used.
- Author:
- Andy Evans
- Version: 1.0 01 Mar 2021
-
Field Summary
Fields Modifier and Type Field Description (package private) org.apache.hadoop.conf.ConfigurationconfConfiguration.private booleandebugDebugging flag, set by System variable passed in-Ddebug=truerather than setting here / with accessor.private StringDEF_DB_DIRDefault directory to create within store.(package private) org.apache.hadoop.fs.FileSystemfileSystemHDFS.private booleanMETADATA_TABLEConstant proveLoaded.private booleanNORMAL_TABLEConstant proveLoaded.private intprogressUsed for progress monitoring.private ArrayList<String>recordStoreNamesFilenames used.private ArrayList<IReportingListener>reportingListenersListeners interested in updates on progress.private intSANITISE_DATAConstant for weak sanitisation to remove a few problematic chars.private intSANITISE_DIRPATHConstant for aggresive sanitisation.private intSANITISE_FILENAMEConstant for database object name sanitisation.private StringstorePath to directory used for storage. -
Constructor Summary
Constructors Constructor Description HadoopConsumer()Default constructor. -
Method Summary
Modifier and Type Method Description voidaddReportingListener(IReportingListener reportingListener)For objects wishing to get progress reports on data reading.private voidbuildFile(IDataset dataset, int index)Sets up the relevant file.private voidbuildMetadataFile(IMetadata metadata, String metadataFileName)Sets up a metadata MapFile for each record holder file.voidbulkLoad(IDataset dataset)This method bulk-loads a whole dataset into one or more files.voidconnectStore()Creates data directory and directory it is in if missing, and calls HDFS.voiddisconnectStore()Disconnects from current store / database and garbage collects.protected StringfindTitle(IMetadata metadata)Finds the title category in an unknown IMetadata object.private voidgapFillLocalisedGUIText()Sets the defaults for warnings and exceptions in English if an appropriate language properties file is missing.ArrayList<String>getRecordStoreNames()Gets the record store names.StringgetStore()Gets the location for the store / database.voidinitialise(IDataset dataset)Sets up the data store (directory and files).voidload(IDataset dataset)Deprecated.Only here to satisfy deprecated interface demands conditional on other classes.voidload(ArrayList<IRecord> records)Adds multiple records to current store.private voidproveLoaded(String tableName, boolean metadataTable)Deprecated.Not needed for this class, but a convenient add-in for proving it works without the need for unit testing or database interrogation software.voidreportMessage(String message)Reports message to reportingListeners.voidreportProgress(int progress, int total)Reports progress to reportingListeners.voidreportProgress(int progress, IDataset dataset)Reports progress to reportingListeners.protected Stringsanitise(String string, int level)Sanitise Strings.voidsetRecordStoreNames(ArrayList<String> recordStoreNames)Sets the record store (file) names for the store (directory).voidsetStore(String store)Sets the location for the store / directory to write to.private voidstoreRecords(ArrayList<IRecord> records)Add a set of records to the relevant record store (file).
-
Field Details
-
debug
private boolean debugDebugging flag, set by System variable passed in-Ddebug=truerather than setting here / with accessor. -
store
Path to directory used for storage. -
recordStoreNames
Filenames used. -
reportingListeners
Listeners interested in updates on progress. -
DEF_DB_DIR
Default directory to create within store.- See Also:
- Constant Field Values
-
SANITISE_DIRPATH
private final int SANITISE_DIRPATHConstant for aggresive sanitisation.- See Also:
- Constant Field Values
-
SANITISE_FILENAME
private final int SANITISE_FILENAMEConstant for database object name sanitisation.- See Also:
- Constant Field Values
-
SANITISE_DATA
private final int SANITISE_DATAConstant for weak sanitisation to remove a few problematic chars.- See Also:
- Constant Field Values
-
METADATA_TABLE
private final boolean METADATA_TABLEConstant proveLoaded.- See Also:
- Constant Field Values
-
NORMAL_TABLE
private final boolean NORMAL_TABLEConstant proveLoaded.- See Also:
- Constant Field Values
-
progress
private int progressUsed for progress monitoring. -
fileSystem
org.apache.hadoop.fs.FileSystem fileSystemHDFS. -
conf
org.apache.hadoop.conf.Configuration confConfiguration.
-
-
Constructor Details
-
HadoopConsumer
public HadoopConsumer()Default constructor.
-
-
Method Details
-
initialise
Sets up the data store (directory and files).If a store (directory) path and/or record store (file) names haven't been set using the
setSource/setRecordStoreNamesmethods, this method deals with this. It pulls the titles from the dataset passed in for directory name and from the dataset record holders for file names. The default location for the directory is the user's home directory where an containing directory (see class docs) will be constructed first. All names and paths are sanitised.It then creates the relevant files with appropriate headers.
- Specified by:
initialisein interfaceIDataConsumer- Parameters:
dataset- Dataset to store - note that this need not be filled with records as long as it has recordHolders, metadata, and field data.- Throws:
DBCreationException- If issues arise.
-
buildFile
Sets up the relevant file.- Parameters:
dataset- The dataset containing the file to be built. Record holders can be empty of data at this point, but must contain field information and metadata.index- The index of the file to build inrecordStoreNames.- Throws:
DBCreationException- If issues arise.
-
buildMetadataFile
private void buildMetadataFile(IMetadata metadata, String metadataFileName) throws DBCreationExceptionSets up a metadata MapFile for each record holder file.MapFiles are, despite their name, a directory of interrelated files, so this is what is produced. Note that the MapFile data will be sorted alphabetically by metadata category, as is usual with these files for fast searches.
- Parameters:
metadata- The metadata to be written to the metadata file.metadataFileName- The name of the metadata file.- Throws:
DBCreationException- If issues arise.
-
bulkLoad
This method bulk-loads a whole dataset into one or more files.This method calls
disconnectStorewhen done to clean up.- Specified by:
bulkLoadin interfaceIDataConsumer- Parameters:
dataset- The dataset to load.- Throws:
DBCreationException- If there's an issue.
-
load
Adds multiple records to current store.This little at the moment than call
storeRecordswith the records, but we keep it as a separate method as it acts as a gateway for data in the supplier/consumer push model and we may want to add functionality to the gateway in the future.- Specified by:
loadin interfaceIDataConsumer- Parameters:
records- ArrayList of records.- Throws:
DBCreationException- Only if there is an issue.
-
storeRecords
Add a set of records to the relevant record store (file). These are standard Hadoop flat files. Dates are US MM/dd/yyyy format.- Parameters:
records- An ArrayList of the IRecords to add.- Throws:
DBCreationException- Only if there is an issue.
-
setStore
Sets the location for the store / directory to write to.If not set the parameters can be set from the dataset via the
initialisemethod.Sanitises input with SANITISE_DIRPATH.
- Specified by:
setStorein interfaceIDataConsumer- Parameters:
store- File path of directory to write to.- Throws:
DBCreationException- Not used in this implementation.- See Also:
initialise(IDataset dataset)
-
getStore
Gets the location for the store / database.For testing.
- Returns:
- String Store location.
-
setRecordStoreNames
Sets the record store (file) names for the store (directory).If not set the parameters can be set from the dataset via the
initialisemethod.Sanitises input with SANITISE_FILENAME.
If set here directly (for example, the user enters the name/s) it's the implementers responsibility to make sure the number of record stores named and number supplied in the dataset to
initialisematch up orinitialisewill throw an exception.- Specified by:
setRecordStoreNamesin interfaceIDataConsumer- Parameters:
recordStoreNames- Names of record stores (files) to make.- Throws:
DBCreationException- Not used in this implementation.- See Also:
initialise(IDataset dataset)
-
getRecordStoreNames
Gets the record store names.For testing.
- Returns:
- ArrayList Record store names.
-
addReportingListener
For objects wishing to get progress reports on data reading.- Specified by:
addReportingListenerin interfaceIDataConsumer- Parameters:
reportingListener- Object wishing to gain reports.- See Also:
IReportingListener
-
findTitle
Finds the title category in an unknown IMetadata object.If it doesn't exist, defaults to "DEFAULT".
- Parameters:
metadata- Metadata object of unknown schema.- Returns:
- String Title, if found.
-
connectStore
Creates data directory and directory it is in if missing, and calls HDFS.- Specified by:
connectStorein interfaceIDataConsumer- Throws:
DBCreationException- If there's an issue.
-
disconnectStore
Disconnects from current store / database and garbage collects.- Specified by:
disconnectStorein interfaceIDataConsumer- Throws:
DBCreationException- Not thrown in this implementation.
-
sanitise
Sanitise Strings.Directory path sanitises removes characters that are illegal in windows filenames, but leaves slashes etc. If these are likely, remove these using filename sanitisation on component parts.
Filename sanitisation removes all illegal windows filename characters and slashes for POSIX systems. It also adds"Data-"to illegal windows filenames
and removes trailing periods and spaces.
Data sanitisation just removes commas in preparation for writing as CSV.- Parameters:
string- String to sanitise.level- One of SANITISE_DIRPATH, SANITISE_FILENAME, SANITISE_DATA.- Returns:
- String Sanitised String.
-
gapFillLocalisedGUIText
private void gapFillLocalisedGUIText()Sets the defaults for warnings and exceptions in English if an appropriate language properties file is missing. -
reportProgress
Reports progress to reportingListeners.Reports if progress is a multiple of total records / 100. If progress is zero or less, reports progress as 0 of 1.
- Parameters:
progress- Progress in record processing.dataset- Dataset to extract estimate of processing to be done.
-
reportProgress
public void reportProgress(int progress, int total)Reports progress to reportingListeners.Reports for an arbitrary progress and total worked towards.
- Parameters:
progress- Value indicating progress through work total.total- Value indicating total work to do.
-
reportMessage
Reports message to reportingListeners.- Parameters:
message- Message to reporting listeners.
-
load
Deprecated.Only here to satisfy deprecated interface demands conditional on other classes. Redirects to bulkLoad.Adds a dataset to the relevant file in the store directory.- Specified by:
loadin interfaceIDataConsumer- Parameters:
dataset- Dataset to load.- Throws:
DBCreationException- Only if there is an issue.- See Also:
IDataConsumer.load(ArrayList<IRecord> records)
-
proveLoaded
@Deprecated private void proveLoaded(String tableName, boolean metadataTable) throws DBCreationExceptionDeprecated.Not needed for this class, but a convenient add-in for proving it works without the need for unit testing or database interrogation software.Messages out to ReportingListeners the first and last entries in the named table.Will disconnect and connect to current store to prove persistence.
- Parameters:
tableName- Table to use.metadataTable- One of METADATA_TABLE or NORMAL_TABLE.- Throws:
DBCreationException- If an issue with connecting.
-