package five9 import ( "context" "fmt" "net/http" "github.com/equalsgibson/five9-go/five9/five9types" ) type AgentService struct { authState *authenticationState } func (s AgentService) GetAllMaintenanceNoticesForSelf(ctx context.Context) ([]five9types.MaintenanceNoticeInfo, error) { var target []five9types.MaintenanceNoticeInfo request, err := http.NewRequestWithContext( ctx, http.MethodGet, "/appsvcs/rs/svc/agents/:userID/maintenance_notices", http.NoBody, ) if err != nil { return nil, err } if err := s.authState.requestWithAuthentication(request, &target); err != nil { return nil, err } return target, nil } func (s AgentService) AcceptMaintenanceNoticeForSelf( ctx context.Context, maintenanceNoticeID five9types.MaintenanceNoticeID, ) (five9types.MaintenanceNoticeInfo, error) { var target five9types.MaintenanceNoticeInfo request, err := http.NewRequestWithContext( ctx, http.MethodPut, fmt.Sprintf( "/appsvcs/rs/svc/agents/:userID/maintenance_notices/%s/accept", maintenanceNoticeID, ), http.NoBody, ) if err != nil { return five9types.MaintenanceNoticeInfo{}, err } if err := s.authState.requestWithAuthentication(request, &target); err != nil { return five9types.MaintenanceNoticeInfo{}, err } return target, nil } func (s *AgentService) GetAllReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) { logoutCodes, err := s.getAllLogoutReasonCodes(ctx) if err != nil { return nil, err } notReadyCodes, err := s.getAllNotReadyReasonCodes(ctx) if err != nil { return nil, err } return append(logoutCodes, notReadyCodes...), nil } func (s *AgentService) getAllLogoutReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) { var target []five9types.ReasonCodeInfo request, err := http.NewRequestWithContext( ctx, http.MethodGet, "/appsvcs/rs/svc/orgs/:organizationID/logout_reason_codes", http.NoBody, ) if err != nil { return nil, err } if err := s.authState.requestWithAuthentication(request, &target); err != nil { return nil, err } return target, nil } func (s *AgentService) getAllNotReadyReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) { var target []five9types.ReasonCodeInfo request, err := http.NewRequestWithContext( ctx, http.MethodGet, "/appsvcs/rs/svc/orgs/:organizationID/not_ready_reason_codes", http.NoBody, ) if err != nil { return nil, err } if err := s.authState.requestWithAuthentication(request, &target); err != nil { return nil, err } return target, nil }
package five9 import ( "context" "fmt" "net/http" "strings" "sync" "time" "github.com/equalsgibson/five9-go/five9/five9types" ) type authenticationState struct { client *client loginResponse *five9types.LoginResponse loginMutex *sync.Mutex apiContextPath string } func (a *authenticationState) endpointGetSessionMetadata(ctx context.Context) error { request, err := http.NewRequestWithContext( ctx, http.MethodGet, fmt.Sprintf("/%s/auth/metadata", a.apiContextPath), http.NoBody, ) if err != nil { return err } return a.requestWithAuthentication(request, nil) } func (a *authenticationState) requestWithAuthentication(request *http.Request, target any) error { login, err := a.getLogin(request.Context()) if err != nil { return err } request.URL.Scheme = "https" request.URL.Host = login.GetAPIHost() request.URL.Path = strings.ReplaceAll(request.URL.Path, ":userID", string(login.UserID)) request.URL.Path = strings.ReplaceAll(request.URL.Path, ":organizationID", string(login.OrgID)) var latestAttemptErr error tries := 0 for tries < 3 { tries++ latestAttemptErr = a.client.request(request, target) if latestAttemptErr != nil { if five9Error, ok := latestAttemptErr.(*Error); ok { if five9Error.StatusCode == http.StatusUnauthorized { // The login is not registered by other endpoints for a short time. // I think this has to do with Five9 propagating the session across their data centers. // We login using the app.five9.com domain but then make subsequent calls to the data center specific domain time.Sleep(time.Second * 2) continue } // Five9 reply with Status 435 if a service has been migrated. This is not an official status code, so check directly. if five9Error.StatusCode == int(435) { // Clear out the login state a.loginMutex.Lock() defer a.loginMutex.Unlock() a.loginResponse = nil return latestAttemptErr } } } return nil } return latestAttemptErr } func (a *authenticationState) getLogin( ctx context.Context, ) (*five9types.LoginResponse, error) { { // check for existing login if a.loginResponse != nil { return a.loginResponse, nil } a.loginMutex.Lock() defer a.loginMutex.Unlock() if a.loginResponse != nil { return a.loginResponse, nil } } login, err := a.endpointLogin(ctx) if err != nil { return nil, err } a.loginResponse = &login if err := a.endpointGetSessionMetadata(ctx); err != nil { return nil, err } loginState, err := a.endpointGetLoginState(ctx) if err != nil { return nil, err } switch loginState { case five9types.UserLoginStateSelectStation: // Standard response after logging in. if err := a.endpointStartSession(ctx); err != nil { return nil, err } // Check the login state after starting the session newLoginState, err := a.endpointGetLoginState(ctx) if err != nil { return nil, err } if newLoginState == five9types.UserLoginStateAcceptNotice { if err := a.handleMaintenanceNotices(ctx); err != nil { return nil, err } } case five9types.UserLoginStateAcceptNotice: // Can occur if Five9 have issued a maintenance notice if err := a.handleMaintenanceNotices(ctx); err != nil { return nil, err } case five9types.UserLoginStateRelogin: // Can occur if the service has been migrated if err := a.endpointRestartSession(ctx); err != nil { return nil, err } // Check the login state after restarting the session newLoginState, err := a.endpointGetLoginState(ctx) if err != nil { return nil, err } if newLoginState == five9types.UserLoginStateAcceptNotice { if err := a.handleMaintenanceNotices(ctx); err != nil { return nil, err } } } return a.loginResponse, nil } func (a *authenticationState) endpointLogin(ctx context.Context) (five9types.LoginResponse, error) { payload := five9types.LoginPayload{ PasswordCredentials: a.client.credentials, AppKey: "web-ui", Policy: five9types.PolicyForceIn, } request, err := http.NewRequestWithContext( ctx, http.MethodPost, fmt.Sprintf("https://app.five9.com/%s/auth/login", a.apiContextPath), structToReaderCloser(payload), ) if err != nil { return five9types.LoginResponse{}, err } target := five9types.LoginResponse{} if err := a.client.request(request, &target); err != nil { return five9types.LoginResponse{}, err } return target, nil } func (a *authenticationState) endpointGetLoginState(ctx context.Context) (five9types.UserLoginState, error) { path := agentAPIPath if a.apiContextPath == supervisorAPIContextPath { path = supervisorAPIPath } var target five9types.UserLoginState request, err := http.NewRequestWithContext( ctx, http.MethodGet, fmt.Sprintf( "/%s/%s/:userID/login_state", a.apiContextPath, path, ), http.NoBody, ) if err != nil { return "", err } if err := a.requestWithAuthentication(request, &target); err != nil { return "", err } return target, nil } func (a *authenticationState) endpointStartSession(ctx context.Context) error { path := agentAPIPath if a.apiContextPath == supervisorAPIContextPath { path = supervisorAPIPath } request, err := http.NewRequestWithContext( ctx, http.MethodPut, fmt.Sprintf( "/%s/%s/:userID/session_start?force=true", a.apiContextPath, path, ), structToReaderCloser(five9types.StationInfo{ StationID: "", StationType: "EMPTY", }), ) if err != nil { return err } if err := a.requestWithAuthentication(request, nil); err != nil { return err } return nil } func (a *authenticationState) endpointRestartSession(ctx context.Context) error { path := agentAPIPath if a.apiContextPath == supervisorAPIContextPath { path = supervisorAPIPath } request, err := http.NewRequestWithContext( ctx, http.MethodPut, fmt.Sprintf( "/%s/%s/:userID/session_restart", a.apiContextPath, path, ), http.NoBody, ) if err != nil { return err } if err := a.requestWithAuthentication(request, nil); err != nil { return err } return nil } func (a *authenticationState) handleMaintenanceNotices(ctx context.Context) error { notices, err := a.endpointGetMaintenanceNotices(ctx) if err != nil { return err } for _, notice := range notices { if err := a.endpointAcceptMaintenanceNotice(ctx, notice.ID); err != nil { return err } } loginState, err := a.endpointGetLoginState(ctx) if err != nil { return err } if loginState != five9types.UserLoginStateWorking { if err := a.endpointStartSession(ctx); err != nil { return err } } return nil } func (a *authenticationState) endpointGetMaintenanceNotices(ctx context.Context) ([]five9types.MaintenanceNoticeInfo, error) { path := agentAPIPath if a.apiContextPath == supervisorAPIContextPath { path = supervisorAPIPath } request, err := http.NewRequestWithContext( ctx, http.MethodGet, fmt.Sprintf( "/%s/%s/:userID/maintenance_notices", a.apiContextPath, path, ), http.NoBody, ) if err != nil { return nil, err } target := []five9types.MaintenanceNoticeInfo{} if err := a.requestWithAuthentication(request, &target); err != nil { return nil, err } return target, nil } func (a *authenticationState) endpointAcceptMaintenanceNotice( ctx context.Context, maintenanceNoticeID five9types.MaintenanceNoticeID, ) error { path := agentAPIPath if a.apiContextPath == supervisorAPIContextPath { path = supervisorAPIPath } request, err := http.NewRequestWithContext( ctx, http.MethodPut, fmt.Sprintf( "/%s/%s/:userID/maintenance_notices/%s/accept", a.apiContextPath, path, maintenanceNoticeID, ), http.NoBody, ) if err != nil { return err } target := five9types.MaintenanceNoticeInfo{} return a.requestWithAuthentication(request, &target) }
package five9 import ( "bytes" "encoding/json" "io" "net/http" "github.com/equalsgibson/five9-go/five9/five9types" ) type client struct { httpClient *http.Client credentials five9types.PasswordCredentials requestPreProcessors []func(r *http.Request) error } const ( supervisorAPIContextPath = "supsvcs/rs/svc" agentAPIContextPath = "appsvcs/rs/svc" agentAPIPath = "agents" supervisorAPIPath = "supervisors" ) func (c *client) request(request *http.Request, target any) error { request.Header.Set("Accept", "application/json") request.Header.Set("Content-Type", "application/json") for _, requestPreProcessor := range c.requestPreProcessors { if err := requestPreProcessor(request); err != nil { return err } } response, err := c.httpClient.Do(request) if err != nil { return err } defer response.Body.Close() if response.StatusCode >= http.StatusBadRequest { bodyBytes, err := io.ReadAll(response.Body) if err != nil { return err } responseErr := &Error{ StatusCode: response.StatusCode, Body: bodyBytes, } if err := json.Unmarshal(bodyBytes, responseErr); err != nil { return err } return responseErr } if target != nil { bodyBytes, err := io.ReadAll(response.Body) if err != nil { return err } if err := json.Unmarshal(bodyBytes, target); err != nil { return err } } return nil } func structToReaderCloser(v any) io.Reader { vBytes, err := json.Marshal(v) if err != nil { panic(err) } return bytes.NewReader(vBytes) }
package five9 import ( "net/http" ) type ConfigFunc func(*Service) func AddRequestPreprocessor(things ...func(*http.Request) error) ConfigFunc { return func(s *Service) { s.agentService.authState.client.requestPreProcessors = append( s.agentService.authState.client.requestPreProcessors, things..., ) } } func SetWebsocketHandler(w webSocketHandler) ConfigFunc { return func(s *Service) { s.supervisorService.webSocketHandler = w } } func SetRoundTripper(roundTripper http.RoundTripper) ConfigFunc { return func(s *Service) { s.agentService.authState.client.httpClient.Transport = roundTripper } } // func SetMaxCacheAge(maxAge *time.Duration) ConfigFunc { // return func(s *Service) { // s.agentService.authState.client.httpClient.Transport = roundTripper // } // }
package five9 import ( "encoding/json" "errors" "fmt" ) type five9Error struct { Five9ExceptionDetail five9ExceptionDetail `json:"five9ExceptionDetail"` } type five9ExceptionDetail struct { Timestamp uint `json:"timestamp"` ErrorCode int `json:"errorCode"` Message string `json:"message"` Context errorContext `json:"context"` } type errorContext struct { ContextCode string `json:"contextCode"` ObjectID string `json:"objectId"` } type Error struct { StatusCode int `json:"status_code"` Body []byte `json:"body"` Message string `json:"message"` } func (err *Error) Error() string { if err.Message == "" { return fmt.Sprintf("Five9 REST API Error, Status Code: %d", err.StatusCode) } return err.Message } func (err *Error) UnmarshalJSON(b []byte) error { target := five9Error{} if targetErr := json.Unmarshal(b, &target); targetErr == nil { if target.Five9ExceptionDetail.Message != "" { err.Message = target.Five9ExceptionDetail.Message return nil } } return nil } var ( ErrUnknownUserID error = errors.New("unknown userID provided") ErrWebSocketCacheNotReady error = errors.New("webSocket cache is not ready") ErrWebSocketCacheStale error = errors.New("webSocket cache is stale") )
package five9 import ( "net/http" "net/http/cookiejar" "sync" "time" "github.com/equalsgibson/five9-go/five9/five9types" "github.com/equalsgibson/five9-go/five9/internal/utils" ) func NewService( creds five9types.PasswordCredentials, configFuncs ...ConfigFunc, ) *Service { cookieJar, _ := cookiejar.New(nil) httpClient := &http.Client{ Jar: cookieJar, } defaultCacheAllowedAge := time.Hour c := &client{ credentials: creds, httpClient: httpClient, requestPreProcessors: []func(r *http.Request) error{}, } s := &Service{ // ** // agentService: &AgentService{ authState: &authenticationState{ client: c, apiContextPath: agentAPIContextPath, loginMutex: &sync.Mutex{}, }, }, // ** // supervisorService: &SupervisorService{ authState: &authenticationState{ client: c, apiContextPath: supervisorAPIContextPath, loginMutex: &sync.Mutex{}, }, domainMetadataCache: &domainMetadataCache{ agentInfoState: utils.NewMemoryCacheInstance[ five9types.UserID, five9types.AgentInfo, ](&defaultCacheAllowedAge), reasonCodeInfoState: utils.NewMemoryCacheInstance[ five9types.ReasonCodeID, five9types.ReasonCodeInfo, ](&defaultCacheAllowedAge), queueInfoState: utils.NewMemoryCacheInstance[ five9types.QueueID, five9types.QueueInfo, ](&defaultCacheAllowedAge), }, webSocketHandler: &liveWebsocketHandler{}, webSocketCache: &supervisorWebSocketCache{ agentState: utils.NewMemoryCacheInstance[ five9types.UserID, five9types.AgentState, ](&defaultCacheAllowedAge), agentStatistics: utils.NewMemoryCacheInstance[ five9types.UserID, five9types.AgentStatistics, ](&defaultCacheAllowedAge), acdState: utils.NewMemoryCacheInstance[ five9types.QueueID, five9types.ACDState, ](&defaultCacheAllowedAge), timers: utils.NewMemoryCacheInstance[ five9types.EventID, *time.Time, ](nil), }, }, } // Set the cache to default values s.supervisorService.resetCache() for _, configFunc := range configFuncs { configFunc(s) } return s } type Service struct { agentService *AgentService supervisorService *SupervisorService } func (s *Service) Supervisor() *SupervisorService { return s.supervisorService } func (s *Service) Agent() *AgentService { return s.agentService } type domainMetadataCache struct { reasonCodeInfoState *utils.MemoryCacheInstance[five9types.ReasonCodeID, five9types.ReasonCodeInfo] agentInfoState *utils.MemoryCacheInstance[five9types.UserID, five9types.AgentInfo] queueInfoState *utils.MemoryCacheInstance[five9types.QueueID, five9types.QueueInfo] }
package five9 import ( "context" "net/http" "github.com/equalsgibson/five9-go/five9/five9types" ) type SupervisorService struct { authState *authenticationState webSocketHandler webSocketHandler webSocketCache *supervisorWebSocketCache domainMetadataCache *domainMetadataCache } func (s *SupervisorService) GetOwnUserInfo(ctx context.Context) (five9types.AgentInfo, error) { users, err := s.getDomainUserInfoMap(ctx) if err != nil { return five9types.AgentInfo{}, err } self, ok := users[s.authState.loginResponse.UserID] if !ok { return five9types.AgentInfo{}, ErrUnknownUserID } return self, nil } func (s *SupervisorService) GetStatisticsFilterSettings(ctx context.Context) ([]five9types.AgentInfo, error) { var target []five9types.AgentInfo request, err := http.NewRequestWithContext( ctx, http.MethodGet, "/supsvcs/rs/svc/supervisors/:userID/stats_filter_settings", http.NoBody, ) if err != nil { return nil, err } if err := s.authState.requestWithAuthentication(request, &target); err != nil { return nil, err } return target, nil } func (s *SupervisorService) SetStatisticsFilterSettings(ctx context.Context, payload any) ([]five9types.AgentInfo, error) { var target []five9types.AgentInfo request, err := http.NewRequestWithContext( ctx, http.MethodPut, "/supsvcs/rs/svc/supervisors/:userID/stats_filter_settings", structToReaderCloser(payload), ) if err != nil { return nil, err } if err := s.authState.requestWithAuthentication(request, &target); err != nil { return nil, err } return target, nil } func (s *SupervisorService) getDomainUserInfoMap(ctx context.Context) (map[five9types.UserID]five9types.AgentInfo, error) { allUserInfo, err := s.domainMetadataCache.agentInfoState.GetAll() if err == nil { return allUserInfo.Items, nil } domainUserSlice, err := s.GetAllDomainUsers(ctx) if err != nil { return nil, err } freshData := map[five9types.UserID]five9types.AgentInfo{} for _, domainUser := range domainUserSlice { freshData[domainUser.ID] = domainUser } s.domainMetadataCache.agentInfoState.Replace(freshData) return freshData, nil } func (s *SupervisorService) GetAllDomainUsers(ctx context.Context) ([]five9types.AgentInfo, error) { var target []five9types.AgentInfo request, err := http.NewRequestWithContext( ctx, http.MethodGet, "/supsvcs/rs/svc/orgs/:organizationID/users", http.NoBody, ) if err != nil { return nil, err } if err := s.authState.requestWithAuthentication(request, &target); err != nil { return nil, err } return target, nil } func (s *SupervisorService) getQueueInfoMap(ctx context.Context) (map[five9types.QueueID]five9types.QueueInfo, error) { q, err := s.domainMetadataCache.queueInfoState.GetAll() if err == nil { return q.Items, nil } queues, err := s.GetAllQueues(ctx) if err != nil { return nil, err } freshData := map[five9types.QueueID]five9types.QueueInfo{} for _, queue := range queues { freshData[queue.ID] = queue } s.domainMetadataCache.queueInfoState.Replace(freshData) return freshData, nil } func (s *SupervisorService) GetAllQueues(ctx context.Context) ([]five9types.QueueInfo, error) { var target []five9types.QueueInfo request, err := http.NewRequestWithContext( ctx, http.MethodGet, "/supsvcs/rs/svc/orgs/:organizationID/skills", http.NoBody, ) if err != nil { return nil, err } if err := s.authState.requestWithAuthentication(request, &target); err != nil { return nil, err } return target, nil } func (s *SupervisorService) GetReasonCodeInfoMap(ctx context.Context) (map[five9types.ReasonCodeID]five9types.ReasonCodeInfo, error) { r, err := s.domainMetadataCache.reasonCodeInfoState.GetAll() if err == nil { return r.Items, nil } reasonCodes, err := s.GetAllReasonCodes(ctx) if err != nil { return nil, err } freshData := map[five9types.ReasonCodeID]five9types.ReasonCodeInfo{} for _, reasonCode := range reasonCodes { freshData[reasonCode.ID] = reasonCode } s.domainMetadataCache.reasonCodeInfoState.Replace(freshData) return freshData, nil } func (s *SupervisorService) GetAllReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) { reasonCodes := []five9types.ReasonCodeInfo{} logoutCodes, err := s.getAllLogoutReasonCodes(ctx) if err != nil { return nil, err } reasonCodes = append(reasonCodes, logoutCodes...) notReadyCodes, err := s.getAllNotReadyReasonCodes(ctx) if err != nil { return nil, err } return append(reasonCodes, notReadyCodes...), nil } func (s *SupervisorService) getAllLogoutReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) { var target []five9types.ReasonCodeInfo request, err := http.NewRequestWithContext( ctx, http.MethodGet, "/supsvcs/rs/svc/orgs/:organizationID/logout_reason_codes", http.NoBody, ) if err != nil { return nil, err } if err := s.authState.requestWithAuthentication(request, &target); err != nil { return nil, err } return target, nil } func (s *SupervisorService) getAllNotReadyReasonCodes(ctx context.Context) ([]five9types.ReasonCodeInfo, error) { var target []five9types.ReasonCodeInfo request, err := http.NewRequestWithContext( ctx, http.MethodGet, "/supsvcs/rs/svc/orgs/:organizationID/not_ready_reason_codes", http.NoBody, ) if err != nil { return nil, err } if err := s.authState.requestWithAuthentication(request, &target); err != nil { return nil, err } return target, nil } func (s *SupervisorService) requestWebSocketFullStatistics(ctx context.Context) error { request, err := http.NewRequestWithContext( ctx, http.MethodPut, "/supsvcs/rs/svc/supervisors/:userID/request_full_statistics", http.NoBody, ) if err != nil { return err } if err := s.authState.requestWithAuthentication(request, nil); err != nil { return err } return nil } func (s *SupervisorService) UpdateAgentState(ctx context.Context, agentID five9types.UserID) (five9types.UserFullStateInfo, error) { return five9types.UserFullStateInfo{}, nil }
package five9 import ( "context" "errors" "fmt" "time" "github.com/equalsgibson/concur/concur" "github.com/equalsgibson/five9-go/five9/five9types" "github.com/equalsgibson/five9-go/five9/internal/utils" "github.com/google/uuid" ) type supervisorWebSocketCache struct { agentState *utils.MemoryCacheInstance[ five9types.UserID, five9types.AgentState, ] agentStatistics *utils.MemoryCacheInstance[ five9types.UserID, five9types.AgentStatistics, ] acdState *utils.MemoryCacheInstance[ five9types.QueueID, five9types.ACDState, ] timers *utils.MemoryCacheInstance[ five9types.EventID, *time.Time, ] } func (s *SupervisorService) StartWebsocket(parentCtx context.Context) error { // Clear any stale data from a previous connection s.resetCache() // If we encounter an error on the WebsocketErr channel, cancel the context, thus cancelling all other goroutines. ctx, cancel := context.WithCancelCause(parentCtx) defer func() { // Clear the cache when closing the connection s.resetCache() }() login, err := s.authState.getLogin(ctx) if err != nil { return err } connectionURL := fmt.Sprintf("wss://%s/supsvcs/sws/%s", login.GetAPIHost(), uuid.NewString()) if err := s.webSocketHandler.Connect(ctx, connectionURL, s.authState.client.httpClient); err != nil { return err } defer s.webSocketHandler.Close() asyncReader := concur.NewAsyncReader(s.webSocketHandler.Read) go asyncReader.Loop(ctx) defer asyncReader.Close() pingTicker := time.NewTicker(time.Second * 5) defer pingTicker.Stop() go func() { for { select { case <-pingTicker.C: if err := s.ping(ctx); err != nil { cancel(err) return } case <-ctx.Done(): return } } }() pongMonitorTicker := time.NewTicker(time.Second * 5) defer pongMonitorTicker.Stop() go func() { for { select { case <-pingTicker.C: if err := s.pong(ctx); err != nil { cancel(err) return // asyncReader.Close() } case <-ctx.Done(): return } } }() // Get full statistics go func() { // When starting a new session, this is called by Five9. Account for rejoining an existing session by also // calling this. if err := s.requestWebSocketFullStatistics(ctx); err != nil { cancel(err) // asyncReader.Close() } }() for { select { case update := <-asyncReader.Updates(): if update.Err != nil { return update.Err } if err := s.handleWebsocketMessage(update.Item); err != nil { return err } case <-ctx.Done(): return context.Cause(ctx) } } } func (s *SupervisorService) WSAgentState(ctx context.Context) (map[five9types.UserName]five9types.AgentState, error) { response := map[five9types.UserName]five9types.AgentState{} domainUsers, err := s.getDomainUserInfoMap(ctx) if err != nil { return nil, err } all, err := s.webSocketCache.agentState.GetAll() if err != nil { if errors.Is(err, utils.ErrWebSocketCacheStale) { return nil, ErrWebSocketCacheStale } if errors.Is(err, utils.ErrWebSocketCacheNotReady) { return nil, ErrWebSocketCacheNotReady } return nil, err } for agentID, agentState := range all.Items { agentInfo, ok := domainUsers[agentID] if !ok { continue } response[agentInfo.UserName] = agentState } return response, nil } func (s *SupervisorService) WSAgentStatistics(ctx context.Context) (map[five9types.UserName]five9types.AgentStatistics, error) { response := map[five9types.UserName]five9types.AgentStatistics{} domainUsers, err := s.getDomainUserInfoMap(ctx) if err != nil { return nil, err } allDomainUsers, err := s.webSocketCache.agentStatistics.GetAll() if err != nil { if errors.Is(err, utils.ErrWebSocketCacheStale) { return nil, ErrWebSocketCacheStale } if errors.Is(err, utils.ErrWebSocketCacheNotReady) { return nil, ErrWebSocketCacheNotReady } return nil, err } for agentID, agentStatistic := range allDomainUsers.Items { agentInfo, ok := domainUsers[agentID] if !ok { continue } response[agentInfo.UserName] = agentStatistic } return response, nil } func (s *SupervisorService) WSACDState(ctx context.Context) (map[string]five9types.ACDState, error) { response := map[string]five9types.ACDState{} queues, err := s.getQueueInfoMap(ctx) if err != nil { return nil, err } allACDState, err := s.webSocketCache.acdState.GetAll() if err != nil { if errors.Is(err, utils.ErrWebSocketCacheStale) { return nil, ErrWebSocketCacheStale } if errors.Is(err, utils.ErrWebSocketCacheNotReady) { return nil, ErrWebSocketCacheNotReady } return nil, err } for queueID, queueState := range allACDState.Items { queueInfo, ok := queues[queueID] if !ok { continue } response[queueInfo.Name] = queueState } return response, nil } func (s *SupervisorService) ping(ctx context.Context) error { if err := s.webSocketHandler.Write(ctx, []byte("ping")); err != nil { return err } return nil } func (s *SupervisorService) pong(_ context.Context) error { lastPongReceived, ok := s.webSocketCache.timers.Get(five9types.EventIDPongReceived) if !ok { return errors.New("could not obtain last pong time from cache") } if time.Since(*lastPongReceived) > time.Second*45 { return errors.New("last valid ping response from WS is older than 45 seconds, closing connection") } return nil } func (s *SupervisorService) resetCache() { s.authState.loginMutex.Lock() s.authState.loginResponse = nil s.authState.loginMutex.Unlock() s.webSocketCache.acdState.Reset() s.webSocketCache.agentState.Reset() s.webSocketCache.agentStatistics.Reset() s.webSocketCache.timers.Reset() s.domainMetadataCache.agentInfoState.Reset() s.domainMetadataCache.queueInfoState.Reset() s.domainMetadataCache.reasonCodeInfoState.Reset() serviceReset := time.Now() s.webSocketCache.timers.Update(five9types.EventIDPongReceived, &serviceReset) }
package five9 import ( "encoding/json" "errors" "fmt" "time" "github.com/equalsgibson/five9-go/five9/five9types" ) type websocketFrameProcessingError struct { OriginalError error MessageBytes []byte } func (err websocketFrameProcessingError) Error() string { return fmt.Sprintf("Error while processing websocket frame: %s - %s", err.OriginalError.Error(), string(err.MessageBytes)) } func (s *SupervisorService) handleWebsocketMessage(messageBytes []byte) error { message := five9types.WebsocketMessage{} if err := json.Unmarshal(messageBytes, &message); err != nil { return websocketFrameProcessingError{ OriginalError: err, MessageBytes: messageBytes, } } if message.Context.EventID == "" { return websocketFrameProcessingError{ OriginalError: errors.New("unsupported message"), MessageBytes: messageBytes, } } eventReceivedTime := time.Now() s.webSocketCache.timers.Update(message.Context.EventID, &eventReceivedTime) switch message.Context.EventID { case five9types.EventIDServerConnected: return nil case five9types.EventIDPongReceived: return s.handlerPong(message.Payload) case five9types.EventIDIncrementalStatsUpdate: return s.handlerIncrementalStatsUpdate(message.Payload) case five9types.EventIDSupervisorStats: return s.handlerSupervisorStats(message.Payload) } return nil } func (s *SupervisorService) handlerPong(payload any) error { payloadString, ok := payload.(string) if !ok { return fmt.Errorf("failed type assertion for payload: %T", payload) } if payloadString != "pong" { return fmt.Errorf("payload not expected type") } return nil } func (s *SupervisorService) handlerIncrementalStatsUpdate(payload any) error { payloadSlice, ok := payload.([]any) if !ok { return fmt.Errorf("failed type assertion for payload: %T", payload) } for _, payloadItem := range payloadSlice { payloadMap, ok := payloadItem.(map[string]any) if !ok { return fmt.Errorf("failed type assertion for payload item: %T", payloadItem) } dataSourceRaw, ok := payloadMap["dataSource"] if !ok { return fmt.Errorf("data source not found: %+v", payloadItem) } dataSourceString, ok := dataSourceRaw.(string) if !ok { return fmt.Errorf("data source is not a string, %T", dataSourceRaw) } dataSource := five9types.DataSource(dataSourceString) payloadItemBytes, err := json.Marshal(payloadItem) if err != nil { return err } switch dataSource { // ** // case five9types.DataSourceAgentState: eventTarget := five9types.WebSocketIncrementalAgentStateData{} if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil { return websocketFrameProcessingError{ OriginalError: err, MessageBytes: payloadItemBytes, } } if err := s.handleAgentStateUpdate(eventTarget); err != nil { return err } // ** // case five9types.DataSourceACDStatus: eventTarget := five9types.WebSocketIncrementalACDStateData{} if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil { return websocketFrameProcessingError{ OriginalError: err, MessageBytes: payloadItemBytes, } } if err := s.handleACDStateUpdate(eventTarget); err != nil { return err } } } return nil } func (s *SupervisorService) handlerSupervisorStats(payload any) error { payloadSlice, ok := payload.([]any) if !ok { return fmt.Errorf("failed type assertion for payload: %T", payload) } for _, payloadItem := range payloadSlice { payloadMap, ok := payloadItem.(map[string]any) if !ok { return fmt.Errorf("failed type assertion for payload item: %T", payloadItem) } dataSourceRaw, ok := payloadMap["dataSource"] if !ok { return fmt.Errorf("data source not found: %+v", payloadItem) } dataSourceString, ok := dataSourceRaw.(string) if !ok { return fmt.Errorf("data source is not a string, %T", dataSourceRaw) } dataSource := five9types.DataSource(dataSourceString) payloadItemBytes, err := json.Marshal(payloadItem) if err != nil { return err } switch dataSource { // ** // case five9types.DataSourceAgentState: eventTarget := five9types.WebsocketSupervisorStateData{} if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil { return websocketFrameProcessingError{ OriginalError: err, MessageBytes: payloadItemBytes, } } freshData := map[five9types.UserID]five9types.AgentState{} for _, agent := range eventTarget.Data { freshData[agent.ID] = agent } s.webSocketCache.agentState.Replace(freshData) // ** // case five9types.DataSourceAgentStatistic: eventTarget := five9types.WebsocketSupervisorStatisticsData{} if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil { return websocketFrameProcessingError{ OriginalError: err, MessageBytes: payloadItemBytes, } } freshData := map[five9types.UserID]five9types.AgentStatistics{} for _, agentStatistic := range eventTarget.Data { freshData[agentStatistic.ID] = agentStatistic } s.webSocketCache.agentStatistics.Replace(freshData) // ** // case five9types.DataSourceACDStatus: eventTarget := five9types.WebsocketSupervisorACDData{} if err := json.Unmarshal(payloadItemBytes, &eventTarget); err != nil { return websocketFrameProcessingError{ OriginalError: err, MessageBytes: payloadItemBytes, } } freshData := map[five9types.QueueID]five9types.ACDState{} for _, acd := range eventTarget.Data { freshData[acd.ID] = acd } s.webSocketCache.acdState.Replace(freshData) } } return nil } func (s *SupervisorService) handleAgentStateUpdate(eventData five9types.WebSocketIncrementalAgentStateData) error { for _, addedData := range eventData.Added { s.webSocketCache.agentState.Update(addedData.ID, addedData) } for _, updatedData := range eventData.Updated { s.webSocketCache.agentState.Update(updatedData.ID, updatedData) } for _, removedID := range eventData.Removed { s.webSocketCache.agentState.Delete(removedID) } return nil } func (s *SupervisorService) handleACDStateUpdate(eventData five9types.WebSocketIncrementalACDStateData) error { for _, addedData := range eventData.Added { s.webSocketCache.acdState.Update(addedData.ID, addedData) } for _, updatedData := range eventData.Updated { s.webSocketCache.acdState.Update(updatedData.ID, updatedData) } for _, removedID := range eventData.Removed { s.webSocketCache.acdState.Delete(removedID) } return nil }
package five9 import ( "context" "errors" "math" "net/http" "nhooyr.io/websocket" ) type webSocketHandler interface { Connect(ctx context.Context, s string, c *http.Client) error Read(ctx context.Context) ([]byte, error) Write(ctx context.Context, b []byte) error Close() } type liveWebsocketHandler struct { c *websocket.Conn } func (h *liveWebsocketHandler) Connect(ctx context.Context, connectionURL string, httpClient *http.Client) error { if h.c != nil { h.c.Close(websocket.StatusNormalClosure, "Restarting connection") } h.c = nil connection, response, err := websocket.Dial(ctx, connectionURL, &websocket.DialOptions{ HTTPClient: httpClient, }) if err != nil { return err } defer func() { if response != nil && response.Body != nil { response.Body.Close() } }() // Set a very large read limit connection.SetReadLimit(math.MaxInt32) h.c = connection return nil } func (h *liveWebsocketHandler) Read(ctx context.Context) ([]byte, error) { messageType, b, err := h.c.Read(ctx) if err != nil { return nil, err } if messageType != websocket.MessageText { return nil, errors.New("binary messages are not supported") } return b, nil } func (h *liveWebsocketHandler) Close() { if h.c != nil { h.c.Close(websocket.StatusNormalClosure, "closed") } } func (h *liveWebsocketHandler) Write(ctx context.Context, data []byte) error { return h.c.Write(ctx, websocket.MessageText, data) }