Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Source Code and Testing for Google Cloud's Data Pipelines Create …
…Operator (#32843) * CreateDataPipeline operator and hook created. * Added DAG for datapipeline to test if running as hoped * created the run pipeline hook, updated the data pipeline create hook and operator to properly intake the body * updated the create pipeline to accept the body. created run pipeline hook. updated operator accordingly * Add RunDataPipelineOperator * Example dags is working for creating a new pipeline. Committing minor chagnes before working on run operator/ hook exanple dag and testing * return full response body update * The Create and Run operators are meant to work together, so it was recommended that the run operator accept the entire response body. Updated the Run Operator to accept the response body dict and removed the other parameters because they were used to build the pipeline name, which the response body includes. Created variable for the pipeline name that takes it from the given response body. Used it to call upon the hook. Updated the hook to accept the pipeline name and removed the building of the name. Added run_data_pipeline to example dags to call the Run Data Pipeline operator passing a pipeline name taken from the create_data_pipeline result. (which should be a response dict). * Update variable names to be consistent * Update the Run Operator to accept the Pipeline name instead of the response body. * Update example dags to reflect that only the name is being returned from the Create Operator * Create Operator returns full response body. example dag reflects that * fixed syntax errors. gave run example dag a task id and pipeline name for testing * Added project_id and location to run pipeline hook. Updated the dags to work with dataflow-interns. Running properly. Added project_id and location to run operator. * add testing file * Change created pipeline source to airflow * Fix source format and remove whitespace * Raise exception if data_pipeline_name not given * Removed unnecessary variable data_pipeline_name from operator and all placed referenced for the create. Added code comments to explain parameters for operators and hooks. Updated run hook to utilize all variables. Removed some whitespace. * Updated code comments * Updated comments to include that the return values * Create hooks test file, rename operator test file * Updating Branch * Change created pipeline source to airflow * Fix source format and remove whitespace * Create hooks test file, rename operator test file * Change created pipeline source to airflow * Fix source format and remove whitespace * Raise exception if data_pipeline_name not given * Create hooks test file, rename operator test file * Change created pipeline source to airflow * Fix source format and remove whitespace * Change created pipeline source to airflow * Fix source format and remove whitespace * Raise exception if data_pipeline_name not given * Create hooks test file, rename operator test file * Add Airflow Exceptions for run operator * Test variables and operator used for testing create op. Some imports that I believe will be necessary for testing create op * Change created pipeline source to airflow * Raise exception if data_pipeline_name not given * Create hooks test file, rename operator test file * Add Airflow Exceptions for run operator * Raise exception if data_pipeline_name not given * Create hooks test file, rename operator test file * Raise exception if data_pipeline_name not given * Create hooks test file, rename operator test file * Merge Conflicts resolved * Added Airflow exception for if body parameter is not given. Cannot create the pipeline without it. * Fixed indentation * address PR comments * accidentally deleted the service access line * Addresses PR comment * Re: xianhualiu; Raise exception if pipeline run error * Passing Test Cases if the Create DataPipeline Operator is given the correct input * Passing test for if projectid param is not given. * Passing tests that check that the operator is passed the body and location. * Create test for run operator execute * Accidentally added to wrong branch, undoing * undoing * Passing test that if the response body contains an error an Airflow Exception is raised * Re: xianhualiu; Raise exception if pipeline run error * Create test for run operator execute * undoing * Removed the Run Operator test because everything was not pushed * Create tests for run operator * Remove import; using mock for hook * Change test names to be consistent with create operator tests * Commit for host to review. Renders an error that gcp_conn_id doesn't exist bc it is not mocking properly and that .create is not called in reference to line 127. these should only get feedback and not be approved to merge * . * minor fix to params in mock request * Create hooks tests for running pipeline * Update grammar * Remove comment * update * Update * Revert "Add tests for run operator" * Minor removals based on Manav comments * Revert "Revert "Add tests for run operator"" * Test hook for running data pipeline * remove comment * test_create_data_pipeline was failing because "create()" was not being called. this was because I used the wrong test_parent and my project id wasn't the same all around. fixed this and removed unnecessary patch. now focusing on mock gcp conn issue. * Assert that run_data_pipeline call returns job * completed code. cleaned up unnecessary imports * removed an overlooked conflict from when I merged the run test * formatting all files created for both operators and hooks * Merging the Create Operator code to Apache * prepare for sync * delete dags for dataflow * Altered DAG to use variables instead of hard coded values. * renamed test to better suit their function * Create and start documentation for operators * Added documentation draft for create operator. * Adding Data Pipelines API REST Resource to the documentation as a reference. * Move DAGs file to the system test DataPipelines folder * updated DAGs to create a local bucket and pull from that instead of the external resource. Added "resources" folder. * added impersonation chain & updated comments * static check fixes * adding license * adding resource files. add None check for the data_pipeline * updated unit test to account for impersonation chain * Fixed failing pre-commits * updated documentation to pull from the correct file location * increase underline length --------- Co-authored-by: shaniyaclement <shaniya.clement17@gmail.com> Co-authored-by: Brenda Pham <bloop@google.com> Co-authored-by: Brenda Pham <93027753+blpham@users.noreply.github.com>
- Loading branch information