MtasDataCollector.java

package mtas.codec.util.collector;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.Map.Entry;

import mtas.codec.util.DataCollector;

/**
 * The Class MtasDataCollector.
 *
 * @param <T1> the generic type
 * @param <T2> the generic type
 */
public abstract class MtasDataCollector<T1 extends Number & Comparable<T1>, T2 extends Number & Comparable<T2>>
    implements Serializable {

  /** The Constant serialVersionUID. */
  private static final long serialVersionUID = 1L;

  /** The Constant SEGMENT_SORT_ASC. */
  public static final String SEGMENT_SORT_ASC = "segment_asc";

  /** The Constant SEGMENT_SORT_DESC. */
  public static final String SEGMENT_SORT_DESC = "segment_desc";

  /** The Constant SEGMENT_BOUNDARY_ASC. */
  public static final String SEGMENT_BOUNDARY_ASC = "segment_boundary_asc";

  /** The Constant SEGMENT_BOUNDARY_DESC. */
  public static final String SEGMENT_BOUNDARY_DESC = "segment_boundary_desc";

  /** The Constant SEGMENT_KEY. */
  public static final String SEGMENT_KEY = "key";

  /** The Constant SEGMENT_NEW. */
  public static final String SEGMENT_NEW = "new";

  /** The Constant SEGMENT_KEY_OR_NEW. */
  public static final String SEGMENT_KEY_OR_NEW = "key_or_new";

  /** The Constant SEGMENT_POSSIBLE_KEY. */
  public static final String SEGMENT_POSSIBLE_KEY = "possible_key";

  /** The size. */
  protected int size;

  /** The position. */
  protected int position;

  /** The collector type. */
  // properties collector
  protected String collectorType;

  /** The stats type. */
  protected String statsType;

  /** The data type. */
  protected String dataType;

  /** The stats items. */
  private SortedSet<String> statsItems;

  /** The sort type. */
  protected String sortType;

  /** The sort direction. */
  protected String sortDirection;

  /** The start. */
  protected Integer start;

  /** The number. */
  protected Integer number;

  /** The error number. */
  // error
  protected int[] errorNumber;

  /** The error list. */
  protected HashMap<String, Integer>[] errorList;

  /** The key list. */
  protected String[] keyList;

  /** The source number list. */
  protected int[] sourceNumberList;

  /** The with total. */
  private boolean withTotal;

  /** The segment registration. */
  public transient String segmentRegistration;

  /** The segment key value list. */
  protected transient LinkedHashMap<String, Map<String, T1>> segmentKeyValueList;

  /** The segment recompute key list. */
  public transient Map<String, Set<String>> segmentRecomputeKeyList;

  /** The segment keys. */
  public transient Set<String> segmentKeys;

  /** The segment values boundary. */
  protected transient Map<String, T1> segmentValuesBoundary;

  /** The segment value boundary. */
  protected transient T1 segmentValueBoundary;

  /** The segment value top list last. */
  protected transient Map<String, T1> segmentValueTopListLast;

  /** The segment value top list. */
  protected transient ArrayList<T1> segmentValueTopList;

  /** The segment name. */
  protected transient String segmentName;

  /** The segment number. */
  protected transient int segmentNumber;

  /** The has sub. */
  private boolean hasSub;

  /** The sub collector types. */
  private String[] subCollectorTypes;

  /** The sub data types. */
  private String[] subDataTypes;

  /** The sub stats types. */
  private String[] subStatsTypes;

  /** The sub stats items. */
  private SortedSet<String>[] subStatsItems;

  /** The sub sort types. */
  private String[] subSortTypes;

  /** The sub sort directions. */
  private String[] subSortDirections;

  /** The sub start. */
  private Integer[] subStart;

  /** The sub number. */
  private Integer[] subNumber;

  /** The sub collector list next level. */
  protected MtasDataCollector<?, ?>[] subCollectorListNextLevel = null;

  /** The sub collector next level. */
  protected MtasDataCollector<?, ?> subCollectorNextLevel = null;

  /** The new size. */
  protected transient int newSize;

  /** The new position. */
  protected transient int newPosition;

  /** The new current position. */
  protected transient int newCurrentPosition;

  /** The new current existing. */
  protected transient boolean newCurrentExisting;

  /** The new key list. */
  protected transient String[] newKeyList = null;

  /** The new source number list. */
  protected transient int[] newSourceNumberList = null;

  /** The new error number. */
  protected transient int[] newErrorNumber;

  /** The new error list. */
  protected transient HashMap<String, Integer>[] newErrorList;

  /** The new known key found in segment. */
  public transient Set<String> newKnownKeyFoundInSegment;

  /** The new sub collector types. */
  private transient String[] newSubCollectorTypes;

  /** The new sub data types. */
  private transient String[] newSubDataTypes;

  /** The new sub stats types. */
  private transient String[] newSubStatsTypes;

  /** The new sub stats items. */
  private transient SortedSet<String>[] newSubStatsItems;

  /** The new sub sort types. */
  private transient String[] newSubSortTypes;

  /** The new sub sort directions. */
  private transient String[] newSubSortDirections;

  /** The new sub start. */
  private transient Integer[] newSubStart;

  /** The new sub number. */
  private transient Integer[] newSubNumber;

  /** The new sub collector list next level. */
  // subcollectors next level for adding
  protected transient MtasDataCollector<?, ?>[] newSubCollectorListNextLevel = null;

  /** The new sub collector next level. */
  protected transient MtasDataCollector<?, ?> newSubCollectorNextLevel = null;

  /** The closed. */
  protected transient boolean closed = false;

  /** The result. */
  private transient MtasDataCollectorResult<T1, T2> result = null;

  /**
   * Instantiates a new mtas data collector.
   *
   * @param collectorType the collector type
   * @param dataType the data type
   * @param statsType the stats type
   * @param statsItems the stats items
   * @param sortType the sort type
   * @param sortDirection the sort direction
   * @param start the start
   * @param number the number
   * @param segmentRegistration the segment registration
   * @param boundary the boundary
   * @throws IOException Signals that an I/O exception has occurred.
   */
  @SuppressWarnings("unchecked")
  protected MtasDataCollector(String collectorType, String dataType,
      String statsType, SortedSet<String> statsItems, String sortType,
      String sortDirection, Integer start, Integer number,
      String segmentRegistration, String boundary) throws IOException {
    // set properties
    this.closed = false;
    this.collectorType = collectorType; // data or list
    this.dataType = dataType; // long or double
    this.statsType = statsType; // basic, advanced or full
    this.statsItems = statsItems; // sum, n, all, ...
    this.sortType = sortType;
    this.sortDirection = sortDirection;
    this.start = start;
    this.number = number;
    this.segmentRegistration = segmentRegistration;
    this.withTotal = false;
    if (segmentRegistration != null) {
      segmentKeys = new HashSet<>();
      segmentKeyValueList = new LinkedHashMap<>();
      segmentValuesBoundary = new LinkedHashMap<>();
      segmentValueTopListLast = new LinkedHashMap<>();
      if (segmentRegistration.equals(SEGMENT_BOUNDARY_ASC)
          || segmentRegistration.equals(SEGMENT_BOUNDARY_DESC)) {
        if (boundary != null) {
          segmentValueBoundary = stringToBoundary(boundary);
        } else {
          throw new IOException("did expect boundary with segmentRegistration "
              + segmentRegistration);
        }
      } else if (boundary != null) {
        throw new IOException("didn't expect boundary with segmentRegistration "
            + segmentRegistration);
      }
    }
    // initialize administration
    keyList = new String[0];
    sourceNumberList = new int[0];
    errorNumber = new int[0];
    errorList = (HashMap<String, Integer>[]) new HashMap<?, ?>[0];
    size = 0;
    position = 0;
    // subCollectors properties
    hasSub = false;
    subCollectorTypes = null;
    subDataTypes = null;
    subStatsTypes = null;
    subStatsItems = null;
    subSortTypes = null;
    subSortDirections = null;
    subStart = null;
    subNumber = null;
    subCollectorListNextLevel = null;
    subCollectorNextLevel = null;
  }

  /**
   * Instantiates a new mtas data collector.
   *
   * @param collectorType the collector type
   * @param dataType the data type
   * @param statsType the stats type
   * @param statsItems the stats items
   * @param sortType the sort type
   * @param sortDirection the sort direction
   * @param start the start
   * @param number the number
   * @param subCollectorTypes the sub collector types
   * @param subDataTypes the sub data types
   * @param subStatsTypes the sub stats types
   * @param subStatsItems the sub stats items
   * @param subSortTypes the sub sort types
   * @param subSortDirections the sub sort directions
   * @param subStart the sub start
   * @param subNumber the sub number
   * @param segmentRegistration the segment registration
   * @param boundary the boundary
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected MtasDataCollector(String collectorType, String dataType,
      String statsType, SortedSet<String> statsItems, String sortType,
      String sortDirection, Integer start, Integer number,
      String[] subCollectorTypes, String[] subDataTypes, String[] subStatsTypes,
      SortedSet<String>[] subStatsItems, String[] subSortTypes,
      String[] subSortDirections, Integer[] subStart, Integer[] subNumber,
      String segmentRegistration, String boundary) throws IOException {
    // initialize
    this(collectorType, dataType, statsType, statsItems, sortType,
        sortDirection, start, number, segmentRegistration, boundary);
    // initialize subCollectors
    if (subCollectorTypes != null) {
      hasSub = true;
      this.subCollectorTypes = subCollectorTypes;
      this.subDataTypes = subDataTypes;
      this.subStatsTypes = subStatsTypes;
      this.subStatsItems = subStatsItems;
      this.subSortTypes = subSortTypes;
      this.subSortDirections = subSortDirections;
      this.subStart = subStart;
      this.subNumber = subNumber;
      if (subCollectorTypes.length > 1) {
        newSubCollectorTypes = Arrays.copyOfRange(subCollectorTypes, 1,
            subCollectorTypes.length);
        newSubDataTypes = Arrays.copyOfRange(subDataTypes, 1,
            subStatsTypes.length);
        newSubStatsTypes = Arrays.copyOfRange(subStatsTypes, 1,
            subStatsTypes.length);
        newSubStatsItems = Arrays.copyOfRange(subStatsItems, 1,
            subStatsItems.length);
        newSubSortTypes = Arrays.copyOfRange(subSortTypes, 1,
            subSortTypes.length);
        newSubSortDirections = Arrays.copyOfRange(subSortDirections, 1,
            subSortDirections.length);
        newSubStart = Arrays.copyOfRange(subStart, 1, subStart.length);
        newSubNumber = Arrays.copyOfRange(subNumber, 1, subNumber.length);
      }
      newSubCollectorListNextLevel = new MtasDataCollector[0];
    }
  }

  /**
   * Merge.
   *
   * @param newDataCollector the new data collector
   * @param map the map
   * @param increaseSourceNumber the increase source number
   * @throws IOException Signals that an I/O exception has occurred.
   */
  abstract public void merge(MtasDataCollector<?, ?> newDataCollector,
      Map<MtasDataCollector<?, ?>, MtasDataCollector<?, ?>> map,
      boolean increaseSourceNumber) throws IOException;

  /**
   * Inits the new list.
   *
   * @param maxNumberOfTerms the max number of terms
   * @param segmentName the segment name
   * @param segmentNumber the segment number
   * @param boundary the boundary
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public void initNewList(int maxNumberOfTerms, String segmentName,
      int segmentNumber, String boundary) throws IOException {
    if (closed) {
      result = null;
      closed = false;
    }
    initNewListBasic(maxNumberOfTerms);
    if (segmentRegistration != null) {
      this.segmentName = segmentName;
      this.segmentNumber = segmentNumber;
      if (!segmentKeyValueList.containsKey(segmentName)) {
        segmentKeyValueList.put(segmentName, new HashMap<String, T1>());
        if (segmentRegistration.equals(SEGMENT_BOUNDARY_ASC)
            || segmentRegistration.equals(SEGMENT_BOUNDARY_DESC)) {
          if (boundary != null) {
            segmentValuesBoundary.put(segmentName,
                stringToBoundary(boundary, segmentNumber));
          } else {
            throw new IOException("expected boundary");
          }
        } else {
          segmentValuesBoundary.put(segmentName, null);
        }
        segmentValueTopListLast.put(segmentName, null);
      }
      this.segmentValueTopList = new ArrayList<>();
    }
  }

  /**
   * Inits the new list.
   *
   * @param maxNumberOfTerms the max number of terms
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public void initNewList(int maxNumberOfTerms) throws IOException {
    if (closed) {
      result = null;
      closed = false;
    }
    if (segmentRegistration != null) {
      throw new IOException("missing segment name");
    } else {
      initNewListBasic(maxNumberOfTerms);
    }
  }

  /**
   * Inits the new list basic.
   *
   * @param maxNumberOfTerms the max number of terms
   * @throws IOException Signals that an I/O exception has occurred.
   */
  @SuppressWarnings("unchecked")
  private void initNewListBasic(int maxNumberOfTerms) throws IOException {
    if (!closed) {
      position = 0;
      newPosition = 0;
      newCurrentPosition = 0;
      newSize = maxNumberOfTerms + size;
      newKeyList = new String[newSize];
      newSourceNumberList = new int[newSize];
      newErrorNumber = new int[newSize];
      newErrorList = (HashMap<String, Integer>[]) new HashMap<?, ?>[newSize];
      newKnownKeyFoundInSegment = new HashSet<>();
      if (hasSub) {
        newSubCollectorListNextLevel = new MtasDataCollector[newSize];
      }
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Increase new list size.
   *
   * @throws IOException Signals that an I/O exception has occurred.
   */
  @SuppressWarnings("unchecked")
  protected void increaseNewListSize() throws IOException {
    if (!closed) {
      String[] tmpNewKeyList = newKeyList;
      int[] tmpNewSourceNumberList = newSourceNumberList;
      int[] tmpNewErrorNumber = newErrorNumber;
      HashMap<String, Integer>[] tmpNewErrorList = newErrorList;
      int tmpNewSize = newSize;
      newSize = 2 * newSize;
      newKeyList = new String[newSize];
      newSourceNumberList = new int[newSize];
      newErrorNumber = new int[newSize];
      newErrorList = (HashMap<String, Integer>[]) new HashMap<?, ?>[newSize];
      System.arraycopy(tmpNewKeyList, 0, newKeyList, 0, tmpNewSize);
      System.arraycopy(tmpNewSourceNumberList, 0, newSourceNumberList, 0,
          tmpNewSize);
      System.arraycopy(tmpNewErrorNumber, 0, newErrorNumber, 0, tmpNewSize);
      System.arraycopy(tmpNewErrorList, 0, newErrorList, 0, tmpNewSize);
      if (hasSub) {
        MtasDataCollector<?, ?>[] tmpNewSubCollectorListNextLevel = newSubCollectorListNextLevel;
        newSubCollectorListNextLevel = new MtasDataCollector[newSize];
        System.arraycopy(tmpNewSubCollectorListNextLevel, 0,
            newSubCollectorListNextLevel, 0, tmpNewSize);
      }
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Adds the.
   *
   * @param increaseSourceNumber the increase source number
   * @return the mtas data collector
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected final MtasDataCollector add(boolean increaseSourceNumber)
      throws IOException {
    if (!closed) {
      if (!collectorType.equals(DataCollector.COLLECTOR_TYPE_DATA)) {
        throw new IOException(
            "collector should be " + DataCollector.COLLECTOR_TYPE_DATA);
      } else {
        if (newPosition > 0) {
          newCurrentExisting = true;
        } else if (position < getSize()) {
          // copy
          newKeyList[0] = keyList[0];
          newSourceNumberList[0] = sourceNumberList[0];
          if (increaseSourceNumber) {
            newSourceNumberList[0]++;
          }
          newErrorNumber[0] = errorNumber[0];
          newErrorList[0] = errorList[0];
          if (hasSub) {
            newSubCollectorNextLevel = subCollectorNextLevel;
          }
          copyToNew(0, 0);
          newPosition = 1;
          position = 1;
          newCurrentExisting = true;
        } else {
          // add key
          newKeyList[0] = DataCollector.COLLECTOR_TYPE_DATA;
          newSourceNumberList[0] = 1;
          newErrorNumber[0] = 0;
          newErrorList[0] = new HashMap<>();
          newPosition = 1;
          newCurrentPosition = newPosition - 1;
          newCurrentExisting = false;
          // ready, only handle sub
          if (hasSub) {
            newSubCollectorNextLevel = DataCollector.getCollector(
                subCollectorTypes[0], subDataTypes[0], subStatsTypes[0],
                subStatsItems[0], subSortTypes[0], subSortDirections[0],
                subStart[0], subNumber[0], newSubCollectorTypes,
                newSubDataTypes, newSubStatsTypes, newSubStatsItems,
                newSubSortTypes, newSubSortDirections, newSubStart,
                newSubNumber, segmentRegistration, null);
          } else {
            newSubCollectorNextLevel = null;
          }
        }
        return newSubCollectorNextLevel;
      }
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Adds the.
   *
   * @param key the key
   * @param increaseSourceNumber the increase source number
   * @return the mtas data collector
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected final MtasDataCollector add(String key,
      boolean increaseSourceNumber) throws IOException {
    if (!closed) {
      if (collectorType.equals(DataCollector.COLLECTOR_TYPE_DATA)) {
        throw new IOException(
            "collector should be " + DataCollector.COLLECTOR_TYPE_LIST);
      } else if (key == null) {
        throw new IOException("key shouldn't be null");
      } else {
        // check previous added
        if ((newPosition > 0)
            && newKeyList[(newPosition - 1)].compareTo(key) >= 0) {
          int i = newPosition;
          do {
            i--;
            if (newKeyList[i].equals(key)) {
              newCurrentPosition = i;
              newCurrentExisting = true;
              if (subDataTypes != null) {
                return newSubCollectorListNextLevel[newCurrentPosition];
              } else {
                return null;
              }
            }
          } while ((i > 0) && (newKeyList[i].compareTo(key) > 0));
        }
        // move position in old list
        if (position < getSize()) {
          // just add smaller or equal items
          while (keyList[position].compareTo(key) <= 0) {
            if (newPosition == newSize) {
              increaseNewListSize();
            }
            // copy
            newKeyList[newPosition] = keyList[position];
            newSourceNumberList[newPosition] = sourceNumberList[position];
            newErrorNumber[newPosition] = errorNumber[position];
            newErrorList[newPosition] = errorList[position];
            if (hasSub) {
              newSubCollectorListNextLevel[newPosition] = subCollectorListNextLevel[position];
            }
            copyToNew(position, newPosition);
            newPosition++;
            position++;
            // check if added key from list is right key
            if (newKeyList[(newPosition - 1)].equals(key)) {
              if (increaseSourceNumber) {
                newSourceNumberList[(newPosition - 1)]++;
              }
              newCurrentPosition = newPosition - 1;
              newCurrentExisting = true;
              // register known key found again in segment
              newKnownKeyFoundInSegment.add(key);
              // ready
              if (hasSub) {
                return newSubCollectorListNextLevel[newCurrentPosition];
              } else {
                return null;
              }
              // stop if position exceeds size
            } else if (position == getSize()) {
              break;
            }
          }
        }
        // check size
        if (newPosition == newSize) {
          increaseNewListSize();
        }
        // add key
        newKeyList[newPosition] = key;
        newSourceNumberList[newPosition] = 1;
        newErrorNumber[newPosition] = 0;
        newErrorList[newPosition] = new HashMap<>();
        newPosition++;
        newCurrentPosition = newPosition - 1;
        newCurrentExisting = false;
        // ready, only handle sub
        if (hasSub) {
          newSubCollectorListNextLevel[newCurrentPosition] = DataCollector
              .getCollector(subCollectorTypes[0], subDataTypes[0],
                  subStatsTypes[0], subStatsItems[0], subSortTypes[0],
                  subSortDirections[0], subStart[0], subNumber[0],
                  newSubCollectorTypes, newSubDataTypes, newSubStatsTypes,
                  newSubStatsItems, newSubSortTypes, newSubSortDirections,
                  newSubStart, newSubNumber, segmentRegistration, null);
          return newSubCollectorListNextLevel[newCurrentPosition];
        } else {
          return null;
        }
      }
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Copy to new.
   *
   * @param position the position
   * @param newPosition the new position
   */
  protected abstract void copyToNew(int position, int newPosition);

  /**
   * Copy from new.
   */
  protected abstract void copyFromNew();

  /**
   * Compare with boundary.
   *
   * @param value the value
   * @param boundary the boundary
   * @return true, if successful
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected abstract boolean compareWithBoundary(T1 value, T1 boundary)
      throws IOException;

  /**
   * Last for computing segment.
   *
   * @param value the value
   * @param boundary the boundary
   * @return the t1
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected abstract T1 lastForComputingSegment(T1 value, T1 boundary)
      throws IOException;

  /**
   * Last for computing segment.
   *
   * @return the t1
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected abstract T1 lastForComputingSegment() throws IOException;

  /**
   * Boundary for segment.
   *
   * @param segmentName the segment name
   * @return the t1
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected abstract T1 boundaryForSegment(String segmentName)
      throws IOException;

  /**
   * Boundary for segment computing.
   *
   * @param segmentName the segment name
   * @return the t1
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected abstract T1 boundaryForSegmentComputing(String segmentName)
      throws IOException;

  /**
   * String to boundary.
   *
   * @param boundary the boundary
   * @param segmentNumber the segment number
   * @return the t1
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected abstract T1 stringToBoundary(String boundary, Integer segmentNumber)
      throws IOException;

  /**
   * String to boundary.
   *
   * @param boundary the boundary
   * @return the t1
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected T1 stringToBoundary(String boundary) throws IOException {
    return stringToBoundary(boundary, null);
  }

  /**
   * Close segment key value registration.
   *
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public void closeSegmentKeyValueRegistration() throws IOException {
    if (!closed) {
      if (segmentRegistration != null) {
        Map<String, T1> keyValueList = segmentKeyValueList.get(segmentName);
        T1 tmpSegmentValueBoundary = segmentValuesBoundary.get(segmentName);
        for (Entry<String, T1> entry : keyValueList.entrySet()) {
          if (tmpSegmentValueBoundary == null || compareWithBoundary(
              entry.getValue(), tmpSegmentValueBoundary)) {
            segmentKeys.add(entry.getKey());
          }
        }
      }
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Recompute segment keys.
   *
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public void recomputeSegmentKeys() throws IOException {
    if (!closed && segmentRegistration != null) {
      if (segmentRegistration.equals(SEGMENT_SORT_ASC)
          || segmentRegistration.equals(SEGMENT_SORT_DESC)
          || segmentRegistration.equals(SEGMENT_BOUNDARY_ASC)
          || segmentRegistration.equals(SEGMENT_BOUNDARY_DESC)) {

        if (segmentRegistration.equals(SEGMENT_SORT_ASC)
            || segmentRegistration.equals(SEGMENT_SORT_DESC)) {
          segmentKeys.clear();
          // recompute boundaries
          for (Entry<String, Map<String, T1>> entry : segmentKeyValueList
              .entrySet()) {
            T1 tmpSegmentValueBoundary = boundaryForSegment(entry.getKey());
            segmentValuesBoundary.put(entry.getKey(), tmpSegmentValueBoundary);
          }
          // compute adjusted boundaries and compute keys
          for (Entry<String, Map<String, T1>> entry : segmentKeyValueList
              .entrySet()) {
            this.segmentName = entry.getKey();
            Map<String, T1> keyValueList = entry.getValue();
            T1 tmpSegmentValueBoundaryForComputing = boundaryForSegmentComputing(
                entry.getKey());
            for (Entry<String, T1> subEntry : keyValueList.entrySet()) {
              if (tmpSegmentValueBoundaryForComputing == null
                  || compareWithBoundary(subEntry.getValue(),
                      tmpSegmentValueBoundaryForComputing)) {
                if (!segmentKeys.contains(subEntry.getKey())) {
                  segmentKeys.add(subEntry.getKey());
                }
              }
            }
          }
        }

        Map<String, T1> keyValueList;
        Set<String> recomputeKeyList;
        segmentRecomputeKeyList = new LinkedHashMap<>();
        for (String key : segmentKeys) {
          for (Entry<String, Map<String, T1>> entry : segmentKeyValueList
              .entrySet()) {
            keyValueList = entry.getValue();
            if (!keyValueList.containsKey(key)) {
              if (!segmentRecomputeKeyList.containsKey(entry.getKey())) {
                recomputeKeyList = new HashSet<>();
                segmentRecomputeKeyList.put(entry.getKey(), recomputeKeyList);
              } else {
                recomputeKeyList = segmentRecomputeKeyList.get(entry.getKey());
              }
              recomputeKeyList.add(key);
            }
          }
        }
        this.segmentName = null;
      } else {
        throw new IOException(
            "not for segmentRegistration " + segmentRegistration);
      }
    } else {
      throw new IOException("already closed or no segmentRegistration ("
          + segmentRegistration + ")");
    }
  }

  /**
   * Reduce to keys.
   *
   * @param keys the keys
   */
  public abstract void reduceToKeys(Set<String> keys);

  /**
   * Reduce to segment keys.
   */
  public void reduceToSegmentKeys() {
    if (segmentRegistration != null) {
      reduceToKeys(segmentKeys);
    }
  }

  /**
   * Check existence necessary keys.
   *
   * @return true, if successful
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public boolean checkExistenceNecessaryKeys() throws IOException {
    if (!closed) {
      if (segmentRegistration != null) {
        return segmentRecomputeKeyList.size() == 0;
      } else {
        return true;
      }
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Validate segment boundary.
   *
   * @param o the o
   * @return true, if successful
   * @throws IOException Signals that an I/O exception has occurred.
   */
  abstract public boolean validateSegmentBoundary(Object o) throws IOException;

  /**
   * Validate with segment boundary.
   *
   * @param value the value
   * @return true, if successful
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected boolean validateWithSegmentBoundary(T1 value) throws IOException {
    if (!closed && segmentRegistration != null) {
      T1 tmpSegmentValueBoundary = segmentValuesBoundary.get(segmentName);
      if (tmpSegmentValueBoundary == null
          || compareWithBoundary(value, tmpSegmentValueBoundary)) {
        return true;
      }
    }
    return false;
  }

  /**
   * Validate segment value.
   *
   * @param value the value
   * @param maximumNumber the maximum number
   * @param segmentNumber the segment number
   * @return the string
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public String validateSegmentValue(T1 value, int maximumNumber,
      int segmentNumber) throws IOException {
    if (!closed) {
      if (segmentRegistration != null) {
        if (maximumNumber > 0) {
          T1 tmpSegmentValueBoundary = segmentValuesBoundary.get(segmentName);
          if (segmentValueTopList.size() < maximumNumber
              || compareWithBoundary(value, tmpSegmentValueBoundary)) {
            return SEGMENT_KEY_OR_NEW;
          } else if (segmentKeys.size() > newKnownKeyFoundInSegment.size()) {
            return SEGMENT_POSSIBLE_KEY;
          } else {
            return null;
          }
        } else {
          return null;
        }
      } else {
        return null;
      }
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Validate segment value.
   *
   * @param key the key
   * @param value the value
   * @param maximumNumber the maximum number
   * @param segmentNumber the segment number
   * @param test the test
   * @return the string
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public String validateSegmentValue(String key, T1 value, int maximumNumber,
      int segmentNumber, boolean test) throws IOException {
    if (!closed) {
      if (segmentRegistration != null) {
        if (maximumNumber > 0) {
          T1 tmpSegmentValueMaxListMin = segmentValueTopListLast
              .get(segmentName);
          T1 tmpSegmentValueBoundary = segmentValuesBoundary.get(segmentName);
          if (segmentValueTopList.size() < maximumNumber) {
            if (!test) {
              segmentKeyValueList.get(segmentName).put(key, value);
              segmentValueTopList.add(value);
              segmentValueTopListLast.put(segmentName,
                  (tmpSegmentValueMaxListMin == null) ? value
                      : lastForComputingSegment(tmpSegmentValueMaxListMin,
                          value));
              if (segmentValueTopList.size() == maximumNumber) {
                tmpSegmentValueMaxListMin = segmentValueTopListLast
                    .get(segmentName);
                segmentValueTopListLast.put(segmentName,
                    tmpSegmentValueMaxListMin);
                segmentValuesBoundary.put(segmentName,
                    boundaryForSegmentComputing(segmentName));
              }
            }
            return segmentKeys.contains(key) ? SEGMENT_KEY : SEGMENT_NEW;
          } else if (compareWithBoundary(value, tmpSegmentValueBoundary)) {
            // System.out.println(key+" "+value+" "+tmpSegmentValueBoundary);
            if (!test) {
              segmentKeyValueList.get(segmentName).put(key, value);
              if (compareWithBoundary(value, tmpSegmentValueMaxListMin)) {
                segmentValueTopList.add(value);
                segmentValueTopList.remove(tmpSegmentValueMaxListMin);
                tmpSegmentValueMaxListMin = lastForComputingSegment();
                segmentValueTopListLast.put(segmentName,
                    tmpSegmentValueMaxListMin);
                segmentValuesBoundary.put(segmentName,
                    boundaryForSegmentComputing(segmentName));
              }
            }
            return segmentKeys.contains(key) ? SEGMENT_KEY : SEGMENT_NEW;
          } else if (segmentKeys.contains(key)) {
            if (!test) {
              segmentKeyValueList.get(segmentName).put(key, value);
            }
            return SEGMENT_KEY;
          } else {
            return null;
          }
        } else {
          return null;
        }
      } else {
        return null;
      }
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Sets the error.
   *
   * @param newPosition the new position
   * @param errorNumberItem the error number item
   * @param errorListItem the error list item
   * @param currentExisting the current existing
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected final void setError(int newPosition, int errorNumberItem,
      HashMap<String, Integer> errorListItem, boolean currentExisting)
      throws IOException {
    if (!closed) {
      if (currentExisting) {
        newErrorNumber[newPosition] += errorNumberItem;
        HashMap<String, Integer> item = newErrorList[newPosition];
        for (Entry<String, Integer> entry : errorListItem.entrySet()) {
          if (item.containsKey(entry.getKey())) {
            item.put(entry.getKey(),
                item.get(entry.getKey()) + entry.getValue());
          } else {
            item.put(entry.getKey(), entry.getValue());
          }
        }
      } else {
        newErrorNumber[newPosition] = errorNumberItem;
        newErrorList[newPosition] = errorListItem;
      }
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Sorted and unique.
   *
   * @param keyList the key list
   * @param size the size
   * @return true, if successful
   * @throws IOException Signals that an I/O exception has occurred.
   */
  private boolean sortedAndUnique(String[] keyList, int size)
      throws IOException {
    if (!closed) {
      for (int i = 1; i < size; i++) {
        if (keyList[(i - 1)].compareTo(keyList[i]) >= 0) {
          return false;
        }
      }
      return true;
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Compute sort and unique mapping.
   *
   * @param keyList the key list
   * @param size the size
   * @return the int[][]
   * @throws IOException Signals that an I/O exception has occurred.
   */
  private int[][] computeSortAndUniqueMapping(String[] keyList, int size)
      throws IOException {
    if (!closed) {
      if (size > 0) {
        SortedMap<String, int[]> sortedMap = new TreeMap<>();
        for (int i = 0; i < size; i++) {
          if (sortedMap.containsKey(keyList[i])) {
            int[] previousList = sortedMap.get(keyList[i]);
            int[] newList = new int[previousList.length + 1];
            System.arraycopy(previousList, 0, newList, 0, previousList.length);
            newList[previousList.length] = i;
            sortedMap.put(keyList[i], newList);
          } else {
            sortedMap.put(keyList[i], new int[] { i });
          }
        }
        Collection<int[]> values = sortedMap.values();
        int[][] result = new int[sortedMap.size()][];
        return values.toArray(result);
      } else {
        return null;
      }
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Remap data.
   *
   * @param mapping the mapping
   * @throws IOException Signals that an I/O exception has occurred.
   */
  protected void remapData(int[][] mapping) throws IOException {
    if (!closed) {
      // remap and merge keys
      String[] newKeyList = new String[mapping.length];
      // process mapping for functions?
      HashMap<MtasDataCollector<?, ?>, MtasDataCollector<?, ?>> map = new HashMap<>();
      int[] newSourceNumberList = new int[mapping.length];
      int[] newErrorNumber = new int[mapping.length];
      @SuppressWarnings("unchecked")
      HashMap<String, Integer>[] newErrorList = (HashMap<String, Integer>[]) new HashMap<?, ?>[mapping.length];
      for (int i = 0; i < mapping.length; i++) {
        newKeyList[i] = keyList[mapping[i][0]];
        newSourceNumberList[i] = sourceNumberList[mapping[i][0]];
        for (int j = 0; j < mapping[i].length; j++) {
          if (j == 0) {
            newErrorNumber[i] = errorNumber[mapping[i][j]];
            newErrorList[i] = errorList[mapping[i][j]];
          } else {
            newErrorNumber[i] += errorNumber[mapping[i][j]];
            for (Entry<String, Integer> entry : errorList[mapping[i][j]]
                .entrySet()) {
              if (newErrorList[i].containsKey(entry.getKey())) {
                newErrorList[i].put(entry.getKey(),
                    newErrorList[i].get(entry.getKey()) + entry.getValue());
              } else {
                newErrorList[i].put(entry.getKey(), entry.getValue());
              }
            }
          }
        }
      }
      if (hasSub) {
        newSubCollectorListNextLevel = new MtasDataCollector<?, ?>[mapping.length];
        for (int i = 0; i < mapping.length; i++) {
          for (int j = 0; j < mapping[i].length; j++) {
            if (j == 0 || newSubCollectorListNextLevel[i] == null) {
              newSubCollectorListNextLevel[i] = subCollectorListNextLevel[mapping[i][j]];
            } else {
              newSubCollectorListNextLevel[i]
                  .merge(subCollectorListNextLevel[mapping[i][j]], map, false);
            }
          }
        }
        subCollectorListNextLevel = newSubCollectorListNextLevel;
      }
      keyList = newKeyList;
      sourceNumberList = newSourceNumberList;
      errorNumber = newErrorNumber;
      errorList = newErrorList;
      size = keyList.length;
      position = 0;
    } else {
      throw new IOException("already closed");
    }
  }

  /**
   * Close new list.
   *
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public void closeNewList() throws IOException {
    if (!closed) {
      if (segmentRegistration != null) {
        this.segmentName = null;
      }
      if (newSize > 0) {
        // add remaining old
        while (position < getSize()) {
          if (newPosition == newSize) {
            increaseNewListSize();
          }
          newKeyList[newPosition] = keyList[position];
          newSourceNumberList[newPosition] = sourceNumberList[position];
          newErrorNumber[newPosition] = errorNumber[position];
          newErrorList[newPosition] = errorList[position];
          if (hasSub) {
            newSubCollectorListNextLevel[newPosition] = subCollectorListNextLevel[position];
          }
          copyToNew(position, newPosition);
          position++;
          newPosition++;
        }
        // copy
        keyList = newKeyList;
        sourceNumberList = newSourceNumberList;
        errorNumber = newErrorNumber;
        errorList = newErrorList;
        subCollectorListNextLevel = newSubCollectorListNextLevel;
        copyFromNew();
        size = newPosition;
        // sort and merge
        if (!sortedAndUnique(keyList, getSize())) {
          remapData(computeSortAndUniqueMapping(keyList, getSize()));
        }
      }
      position = 0;
      newSize = 0;
      newPosition = 0;
      newCurrentPosition = 0;
    }
  }

  /**
   * Gets the item.
   *
   * @param i the i
   * @return the item
   */
  abstract protected MtasDataItem<T1, T2> getItem(int i);

  /**
   * Checks for sub.
   *
   * @return true, if successful
   */
  protected boolean hasSub() {
    return hasSub;
  }

  /**
   * Error.
   *
   * @param error the error
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public abstract void error(String error) throws IOException;

  /**
   * Error.
   *
   * @param key the key
   * @param error the error
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public abstract void error(String key, String error) throws IOException;

  /**
   * Adds the.
   *
   * @param valueSum the value sum
   * @param valueN the value N
   * @return the mtas data collector
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public abstract MtasDataCollector add(long valueSum, long valueN)
      throws IOException;

  /**
   * Adds the.
   *
   * @param values the values
   * @param number the number
   * @return the mtas data collector
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public abstract MtasDataCollector add(long[] values, int number)
      throws IOException;

  /**
   * Adds the.
   *
   * @param valueSum the value sum
   * @param valueN the value N
   * @return the mtas data collector
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public abstract MtasDataCollector add(double valueSum, long valueN)
      throws IOException;

  /**
   * Adds the.
   *
   * @param values the values
   * @param number the number
   * @return the mtas data collector
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public abstract MtasDataCollector add(double[] values, int number)
      throws IOException;

  /**
   * Adds the.
   *
   * @param key the key
   * @param valueSum the value sum
   * @param valueN the value N
   * @return the mtas data collector
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public abstract MtasDataCollector add(String key, long valueSum, long valueN)
      throws IOException;

  /**
   * Adds the.
   *
   * @param key the key
   * @param values the values
   * @param number the number
   * @return the mtas data collector
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public abstract MtasDataCollector add(String key, long[] values, int number)
      throws IOException;

  /**
   * Adds the.
   *
   * @param key the key
   * @param valueSum the value sum
   * @param valueN the value N
   * @return the mtas data collector
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public abstract MtasDataCollector add(String key, double valueSum,
      long valueN) throws IOException;

  /**
   * Adds the.
   *
   * @param key the key
   * @param values the values
   * @param number the number
   * @return the mtas data collector
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public abstract MtasDataCollector add(String key, double[] values, int number)
      throws IOException;

  /*
   * (non-Javadoc)
   * 
   * @see java.lang.Object#toString()
   */
  @Override
  public String toString() {
    StringBuilder text = new StringBuilder();
    text.append(this.getClass().getSimpleName() + "-" + this.hashCode() + "\n");
    text.append("\t=== " + collectorType + " - " + statsType + " " + statsItems
        + " " + hasSub + " ===\n");
    text.append("\tclosed: " + closed + "\n");
    text.append("\tkeylist: " + Arrays.asList(keyList) + "\n");
    text.append("\tsegmentKeys: "
        + (segmentKeys != null ? segmentKeys.contains("1") : "null") + "\n");
    return text.toString().trim();
  }

  /**
   * Gets the result.
   *
   * @return the result
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public MtasDataCollectorResult<T1, T2> getResult() throws IOException {
    if (!closed) {
      close();
    }
    return result;
  }

  /**
   * Gets the key list.
   *
   * @return the key list
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public Set<String> getKeyList() throws IOException {
    if (!closed) {
      close();
    }
    return new HashSet<>(Arrays.asList(keyList));
  }

  /**
   * Gets the stats items.
   *
   * @return the stats items
   */
  public SortedSet<String> getStatsItems() {
    return statsItems;
  }

  /**
   * Close.
   *
   * @throws IOException Signals that an I/O exception has occurred.
   */
  @SuppressWarnings({ "rawtypes", "unchecked" })
  public void close() throws IOException {
    if (!closed) {
      closeNewList();
      if (collectorType.equals(DataCollector.COLLECTOR_TYPE_LIST)) {
        // compute initial basic list
        TreeMap<String, MtasDataItem<T1, T2>> basicList = new TreeMap<>();
        for (int i = 0; i < getSize(); i++) {
          MtasDataItem<T1, T2> newItem = getItem(i);
          if (basicList.containsKey(keyList[i])) {
            newItem.add(basicList.get(keyList[i]));
          }
          basicList.put(keyList[i], newItem);
        }
        // create result based on basic list
        result = new MtasDataCollectorResult<>(collectorType, sortType,
            sortDirection, basicList, start, number);
        // reduce
        if (segmentRegistration != null) {
          if (segmentRegistration.equals(SEGMENT_SORT_ASC)
              || segmentRegistration.equals(SEGMENT_SORT_DESC)) {
            reduceToKeys(result.getComparatorList().keySet());
          } else if (segmentRegistration.equals(SEGMENT_BOUNDARY_ASC)
              || segmentRegistration.equals(SEGMENT_BOUNDARY_DESC)) {
            Map<String, MtasDataItemNumberComparator> comparatorList = result
                .getComparatorList();
            HashSet<String> filteredKeySet = new HashSet<>();
            if (segmentRegistration.equals(SEGMENT_BOUNDARY_ASC)) {
              for (Entry<String, MtasDataItemNumberComparator> entry : comparatorList
                  .entrySet()) {
                if (entry.getValue().compareTo(segmentValueBoundary) < 0) {
                  filteredKeySet.add(entry.getKey());
                }
              }
            } else {
              for (Entry<String, MtasDataItemNumberComparator> entry : comparatorList
                  .entrySet()) {
                if (entry.getValue().compareTo(segmentValueBoundary) > 0) {
                  filteredKeySet.add(entry.getKey());
                }
              }
            }
            reduceToKeys(filteredKeySet);
            basicList.keySet().retainAll(filteredKeySet);
            result = new MtasDataCollectorResult<>(collectorType, sortType,
                sortDirection, basicList, start, number);
          }
        }
      } else if (collectorType.equals(DataCollector.COLLECTOR_TYPE_DATA)) {
        if (getSize() > 0) {
          result = new MtasDataCollectorResult<>(collectorType, getItem(0));
        } else {
          result = new MtasDataCollectorResult<>(collectorType, sortType,
              sortDirection);
        }
      } else {
        throw new IOException("type " + collectorType + " not supported");
      }
      closed = true;
    }
  }

  /**
   * Gets the collector type.
   *
   * @return the collector type
   */
  public String getCollectorType() {
    return collectorType;
  }

  /**
   * Gets the stats type.
   *
   * @return the stats type
   */
  public String getStatsType() {
    return statsType;
  }

  /**
   * Gets the data type.
   *
   * @return the data type
   */
  public String getDataType() {
    return dataType;
  }

  /**
   * Gets the size.
   *
   * @return the size
   */
  public int getSize() {
    return size;
  }

  /**
   * With total.
   *
   * @return true, if successful
   */
  public boolean withTotal() {
    return withTotal;
  }

  /**
   * Sets the with total.
   *
   * @throws IOException Signals that an I/O exception has occurred.
   */
  public void setWithTotal() throws IOException {
    if (collectorType.equals(DataCollector.COLLECTOR_TYPE_LIST)) {
      if (segmentName != null) {
        throw new IOException("can't get total with segmentRegistration");
      } else {
        withTotal = true;
      }
    } else {
      throw new IOException(
          "can't get total for dataCollector of type " + collectorType);
    }
  }

}