Module src.Pipelines
Classes
class ApiPipeline (request_factory: RequestFactory, sleeping_time: float = None)
-
Abstract ApiPipeline
All ApiPipeline class must inherit from this class methods read, process and write needs to be override in the subclass
Arguments
request_factory(Required):
RequestFactory instance (see the doc). A RequestFactory instance that will create all requests of the pipe
sleeping_time(Optional):
Float. If api calls need to be delayed, add the time in seconds you want that pipe sleep after each request to 'sleeping_time' argument
Ancestors
- GenericPipeline
- abc.ABC
Class variables
var request_factory
Instance variables
var err_log
-
List of errors occured during Pipe
Log objects are 4-tuple like ("entry", "status_code_if_there_is", "datetime", "typeError") Errors catched are requests.exceptions.ConnectionError, Timeout, and HttpError
Methods
def err_params_log(self)
-
return error logs parameters to rerun the pipe with failed requests
def process(self, entry)
-
execute the requests created by read() method and sleep if needed
if an error Occurs during request execution an log object is added to err_log argument Log objects are 4-tuple like
("entry", "status_code_if_there_is", "datetime", "typeError")
Errors catched are requests.exceptions.ConnectionError, Timeout, and HttpError
Arguments
entry:
a request element that is passed through this function in run_pipe method check read() method documentation
def read(self, entry)
-
wrap request parameters in the requestFactory
create a request with a data element passed in argument and the requestFactory Data elements are not validated! data element need to be a 2-tuple (end_url:string, params:dict)
Arguments
entry:
a data element that is passed through this function in run_pipe method a correct data element for api call is ("the end of the url", {"param_name":"param_val"}) or ("the end of the url", None) if there is no params or (None, None) if there is no params and no end_url
def write(self, entry_pack)
-
called in third for groups of elements of the 'data' loaded (to write it in base for example)
You need to override this method. Provide the behavior you want for this data after the processing
Arguments
entry_pack:
a group of requests_results that is passed through this function in run_pipe method
Inherited members
class GenericPipeline
-
Abstract Pipeline class
All Pipeline class must inherit from this class methods read, process and write needs to be override in the subclass
Ancestors
- abc.ABC
Subclasses
Methods
def load_data(self, data)
-
Check if data is an iterable and load data in self._data attribute
if data argument hasn't iter method implemented, ValueError is raised
def process(self, entry)
-
called in second for each element of the 'data' loaded (to process transformations)
Arguments
entry:
a data element that is passed through this function in run_pipe method
def read(self, entry)
-
called in first for each element of the 'data' loaded (to parse)
Arguments
entry:
a data element that is passed through this function in run_pipe method
def run_pipe(self, transaction_rate=None)
-
method to call to execute the pipe
Arguments
transaction_rate(Optional):
Integer. Provides the number of data elements that need to be write together with the write method Put it to 1(one) to write after each element process if transaction_rate number is higher than data length, write method is executed once for all data elements at the end if transaction_rate number is None(Not specified) write method is called once a the end of the pipe
def write(self, entry_pack)
-
called in third for groups of elements of the 'data' loaded (to write it in base for example)
Arguments
entry_pack:
a group of data element that is passed through this function in run_pipe method