Skip to content

Commit

Permalink
Merge pull request #15 from mjkrumlauf/master
Browse files Browse the repository at this point in the history
Reformatted ECL files
  • Loading branch information
mjkrumlauf committed Jun 13, 2014
2 parents 9d55dc3 + cac3758 commit e744349
Show file tree
Hide file tree
Showing 14 changed files with 650 additions and 625 deletions.
140 changes: 74 additions & 66 deletions ecl/BuildIndex_Scheduler.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -3,83 +3,91 @@ IMPORT $,STD;
// Get all the Raw files from SuperFile and Process it (Create a base, Build an index, Add it to SuperKey and use it in ROXIE Queries).
// Delete the raw files read from Superfile
BuildIndex_Scheduler(STRING currentTime) := FUNCTION
currentCombinedFile := $.files.COMBINED_FILE_TMP + currentTime;
currentBaseFileName := $.files.BASE_FILE_NAME + currentTime;
currentKeyName_ACCDEC := $.files.KEY_ACCDEC_NAME + currentTime;
currentKeyName_BYSPEED := $.files.KEY_BYSPEED_NAME + currentTime;
// Compress all the Subfiles in Superfile and then build the index on one file
// STEP:1 Output the contents of the Superfile
ds_rawsuperfile := DATASET($.files.SUPERFILE_RAWDATA_TEMP, $.layouts.input, CSV);
combineSubFiles := OUTPUT(ds_rawsuperfile, ,currentCombinedFile , OVERWRITE, CSV);
currentCombinedFile := $.files.COMBINED_FILE_TMP + currentTime;
currentBaseFileName := $.files.BASE_FILE_NAME + currentTime;
currentKeyName_ACCDEC := $.files.KEY_ACCDEC_NAME + currentTime;
currentKeyName_BYSPEED := $.files.KEY_BYSPEED_NAME + currentTime;
// Compress all the Subfiles in Superfile and then build the index on one file
// STEP:1 Output the contents of the Superfile
ds_rawsuperfile := DATASET($.files.SUPERFILE_RAWDATA_TEMP, $.layouts.input, CSV);
combineSubFiles := OUTPUT(ds_rawsuperfile, ,currentCombinedFile , OVERWRITE, CSV);

// STEP 2: Clear SuperFile
clearTempRawSuperfile := SEQUENTIAL(
STD.File.StartSuperFileTransaction(),
STD.File.ClearSuperFile($.files.SUPERFILE_RAWDATA_TEMP),
STD.File.FinishSuperFileTransaction()
);
// STEP 2: Clear SuperFile
clearTempRawSuperfile := SEQUENTIAL(
STD.File.StartSuperFileTransaction(),
STD.File.ClearSuperFile($.files.SUPERFILE_RAWDATA_TEMP),
STD.File.FinishSuperFileTransaction()
);

// Create a Base file
contrib := $.telematics_calc(currentCombinedFile);
combined := contrib;
build_contrib := OUTPUT(combined, , currentBaseFileName, THOR, OVERWRITE);
// Create a Base file
contrib := $.telematics_calc(currentCombinedFile);
combined := contrib;
build_contrib := OUTPUT(combined, , currentBaseFileName, THOR, OVERWRITE);

// Build a INDEX
ds_baseFile := DATASET( currentBaseFileName, $.layouts.base, THOR, OPT);
buildIndexForAccDec := BUILDINDEX($.key_acc_dec(ds_baseFile, currentKeyName_ACCDEC), OVERWRITE);
buildIndexForSpeed := BUILDINDEX($.key_bySpeed(ds_baseFile, currentKeyName_BYSPEED), OVERWRITE);

// Add INDEX to Superkey
AddLogicalFilesToSuperFile := SEQUENTIAL (
STD.File.StartSuperFileTransaction(),
STD.File.AddSuperFile($.files.SUPERKEY_ACCDEC, currentKeyName_ACCDEC),
STD.File.AddSuperFile($.files.SUPERKEY_SPEED, currentKeyName_BYSPEED);
STD.File.FinishSuperFileTransaction()
);

// Delete CombinedFile
deleteLogicalFiles := STD.File.DeleteLogicalFile(currentCombinedFile, TRUE);

subfileList := NOTHOR(STD.File.SuperFileContents($.files.SUPERFILE_RAWDATA_TEMP));
deployPackage := $.DeployPackage;
subFilesExists := IF( EXISTS (subfileList),
SEQUENTIAL (
combineSubFiles,
clearTempRawSuperfile,
build_contrib,
buildIndexForAccDec,
buildIndexForSpeed,
AddLogicalFilesToSuperFile,
OUTPUT(deployPackage),
deleteLogicalFiles
));
RETURN subFilesExists;
// Build a INDEX
ds_baseFile := DATASET( currentBaseFileName, $.layouts.base, THOR, OPT);
buildIndexForAccDec := BUILDINDEX($.key_acc_dec(ds_baseFile, currentKeyName_ACCDEC), OVERWRITE);
buildIndexForSpeed := BUILDINDEX($.key_bySpeed(ds_baseFile, currentKeyName_BYSPEED), OVERWRITE);

// Add INDEX to Superkey
AddLogicalFilesToSuperFile := SEQUENTIAL (
STD.File.StartSuperFileTransaction(),
STD.File.AddSuperFile($.files.SUPERKEY_ACCDEC, currentKeyName_ACCDEC),
STD.File.AddSuperFile($.files.SUPERKEY_SPEED, currentKeyName_BYSPEED);
STD.File.FinishSuperFileTransaction()
);

// Delete CombinedFile
deleteLogicalFiles := STD.File.DeleteLogicalFile(currentCombinedFile, TRUE);

subfileList := NOTHOR(STD.File.SuperFileContents($.files.SUPERFILE_RAWDATA_TEMP));
deployPackage := $.DeployPackage;
subFilesExists :=
IF(EXISTS(subfileList),
SEQUENTIAL(
combineSubFiles,
clearTempRawSuperfile,
build_contrib,
buildIndexForAccDec,
buildIndexForSpeed,
AddLogicalFilesToSuperFile,
OUTPUT(deployPackage),
deleteLogicalFiles)
);

RETURN subFilesExists;
END;
time := $.Util.getTimeDate() : INDEPENDENT;

// Create superfiles
CreateSuperFiles := SEQUENTIAL(
IF( ~STD.File.SuperFileExists($.files.SUPERFILE_RAWDATA_TEMP),
STD.File.CreateSuperFile($.files.SUPERFILE_RAWDATA_TEMP));
IF(~STD.File.SuperFileExists($.files.SUPERKEY_ACCDEC),
STD.File.CreateSuperFile($.files.SUPERKEY_ACCDEC));
IF(~STD.File.SuperFileExists($.files.SUPERKEY_SPEED),
STD.File.CreateSuperFile($.files.SUPERKEY_SPEED));
);
CreateSuperFiles :=
SEQUENTIAL(
IF(~STD.File.SuperFileExists($.files.SUPERFILE_RAWDATA_TEMP),
STD.File.CreateSuperFile($.files.SUPERFILE_RAWDATA_TEMP));
IF(~STD.File.SuperFileExists($.files.SUPERKEY_ACCDEC),
STD.File.CreateSuperFile($.files.SUPERKEY_ACCDEC));
IF(~STD.File.SuperFileExists($.files.SUPERKEY_SPEED),
STD.File.CreateSuperFile($.files.SUPERKEY_SPEED));
);

// Swap Superfile contents to temp superfile. This acts like a Temp storage for the files to be processed.
// If there are contents then only swap.
swapSuperFileContents := IF( EXISTS($.Util.getSuperFileContents($.files.SUPERFILE_RAWDATA)),
SEQUENTIAL (
STD.File.StartSuperFileTransaction(),
STD.File.SwapSuperFile($.files.SUPERFILE_RAWDATA, $.files.SUPERFILE_RAWDATA_TEMP),
STD.File.FinishSuperFileTransaction()
));
swapSuperFileContents :=
IF(EXISTS($.Util.getSuperFileContents($.files.SUPERFILE_RAWDATA)),
SEQUENTIAL(
STD.File.StartSuperFileTransaction(),
STD.File.SwapSuperFile($.files.SUPERFILE_RAWDATA, $.files.SUPERFILE_RAWDATA_TEMP),
STD.File.FinishSuperFileTransaction()
)
);

createFilesAndgetContents := SEQUENTIAL(CreateSuperFiles, swapSuperFileContents);
buildIndexes := IF(EXISTS($.Util.getSuperFileContents($.files.SUPERFILE_RAWDATA_TEMP)), BuildIndex_Scheduler(time));
buildIndexes :=
IF(EXISTS($.Util.getSuperFileContents($.files.SUPERFILE_RAWDATA_TEMP)),
BuildIndex_Scheduler(time));

start_build_process := SEQUENTIAL(createFilesAndgetContents, buildIndexes);
start_build_process : WHEN ( CRON ( '0-59/5 * * * *' ) ); //SCHEDULE A JOB every 5 minute
148 changes: 74 additions & 74 deletions ecl/Constants.ecl
Original file line number Diff line number Diff line change
@@ -1,77 +1,77 @@
EXPORT Constants := MODULE

EXPORT bySpeed := MODULE
EXPORT range1_lo := 1;
EXPORT range1_hi := 5;
EXPORT range2_lo := 6;
EXPORT range2_hi := 10;
EXPORT range3_lo := 11;
EXPORT range3_hi := 20;
EXPORT range4_lo := 21;
EXPORT range4_hi := 30;
EXPORT range5_lo := 31;
EXPORT range5_hi := 40;
EXPORT range6_lo := 41;
EXPORT range6_hi := 50;
EXPORT range7_lo := 51;
EXPORT range7_hi := 60;
EXPORT range8_lo := 61;
EXPORT range8_hi := 70;
EXPORT range9_lo := 71;
EXPORT range9_hi := 80;
EXPORT range10_lo := 81;
EXPORT range10_hi := 85;
EXPORT range11_lo := 86;
EXPORT range11_hi := 1000; // arbitrary hi val
END;
EXPORT accel := MODULE
EXPORT range1_lo := 1;
EXPORT range1_hi := 2;
EXPORT range2_lo := 3;
EXPORT range2_hi := 5;
EXPORT range3_lo := 6;
EXPORT range3_hi := 7;
EXPORT range4_lo := 8;
EXPORT range4_hi := 10;
EXPORT range5_lo := 11;
EXPORT range5_hi := 12;
EXPORT range6_lo := 13;
EXPORT range6_hi := 15;
EXPORT range7_lo := 16;
EXPORT range7_hi := 1000; // arbitrary low val
END;
EXPORT decel := MODULE
EXPORT range1_lo := -2;
EXPORT range1_hi := -1;
EXPORT range2_lo := -5;
EXPORT range2_hi := -3;
EXPORT range3_lo := -7;
EXPORT range3_hi := -6;
EXPORT range4_lo := -10;
EXPORT range4_hi := -8;
EXPORT range5_lo := -12;
EXPORT range5_hi := -11;
EXPORT range6_lo := -15;
EXPORT range6_hi := -13;
EXPORT range7_lo := -1000;
EXPORT range7_hi := -16; // arbitrary low val
END;
EXPORT real center_lat := 31.180866;
EXPORT real center_lon := 121.601206;
// KAFKA Topic and Consumer-Group
EXPORT STRING topic_name := 'vehicle-simulator';
EXPORT STRING consumer_group_name := 'grp-vehicle-simulator';
EXPORT STRING LandingZoneIP := '127.0.0.1';
EXPORT STRING Roxie_Hostname := '127.0.0.1';
EXPORT STRING RoxieUrl_WsWorkunits := 'http://' + Roxie_Hostname + ':8010/WsWorkunits?ver_=1.44';
EXPORT STRING RoxieUrl_WsPackageProcess := 'http://'+ Roxie_Hostname + ':8010/WsPackageProcess';
EXPORT STRING Roxie_Clustername := 'roxie';
EXPORT STRING Package_Name := 'demo.pkg';
EXPORT STRING Dali_IP := LandingZoneIP;
EXPORT bySpeed := MODULE
EXPORT range1_lo := 1;
EXPORT range1_hi := 5;
EXPORT range2_lo := 6;
EXPORT range2_hi := 10;
EXPORT range3_lo := 11;
EXPORT range3_hi := 20;
EXPORT range4_lo := 21;
EXPORT range4_hi := 30;
EXPORT range5_lo := 31;
EXPORT range5_hi := 40;
EXPORT range6_lo := 41;
EXPORT range6_hi := 50;
EXPORT range7_lo := 51;
EXPORT range7_hi := 60;
EXPORT range8_lo := 61;
EXPORT range8_hi := 70;
EXPORT range9_lo := 71;
EXPORT range9_hi := 80;
EXPORT range10_lo := 81;
EXPORT range10_hi := 85;
EXPORT range11_lo := 86;
EXPORT range11_hi := 1000; // arbitrary hi val
END;
EXPORT accel := MODULE
EXPORT range1_lo := 1;
EXPORT range1_hi := 2;
EXPORT range2_lo := 3;
EXPORT range2_hi := 5;
EXPORT range3_lo := 6;
EXPORT range3_hi := 7;
EXPORT range4_lo := 8;
EXPORT range4_hi := 10;
EXPORT range5_lo := 11;
EXPORT range5_hi := 12;
EXPORT range6_lo := 13;
EXPORT range6_hi := 15;
EXPORT range7_lo := 16;
EXPORT range7_hi := 1000; // arbitrary low val
END;
EXPORT decel := MODULE
EXPORT range1_lo := -2;
EXPORT range1_hi := -1;
EXPORT range2_lo := -5;
EXPORT range2_hi := -3;
EXPORT range3_lo := -7;
EXPORT range3_hi := -6;
EXPORT range4_lo := -10;
EXPORT range4_hi := -8;
EXPORT range5_lo := -12;
EXPORT range5_hi := -11;
EXPORT range6_lo := -15;
EXPORT range6_hi := -13;
EXPORT range7_lo := -1000;
EXPORT range7_hi := -16; // arbitrary low val
END;
EXPORT real center_lat := 31.180866;
EXPORT real center_lon := 121.601206;
// KAFKA Topic and Consumer-Group
EXPORT STRING topic_name := 'vehicle-simulator';
EXPORT STRING consumer_group_name := 'grp-vehicle-simulator';
EXPORT STRING LandingZoneIP := '127.0.0.1';
EXPORT STRING Roxie_Hostname := '127.0.0.1';
EXPORT STRING RoxieUrl_WsWorkunits := 'http://' + Roxie_Hostname + ':8010/WsWorkunits?ver_=1.44';
EXPORT STRING RoxieUrl_WsPackageProcess := 'http://'+ Roxie_Hostname + ':8010/WsPackageProcess';
EXPORT STRING Roxie_Clustername := 'roxie';
EXPORT STRING Package_Name := 'demo.pkg';
EXPORT STRING Dali_IP := LandingZoneIP;
END;
72 changes: 34 additions & 38 deletions ecl/CreatePackageMapString.ecl
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,42 @@ IMPORT STD.File AS File;

EXPORT CreatePackageMapString(STRING queryName, STRING sourceSuperFilePath, STRING querySuperFilePath) := FUNCTION

queryDef := '<Package id="' + TRIM(queryName, LEFT, RIGHT) + '"><Base id="' + TRIM(querySuperFilePath, LEFT, RIGHT) + '"/></Package>';
sourceSubFiles := $.Util.getSuperFileContents(sourceSuperFilePath);
TextRec := RECORD
STRING s;
END;
queryDef := '<Package id="' + TRIM(queryName, LEFT, RIGHT) + '"><Base id="' + TRIM(querySuperFilePath, LEFT, RIGHT) + '"/></Package>';
sourceSubFiles := $.Util.getSuperFileContents(sourceSuperFilePath);
TextRec := RECORD
STRING s;
END;

subFileDefList := PROJECT (
sourceSubFiles,
TRANSFORM (
subFileDefList :=
PROJECT(sourceSubFiles,
TRANSFORM(
TextRec,
SELF.s := '<SubFile value="~' + LEFT.name + '"/>';
),
),
LOCAL
);
// OUTPUT(subFileDefList, NAMED('subFileDefList'));

subFileDefs := ROLLUP (
subFileDefList,
TRUE,
TRANSFORM
(
TextRec,
SELF.s := LEFT.s + RIGHT.s
)
);
// OUTPUT(subFileDefs, NAMED('subFileDefs'));

newSuperFileDef := '<SuperFile id="' + TRIM(sourceSuperFilePath, LEFT, RIGHT) + '">' + subFileDefs[1].s + '</SuperFile>';
// OUTPUT(newSuperFileDef, NAMED('newSuperFileDef'));

newPackageDef := '<Package id="' + TRIM(querySuperFilePath, LEFT, RIGHT) + '">' + newSuperFileDef + '</Package>';
// OUTPUT(newPackageDef, NAMED('newPackageDef'));

buildpackage := queryDef + newPackageDef;
// OUTPUT(buildpackage, NAMED('buildpackage'));

packageDefinition := IF( EXISTS(sourceSubFiles), buildpackage, '');
// packageDefinition;

RETURN packageDefinition;

);
// OUTPUT(subFileDefList, NAMED('subFileDefList'));

subFileDefs :=
ROLLUP(subFileDefList,
TRUE,
TRANSFORM(TextRec, SELF.s := LEFT.s + RIGHT.s)
);
// OUTPUT(subFileDefs, NAMED('subFileDefs'));

newSuperFileDef := '<SuperFile id="' + TRIM(sourceSuperFilePath, LEFT, RIGHT) + '">' + subFileDefs[1].s + '</SuperFile>';
// OUTPUT(newSuperFileDef, NAMED('newSuperFileDef'));

newPackageDef := '<Package id="' + TRIM(querySuperFilePath, LEFT, RIGHT) + '">' + newSuperFileDef + '</Package>';
// OUTPUT(newPackageDef, NAMED('newPackageDef'));

buildpackage := queryDef + newPackageDef;
// OUTPUT(buildpackage, NAMED('buildpackage'));

packageDefinition := IF(EXISTS(sourceSubFiles), buildpackage, '');
// packageDefinition;

RETURN packageDefinition;

END;
Loading

0 comments on commit e744349

Please sign in to comment.