Class HadoopConsumer
- All Implemented Interfaces:
IDataConsumer
public class HadoopConsumer extends Object implements IDataConsumer
Works with an IDataset
(e.g. database) containing one or more
IRecordHolders
(e.g. data tables) containing
IRecords
(e.g. rows) of values. It writes these to a
store (in this case a directory) as record stores (files). It will also
write out an associated set of metadata files.
IReportingListener
objects may be registered with objects of
this class to receive suitable progress reporting and messaging. In general,
exceptions not dealt with internally are re-thrown for calling objects to
deal with. Messages are user-friendly.
If developers have a specific directory and set of filenames
they'd like to use, they should first call setStore
and setRecordStoreNames
. If these are missing when
initialise
is finally called, initialise
will build a directory in the user's space (called after
defaultFileDirectory
in application.properties
)
within which will be a directory named after the dataset or
DEFAULT
and files named after the record holders or
DEFAULTx
where x
is an integer.
Either way, all developers should call initialise
with
a dataset that at least has the fields and fieldtypes in it. This will
build the file structures with headers and set up progress monitoring.
They can then get data written to the files by either calling
bulkLoad
with a complete dataset object or
load
with a collection of records.
If you don't want file headers written, calling connectStore
will
check the directories exist and are ready, but won't set up the files or
progress monitoring.
Note that because instance variables will hold a wide variety of information on pervious writes, it is essential that for each new set of files / dataset a new instance of this class is used.
- Author:
- Andy Evans
- Version: 1.0 01 Mar 2021
-
Field Summary
Fields Modifier and Type Field Description (package private) org.apache.hadoop.conf.Configuration
conf
Configuration.private boolean
debug
Debugging flag, set by System variable passed in-Ddebug=true
rather than setting here / with accessor.private String
DEF_DB_DIR
Default directory to create within store.(package private) org.apache.hadoop.fs.FileSystem
fileSystem
HDFS.private boolean
METADATA_TABLE
Constant proveLoaded.private boolean
NORMAL_TABLE
Constant proveLoaded.private int
progress
Used for progress monitoring.private ArrayList<String>
recordStoreNames
Filenames used.private ArrayList<IReportingListener>
reportingListeners
Listeners interested in updates on progress.private int
SANITISE_DATA
Constant for weak sanitisation to remove a few problematic chars.private int
SANITISE_DIRPATH
Constant for aggresive sanitisation.private int
SANITISE_FILENAME
Constant for database object name sanitisation.private String
store
Path to directory used for storage. -
Constructor Summary
Constructors Constructor Description HadoopConsumer()
Default constructor. -
Method Summary
Modifier and Type Method Description void
addReportingListener(IReportingListener reportingListener)
For objects wishing to get progress reports on data reading.private void
buildFile(IDataset dataset, int index)
Sets up the relevant file.private void
buildMetadataFile(IMetadata metadata, String metadataFileName)
Sets up a metadata MapFile for each record holder file.void
bulkLoad(IDataset dataset)
This method bulk-loads a whole dataset into one or more files.void
connectStore()
Creates data directory and directory it is in if missing, and calls HDFS.void
disconnectStore()
Disconnects from current store / database and garbage collects.protected String
findTitle(IMetadata metadata)
Finds the title category in an unknown IMetadata object.private void
gapFillLocalisedGUIText()
Sets the defaults for warnings and exceptions in English if an appropriate language properties file is missing.ArrayList<String>
getRecordStoreNames()
Gets the record store names.String
getStore()
Gets the location for the store / database.void
initialise(IDataset dataset)
Sets up the data store (directory and files).void
load(IDataset dataset)
Deprecated.Only here to satisfy deprecated interface demands conditional on other classes.void
load(ArrayList<IRecord> records)
Adds multiple records to current store.private void
proveLoaded(String tableName, boolean metadataTable)
Deprecated.Not needed for this class, but a convenient add-in for proving it works without the need for unit testing or database interrogation software.void
reportMessage(String message)
Reports message to reportingListeners.void
reportProgress(int progress, int total)
Reports progress to reportingListeners.void
reportProgress(int progress, IDataset dataset)
Reports progress to reportingListeners.protected String
sanitise(String string, int level)
Sanitise Strings.void
setRecordStoreNames(ArrayList<String> recordStoreNames)
Sets the record store (file) names for the store (directory).void
setStore(String store)
Sets the location for the store / directory to write to.private void
storeRecords(ArrayList<IRecord> records)
Add a set of records to the relevant record store (file).
-
Field Details
-
debug
private boolean debugDebugging flag, set by System variable passed in-Ddebug=true
rather than setting here / with accessor. -
store
Path to directory used for storage. -
recordStoreNames
Filenames used. -
reportingListeners
Listeners interested in updates on progress. -
DEF_DB_DIR
Default directory to create within store.- See Also:
- Constant Field Values
-
SANITISE_DIRPATH
private final int SANITISE_DIRPATHConstant for aggresive sanitisation.- See Also:
- Constant Field Values
-
SANITISE_FILENAME
private final int SANITISE_FILENAMEConstant for database object name sanitisation.- See Also:
- Constant Field Values
-
SANITISE_DATA
private final int SANITISE_DATAConstant for weak sanitisation to remove a few problematic chars.- See Also:
- Constant Field Values
-
METADATA_TABLE
private final boolean METADATA_TABLEConstant proveLoaded.- See Also:
- Constant Field Values
-
NORMAL_TABLE
private final boolean NORMAL_TABLEConstant proveLoaded.- See Also:
- Constant Field Values
-
progress
private int progressUsed for progress monitoring. -
fileSystem
org.apache.hadoop.fs.FileSystem fileSystemHDFS. -
conf
org.apache.hadoop.conf.Configuration confConfiguration.
-
-
Constructor Details
-
HadoopConsumer
public HadoopConsumer()Default constructor.
-
-
Method Details
-
initialise
Sets up the data store (directory and files).If a store (directory) path and/or record store (file) names haven't been set using the
setSource
/setRecordStoreNames
methods, this method deals with this. It pulls the titles from the dataset passed in for directory name and from the dataset record holders for file names. The default location for the directory is the user's home directory where an containing directory (see class docs) will be constructed first. All names and paths are sanitised.It then creates the relevant files with appropriate headers.
- Specified by:
initialise
in interfaceIDataConsumer
- Parameters:
dataset
- Dataset to store - note that this need not be filled with records as long as it has recordHolders, metadata, and field data.- Throws:
DBCreationException
- If issues arise.
-
buildFile
Sets up the relevant file.- Parameters:
dataset
- The dataset containing the file to be built. Record holders can be empty of data at this point, but must contain field information and metadata.index
- The index of the file to build inrecordStoreNames
.- Throws:
DBCreationException
- If issues arise.
-
buildMetadataFile
private void buildMetadataFile(IMetadata metadata, String metadataFileName) throws DBCreationExceptionSets up a metadata MapFile for each record holder file.MapFiles are, despite their name, a directory of interrelated files, so this is what is produced. Note that the MapFile data will be sorted alphabetically by metadata category, as is usual with these files for fast searches.
- Parameters:
metadata
- The metadata to be written to the metadata file.metadataFileName
- The name of the metadata file.- Throws:
DBCreationException
- If issues arise.
-
bulkLoad
This method bulk-loads a whole dataset into one or more files.This method calls
disconnectStore
when done to clean up.- Specified by:
bulkLoad
in interfaceIDataConsumer
- Parameters:
dataset
- The dataset to load.- Throws:
DBCreationException
- If there's an issue.
-
load
Adds multiple records to current store.This little at the moment than call
storeRecords
with the records, but we keep it as a separate method as it acts as a gateway for data in the supplier/consumer push model and we may want to add functionality to the gateway in the future.- Specified by:
load
in interfaceIDataConsumer
- Parameters:
records
- ArrayList of records.- Throws:
DBCreationException
- Only if there is an issue.
-
storeRecords
Add a set of records to the relevant record store (file). These are standard Hadoop flat files. Dates are US MM/dd/yyyy format.- Parameters:
records
- An ArrayList of the IRecords to add.- Throws:
DBCreationException
- Only if there is an issue.
-
setStore
Sets the location for the store / directory to write to.If not set the parameters can be set from the dataset via the
initialise
method.Sanitises input with SANITISE_DIRPATH.
- Specified by:
setStore
in interfaceIDataConsumer
- Parameters:
store
- File path of directory to write to.- Throws:
DBCreationException
- Not used in this implementation.- See Also:
initialise(IDataset dataset)
-
getStore
Gets the location for the store / database.For testing.
- Returns:
- String Store location.
-
setRecordStoreNames
Sets the record store (file) names for the store (directory).If not set the parameters can be set from the dataset via the
initialise
method.Sanitises input with SANITISE_FILENAME.
If set here directly (for example, the user enters the name/s) it's the implementers responsibility to make sure the number of record stores named and number supplied in the dataset to
initialise
match up orinitialise
will throw an exception.- Specified by:
setRecordStoreNames
in interfaceIDataConsumer
- Parameters:
recordStoreNames
- Names of record stores (files) to make.- Throws:
DBCreationException
- Not used in this implementation.- See Also:
initialise(IDataset dataset)
-
getRecordStoreNames
Gets the record store names.For testing.
- Returns:
- ArrayList Record store names.
-
addReportingListener
For objects wishing to get progress reports on data reading.- Specified by:
addReportingListener
in interfaceIDataConsumer
- Parameters:
reportingListener
- Object wishing to gain reports.- See Also:
IReportingListener
-
findTitle
Finds the title category in an unknown IMetadata object.If it doesn't exist, defaults to "DEFAULT".
- Parameters:
metadata
- Metadata object of unknown schema.- Returns:
- String Title, if found.
-
connectStore
Creates data directory and directory it is in if missing, and calls HDFS.- Specified by:
connectStore
in interfaceIDataConsumer
- Throws:
DBCreationException
- If there's an issue.
-
disconnectStore
Disconnects from current store / database and garbage collects.- Specified by:
disconnectStore
in interfaceIDataConsumer
- Throws:
DBCreationException
- Not thrown in this implementation.
-
sanitise
Sanitise Strings.Directory path sanitises removes characters that are illegal in windows filenames, but leaves slashes etc. If these are likely, remove these using filename sanitisation on component parts.
Filename sanitisation removes all illegal windows filename characters and slashes for POSIX systems. It also adds"Data-"
to illegal windows filenames
and removes trailing periods and spaces.
Data sanitisation just removes commas in preparation for writing as CSV.- Parameters:
string
- String to sanitise.level
- One of SANITISE_DIRPATH, SANITISE_FILENAME, SANITISE_DATA.- Returns:
- String Sanitised String.
-
gapFillLocalisedGUIText
private void gapFillLocalisedGUIText()Sets the defaults for warnings and exceptions in English if an appropriate language properties file is missing. -
reportProgress
Reports progress to reportingListeners.Reports if progress is a multiple of total records / 100. If progress is zero or less, reports progress as 0 of 1.
- Parameters:
progress
- Progress in record processing.dataset
- Dataset to extract estimate of processing to be done.
-
reportProgress
public void reportProgress(int progress, int total)Reports progress to reportingListeners.Reports for an arbitrary progress and total worked towards.
- Parameters:
progress
- Value indicating progress through work total.total
- Value indicating total work to do.
-
reportMessage
Reports message to reportingListeners.- Parameters:
message
- Message to reporting listeners.
-
load
Deprecated.Only here to satisfy deprecated interface demands conditional on other classes. Redirects to bulkLoad.Adds a dataset to the relevant file in the store directory.- Specified by:
load
in interfaceIDataConsumer
- Parameters:
dataset
- Dataset to load.- Throws:
DBCreationException
- Only if there is an issue.- See Also:
IDataConsumer.load(ArrayList<IRecord> records)
-
proveLoaded
@Deprecated private void proveLoaded(String tableName, boolean metadataTable) throws DBCreationExceptionDeprecated.Not needed for this class, but a convenient add-in for proving it works without the need for unit testing or database interrogation software.Messages out to ReportingListeners the first and last entries in the named table.Will disconnect and connect to current store to prove persistence.
- Parameters:
tableName
- Table to use.metadataTable
- One of METADATA_TABLE or NORMAL_TABLE.- Throws:
DBCreationException
- If an issue with connecting.
-