Module src.Pipelines

Classes

class ApiPipeline (request_factory: RequestFactory, sleeping_time: float = None)

Abstract ApiPipeline

All ApiPipeline class must inherit from this class methods read, process and write needs to be override in the subclass

Arguments

request_factory(Required):

RequestFactory instance (see the doc).
A RequestFactory instance that will create all requests of the pipe

sleeping_time(Optional):

Float.
If api calls need to be delayed, add the time in seconds you
want that pipe sleep after each request to 'sleeping_time' argument

Ancestors

Class variables

var request_factory

Instance variables

var err_log

List of errors occured during Pipe

Log objects are 4-tuple like ("entry", "status_code_if_there_is", "datetime", "typeError") Errors catched are requests.exceptions.ConnectionError, Timeout, and HttpError

Methods

def err_params_log(self)

return error logs parameters to rerun the pipe with failed requests

def process(self, entry)

execute the requests created by read() method and sleep if needed

if an error Occurs during request execution an log object is added to err_log argument Log objects are 4-tuple like

    ("entry", "status_code_if_there_is", "datetime", "typeError")

Errors catched are requests.exceptions.ConnectionError, Timeout, and HttpError

Arguments

entry:

a request element that is passed through this function in run_pipe method
check read() method documentation
def read(self, entry)

wrap request parameters in the requestFactory

create a request with a data element passed in argument and the requestFactory Data elements are not validated! data element need to be a 2-tuple (end_url:string, params:dict)

Arguments

entry:

a data element that is passed through this function in run_pipe method
a correct data element for api call is

    ("the end of the url", {"param_name":"param_val"})
    or
    ("the end of the url", None) if there is no params
    or
    (None, None) if there is no params and no end_url
def write(self, entry_pack)

called in third for groups of elements of the 'data' loaded (to write it in base for example)

You need to override this method. Provide the behavior you want for this data after the processing

Arguments

entry_pack:

a group of requests_results that is passed through this function in run_pipe method

Inherited members

class GenericPipeline

Abstract Pipeline class

All Pipeline class must inherit from this class methods read, process and write needs to be override in the subclass

Ancestors

  • abc.ABC

Subclasses

Methods

def load_data(self, data)

Check if data is an iterable and load data in self._data attribute

if data argument hasn't iter method implemented, ValueError is raised

def process(self, entry)

called in second for each element of the 'data' loaded (to process transformations)

Arguments

entry:

a data element that is passed through this function in run_pipe method
def read(self, entry)

called in first for each element of the 'data' loaded (to parse)

Arguments

entry:

a data element that is passed through this function in run_pipe method
def run_pipe(self, transaction_rate=None)

method to call to execute the pipe

Arguments

transaction_rate(Optional):

Integer.
Provides the number of data elements that need to be write together
with the write method
Put it to 1(one) to write after each element process
if transaction_rate number is higher than data length, write method
is executed once for all data elements at the end
if transaction_rate number is None(Not specified) write method is called
once a the end of the pipe
def write(self, entry_pack)

called in third for groups of elements of the 'data' loaded (to write it in base for example)

Arguments

entry_pack:

a group of data element that is passed through this function in run_pipe method