MtasDataCollector.java
package mtas.codec.util.collector;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.Map.Entry;
import mtas.codec.util.DataCollector;
/**
* The Class MtasDataCollector.
*
* @param <T1> the generic type
* @param <T2> the generic type
*/
public abstract class MtasDataCollector<T1 extends Number & Comparable<T1>, T2 extends Number & Comparable<T2>>
implements Serializable {
/** The Constant serialVersionUID. */
private static final long serialVersionUID = 1L;
/** The Constant SEGMENT_SORT_ASC. */
public static final String SEGMENT_SORT_ASC = "segment_asc";
/** The Constant SEGMENT_SORT_DESC. */
public static final String SEGMENT_SORT_DESC = "segment_desc";
/** The Constant SEGMENT_BOUNDARY_ASC. */
public static final String SEGMENT_BOUNDARY_ASC = "segment_boundary_asc";
/** The Constant SEGMENT_BOUNDARY_DESC. */
public static final String SEGMENT_BOUNDARY_DESC = "segment_boundary_desc";
/** The Constant SEGMENT_KEY. */
public static final String SEGMENT_KEY = "key";
/** The Constant SEGMENT_NEW. */
public static final String SEGMENT_NEW = "new";
/** The Constant SEGMENT_KEY_OR_NEW. */
public static final String SEGMENT_KEY_OR_NEW = "key_or_new";
/** The Constant SEGMENT_POSSIBLE_KEY. */
public static final String SEGMENT_POSSIBLE_KEY = "possible_key";
/** The size. */
protected int size;
/** The position. */
protected int position;
/** The collector type. */
// properties collector
protected String collectorType;
/** The stats type. */
protected String statsType;
/** The data type. */
protected String dataType;
/** The stats items. */
private SortedSet<String> statsItems;
/** The sort type. */
protected String sortType;
/** The sort direction. */
protected String sortDirection;
/** The start. */
protected Integer start;
/** The number. */
protected Integer number;
/** The error number. */
// error
protected int[] errorNumber;
/** The error list. */
protected HashMap<String, Integer>[] errorList;
/** The key list. */
protected String[] keyList;
/** The source number list. */
protected int[] sourceNumberList;
/** The with total. */
private boolean withTotal;
/** The segment registration. */
public transient String segmentRegistration;
/** The segment key value list. */
protected transient LinkedHashMap<String, Map<String, T1>> segmentKeyValueList;
/** The segment recompute key list. */
public transient Map<String, Set<String>> segmentRecomputeKeyList;
/** The segment keys. */
public transient Set<String> segmentKeys;
/** The segment values boundary. */
protected transient Map<String, T1> segmentValuesBoundary;
/** The segment value boundary. */
protected transient T1 segmentValueBoundary;
/** The segment value top list last. */
protected transient Map<String, T1> segmentValueTopListLast;
/** The segment value top list. */
protected transient ArrayList<T1> segmentValueTopList;
/** The segment name. */
protected transient String segmentName;
/** The segment number. */
protected transient int segmentNumber;
/** The has sub. */
private boolean hasSub;
/** The sub collector types. */
private String[] subCollectorTypes;
/** The sub data types. */
private String[] subDataTypes;
/** The sub stats types. */
private String[] subStatsTypes;
/** The sub stats items. */
private SortedSet<String>[] subStatsItems;
/** The sub sort types. */
private String[] subSortTypes;
/** The sub sort directions. */
private String[] subSortDirections;
/** The sub start. */
private Integer[] subStart;
/** The sub number. */
private Integer[] subNumber;
/** The sub collector list next level. */
protected MtasDataCollector<?, ?>[] subCollectorListNextLevel = null;
/** The sub collector next level. */
protected MtasDataCollector<?, ?> subCollectorNextLevel = null;
/** The new size. */
protected transient int newSize;
/** The new position. */
protected transient int newPosition;
/** The new current position. */
protected transient int newCurrentPosition;
/** The new current existing. */
protected transient boolean newCurrentExisting;
/** The new key list. */
protected transient String[] newKeyList = null;
/** The new source number list. */
protected transient int[] newSourceNumberList = null;
/** The new error number. */
protected transient int[] newErrorNumber;
/** The new error list. */
protected transient HashMap<String, Integer>[] newErrorList;
/** The new known key found in segment. */
public transient Set<String> newKnownKeyFoundInSegment;
/** The new sub collector types. */
private transient String[] newSubCollectorTypes;
/** The new sub data types. */
private transient String[] newSubDataTypes;
/** The new sub stats types. */
private transient String[] newSubStatsTypes;
/** The new sub stats items. */
private transient SortedSet<String>[] newSubStatsItems;
/** The new sub sort types. */
private transient String[] newSubSortTypes;
/** The new sub sort directions. */
private transient String[] newSubSortDirections;
/** The new sub start. */
private transient Integer[] newSubStart;
/** The new sub number. */
private transient Integer[] newSubNumber;
/** The new sub collector list next level. */
// subcollectors next level for adding
protected transient MtasDataCollector<?, ?>[] newSubCollectorListNextLevel = null;
/** The new sub collector next level. */
protected transient MtasDataCollector<?, ?> newSubCollectorNextLevel = null;
/** The closed. */
protected transient boolean closed = false;
/** The result. */
private transient MtasDataCollectorResult<T1, T2> result = null;
/**
* Instantiates a new mtas data collector.
*
* @param collectorType the collector type
* @param dataType the data type
* @param statsType the stats type
* @param statsItems the stats items
* @param sortType the sort type
* @param sortDirection the sort direction
* @param start the start
* @param number the number
* @param segmentRegistration the segment registration
* @param boundary the boundary
* @throws IOException Signals that an I/O exception has occurred.
*/
@SuppressWarnings("unchecked")
protected MtasDataCollector(String collectorType, String dataType,
String statsType, SortedSet<String> statsItems, String sortType,
String sortDirection, Integer start, Integer number,
String segmentRegistration, String boundary) throws IOException {
// set properties
this.closed = false;
this.collectorType = collectorType; // data or list
this.dataType = dataType; // long or double
this.statsType = statsType; // basic, advanced or full
this.statsItems = statsItems; // sum, n, all, ...
this.sortType = sortType;
this.sortDirection = sortDirection;
this.start = start;
this.number = number;
this.segmentRegistration = segmentRegistration;
this.withTotal = false;
if (segmentRegistration != null) {
segmentKeys = new HashSet<>();
segmentKeyValueList = new LinkedHashMap<>();
segmentValuesBoundary = new LinkedHashMap<>();
segmentValueTopListLast = new LinkedHashMap<>();
if (segmentRegistration.equals(SEGMENT_BOUNDARY_ASC)
|| segmentRegistration.equals(SEGMENT_BOUNDARY_DESC)) {
if (boundary != null) {
segmentValueBoundary = stringToBoundary(boundary);
} else {
throw new IOException("did expect boundary with segmentRegistration "
+ segmentRegistration);
}
} else if (boundary != null) {
throw new IOException("didn't expect boundary with segmentRegistration "
+ segmentRegistration);
}
}
// initialize administration
keyList = new String[0];
sourceNumberList = new int[0];
errorNumber = new int[0];
errorList = (HashMap<String, Integer>[]) new HashMap<?, ?>[0];
size = 0;
position = 0;
// subCollectors properties
hasSub = false;
subCollectorTypes = null;
subDataTypes = null;
subStatsTypes = null;
subStatsItems = null;
subSortTypes = null;
subSortDirections = null;
subStart = null;
subNumber = null;
subCollectorListNextLevel = null;
subCollectorNextLevel = null;
}
/**
* Instantiates a new mtas data collector.
*
* @param collectorType the collector type
* @param dataType the data type
* @param statsType the stats type
* @param statsItems the stats items
* @param sortType the sort type
* @param sortDirection the sort direction
* @param start the start
* @param number the number
* @param subCollectorTypes the sub collector types
* @param subDataTypes the sub data types
* @param subStatsTypes the sub stats types
* @param subStatsItems the sub stats items
* @param subSortTypes the sub sort types
* @param subSortDirections the sub sort directions
* @param subStart the sub start
* @param subNumber the sub number
* @param segmentRegistration the segment registration
* @param boundary the boundary
* @throws IOException Signals that an I/O exception has occurred.
*/
protected MtasDataCollector(String collectorType, String dataType,
String statsType, SortedSet<String> statsItems, String sortType,
String sortDirection, Integer start, Integer number,
String[] subCollectorTypes, String[] subDataTypes, String[] subStatsTypes,
SortedSet<String>[] subStatsItems, String[] subSortTypes,
String[] subSortDirections, Integer[] subStart, Integer[] subNumber,
String segmentRegistration, String boundary) throws IOException {
// initialize
this(collectorType, dataType, statsType, statsItems, sortType,
sortDirection, start, number, segmentRegistration, boundary);
// initialize subCollectors
if (subCollectorTypes != null) {
hasSub = true;
this.subCollectorTypes = subCollectorTypes;
this.subDataTypes = subDataTypes;
this.subStatsTypes = subStatsTypes;
this.subStatsItems = subStatsItems;
this.subSortTypes = subSortTypes;
this.subSortDirections = subSortDirections;
this.subStart = subStart;
this.subNumber = subNumber;
if (subCollectorTypes.length > 1) {
newSubCollectorTypes = Arrays.copyOfRange(subCollectorTypes, 1,
subCollectorTypes.length);
newSubDataTypes = Arrays.copyOfRange(subDataTypes, 1,
subStatsTypes.length);
newSubStatsTypes = Arrays.copyOfRange(subStatsTypes, 1,
subStatsTypes.length);
newSubStatsItems = Arrays.copyOfRange(subStatsItems, 1,
subStatsItems.length);
newSubSortTypes = Arrays.copyOfRange(subSortTypes, 1,
subSortTypes.length);
newSubSortDirections = Arrays.copyOfRange(subSortDirections, 1,
subSortDirections.length);
newSubStart = Arrays.copyOfRange(subStart, 1, subStart.length);
newSubNumber = Arrays.copyOfRange(subNumber, 1, subNumber.length);
}
newSubCollectorListNextLevel = new MtasDataCollector[0];
}
}
/**
* Merge.
*
* @param newDataCollector the new data collector
* @param map the map
* @param increaseSourceNumber the increase source number
* @throws IOException Signals that an I/O exception has occurred.
*/
abstract public void merge(MtasDataCollector<?, ?> newDataCollector,
Map<MtasDataCollector<?, ?>, MtasDataCollector<?, ?>> map,
boolean increaseSourceNumber) throws IOException;
/**
* Inits the new list.
*
* @param maxNumberOfTerms the max number of terms
* @param segmentName the segment name
* @param segmentNumber the segment number
* @param boundary the boundary
* @throws IOException Signals that an I/O exception has occurred.
*/
public void initNewList(int maxNumberOfTerms, String segmentName,
int segmentNumber, String boundary) throws IOException {
if (closed) {
result = null;
closed = false;
}
initNewListBasic(maxNumberOfTerms);
if (segmentRegistration != null) {
this.segmentName = segmentName;
this.segmentNumber = segmentNumber;
if (!segmentKeyValueList.containsKey(segmentName)) {
segmentKeyValueList.put(segmentName, new HashMap<String, T1>());
if (segmentRegistration.equals(SEGMENT_BOUNDARY_ASC)
|| segmentRegistration.equals(SEGMENT_BOUNDARY_DESC)) {
if (boundary != null) {
segmentValuesBoundary.put(segmentName,
stringToBoundary(boundary, segmentNumber));
} else {
throw new IOException("expected boundary");
}
} else {
segmentValuesBoundary.put(segmentName, null);
}
segmentValueTopListLast.put(segmentName, null);
}
this.segmentValueTopList = new ArrayList<>();
}
}
/**
* Inits the new list.
*
* @param maxNumberOfTerms the max number of terms
* @throws IOException Signals that an I/O exception has occurred.
*/
public void initNewList(int maxNumberOfTerms) throws IOException {
if (closed) {
result = null;
closed = false;
}
if (segmentRegistration != null) {
throw new IOException("missing segment name");
} else {
initNewListBasic(maxNumberOfTerms);
}
}
/**
* Inits the new list basic.
*
* @param maxNumberOfTerms the max number of terms
* @throws IOException Signals that an I/O exception has occurred.
*/
@SuppressWarnings("unchecked")
private void initNewListBasic(int maxNumberOfTerms) throws IOException {
if (!closed) {
position = 0;
newPosition = 0;
newCurrentPosition = 0;
newSize = maxNumberOfTerms + size;
newKeyList = new String[newSize];
newSourceNumberList = new int[newSize];
newErrorNumber = new int[newSize];
newErrorList = (HashMap<String, Integer>[]) new HashMap<?, ?>[newSize];
newKnownKeyFoundInSegment = new HashSet<>();
if (hasSub) {
newSubCollectorListNextLevel = new MtasDataCollector[newSize];
}
} else {
throw new IOException("already closed");
}
}
/**
* Increase new list size.
*
* @throws IOException Signals that an I/O exception has occurred.
*/
@SuppressWarnings("unchecked")
protected void increaseNewListSize() throws IOException {
if (!closed) {
String[] tmpNewKeyList = newKeyList;
int[] tmpNewSourceNumberList = newSourceNumberList;
int[] tmpNewErrorNumber = newErrorNumber;
HashMap<String, Integer>[] tmpNewErrorList = newErrorList;
int tmpNewSize = newSize;
newSize = 2 * newSize;
newKeyList = new String[newSize];
newSourceNumberList = new int[newSize];
newErrorNumber = new int[newSize];
newErrorList = (HashMap<String, Integer>[]) new HashMap<?, ?>[newSize];
System.arraycopy(tmpNewKeyList, 0, newKeyList, 0, tmpNewSize);
System.arraycopy(tmpNewSourceNumberList, 0, newSourceNumberList, 0,
tmpNewSize);
System.arraycopy(tmpNewErrorNumber, 0, newErrorNumber, 0, tmpNewSize);
System.arraycopy(tmpNewErrorList, 0, newErrorList, 0, tmpNewSize);
if (hasSub) {
MtasDataCollector<?, ?>[] tmpNewSubCollectorListNextLevel = newSubCollectorListNextLevel;
newSubCollectorListNextLevel = new MtasDataCollector[newSize];
System.arraycopy(tmpNewSubCollectorListNextLevel, 0,
newSubCollectorListNextLevel, 0, tmpNewSize);
}
} else {
throw new IOException("already closed");
}
}
/**
* Adds the.
*
* @param increaseSourceNumber the increase source number
* @return the mtas data collector
* @throws IOException Signals that an I/O exception has occurred.
*/
protected final MtasDataCollector add(boolean increaseSourceNumber)
throws IOException {
if (!closed) {
if (!collectorType.equals(DataCollector.COLLECTOR_TYPE_DATA)) {
throw new IOException(
"collector should be " + DataCollector.COLLECTOR_TYPE_DATA);
} else {
if (newPosition > 0) {
newCurrentExisting = true;
} else if (position < getSize()) {
// copy
newKeyList[0] = keyList[0];
newSourceNumberList[0] = sourceNumberList[0];
if (increaseSourceNumber) {
newSourceNumberList[0]++;
}
newErrorNumber[0] = errorNumber[0];
newErrorList[0] = errorList[0];
if (hasSub) {
newSubCollectorNextLevel = subCollectorNextLevel;
}
copyToNew(0, 0);
newPosition = 1;
position = 1;
newCurrentExisting = true;
} else {
// add key
newKeyList[0] = DataCollector.COLLECTOR_TYPE_DATA;
newSourceNumberList[0] = 1;
newErrorNumber[0] = 0;
newErrorList[0] = new HashMap<>();
newPosition = 1;
newCurrentPosition = newPosition - 1;
newCurrentExisting = false;
// ready, only handle sub
if (hasSub) {
newSubCollectorNextLevel = DataCollector.getCollector(
subCollectorTypes[0], subDataTypes[0], subStatsTypes[0],
subStatsItems[0], subSortTypes[0], subSortDirections[0],
subStart[0], subNumber[0], newSubCollectorTypes,
newSubDataTypes, newSubStatsTypes, newSubStatsItems,
newSubSortTypes, newSubSortDirections, newSubStart,
newSubNumber, segmentRegistration, null);
} else {
newSubCollectorNextLevel = null;
}
}
return newSubCollectorNextLevel;
}
} else {
throw new IOException("already closed");
}
}
/**
* Adds the.
*
* @param key the key
* @param increaseSourceNumber the increase source number
* @return the mtas data collector
* @throws IOException Signals that an I/O exception has occurred.
*/
protected final MtasDataCollector add(String key,
boolean increaseSourceNumber) throws IOException {
if (!closed) {
if (collectorType.equals(DataCollector.COLLECTOR_TYPE_DATA)) {
throw new IOException(
"collector should be " + DataCollector.COLLECTOR_TYPE_LIST);
} else if (key == null) {
throw new IOException("key shouldn't be null");
} else {
// check previous added
if ((newPosition > 0)
&& newKeyList[(newPosition - 1)].compareTo(key) >= 0) {
int i = newPosition;
do {
i--;
if (newKeyList[i].equals(key)) {
newCurrentPosition = i;
newCurrentExisting = true;
if (subDataTypes != null) {
return newSubCollectorListNextLevel[newCurrentPosition];
} else {
return null;
}
}
} while ((i > 0) && (newKeyList[i].compareTo(key) > 0));
}
// move position in old list
if (position < getSize()) {
// just add smaller or equal items
while (keyList[position].compareTo(key) <= 0) {
if (newPosition == newSize) {
increaseNewListSize();
}
// copy
newKeyList[newPosition] = keyList[position];
newSourceNumberList[newPosition] = sourceNumberList[position];
newErrorNumber[newPosition] = errorNumber[position];
newErrorList[newPosition] = errorList[position];
if (hasSub) {
newSubCollectorListNextLevel[newPosition] = subCollectorListNextLevel[position];
}
copyToNew(position, newPosition);
newPosition++;
position++;
// check if added key from list is right key
if (newKeyList[(newPosition - 1)].equals(key)) {
if (increaseSourceNumber) {
newSourceNumberList[(newPosition - 1)]++;
}
newCurrentPosition = newPosition - 1;
newCurrentExisting = true;
// register known key found again in segment
newKnownKeyFoundInSegment.add(key);
// ready
if (hasSub) {
return newSubCollectorListNextLevel[newCurrentPosition];
} else {
return null;
}
// stop if position exceeds size
} else if (position == getSize()) {
break;
}
}
}
// check size
if (newPosition == newSize) {
increaseNewListSize();
}
// add key
newKeyList[newPosition] = key;
newSourceNumberList[newPosition] = 1;
newErrorNumber[newPosition] = 0;
newErrorList[newPosition] = new HashMap<>();
newPosition++;
newCurrentPosition = newPosition - 1;
newCurrentExisting = false;
// ready, only handle sub
if (hasSub) {
newSubCollectorListNextLevel[newCurrentPosition] = DataCollector
.getCollector(subCollectorTypes[0], subDataTypes[0],
subStatsTypes[0], subStatsItems[0], subSortTypes[0],
subSortDirections[0], subStart[0], subNumber[0],
newSubCollectorTypes, newSubDataTypes, newSubStatsTypes,
newSubStatsItems, newSubSortTypes, newSubSortDirections,
newSubStart, newSubNumber, segmentRegistration, null);
return newSubCollectorListNextLevel[newCurrentPosition];
} else {
return null;
}
}
} else {
throw new IOException("already closed");
}
}
/**
* Copy to new.
*
* @param position the position
* @param newPosition the new position
*/
protected abstract void copyToNew(int position, int newPosition);
/**
* Copy from new.
*/
protected abstract void copyFromNew();
/**
* Compare with boundary.
*
* @param value the value
* @param boundary the boundary
* @return true, if successful
* @throws IOException Signals that an I/O exception has occurred.
*/
protected abstract boolean compareWithBoundary(T1 value, T1 boundary)
throws IOException;
/**
* Last for computing segment.
*
* @param value the value
* @param boundary the boundary
* @return the t1
* @throws IOException Signals that an I/O exception has occurred.
*/
protected abstract T1 lastForComputingSegment(T1 value, T1 boundary)
throws IOException;
/**
* Last for computing segment.
*
* @return the t1
* @throws IOException Signals that an I/O exception has occurred.
*/
protected abstract T1 lastForComputingSegment() throws IOException;
/**
* Boundary for segment.
*
* @param segmentName the segment name
* @return the t1
* @throws IOException Signals that an I/O exception has occurred.
*/
protected abstract T1 boundaryForSegment(String segmentName)
throws IOException;
/**
* Boundary for segment computing.
*
* @param segmentName the segment name
* @return the t1
* @throws IOException Signals that an I/O exception has occurred.
*/
protected abstract T1 boundaryForSegmentComputing(String segmentName)
throws IOException;
/**
* String to boundary.
*
* @param boundary the boundary
* @param segmentNumber the segment number
* @return the t1
* @throws IOException Signals that an I/O exception has occurred.
*/
protected abstract T1 stringToBoundary(String boundary, Integer segmentNumber)
throws IOException;
/**
* String to boundary.
*
* @param boundary the boundary
* @return the t1
* @throws IOException Signals that an I/O exception has occurred.
*/
protected T1 stringToBoundary(String boundary) throws IOException {
return stringToBoundary(boundary, null);
}
/**
* Close segment key value registration.
*
* @throws IOException Signals that an I/O exception has occurred.
*/
public void closeSegmentKeyValueRegistration() throws IOException {
if (!closed) {
if (segmentRegistration != null) {
Map<String, T1> keyValueList = segmentKeyValueList.get(segmentName);
T1 tmpSegmentValueBoundary = segmentValuesBoundary.get(segmentName);
for (Entry<String, T1> entry : keyValueList.entrySet()) {
if (tmpSegmentValueBoundary == null || compareWithBoundary(
entry.getValue(), tmpSegmentValueBoundary)) {
segmentKeys.add(entry.getKey());
}
}
}
} else {
throw new IOException("already closed");
}
}
/**
* Recompute segment keys.
*
* @throws IOException Signals that an I/O exception has occurred.
*/
public void recomputeSegmentKeys() throws IOException {
if (!closed && segmentRegistration != null) {
if (segmentRegistration.equals(SEGMENT_SORT_ASC)
|| segmentRegistration.equals(SEGMENT_SORT_DESC)
|| segmentRegistration.equals(SEGMENT_BOUNDARY_ASC)
|| segmentRegistration.equals(SEGMENT_BOUNDARY_DESC)) {
if (segmentRegistration.equals(SEGMENT_SORT_ASC)
|| segmentRegistration.equals(SEGMENT_SORT_DESC)) {
segmentKeys.clear();
// recompute boundaries
for (Entry<String, Map<String, T1>> entry : segmentKeyValueList
.entrySet()) {
T1 tmpSegmentValueBoundary = boundaryForSegment(entry.getKey());
segmentValuesBoundary.put(entry.getKey(), tmpSegmentValueBoundary);
}
// compute adjusted boundaries and compute keys
for (Entry<String, Map<String, T1>> entry : segmentKeyValueList
.entrySet()) {
this.segmentName = entry.getKey();
Map<String, T1> keyValueList = entry.getValue();
T1 tmpSegmentValueBoundaryForComputing = boundaryForSegmentComputing(
entry.getKey());
for (Entry<String, T1> subEntry : keyValueList.entrySet()) {
if (tmpSegmentValueBoundaryForComputing == null
|| compareWithBoundary(subEntry.getValue(),
tmpSegmentValueBoundaryForComputing)) {
if (!segmentKeys.contains(subEntry.getKey())) {
segmentKeys.add(subEntry.getKey());
}
}
}
}
}
Map<String, T1> keyValueList;
Set<String> recomputeKeyList;
segmentRecomputeKeyList = new LinkedHashMap<>();
for (String key : segmentKeys) {
for (Entry<String, Map<String, T1>> entry : segmentKeyValueList
.entrySet()) {
keyValueList = entry.getValue();
if (!keyValueList.containsKey(key)) {
if (!segmentRecomputeKeyList.containsKey(entry.getKey())) {
recomputeKeyList = new HashSet<>();
segmentRecomputeKeyList.put(entry.getKey(), recomputeKeyList);
} else {
recomputeKeyList = segmentRecomputeKeyList.get(entry.getKey());
}
recomputeKeyList.add(key);
}
}
}
this.segmentName = null;
} else {
throw new IOException(
"not for segmentRegistration " + segmentRegistration);
}
} else {
throw new IOException("already closed or no segmentRegistration ("
+ segmentRegistration + ")");
}
}
/**
* Reduce to keys.
*
* @param keys the keys
*/
public abstract void reduceToKeys(Set<String> keys);
/**
* Reduce to segment keys.
*/
public void reduceToSegmentKeys() {
if (segmentRegistration != null) {
reduceToKeys(segmentKeys);
}
}
/**
* Check existence necessary keys.
*
* @return true, if successful
* @throws IOException Signals that an I/O exception has occurred.
*/
public boolean checkExistenceNecessaryKeys() throws IOException {
if (!closed) {
if (segmentRegistration != null) {
return segmentRecomputeKeyList.size() == 0;
} else {
return true;
}
} else {
throw new IOException("already closed");
}
}
/**
* Validate segment boundary.
*
* @param o the o
* @return true, if successful
* @throws IOException Signals that an I/O exception has occurred.
*/
abstract public boolean validateSegmentBoundary(Object o) throws IOException;
/**
* Validate with segment boundary.
*
* @param value the value
* @return true, if successful
* @throws IOException Signals that an I/O exception has occurred.
*/
protected boolean validateWithSegmentBoundary(T1 value) throws IOException {
if (!closed && segmentRegistration != null) {
T1 tmpSegmentValueBoundary = segmentValuesBoundary.get(segmentName);
if (tmpSegmentValueBoundary == null
|| compareWithBoundary(value, tmpSegmentValueBoundary)) {
return true;
}
}
return false;
}
/**
* Validate segment value.
*
* @param value the value
* @param maximumNumber the maximum number
* @param segmentNumber the segment number
* @return the string
* @throws IOException Signals that an I/O exception has occurred.
*/
public String validateSegmentValue(T1 value, int maximumNumber,
int segmentNumber) throws IOException {
if (!closed) {
if (segmentRegistration != null) {
if (maximumNumber > 0) {
T1 tmpSegmentValueBoundary = segmentValuesBoundary.get(segmentName);
if (segmentValueTopList.size() < maximumNumber
|| compareWithBoundary(value, tmpSegmentValueBoundary)) {
return SEGMENT_KEY_OR_NEW;
} else if (segmentKeys.size() > newKnownKeyFoundInSegment.size()) {
return SEGMENT_POSSIBLE_KEY;
} else {
return null;
}
} else {
return null;
}
} else {
return null;
}
} else {
throw new IOException("already closed");
}
}
/**
* Validate segment value.
*
* @param key the key
* @param value the value
* @param maximumNumber the maximum number
* @param segmentNumber the segment number
* @param test the test
* @return the string
* @throws IOException Signals that an I/O exception has occurred.
*/
public String validateSegmentValue(String key, T1 value, int maximumNumber,
int segmentNumber, boolean test) throws IOException {
if (!closed) {
if (segmentRegistration != null) {
if (maximumNumber > 0) {
T1 tmpSegmentValueMaxListMin = segmentValueTopListLast
.get(segmentName);
T1 tmpSegmentValueBoundary = segmentValuesBoundary.get(segmentName);
if (segmentValueTopList.size() < maximumNumber) {
if (!test) {
segmentKeyValueList.get(segmentName).put(key, value);
segmentValueTopList.add(value);
segmentValueTopListLast.put(segmentName,
(tmpSegmentValueMaxListMin == null) ? value
: lastForComputingSegment(tmpSegmentValueMaxListMin,
value));
if (segmentValueTopList.size() == maximumNumber) {
tmpSegmentValueMaxListMin = segmentValueTopListLast
.get(segmentName);
segmentValueTopListLast.put(segmentName,
tmpSegmentValueMaxListMin);
segmentValuesBoundary.put(segmentName,
boundaryForSegmentComputing(segmentName));
}
}
return segmentKeys.contains(key) ? SEGMENT_KEY : SEGMENT_NEW;
} else if (compareWithBoundary(value, tmpSegmentValueBoundary)) {
// System.out.println(key+" "+value+" "+tmpSegmentValueBoundary);
if (!test) {
segmentKeyValueList.get(segmentName).put(key, value);
if (compareWithBoundary(value, tmpSegmentValueMaxListMin)) {
segmentValueTopList.add(value);
segmentValueTopList.remove(tmpSegmentValueMaxListMin);
tmpSegmentValueMaxListMin = lastForComputingSegment();
segmentValueTopListLast.put(segmentName,
tmpSegmentValueMaxListMin);
segmentValuesBoundary.put(segmentName,
boundaryForSegmentComputing(segmentName));
}
}
return segmentKeys.contains(key) ? SEGMENT_KEY : SEGMENT_NEW;
} else if (segmentKeys.contains(key)) {
if (!test) {
segmentKeyValueList.get(segmentName).put(key, value);
}
return SEGMENT_KEY;
} else {
return null;
}
} else {
return null;
}
} else {
return null;
}
} else {
throw new IOException("already closed");
}
}
/**
* Sets the error.
*
* @param newPosition the new position
* @param errorNumberItem the error number item
* @param errorListItem the error list item
* @param currentExisting the current existing
* @throws IOException Signals that an I/O exception has occurred.
*/
protected final void setError(int newPosition, int errorNumberItem,
HashMap<String, Integer> errorListItem, boolean currentExisting)
throws IOException {
if (!closed) {
if (currentExisting) {
newErrorNumber[newPosition] += errorNumberItem;
HashMap<String, Integer> item = newErrorList[newPosition];
for (Entry<String, Integer> entry : errorListItem.entrySet()) {
if (item.containsKey(entry.getKey())) {
item.put(entry.getKey(),
item.get(entry.getKey()) + entry.getValue());
} else {
item.put(entry.getKey(), entry.getValue());
}
}
} else {
newErrorNumber[newPosition] = errorNumberItem;
newErrorList[newPosition] = errorListItem;
}
} else {
throw new IOException("already closed");
}
}
/**
* Sorted and unique.
*
* @param keyList the key list
* @param size the size
* @return true, if successful
* @throws IOException Signals that an I/O exception has occurred.
*/
private boolean sortedAndUnique(String[] keyList, int size)
throws IOException {
if (!closed) {
for (int i = 1; i < size; i++) {
if (keyList[(i - 1)].compareTo(keyList[i]) >= 0) {
return false;
}
}
return true;
} else {
throw new IOException("already closed");
}
}
/**
* Compute sort and unique mapping.
*
* @param keyList the key list
* @param size the size
* @return the int[][]
* @throws IOException Signals that an I/O exception has occurred.
*/
private int[][] computeSortAndUniqueMapping(String[] keyList, int size)
throws IOException {
if (!closed) {
if (size > 0) {
SortedMap<String, int[]> sortedMap = new TreeMap<>();
for (int i = 0; i < size; i++) {
if (sortedMap.containsKey(keyList[i])) {
int[] previousList = sortedMap.get(keyList[i]);
int[] newList = new int[previousList.length + 1];
System.arraycopy(previousList, 0, newList, 0, previousList.length);
newList[previousList.length] = i;
sortedMap.put(keyList[i], newList);
} else {
sortedMap.put(keyList[i], new int[] { i });
}
}
Collection<int[]> values = sortedMap.values();
int[][] result = new int[sortedMap.size()][];
return values.toArray(result);
} else {
return null;
}
} else {
throw new IOException("already closed");
}
}
/**
* Remap data.
*
* @param mapping the mapping
* @throws IOException Signals that an I/O exception has occurred.
*/
protected void remapData(int[][] mapping) throws IOException {
if (!closed) {
// remap and merge keys
String[] newKeyList = new String[mapping.length];
// process mapping for functions?
HashMap<MtasDataCollector<?, ?>, MtasDataCollector<?, ?>> map = new HashMap<>();
int[] newSourceNumberList = new int[mapping.length];
int[] newErrorNumber = new int[mapping.length];
@SuppressWarnings("unchecked")
HashMap<String, Integer>[] newErrorList = (HashMap<String, Integer>[]) new HashMap<?, ?>[mapping.length];
for (int i = 0; i < mapping.length; i++) {
newKeyList[i] = keyList[mapping[i][0]];
newSourceNumberList[i] = sourceNumberList[mapping[i][0]];
for (int j = 0; j < mapping[i].length; j++) {
if (j == 0) {
newErrorNumber[i] = errorNumber[mapping[i][j]];
newErrorList[i] = errorList[mapping[i][j]];
} else {
newErrorNumber[i] += errorNumber[mapping[i][j]];
for (Entry<String, Integer> entry : errorList[mapping[i][j]]
.entrySet()) {
if (newErrorList[i].containsKey(entry.getKey())) {
newErrorList[i].put(entry.getKey(),
newErrorList[i].get(entry.getKey()) + entry.getValue());
} else {
newErrorList[i].put(entry.getKey(), entry.getValue());
}
}
}
}
}
if (hasSub) {
newSubCollectorListNextLevel = new MtasDataCollector<?, ?>[mapping.length];
for (int i = 0; i < mapping.length; i++) {
for (int j = 0; j < mapping[i].length; j++) {
if (j == 0 || newSubCollectorListNextLevel[i] == null) {
newSubCollectorListNextLevel[i] = subCollectorListNextLevel[mapping[i][j]];
} else {
newSubCollectorListNextLevel[i]
.merge(subCollectorListNextLevel[mapping[i][j]], map, false);
}
}
}
subCollectorListNextLevel = newSubCollectorListNextLevel;
}
keyList = newKeyList;
sourceNumberList = newSourceNumberList;
errorNumber = newErrorNumber;
errorList = newErrorList;
size = keyList.length;
position = 0;
} else {
throw new IOException("already closed");
}
}
/**
* Close new list.
*
* @throws IOException Signals that an I/O exception has occurred.
*/
public void closeNewList() throws IOException {
if (!closed) {
if (segmentRegistration != null) {
this.segmentName = null;
}
if (newSize > 0) {
// add remaining old
while (position < getSize()) {
if (newPosition == newSize) {
increaseNewListSize();
}
newKeyList[newPosition] = keyList[position];
newSourceNumberList[newPosition] = sourceNumberList[position];
newErrorNumber[newPosition] = errorNumber[position];
newErrorList[newPosition] = errorList[position];
if (hasSub) {
newSubCollectorListNextLevel[newPosition] = subCollectorListNextLevel[position];
}
copyToNew(position, newPosition);
position++;
newPosition++;
}
// copy
keyList = newKeyList;
sourceNumberList = newSourceNumberList;
errorNumber = newErrorNumber;
errorList = newErrorList;
subCollectorListNextLevel = newSubCollectorListNextLevel;
copyFromNew();
size = newPosition;
// sort and merge
if (!sortedAndUnique(keyList, getSize())) {
remapData(computeSortAndUniqueMapping(keyList, getSize()));
}
}
position = 0;
newSize = 0;
newPosition = 0;
newCurrentPosition = 0;
}
}
/**
* Gets the item.
*
* @param i the i
* @return the item
*/
abstract protected MtasDataItem<T1, T2> getItem(int i);
/**
* Checks for sub.
*
* @return true, if successful
*/
protected boolean hasSub() {
return hasSub;
}
/**
* Error.
*
* @param error the error
* @throws IOException Signals that an I/O exception has occurred.
*/
public abstract void error(String error) throws IOException;
/**
* Error.
*
* @param key the key
* @param error the error
* @throws IOException Signals that an I/O exception has occurred.
*/
public abstract void error(String key, String error) throws IOException;
/**
* Adds the.
*
* @param valueSum the value sum
* @param valueN the value N
* @return the mtas data collector
* @throws IOException Signals that an I/O exception has occurred.
*/
public abstract MtasDataCollector add(long valueSum, long valueN)
throws IOException;
/**
* Adds the.
*
* @param values the values
* @param number the number
* @return the mtas data collector
* @throws IOException Signals that an I/O exception has occurred.
*/
public abstract MtasDataCollector add(long[] values, int number)
throws IOException;
/**
* Adds the.
*
* @param valueSum the value sum
* @param valueN the value N
* @return the mtas data collector
* @throws IOException Signals that an I/O exception has occurred.
*/
public abstract MtasDataCollector add(double valueSum, long valueN)
throws IOException;
/**
* Adds the.
*
* @param values the values
* @param number the number
* @return the mtas data collector
* @throws IOException Signals that an I/O exception has occurred.
*/
public abstract MtasDataCollector add(double[] values, int number)
throws IOException;
/**
* Adds the.
*
* @param key the key
* @param valueSum the value sum
* @param valueN the value N
* @return the mtas data collector
* @throws IOException Signals that an I/O exception has occurred.
*/
public abstract MtasDataCollector add(String key, long valueSum, long valueN)
throws IOException;
/**
* Adds the.
*
* @param key the key
* @param values the values
* @param number the number
* @return the mtas data collector
* @throws IOException Signals that an I/O exception has occurred.
*/
public abstract MtasDataCollector add(String key, long[] values, int number)
throws IOException;
/**
* Adds the.
*
* @param key the key
* @param valueSum the value sum
* @param valueN the value N
* @return the mtas data collector
* @throws IOException Signals that an I/O exception has occurred.
*/
public abstract MtasDataCollector add(String key, double valueSum,
long valueN) throws IOException;
/**
* Adds the.
*
* @param key the key
* @param values the values
* @param number the number
* @return the mtas data collector
* @throws IOException Signals that an I/O exception has occurred.
*/
public abstract MtasDataCollector add(String key, double[] values, int number)
throws IOException;
/*
* (non-Javadoc)
*
* @see java.lang.Object#toString()
*/
@Override
public String toString() {
StringBuilder text = new StringBuilder();
text.append(this.getClass().getSimpleName() + "-" + this.hashCode() + "\n");
text.append("\t=== " + collectorType + " - " + statsType + " " + statsItems
+ " " + hasSub + " ===\n");
text.append("\tclosed: " + closed + "\n");
text.append("\tkeylist: " + Arrays.asList(keyList) + "\n");
text.append("\tsegmentKeys: "
+ (segmentKeys != null ? segmentKeys.contains("1") : "null") + "\n");
return text.toString().trim();
}
/**
* Gets the result.
*
* @return the result
* @throws IOException Signals that an I/O exception has occurred.
*/
public MtasDataCollectorResult<T1, T2> getResult() throws IOException {
if (!closed) {
close();
}
return result;
}
/**
* Gets the key list.
*
* @return the key list
* @throws IOException Signals that an I/O exception has occurred.
*/
public Set<String> getKeyList() throws IOException {
if (!closed) {
close();
}
return new HashSet<>(Arrays.asList(keyList));
}
/**
* Gets the stats items.
*
* @return the stats items
*/
public SortedSet<String> getStatsItems() {
return statsItems;
}
/**
* Close.
*
* @throws IOException Signals that an I/O exception has occurred.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public void close() throws IOException {
if (!closed) {
closeNewList();
if (collectorType.equals(DataCollector.COLLECTOR_TYPE_LIST)) {
// compute initial basic list
TreeMap<String, MtasDataItem<T1, T2>> basicList = new TreeMap<>();
for (int i = 0; i < getSize(); i++) {
MtasDataItem<T1, T2> newItem = getItem(i);
if (basicList.containsKey(keyList[i])) {
newItem.add(basicList.get(keyList[i]));
}
basicList.put(keyList[i], newItem);
}
// create result based on basic list
result = new MtasDataCollectorResult<>(collectorType, sortType,
sortDirection, basicList, start, number);
// reduce
if (segmentRegistration != null) {
if (segmentRegistration.equals(SEGMENT_SORT_ASC)
|| segmentRegistration.equals(SEGMENT_SORT_DESC)) {
reduceToKeys(result.getComparatorList().keySet());
} else if (segmentRegistration.equals(SEGMENT_BOUNDARY_ASC)
|| segmentRegistration.equals(SEGMENT_BOUNDARY_DESC)) {
Map<String, MtasDataItemNumberComparator> comparatorList = result
.getComparatorList();
HashSet<String> filteredKeySet = new HashSet<>();
if (segmentRegistration.equals(SEGMENT_BOUNDARY_ASC)) {
for (Entry<String, MtasDataItemNumberComparator> entry : comparatorList
.entrySet()) {
if (entry.getValue().compareTo(segmentValueBoundary) < 0) {
filteredKeySet.add(entry.getKey());
}
}
} else {
for (Entry<String, MtasDataItemNumberComparator> entry : comparatorList
.entrySet()) {
if (entry.getValue().compareTo(segmentValueBoundary) > 0) {
filteredKeySet.add(entry.getKey());
}
}
}
reduceToKeys(filteredKeySet);
basicList.keySet().retainAll(filteredKeySet);
result = new MtasDataCollectorResult<>(collectorType, sortType,
sortDirection, basicList, start, number);
}
}
} else if (collectorType.equals(DataCollector.COLLECTOR_TYPE_DATA)) {
if (getSize() > 0) {
result = new MtasDataCollectorResult<>(collectorType, getItem(0));
} else {
result = new MtasDataCollectorResult<>(collectorType, sortType,
sortDirection);
}
} else {
throw new IOException("type " + collectorType + " not supported");
}
closed = true;
}
}
/**
* Gets the collector type.
*
* @return the collector type
*/
public String getCollectorType() {
return collectorType;
}
/**
* Gets the stats type.
*
* @return the stats type
*/
public String getStatsType() {
return statsType;
}
/**
* Gets the data type.
*
* @return the data type
*/
public String getDataType() {
return dataType;
}
/**
* Gets the size.
*
* @return the size
*/
public int getSize() {
return size;
}
/**
* With total.
*
* @return true, if successful
*/
public boolean withTotal() {
return withTotal;
}
/**
* Sets the with total.
*
* @throws IOException Signals that an I/O exception has occurred.
*/
public void setWithTotal() throws IOException {
if (collectorType.equals(DataCollector.COLLECTOR_TYPE_LIST)) {
if (segmentName != null) {
throw new IOException("can't get total with segmentRegistration");
} else {
withTotal = true;
}
} else {
throw new IOException(
"can't get total for dataCollector of type " + collectorType);
}
}
}