Documentation ¶
Overview ¶
Package logic provides the business logic for Trimetric. It handles interacting with the database and processing data.
Index ¶
- Constants
- func ConsumeKafkaTopic(ctx context.Context, c ConsumerFunc, topic string, addrs []string) error
- func PollGTFSData(ctx context.Context, ld LoaderDataset, redisPool *redis.Pool, ...)
- func ProduceTripUpdates(ctx context.Context, baseURL string, apiKey string, p Producer) error
- func ProduceVehiclePositions(ctx context.Context, p Producer, baseURL, apiKey string, delay time.Duration) error
- type Arrival
- type ConsumerFunc
- type KafkaProducer
- type LoaderDataset
- type LoaderSQLDataset
- func (ld *LoaderSQLDataset) LoadCalendarDates(tx *sql.Tx, c *trimet.CSV) error
- func (ld *LoaderSQLDataset) LoadGTFSData(baseURL string) error
- func (ld *LoaderSQLDataset) LoadRoutes(tx *sql.Tx, c *trimet.CSV) error
- func (ld *LoaderSQLDataset) LoadServices(tx *sql.Tx, c *trimet.CSV) error
- func (ld *LoaderSQLDataset) LoadShapes(tx *sql.Tx, c *trimet.CSV) error
- func (ld *LoaderSQLDataset) LoadStopTimes(tx *sql.Tx, c *trimet.CSV) error
- func (ld *LoaderSQLDataset) LoadStops(tx *sql.Tx, c *trimet.CSV) error
- func (ld *LoaderSQLDataset) LoadTrips(tx *sql.Tx, c *trimet.CSV) error
- type Point
- type Producer
- type RouteDataset
- type RoutePoint
- type RouteSQLDataset
- type RouteShape
- type Shape
- type ShapeDataset
- type ShapeSQLDataset
- type StopDataset
- type StopSQLDataset
- func (sd *StopSQLDataset) FetchAllStops() ([]StopWithDistance, error)
- func (sd *StopSQLDataset) FetchArrivals(stopIDs []string) ([]Arrival, error)
- func (sd *StopSQLDataset) FetchWithinBox(w, s, e, n string) ([]StopWithDistance, error)
- func (sd *StopSQLDataset) FetchWithinDistance(lat, lng, dist string) ([]StopWithDistance, error)
- type StopWithDistance
- type TripShape
- type TripUpdateSQLDataset
- type TripUpdatesDataset
- type VehicleDataset
- type VehiclePositionWithRouteType
- type VehicleSQLDataset
Constants ¶
const ( TripUpdatesTopic = "trip_updates" VehiclePositionsTopic = "vehicle_positions" )
Kafka topics
Variables ¶
This section is empty.
Functions ¶
func ConsumeKafkaTopic ¶
ConsumeKafkaTopic reads from a Kafka partition and writes the messages into a ConsumerFunc.
func PollGTFSData ¶
PollGTFSData makes periodic queries to fetch static GTFS data from Trimet. It stores a timestamp of the last query in Redis and will only download if it has been more than 24 hours since the last download.
func ProduceTripUpdates ¶
ProduceTripUpdates makes requests to the Trimet API and sends the results to a Producer.
Types ¶
type Arrival ¶
type Arrival struct { RouteID string `json:"route_id"` RouteShortName string `json:"route_short_name"` RouteLongName string `json:"route_long_name"` RouteType int `json:"route_type"` RouteColor string `json:"route_color"` RouteTextColor string `json:"route_text_color"` TripID string `json:"trip_id"` StopID string `json:"stop_id"` Headsign string `json:"headsign"` ArrivalTime *trimet.Time `json:"arrival_time"` DepartureTime *trimet.Time `json:"departure_time"` VehicleID *string `json:"vehicle_id"` VehicleLabel *string `json:"vehicle_label"` VehiclePosition trimet.Position `json:"vehicle_position"` Date time.Time `json:"date"` }
Arrival ...
type ConsumerFunc ¶
ConsumerFunc defines a generic functioin type to read messages from a Producer
type KafkaProducer ¶
type KafkaProducer struct {
// contains filtered or unexported fields
}
KafkaProducer provides a struct with methods that implement a Producer.
func NewKafkaProducer ¶
func NewKafkaProducer(topic string, addrs []string) (*KafkaProducer, error)
NewKafkaProducer returns a new KafkaProducer
func (*KafkaProducer) Close ¶
func (k *KafkaProducer) Close() error
Close wraps a sarama Close method to provide a generic method that satisifies the Producer interface.
func (*KafkaProducer) Produce ¶
func (k *KafkaProducer) Produce(b []byte) error
Produce wraps a sarama Producer to provide a generic method that satisifies the Producer interface.
type LoaderDataset ¶
LoaderDataset provides a method to bulk load static GTFS data.
type LoaderSQLDataset ¶
LoaderSQLDataset implements LoaderDataset for a SQL database.
func (*LoaderSQLDataset) LoadCalendarDates ¶
LoadCalendarDates loads calendar_dates.txt.
func (*LoaderSQLDataset) LoadGTFSData ¶
func (ld *LoaderSQLDataset) LoadGTFSData(baseURL string) error
LoadGTFSData downloads the lastes static GTFS data from Trimet, and updates the GTFS data in the database.
func (*LoaderSQLDataset) LoadRoutes ¶
LoadRoutes loads routes.txt.
func (*LoaderSQLDataset) LoadServices ¶
LoadServices populates the services table with the unique set of service_ids from the calendar_dates.txt file.
func (*LoaderSQLDataset) LoadShapes ¶
LoadShapes loads shapes.txt.
func (*LoaderSQLDataset) LoadStopTimes ¶
LoadStopTimes loads stop_times.txt.
type Point ¶
type Point struct { Lat float64 `json:"lat"` Lng float64 `json:"lng"` DistTraveled float64 `json:"dist_traveled"` }
Point represents a single point along a route shape
type RouteDataset ¶
RouteDataset provides methods to query and update a database table of Shapes
type RoutePoint ¶
RoutePoint represents a single point along a route
type RouteSQLDataset ¶
RouteSQLDataset stores a DB instance and provides access to methods to retrieve and update shapes from the database
func (*RouteSQLDataset) FetchRoutes ¶
func (sd *RouteSQLDataset) FetchRoutes() ([]trimet.Route, error)
FetchRoutes returns a slice of all routes in the database.
type RouteShape ¶
type RouteShape struct { DirectionID int `json:"direction_id"` RouteID string `json:"route_id"` Color string `json:"color"` Points []RoutePoint `json:"points"` }
RouteShape represents a complete shape line for a given RouteID
type Shape ¶
type Shape struct { ID string `json:"id"` DirectionID int `json:"direction_id"` RouteID string `json:"route_id"` Point []Point `json:"point"` }
Shape represents the shape line for a given route.
type ShapeDataset ¶
type ShapeDataset interface { FetchRouteShapes() ([]*RouteShape, error) FetchShapes(routeIDS, shapeIDs []string) ([]Shape, error) FetchTripShapes(tripIDs []string) (map[string]*TripShape, error) }
ShapeDataset provides methods to query and update a database table of Shapes
type ShapeSQLDataset ¶
ShapeSQLDataset stores a DB instance and provides access to methods to retrieve and update shapes from the database
func (*ShapeSQLDataset) FetchRouteShapes ¶
func (sd *ShapeSQLDataset) FetchRouteShapes() ([]*RouteShape, error)
FetchRouteShapes returns all shapes for train routes and flattens them to reduce the amount of data.
func (*ShapeSQLDataset) FetchShapes ¶
func (sd *ShapeSQLDataset) FetchShapes(routeIDs, shapeIDs []string) ([]Shape, error)
FetchShapes takes a slice of routes or shapeID's and returns an array of shapes.
func (*ShapeSQLDataset) FetchTripShapes ¶
func (sd *ShapeSQLDataset) FetchTripShapes(tripIDs []string) (map[string]*TripShape, error)
FetchTripShapes takes a slice of trip ID's and returns a map that associates trip id's with all of their trip shapes. This is used client side to render a route line for a specific trip a vehicle is on.
type StopDataset ¶
type StopDataset interface { FetchAllStops() ([]StopWithDistance, error) FetchWithinDistance(lat, lng, dist string) ([]StopWithDistance, error) FetchWithinBox(w, s, e, n string) ([]StopWithDistance, error) FetchArrivals(stopIDs []string) ([]Arrival, error) }
StopDataset provides methods to query and update a database table of Stops
type StopSQLDataset ¶
StopSQLDataset stores a DB instance and provides access to methods to retrieve and update stops from the database
func (*StopSQLDataset) FetchAllStops ¶
func (sd *StopSQLDataset) FetchAllStops() ([]StopWithDistance, error)
FetchAllStops ...
func (*StopSQLDataset) FetchArrivals ¶
func (sd *StopSQLDataset) FetchArrivals(stopIDs []string) ([]Arrival, error)
FetchArrivals returns a list of arrivals by combining multiple data sets.
func (*StopSQLDataset) FetchWithinBox ¶
func (sd *StopSQLDataset) FetchWithinBox(w, s, e, n string) ([]StopWithDistance, error)
FetchWithinBox ...
func (*StopSQLDataset) FetchWithinDistance ¶
func (sd *StopSQLDataset) FetchWithinDistance(lat, lng, dist string) ([]StopWithDistance, error)
FetchWithinDistance takes a point (lat,lng) and finds all stops located within 'dist'. It uses the PostGIS extension to calculate the distance to stops stored in the DB.
type StopWithDistance ¶
StopWithDistance augments the Stop struct with a distance field. Distance is calculated via PostGIS and the result is used to provide a list of stops within a specified distance from a specific point.
type TripShape ¶
type TripShape struct { RouteShape TripID string `json:"trip_id"` }
TripShape adds a TripID to a RouteShape in order for it to be associated later in the client.
type TripUpdateSQLDataset ¶
TripUpdateSQLDataset wraps a DB instance that is used to store trip update data
func (*TripUpdateSQLDataset) FetchTripUpdates ¶
func (tuds *TripUpdateSQLDataset) FetchTripUpdates() ([]trimet.TripUpdate, error)
FetchTripUpdates return Tripupdates from the DB
func (*TripUpdateSQLDataset) UpdateTripUpdateBytes ¶
func (tuds *TripUpdateSQLDataset) UpdateTripUpdateBytes(ctx context.Context, b []byte) error
UpdateTripUpdateBytes reads bytes and updates the TripUpdates DB
func (*TripUpdateSQLDataset) UpdateTripUpdates ¶
func (tuds *TripUpdateSQLDataset) UpdateTripUpdates(tus []trimet.TripUpdate) error
UpdateTripUpdates updates trip data in the db.
type TripUpdatesDataset ¶
type TripUpdatesDataset interface { UpdateTripUpdates(tus []trimet.TripUpdate) error UpdateTripUpdateBytes(ctx context.Context, b []byte) error FetchTripUpdates() ([]trimet.TripUpdate, error) }
TripUpdatesDataset provides methods to update and retrieve trip update data
type VehicleDataset ¶
type VehicleDataset interface { FetchVehiclePositions(since int) ([]VehiclePositionWithRouteType, error) UpsertVehiclePosition(v *trimet.VehiclePosition) error UpsertVehiclePositionBytes(ctx context.Context, b []byte) error }
VehicleDataset provides methods to update and retrieve vehicle data
type VehiclePositionWithRouteType ¶
type VehiclePositionWithRouteType struct { trimet.VehiclePosition RouteType trimet.RouteType `json:"route_type" msg:"route_type"` }
VehiclePositionWithRouteType adds routetype to identify the vehicle type
type VehicleSQLDataset ¶
VehicleSQLDataset wraps a DB instance that is used to store vehicle data
func (*VehicleSQLDataset) FetchVehiclePositions ¶
func (vd *VehicleSQLDataset) FetchVehiclePositions(since int) ([]VehiclePositionWithRouteType, error)
FetchVehiclePositions makes a query against the DB and retrieves a list of vehicle data. If IDs are passed in, then vehicle data is restricted to those specific vehicle IDs. Otherwise, all vehicles with a non-expired timestamp are returned.
func (*VehicleSQLDataset) UpsertVehiclePosition ¶
func (vd *VehicleSQLDataset) UpsertVehiclePosition(v *trimet.VehiclePosition) error
UpsertVehiclePosition updates/inserts a vehicle in the DB.
func (*VehicleSQLDataset) UpsertVehiclePositionBytes ¶
func (vd *VehicleSQLDataset) UpsertVehiclePositionBytes(ctx context.Context, b []byte) error
UpsertVehiclePositionBytes writes decodes bytes into VehiclePositions and updates the DB.